001    /*
002     * Copyright (c) 2003, The Regents of the University of California, through
003     * Lawrence Berkeley National Laboratory (subject to receipt of any required
004     * approvals from the U.S. Dept. of Energy). All rights reserved.
005     */
006    package gov.lbl.dsd.sea.nio.demo;
007    
008    import gov.lbl.dsd.sea.ExecutorFactory;
009    import gov.lbl.dsd.sea.Stage;
010    import gov.lbl.dsd.sea.StageManager;
011    import gov.lbl.dsd.sea.nio.AgentEventHandler;
012    import gov.lbl.dsd.sea.nio.NetAgent;
013    import gov.lbl.dsd.sea.nio.event.AdminRequest;
014    import gov.lbl.dsd.sea.nio.event.ChannelRequest;
015    import gov.lbl.dsd.sea.nio.event.ChannelResponse;
016    import gov.lbl.dsd.sea.nio.util.ByteBufferPool;
017    import gov.lbl.dsd.sea.nio.util.SocketOpts;
018    
019    import java.io.IOException;
020    import java.net.InetSocketAddress;
021    import java.net.SocketException;
022    import java.nio.ByteBuffer;
023    import java.nio.channels.SelectionKey;
024    import java.nio.channels.SocketChannel;
025    
026    /**
027     * Asynchronous non-blocking ping pong benchmark; Sends messages back and forth
028     * between client and server and measures throughput.
029     * <p>
030     * Example server usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench
031     * server 9000 2000 2000
032     * <p>
033     * Example client usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench
034     * client localhost 9000 2000 1000000 2000 2000 
035     * <p>
036     * Example server usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench
037     * server 9000 1 1
038     * <p>
039     * Example client usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench
040     * client localhost 9000 0.003 600 1 1 
041     * <p>
042     * Using localhost with large messages should report on the order of 190 MB/s throughput 
043     * (appropriately large packet sizes and TCP buffer sizes are critical).
044     * <p>
045     * Using localhost with micro messages should report on the order of 30000 messages/s throughput.
046     * <p>
047     * Set log level to ERROR to avoid logging becoming the bottleneck!
048     * 
049     * @author whoschek@lbl.gov
050     * @author $Author: gegles $
051     * @version $Revision: 1.5 $, $Date: 2004/09/16 16:57:15 $
052     */
053    public class PingPongBench extends AgentEventHandler {
054    
055            private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(PingPongBench.class);
056    
057            protected int chunkSize = 8192;
058            protected long todo = 10;
059            protected long done = 0;
060            protected boolean isClient = true;
061            protected long startTime;
062            protected long endTime;
063            
064            protected long reads = 0;
065            
066            static public void main(String args[]) throws IOException {
067                    new PingPongBench(args);
068            }
069            
070            public PingPongBench(String args[]) throws IOException {
071                    NetAgent agent = new NetAgent();
072                    
073                    String hostName = "localhost";
074                    int port = 9000;
075                    
076                    int k=-1;
077                    if (args.length > ++k) {
078                            isClient = args[k].equals("client");
079                    }
080                    if (isClient && args.length > ++k) {            
081                            hostName = args[k]; 
082                    }
083                    if (args.length > ++k) {
084                            port = Integer.parseInt(args[k]);                       
085                    }
086                    if (isClient && args.length > ++k) {
087                            chunkSize = Math.round(1024 * Float.parseFloat(args[k]));                       
088                    }
089                    if (isClient && args.length > ++k) {
090                            todo = Math.round(1024 * Float.parseFloat(args[k]));            
091                    }
092                    if (args.length > ++k) {
093                            // setting a buffer pool is a purely optional performance optimization
094                            int n = Math.round(1024 * Float.parseFloat(args[k]));
095                            agent.setReadBufferPool(new ByteBufferPool(5 * n, n, true, null));                      
096                    }
097                    
098                    SocketOpts options = new SocketOpts();
099                    if (args.length > ++k) {
100                            int n = Math.round(1024 * Float.parseFloat(args[k]));
101                            options.setOption(SocketOpts.SO_RCVBUF, new Integer(n));                        
102                            options.setOption(SocketOpts.SO_SNDBUF, new Integer(n));                        
103                    }
104                    
105                    if (args.length > ++k) {
106                            options.setOption(SocketOpts.TCP_NODELAY, new Boolean(args[k]));                        
107                    }
108                    agent.setSocketOptions(options);
109    
110                    ExecutorFactory execFactory = StageManager.QUEUED;
111                    if (args.length > ++k) {
112                            if (args[k].equals("single")) execFactory = StageManager.DIRECT;
113                    }
114                    Stage myStage = new StageManager(execFactory).createStage(this).start();        
115    
116                    if (isClient) {
117                            agent.addConnectAddress(myStage, new InetSocketAddress(hostName, port));
118                    }
119                    else {
120                            agent.addListenPort(myStage, port);
121                    }
122    
123                    agent.start();
124            }
125    
126            protected void onAccepted(ChannelResponse.Accepted rsp) {
127                    // server has accepted new connection from remote client
128                    // register read interest with new socket channel
129                    // this triggers future ChannelResponse.Read events to be delivered to us
130                    log.error("*************got accepted=" + rsp);
131                    try {
132                            log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket()));
133                    } catch (SocketException e) {
134                            throw new RuntimeException(e);
135                    }
136                    this.startTime = System.currentTimeMillis();
137                    rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ));
138                    log.error("*************enqueued register");
139            }
140    
141            protected void onClosed(ChannelResponse.Closed rsp) {
142                    log.error("*************got channel closed=" + rsp);
143                    if (rsp.getException() != null) { 
144                            ; // nothing special to be done
145                    }
146                    if (isClient) {
147                            this.shutDown(rsp);
148                            if (this.endTime > 0) {
149                                    this.printStats(rsp, endTime - startTime);
150                                    try {
151                                            Thread.sleep(100);
152                                    } catch (InterruptedException e) {
153                                            // TODO Auto-generated catch block
154                                            e.printStackTrace();
155                                    }
156                                    this.printStats(rsp, endTime - startTime);
157                            }
158                    }
159                    else {
160                            this.endTime = System.currentTimeMillis();
161                            this.printStats(rsp, endTime - startTime); // no problem; continueing
162                            try {
163                                    Thread.sleep(100);
164                            } catch (InterruptedException e) {
165                                    // TODO Auto-generated catch block
166                                    e.printStackTrace();
167                            }
168                            this.printStats(rsp, endTime - startTime); // no problem; continueing
169                            this.endTime = 0;
170                    }               
171            }
172    
173            protected void onConnected(ChannelResponse.Connected rsp) {
174                    log.error("*************got connected=" + rsp);
175                    
176                    ByteBuffer buf = ByteBuffer.allocateDirect(chunkSize);
177                    //ByteBuffer buf = NioUtil.toAsciiByteBuffer("hello");
178                    
179                    try {
180                            log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket()));
181                    } catch (SocketException e) {
182                            throw new RuntimeException(e);
183                    }
184    
185                    this.startTime = System.currentTimeMillis();
186                    
187                    rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ));
188                    rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), buf));
189                    log.info("*************writeData1 enqueued");                           
190            }
191    
192            protected void onRegistered(ChannelResponse.Registered rsp) {
193                    log.error("*************got registered=" + rsp);
194            }
195    
196            protected void onRead(ChannelResponse.Read rsp) {
197                    // we have read some data
198                    this.reads++;
199                    //byte[] bytes = NioUtil.toByteArray(rsp.getBuffer());
200                    //String str = new String(bytes);
201                    //log.error("******************** read ***********\n" + str);
202                    done += rsp.getBuffer().remaining();
203                    log.info("########### done reads so far = " + done);
204                    if (isClient && done > todo) {
205                            if (this.endTime > 0) {
206                                    done -= rsp.getBuffer().remaining();
207                            }
208                            else {
209                                    this.endTime = System.currentTimeMillis();
210                                    rsp.getAgent().enqueue(new ChannelRequest.Close(rsp.getKey().channel()));
211                                    
212                            }
213                    }
214                    else {
215                            // echo the same data back to client                                    
216                            ByteBuffer writeBuf = rsp.getBuffer();
217                            rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), writeBuf));
218                            log.info("*************writeData2 enqueued");   
219                    }               
220            }
221    
222            protected void onWrite(ChannelResponse.Write rsp) {
223                    // we have written some data
224                    log.info("******************** wrote all " + rsp.getBuffer().limit() + " bytes");
225                    // recycling to buffer pool is a purely optional performance optimization
226                    rsp.getAgent().getReadBufferPool().put(rsp.getBuffer());
227            }
228    
229            private void shutDown(ChannelResponse rsp) {
230                    log.error("now shutting down demo");
231                    this.getStage().stop();         
232                    rsp.getAgent().enqueue(new AdminRequest.Stop());
233            }
234    
235            private void printStats(ChannelResponse rsp, long time) {
236                    long realdone = this.done * 2; // we have written and read those bytes
237                    long realreads = this.reads * 2; // we have written and read those bytes
238                    System.out.println("Summary statistics:");
239                    System.out.println("*******************");
240                    System.out.println(realdone / (1024.0f * 1024) + " MB echoed back and forth");
241                    System.out.println(realdone + " bytes echoed back and forth");
242                    System.out.println(time / 1000.0f + " seconds");
243                    System.out.println(1.0f * realdone / ( 1024 * time) + " MB/s throughput");
244                    System.out.println();
245                    System.out.println(realreads + " messages");
246                    System.out.println(realreads / (time / 1000.0f) + " messages/s");
247                    System.out.println("\nagent=" + rsp.getAgent().toDebugString());
248            }       
249    
250    }