001    /*
002     * Copyright (c) 2003, The Regents of the University of California, through
003     * Lawrence Berkeley National Laboratory (subject to receipt of any required
004     * approvals from the U.S. Dept. of Energy). All rights reserved.
005     */
006    package gov.lbl.dsd.sea.nio.demo;
007    import gov.lbl.dsd.sea.nio.NetAgent;
008    import gov.lbl.dsd.sea.nio.util.BufferPool;
009    import gov.lbl.dsd.sea.nio.util.SocketOpts;
010    
011    import java.io.IOException;
012    import java.net.InetSocketAddress;
013    import java.net.SocketException;
014    import java.nio.ByteBuffer;
015    import java.nio.channels.SelectionKey;
016    import java.nio.channels.SocketChannel;
017    
018    /**
019     * Simple single-threaded ping pong benchmark.
020     * 
021     * @author whoschek@lbl.gov
022     * @author $Author: hoschek3 $
023     * @version $Revision: 1.1 $, $Date: 2004/06/15 00:36:05 $
024     */
025    public class XSinglePingPongBench extends NetAgent {
026    
027            private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(XSinglePingPongBench.class);
028    
029            protected int chunkSize = 8192;
030            protected long todo = 10;
031            protected long done = 0;
032            protected boolean isClient = true;
033            protected boolean autoClose = true;
034            protected long start;
035            
036            protected long reads = 0;
037            protected long partialWrites = 0;
038            
039            static public void main(String args[]) throws IOException {
040                    new XSinglePingPongBench(args);
041            }
042            
043            public XSinglePingPongBench(String args[]) throws IOException {
044                    this.setFailFast(false);
045                    
046                    String hostName = "localhost";
047                    int port = 9000;
048                    
049                    int k=-1;
050                    if (args.length > ++k) {
051                            isClient = args[k].equals("client");
052                    }
053                    if (isClient && args.length > ++k) {            
054                            hostName = args[k]; 
055                    }
056                    if (args.length > ++k) {
057                            port = Integer.parseInt(args[k]);                       
058                    }
059                    if (isClient && args.length > ++k) {
060                            chunkSize = Math.round(1024 * Float.parseFloat(args[k]));                       
061                    }
062                    if (isClient && args.length > ++k) {
063                            todo = Math.round(1024 * Float.parseFloat(args[k]));            
064                    }
065                    if (args.length > ++k) {
066                            // setting a buffer pool is a purely optional performance optimization
067                            int n = 1024 * Integer.parseInt(args[k]);
068                            this.setReadBufferPool(new BufferPool(15 * n, n, true, null));                  
069                            //agent.setReadBufferCapacity(1024 * Integer.parseInt(args[k]));                        
070                    }
071                    
072                    SocketOpts options = new SocketOpts();
073                    if (args.length > ++k) {
074                            int val = Math.round(1024 * Float.parseFloat(args[k]));
075                            options.setOption(SocketOpts.SO_RCVBUF, new Integer(val));                      
076                            options.setOption(SocketOpts.SO_SNDBUF, new Integer(val));                      
077                    }
078                    
079                    if (args.length > ++k) {
080                            this.autoClose = new Boolean(args[k]).booleanValue();
081                    }
082                    this.setAutoClose(this.autoClose);      
083                    log.error("using autoClose=" + autoClose);
084    
085                    if (args.length > ++k) {
086                            options.setOption(SocketOpts.TCP_NODELAY, new Boolean(args[k]));                        
087                    }
088                    this.setSocketOptions(options);
089    
090                    if (isClient) {
091                            this.addConnectAddress(null, new InetSocketAddress(hostName, port));
092                    }
093                    else {
094                            this.addListenPort(null, port);
095                    }
096    
097                    this.start();
098            }
099    
100            protected void onConnectDone(SelectionKey key, IOException exception) {
101                    super.onConnectDone(key, exception);
102                    key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
103                    ByteBuffer buf = ByteBuffer.allocateDirect(chunkSize);
104                    //ByteBuffer buf = NioUtil.toAsciiByteBuffer("hello");
105                    
106                    try {
107                            log.error("socketOpts=" + new SocketOpts(((SocketChannel) key.channel()).socket()));
108                    } catch (SocketException e) {
109                            throw new RuntimeException(e);
110                    }
111                    this.start = System.currentTimeMillis();
112    
113                    this.onWriteRequest(key, buf);
114                    //this.onWriteRequest(key, NioUtil.toAsciiByteBuffer(createHTTPRequest()));
115            }
116    
117            protected void onAcceptDone(SelectionKey key, IOException exception) {
118                    super.onAcceptDone(key, exception);
119                    key.interestOps(SelectionKey.OP_READ);
120                    try {
121                            log.error("socketOpts=" + new SocketOpts(((SocketChannel) key.channel()).socket()));
122                    } catch (SocketException e) {
123                            throw new RuntimeException(e);
124                    }
125                    this.start = System.currentTimeMillis();
126            }
127            
128            protected void onCloseDone(SelectionKey key, IOException exception) {
129                    super.onCloseDone(key, exception);
130                    if (!isClient) this.printStats();
131                    if (isClient) this.shutDown();
132            }
133    
134            protected void onReadDone(SelectionKey key, ByteBuffer buffer, IOException exception) {
135                    super.onReadDone(key, buffer, exception);
136                    this.reads++;
137                    //byte[] bytes = NioUtil.toByteArray(buffer);
138                    //String str = new String(bytes);
139                    //log.error("******************** read ***********\n" + str);
140                    done += buffer.limit();
141                    log.warn("########### done reads so far = " + done);
142                    if (isClient && done > todo) {
143                            if (this.start < 0) return; // avoid reporting more than once (possible if multiple Reads responses are in flight) 
144                            this.printStats();
145                            this.start = -1;
146                            this.shutDown();
147                    }
148                    else {
149                            // echo the same data back to client                                    
150                            boolean wantPartialWriteResponses = false;
151                            
152                            ByteBuffer writeBuf = buffer;
153                            //writeBuf.flip();
154                            
155                            if (wantPartialWriteResponses) writeBuf.mark(); // marking indicates we want to get partial write reponses
156                            
157                            this.onWriteRequest(key, writeBuf);
158                            log.info("*************writeData2 enqueued");   
159                    }
160            }
161    
162            protected void onWriteDone(SelectionKey key, ByteBuffer buffer, int n, IOException exception) {
163                    super.onWriteDone(key, buffer, n, exception);
164                    ByteBuffer buf = buffer;
165                    log.warn("******************** total written so far = " + buf.position() + " bytes");
166                    if (buf.position() < buf.limit()) { // we got a partial write
167                            this.partialWrites++;
168                            log.info(buf.limit() - buf.position() + " bytes left to be written");
169                    }
170                    else { // we got a full write; 
171                            log.info("all written");
172                            // recycling to buffer pool is a purely optional performance optimization
173                            this.getReadBufferPool().put(buf);
174                    }
175            }
176            
177            protected String createHTTPRequest() {
178                    String str = "GET / HTTP/1.0" + "\r\n" + "\r\n";
179                    return str;
180            }
181    
182            private void shutDown() {
183                    log.error("now shutting down demo");
184                    this.stop();            
185            }
186    
187            private void printStats() {
188                    long realdone = done * 2; // we have written and read those bytes
189                    long realreads = this.reads * 2; // we have written and read those bytes
190                    long time = System.currentTimeMillis() - start;
191                    System.out.println(realdone / (1024.0f * 1024) + " MB echoed back and forth");
192                    System.out.println(realdone + " bytes echoed back and forth");
193                    System.out.println(time / 1000.0f + " seconds");
194                    System.out.println(1.0f * realdone / ( 1024 * time) + " MB/s throughput");
195                    System.out.println();
196                    System.out.println(realreads + " messages");
197                    System.out.println(realreads / (time / 1000.0f) + " messages/s");
198                    System.out.println(this.partialWrites + " partial writes");
199                    System.out.println("bufferPool= " + this.getReadBufferPool());
200            }       
201    
202    }