001    /*
002     * Copyright (c) 2003, The Regents of the University of California, through
003     * Lawrence Berkeley National Laboratory (subject to receipt of any required
004     * approvals from the U.S. Dept. of Energy). All rights reserved.
005     */
006    package gov.lbl.dsd.sea.nio.demo;
007    
008    import gov.lbl.dsd.sea.ExecutorFactory;
009    import gov.lbl.dsd.sea.Stage;
010    import gov.lbl.dsd.sea.StageManager;
011    import gov.lbl.dsd.sea.nio.AgentEventHandler;
012    import gov.lbl.dsd.sea.nio.NetAgent;
013    import gov.lbl.dsd.sea.nio.event.AdminRequest;
014    import gov.lbl.dsd.sea.nio.event.ChannelRequest;
015    import gov.lbl.dsd.sea.nio.event.ChannelResponse;
016    import gov.lbl.dsd.sea.nio.util.ByteBufferPool;
017    import gov.lbl.dsd.sea.nio.util.SocketOpts;
018    
019    import java.io.IOException;
020    import java.net.InetSocketAddress;
021    import java.net.SocketException;
022    import java.nio.ByteBuffer;
023    import java.nio.channels.SelectionKey;
024    import java.nio.channels.SocketChannel;
025    
026    /**
027     * TODO. Asynchronous non-blocking streaming benchmark; Sends messages back and forth
028     * between client and server and measures throughput.
029     * <p>
030     * Example server usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.StreamingBench
031     * server 9000 2000 2000
032     * <p>
033     * Example client usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.StreamingBench
034     * client localhost 9000 2000 1000000 2000 2000 
035     * <p>
036     * Example server usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.StreamingBench
037     * server 9000 1 1
038     * <p>
039     * Example client usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.StreamingBench
040     * client localhost 9000 0.003 600 1 1 
041     * <p>
042     * Using localhost with large messages should report on the order of 190 MB/s throughput 
043     * (appropriately large packet sizes and TCP buffer sizes are critical).
044     * <p>
045     * Using localhost with micro messages should report on the order of 30000 messages/s throughput.
046     * <p>
047     * Set log level to ERROR to avoid logging becoming the bottleneck!
048     * 
049     * @author whoschek@lbl.gov
050     * @author $Author: gegles $
051     * @version $Revision: 1.5 $, $Date: 2004/09/16 16:57:15 $
052     */
053    public class StreamingBench extends AgentEventHandler {
054    
055            private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(StreamingBench.class);
056    
057            protected int chunkSize = 8192;
058            protected long todo = 10;
059            protected long done = 0;
060            protected boolean isClient = true;
061            protected long startTime;
062            protected long endTime;
063            
064            protected long reads = 0;
065            
066            static public void main(String args[]) throws IOException {
067                    new StreamingBench(args);
068            }
069            
070            public StreamingBench(String args[]) throws IOException {
071                    NetAgent agent = new NetAgent();
072                    
073                    String hostName = "localhost";
074                    int port = 9000;
075                    
076                    int k=-1;
077                    if (args.length > ++k) {
078                            isClient = args[k].equals("client");
079                    }
080                    if (isClient && args.length > ++k) {            
081                            hostName = args[k]; 
082                    }
083                    if (args.length > ++k) {
084                            port = Integer.parseInt(args[k]);                       
085                    }
086                    //if (isClient && args.length > ++k) {
087                    if (args.length > ++k) {
088                            chunkSize = Math.round(1024 * Float.parseFloat(args[k]));                       
089                    }
090                    if (isClient && args.length > ++k) {
091                            todo = Math.round(1024 * Float.parseFloat(args[k]));            
092                    }
093                    if (args.length > ++k) {
094                            // setting a buffer pool is a purely optional performance optimization
095                            int n = Math.round(1024 * Float.parseFloat(args[k]));
096                            agent.setReadBufferPool(new ByteBufferPool(5 * n, n, true, null));                      
097                    }
098                    
099                    SocketOpts options = new SocketOpts();
100                    if (args.length > ++k) {
101                            int n = Math.round(1024 * Float.parseFloat(args[k]));
102                            options.setOption(SocketOpts.SO_RCVBUF, new Integer(n));                        
103                            options.setOption(SocketOpts.SO_SNDBUF, new Integer(n));                        
104                    }
105                    
106                    if (args.length > ++k) {
107                            options.setOption(SocketOpts.TCP_NODELAY, new Boolean(args[k]));                        
108                    }
109                    agent.setSocketOptions(options);
110    
111                    ExecutorFactory execFactory = StageManager.QUEUED;
112                    if (args.length > ++k) {
113                            if (args[k].equals("single")) execFactory = StageManager.DIRECT;
114                    }
115                    Stage myStage = new StageManager(execFactory).createStage(this).start();        
116    
117                    if (isClient) {
118                            agent.addConnectAddress(myStage, new InetSocketAddress(hostName, port));
119                    }
120                    else {
121                            agent.addListenPort(myStage, port);
122                    }
123    
124                    agent.start();
125            }
126    
127            protected void onClosed(ChannelResponse.Closed rsp) {
128                    log.error("*************got channel closed=" + rsp);
129                    if (rsp.getException() != null) { 
130                            ; // nothing special to be done
131                    }
132                    if (isClient) {
133                            this.shutDown(rsp);
134                            if (this.endTime > 0) {
135                                    this.printStats(rsp, endTime - startTime);
136                                    try {
137                                            Thread.sleep(100);
138                                    } catch (InterruptedException e) {
139                                            // TODO Auto-generated catch block
140                                            e.printStackTrace();
141                                    }
142                                    this.printStats(rsp, endTime - startTime);
143                            }
144                    }
145                    else {
146                            this.endTime = System.currentTimeMillis();
147                            this.printStats(rsp, endTime - startTime); // no problem; continueing
148                            try {
149                                    Thread.sleep(100);
150                            } catch (InterruptedException e) {
151                                    // TODO Auto-generated catch block
152                                    e.printStackTrace();
153                            }
154                            this.printStats(rsp, endTime - startTime); // no problem; continueing
155                            this.endTime = 0;
156                    }               
157            }
158    
159            protected void onAccepted(ChannelResponse.Accepted rsp) {
160                    // server has accepted new connection from remote client
161                    // register read interest with new socket channel
162                    // this triggers future ChannelResponse.Read events to be delivered to us
163                    log.error("*************got accepted=" + rsp);
164                    try {
165                            log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket()));
166                    } catch (SocketException e) {
167                            throw new RuntimeException(e);
168                    }
169                    this.startTime = System.currentTimeMillis();
170                    ByteBuffer buf = ByteBuffer.allocateDirect(chunkSize);
171                    //ByteBuffer buf = NioUtil.toAsciiByteBuffer("hello");
172    
173                    rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ));
174                    rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), buf));
175                    log.error("*************enqueued register");
176            }
177    
178            protected void onConnected(ChannelResponse.Connected rsp) {
179                    log.error("*************got connected=" + rsp);
180                    
181                    try {
182                            log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket()));
183                    } catch (SocketException e) {
184                            throw new RuntimeException(e);
185                    }
186    
187                    this.startTime = System.currentTimeMillis();
188                    ByteBuffer buf = ByteBuffer.allocateDirect(chunkSize);
189                    //ByteBuffer buf = NioUtil.toAsciiByteBuffer("hello");
190                    
191                    rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ));
192                    rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), buf));
193                    log.info("*************writeData1 enqueued");                           
194            }
195    
196            protected void onRegistered(ChannelResponse.Registered rsp) {
197                    log.error("*************got registered=" + rsp);
198            }
199    
200            protected void onRead(ChannelResponse.Read rsp) {
201                    // we have read some data
202                    this.reads++;
203                    //byte[] bytes = NioUtil.toByteArray(rsp.getBuffer());
204                    //String str = new String(bytes);
205                    //log.error("******************** read ***********\n" + str);
206                    done += rsp.getBuffer().remaining();
207                    log.info("########### done reads so far = " + done);
208                    if (isClient && done > todo) {
209                            if (this.endTime > 0) {
210                                    done -= rsp.getBuffer().remaining();
211                            }
212                            else {
213                                    this.endTime = System.currentTimeMillis();
214                                    rsp.getAgent().enqueue(new ChannelRequest.Close(rsp.getKey().channel()));
215                            }
216                    }
217                    // recycling to buffer pool is a purely optional performance optimization
218                    rsp.getAgent().getReadBufferPool().put(rsp.getBuffer());
219            }
220    
221            protected void onWrite(ChannelResponse.Write rsp) {
222                    // we have written some data
223                    ByteBuffer buf = rsp.getBuffer();
224                    log.info("******************** wrote all " + rsp.getBuffer().limit() + " bytes");
225                    ByteBuffer writeBuffer = rsp.getBuffer();
226                    writeBuffer.flip();
227                    rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), writeBuffer));
228                    log.info("*************writeData2 enqueued");   
229            }
230    
231            private void shutDown(ChannelResponse rsp) {
232                    log.error("now shutting down demo");
233                    this.getStage().stop();         
234                    rsp.getAgent().enqueue(new AdminRequest.Stop());
235            }
236    
237            private void printStats(ChannelResponse rsp, long time) {
238                    long realdone = this.done * 2; // we have written and read those bytes
239                    long realreads = this.reads * 2; // we have written and read those bytes
240                    System.out.println("Summary statistics:");
241                    System.out.println("*******************");
242                    System.out.println(realdone / (1024.0f * 1024) + " MB echoed back and forth");
243                    System.out.println(realdone + " bytes echoed back and forth");
244                    System.out.println(time / 1000.0f + " seconds");
245                    System.out.println(1.0f * realdone / ( 1024 * time) + " MB/s throughput");
246                    System.out.println();
247                    System.out.println(realreads + " messages");
248                    System.out.println(realreads / (time / 1000.0f) + " messages/s");
249                    System.out.println("\nagent=" + rsp.getAgent().toDebugString());
250            }       
251    
252    }