001 /* 002 * Copyright (c) 2003, The Regents of the University of California, through 003 * Lawrence Berkeley National Laboratory (subject to receipt of any required 004 * approvals from the U.S. Dept. of Energy). All rights reserved. 005 */ 006 package gov.lbl.dsd.sea.nio.demo; 007 008 import gov.lbl.dsd.sea.ExecutorFactory; 009 import gov.lbl.dsd.sea.Stage; 010 import gov.lbl.dsd.sea.StageManager; 011 import gov.lbl.dsd.sea.nio.AgentEventHandler; 012 import gov.lbl.dsd.sea.nio.NetAgent; 013 import gov.lbl.dsd.sea.nio.event.AdminRequest; 014 import gov.lbl.dsd.sea.nio.event.ChannelRequest; 015 import gov.lbl.dsd.sea.nio.event.ChannelResponse; 016 import gov.lbl.dsd.sea.nio.util.ByteBufferPool; 017 import gov.lbl.dsd.sea.nio.util.SocketOpts; 018 019 import java.io.IOException; 020 import java.net.InetSocketAddress; 021 import java.net.SocketException; 022 import java.nio.ByteBuffer; 023 import java.nio.channels.SelectionKey; 024 import java.nio.channels.SocketChannel; 025 026 /** 027 * TODO. Asynchronous non-blocking streaming benchmark; Sends messages back and forth 028 * between client and server and measures throughput. 029 * <p> 030 * Example server usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.StreamingBench 031 * server 9000 2000 2000 032 * <p> 033 * Example client usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.StreamingBench 034 * client localhost 9000 2000 1000000 2000 2000 035 * <p> 036 * Example server usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.StreamingBench 037 * server 9000 1 1 038 * <p> 039 * Example client usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.StreamingBench 040 * client localhost 9000 0.003 600 1 1 041 * <p> 042 * Using localhost with large messages should report on the order of 190 MB/s throughput 043 * (appropriately large packet sizes and TCP buffer sizes are critical). 044 * <p> 045 * Using localhost with micro messages should report on the order of 30000 messages/s throughput. 046 * <p> 047 * Set log level to ERROR to avoid logging becoming the bottleneck! 048 * 049 * @author whoschek@lbl.gov 050 * @author $Author: gegles $ 051 * @version $Revision: 1.5 $, $Date: 2004/09/16 16:57:15 $ 052 */ 053 public class StreamingBench extends AgentEventHandler { 054 055 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(StreamingBench.class); 056 057 protected int chunkSize = 8192; 058 protected long todo = 10; 059 protected long done = 0; 060 protected boolean isClient = true; 061 protected long startTime; 062 protected long endTime; 063 064 protected long reads = 0; 065 066 static public void main(String args[]) throws IOException { 067 new StreamingBench(args); 068 } 069 070 public StreamingBench(String args[]) throws IOException { 071 NetAgent agent = new NetAgent(); 072 073 String hostName = "localhost"; 074 int port = 9000; 075 076 int k=-1; 077 if (args.length > ++k) { 078 isClient = args[k].equals("client"); 079 } 080 if (isClient && args.length > ++k) { 081 hostName = args[k]; 082 } 083 if (args.length > ++k) { 084 port = Integer.parseInt(args[k]); 085 } 086 //if (isClient && args.length > ++k) { 087 if (args.length > ++k) { 088 chunkSize = Math.round(1024 * Float.parseFloat(args[k])); 089 } 090 if (isClient && args.length > ++k) { 091 todo = Math.round(1024 * Float.parseFloat(args[k])); 092 } 093 if (args.length > ++k) { 094 // setting a buffer pool is a purely optional performance optimization 095 int n = Math.round(1024 * Float.parseFloat(args[k])); 096 agent.setReadBufferPool(new ByteBufferPool(5 * n, n, true, null)); 097 } 098 099 SocketOpts options = new SocketOpts(); 100 if (args.length > ++k) { 101 int n = Math.round(1024 * Float.parseFloat(args[k])); 102 options.setOption(SocketOpts.SO_RCVBUF, new Integer(n)); 103 options.setOption(SocketOpts.SO_SNDBUF, new Integer(n)); 104 } 105 106 if (args.length > ++k) { 107 options.setOption(SocketOpts.TCP_NODELAY, new Boolean(args[k])); 108 } 109 agent.setSocketOptions(options); 110 111 ExecutorFactory execFactory = StageManager.QUEUED; 112 if (args.length > ++k) { 113 if (args[k].equals("single")) execFactory = StageManager.DIRECT; 114 } 115 Stage myStage = new StageManager(execFactory).createStage(this).start(); 116 117 if (isClient) { 118 agent.addConnectAddress(myStage, new InetSocketAddress(hostName, port)); 119 } 120 else { 121 agent.addListenPort(myStage, port); 122 } 123 124 agent.start(); 125 } 126 127 protected void onClosed(ChannelResponse.Closed rsp) { 128 log.error("*************got channel closed=" + rsp); 129 if (rsp.getException() != null) { 130 ; // nothing special to be done 131 } 132 if (isClient) { 133 this.shutDown(rsp); 134 if (this.endTime > 0) { 135 this.printStats(rsp, endTime - startTime); 136 try { 137 Thread.sleep(100); 138 } catch (InterruptedException e) { 139 // TODO Auto-generated catch block 140 e.printStackTrace(); 141 } 142 this.printStats(rsp, endTime - startTime); 143 } 144 } 145 else { 146 this.endTime = System.currentTimeMillis(); 147 this.printStats(rsp, endTime - startTime); // no problem; continueing 148 try { 149 Thread.sleep(100); 150 } catch (InterruptedException e) { 151 // TODO Auto-generated catch block 152 e.printStackTrace(); 153 } 154 this.printStats(rsp, endTime - startTime); // no problem; continueing 155 this.endTime = 0; 156 } 157 } 158 159 protected void onAccepted(ChannelResponse.Accepted rsp) { 160 // server has accepted new connection from remote client 161 // register read interest with new socket channel 162 // this triggers future ChannelResponse.Read events to be delivered to us 163 log.error("*************got accepted=" + rsp); 164 try { 165 log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket())); 166 } catch (SocketException e) { 167 throw new RuntimeException(e); 168 } 169 this.startTime = System.currentTimeMillis(); 170 ByteBuffer buf = ByteBuffer.allocateDirect(chunkSize); 171 //ByteBuffer buf = NioUtil.toAsciiByteBuffer("hello"); 172 173 rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ)); 174 rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), buf)); 175 log.error("*************enqueued register"); 176 } 177 178 protected void onConnected(ChannelResponse.Connected rsp) { 179 log.error("*************got connected=" + rsp); 180 181 try { 182 log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket())); 183 } catch (SocketException e) { 184 throw new RuntimeException(e); 185 } 186 187 this.startTime = System.currentTimeMillis(); 188 ByteBuffer buf = ByteBuffer.allocateDirect(chunkSize); 189 //ByteBuffer buf = NioUtil.toAsciiByteBuffer("hello"); 190 191 rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ)); 192 rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), buf)); 193 log.info("*************writeData1 enqueued"); 194 } 195 196 protected void onRegistered(ChannelResponse.Registered rsp) { 197 log.error("*************got registered=" + rsp); 198 } 199 200 protected void onRead(ChannelResponse.Read rsp) { 201 // we have read some data 202 this.reads++; 203 //byte[] bytes = NioUtil.toByteArray(rsp.getBuffer()); 204 //String str = new String(bytes); 205 //log.error("******************** read ***********\n" + str); 206 done += rsp.getBuffer().remaining(); 207 log.info("########### done reads so far = " + done); 208 if (isClient && done > todo) { 209 if (this.endTime > 0) { 210 done -= rsp.getBuffer().remaining(); 211 } 212 else { 213 this.endTime = System.currentTimeMillis(); 214 rsp.getAgent().enqueue(new ChannelRequest.Close(rsp.getKey().channel())); 215 } 216 } 217 // recycling to buffer pool is a purely optional performance optimization 218 rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); 219 } 220 221 protected void onWrite(ChannelResponse.Write rsp) { 222 // we have written some data 223 ByteBuffer buf = rsp.getBuffer(); 224 log.info("******************** wrote all " + rsp.getBuffer().limit() + " bytes"); 225 ByteBuffer writeBuffer = rsp.getBuffer(); 226 writeBuffer.flip(); 227 rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), writeBuffer)); 228 log.info("*************writeData2 enqueued"); 229 } 230 231 private void shutDown(ChannelResponse rsp) { 232 log.error("now shutting down demo"); 233 this.getStage().stop(); 234 rsp.getAgent().enqueue(new AdminRequest.Stop()); 235 } 236 237 private void printStats(ChannelResponse rsp, long time) { 238 long realdone = this.done * 2; // we have written and read those bytes 239 long realreads = this.reads * 2; // we have written and read those bytes 240 System.out.println("Summary statistics:"); 241 System.out.println("*******************"); 242 System.out.println(realdone / (1024.0f * 1024) + " MB echoed back and forth"); 243 System.out.println(realdone + " bytes echoed back and forth"); 244 System.out.println(time / 1000.0f + " seconds"); 245 System.out.println(1.0f * realdone / ( 1024 * time) + " MB/s throughput"); 246 System.out.println(); 247 System.out.println(realreads + " messages"); 248 System.out.println(realreads / (time / 1000.0f) + " messages/s"); 249 System.out.println("\nagent=" + rsp.getAgent().toDebugString()); 250 } 251 252 }