001    /*
002     * Copyright (c) 2003, The Regents of the University of California, through
003     * Lawrence Berkeley National Laboratory (subject to receipt of any required
004     * approvals from the U.S. Dept. of Energy). All rights reserved.
005     */
006    package gov.lbl.dsd.sea.nio.demo;
007    
008    import gov.lbl.dsd.sea.ExecutorFactory;
009    import gov.lbl.dsd.sea.Stage;
010    import gov.lbl.dsd.sea.StageManager;
011    import gov.lbl.dsd.sea.nio.AgentEventHandler;
012    import gov.lbl.dsd.sea.nio.NetAgent;
013    import gov.lbl.dsd.sea.nio.event.AdminRequest;
014    import gov.lbl.dsd.sea.nio.event.ChannelRequest;
015    import gov.lbl.dsd.sea.nio.event.ChannelResponse;
016    
017    import java.io.IOException;
018    import java.net.InetSocketAddress;
019    import java.nio.ByteBuffer;
020    import java.nio.channels.SelectionKey;
021    import java.nio.charset.Charset;
022    
023    /**
024     * Simple non-blocking hello world client and server, echoing
025     * messages back and forth.
026     * <p>
027     * Example server usage: fire-java gov.lbl.dsd.sea.nio.demo.PingPong server
028     * 9000
029     * <p>
030     * Example client usage: fire-java gov.lbl.dsd.sea.nio.demo.PingPong client
031     * localhost 9000
032     * 
033     * @author whoschek@lbl.gov
034     * @author $Author: gegles $
035     * @version $Revision: 1.4 $, $Date: 2004/09/16 16:57:15 $
036     */
037    public class PingPong extends AgentEventHandler {
038    
039            protected boolean isClient = true; // am I a client or a server?
040            private static final Charset CHARSET = Charset.forName("US-ASCII");
041    
042            private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
043                            .getLog(PingPong.class);
044    
045            static public void main(String args[]) throws IOException {
046                    new PingPong(args);
047            }
048    
049            public PingPong(String args[]) throws IOException {
050                    NetAgent agent = new NetAgent();
051    
052                    String hostName = "localhost";
053                    int port = 9000;
054    
055                    int k = -1;
056                    if (args.length > ++k) {
057                            isClient = args[k].equals("client");
058                    }
059                    if (isClient && args.length > ++k) {
060                            hostName = args[k];
061                    }
062                    if (args.length > ++k) {
063                            port = Integer.parseInt(args[k]);
064                    }
065    
066                    ExecutorFactory execFactory = StageManager.QUEUED;
067                    if (args.length > ++k) {
068                            if (args[k].equals("single")) execFactory = StageManager.DIRECT;
069                    }               
070                    Stage myStage = new StageManager(execFactory).createStage(this).start();
071    
072                    if (isClient) {
073                            agent.addConnectAddress(myStage, new InetSocketAddress(hostName,
074                                            port));
075                    } else {
076                            agent.addListenPort(myStage, port);
077                    }
078    
079                    agent.start();
080            }
081    
082            protected void onClosed(ChannelResponse.Closed rsp) {
083                    log.info("*************got channel closed=" + rsp);
084                    if (rsp.getException() != null) {
085                            ; // nothing special to be done
086                    }
087                    if (isClient) {
088                            this.shutDown(rsp);
089                    } else {
090                            // no problem; continueing
091                    }
092                    System.out.println("agent status=" + rsp.getAgent().toDebugString());
093            }
094    
095            protected void onConnected(ChannelResponse.Connected rsp) {
096                    ByteBuffer buffer = CHARSET.encode("hello world");
097                    rsp.getAgent().enqueue(
098                                    new ChannelRequest.Register(this.getStage(), rsp.getKey()
099                                                    .channel(), SelectionKey.OP_READ));
100                    rsp.getAgent().enqueue(
101                                    new ChannelRequest.WriteData(rsp.getKey().channel(), buffer));
102            }
103    
104            protected void onAccepted(ChannelResponse.Accepted rsp) {
105                    rsp.getAgent().enqueue(
106                                    new ChannelRequest.Register(this.getStage(), rsp.getKey()
107                                                    .channel(), SelectionKey.OP_READ));
108            }
109    
110            protected void onRead(ChannelResponse.Read rsp) {
111                    String str = CHARSET.decode(rsp.getBuffer()).toString();
112                    System.out.println("********************  read *********** " + str);
113                    
114                    try { // artificially slow down to better see progress on terminal
115                            Thread.sleep(500);
116                    } catch (InterruptedException e) {
117                            throw new RuntimeException(e);
118                    }
119                    
120                    ByteBuffer writeBuffer = CHARSET.encode(str + ".");
121                    
122                    // recycling to buffer pool is an optional optimization
123                    rsp.getAgent().getReadBufferPool().put(rsp.getBuffer());
124    
125                    rsp.getAgent().enqueue(
126                                    new ChannelRequest.WriteData(rsp.getKey().channel(), writeBuffer));
127            }
128    
129            protected void onWrite(ChannelResponse.Write rsp) {
130                    rsp.getBuffer().flip();
131                    System.out.println("*************** fully wrote *********** "
132                                    + CHARSET.decode(rsp.getBuffer()).toString());
133    
134                    // recycling to buffer pool is an optional optimization
135                    rsp.getAgent().getReadBufferPool().put(rsp.getBuffer());
136            }
137    
138            protected void onRegistered(ChannelResponse.Registered rsp) {
139                    if (rsp.getInterestOps() == SelectionKey.OP_ACCEPT) {
140                            System.out.println("******** server started; now listening *****");
141                    }
142            }
143    
144            private void shutDown(ChannelResponse rsp) {
145                    System.out.println("now shutting down...");
146                    this.getStage().stop();
147                    rsp.getAgent().enqueue(new AdminRequest.Stop());
148            }
149    
150    }