001    /*
002     * Copyright (c) 2003, The Regents of the University of California, through
003     * Lawrence Berkeley National Laboratory (subject to receipt of any required
004     * approvals from the U.S. Dept. of Energy). All rights reserved.
005     */
006    package gov.lbl.dsd.sea.nio.demo;
007    
008    import gov.lbl.dsd.sea.Stage;
009    import gov.lbl.dsd.sea.StageManager;
010    import gov.lbl.dsd.sea.nio.AgentEventHandler;
011    import gov.lbl.dsd.sea.nio.NetAgent;
012    import gov.lbl.dsd.sea.nio.event.AdminRequest;
013    import gov.lbl.dsd.sea.nio.event.ChannelRequest;
014    import gov.lbl.dsd.sea.nio.event.ChannelResponse;
015    
016    import java.io.IOException;
017    import java.nio.channels.SelectionKey;
018    import java.nio.charset.Charset;
019    
020    /**
021     * Simple non-blocking echo server that can be tested in
022     * conjunction with a standard UNIX telnet client.
023     * 
024     * @author whoschek@lbl.gov
025     * @author $Author: gegles $
026     * @version $Revision: 1.5 $, $Date: 2004/09/16 16:57:15 $
027     */
028    public class EchoServer extends AgentEventHandler {
029            
030            private static final Charset CHARSET = Charset.forName("US-ASCII");
031    
032            private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
033                            .getLog(EchoServer.class);
034    
035            static public void main(String args[]) throws IOException {
036                    new EchoServer(args);
037            }
038    
039            public EchoServer(String args[]) throws IOException {
040                    NetAgent agent = new NetAgent();
041                    Stage myStage = new StageManager().createStage(this).start();
042    
043                    for (int i = 0; i < args.length; i++) {
044                            int port = Integer.parseInt(args[i]);           
045                            agent.addListenPort(myStage, port);
046                    }
047    
048                    agent.start();
049            }
050    
051            protected void onClosed(ChannelResponse.Closed rsp) {
052                    log.info("*************got channel closed=" + rsp);
053                    if (rsp.getException() != null) {
054                            ; // nothing special to be done
055                    }
056                    System.out.println("agent status=" + rsp.getAgent().toDebugString());
057            }
058    
059            protected void onConnected(ChannelResponse.Connected rsp) {
060                    throw new UnsupportedOperationException();
061            }
062    
063            protected void onAccepted(ChannelResponse.Accepted rsp) {
064                    rsp.getAgent().enqueue(
065                                    new ChannelRequest.Register(this.getStage(), rsp.getKey()
066                                                    .channel(), SelectionKey.OP_READ));
067            }
068    
069            protected void onRead(ChannelResponse.Read rsp) {
070                    String str = CHARSET.decode(rsp.getBuffer().duplicate()).toString();
071                    System.out.println("********************  read *********** " + str);
072                    System.out.println("********************  read bytes ***** " + str.length());
073                    
074                    if (str.startsWith("exit")) {
075                            this.shutDown(rsp);
076                    } else if (str.startsWith("close")) {
077                            rsp.getAgent().enqueue(new ChannelRequest.Close(rsp.getKey().channel()));
078                    } else {
079                            rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), rsp.getBuffer()));
080                    }
081            }
082    
083            protected void onWrite(ChannelResponse.Write rsp) {
084                    rsp.getBuffer().flip();
085                    System.out.println("******************** wrote *********** "
086                                    + CHARSET.decode(rsp.getBuffer()).toString());
087    
088                    // recycling to buffer pool is an optional optimization
089                    rsp.getAgent().getReadBufferPool().put(rsp.getBuffer());
090            }
091    
092            protected void onRegistered(ChannelResponse.Registered rsp) {
093                    if (rsp.getInterestOps() == SelectionKey.OP_ACCEPT) {
094                            System.out.println("******** server started; now listening *****");
095                    }
096            }
097    
098            private void shutDown(ChannelResponse rsp) {
099                    System.out.println("now shutting down...");
100                    this.getStage().stop();
101                    rsp.getAgent().enqueue(new AdminRequest.Stop());
102            }
103    
104    }