001 /* 002 * Copyright (c) 2003, The Regents of the University of California, through 003 * Lawrence Berkeley National Laboratory (subject to receipt of any required 004 * approvals from the U.S. Dept. of Energy). All rights reserved. 005 */ 006 package gov.lbl.dsd.sea.nio.demo; 007 008 import gov.lbl.dsd.sea.ExecutorFactory; 009 import gov.lbl.dsd.sea.Stage; 010 import gov.lbl.dsd.sea.StageManager; 011 import gov.lbl.dsd.sea.nio.AgentEventHandler; 012 import gov.lbl.dsd.sea.nio.NetAgent; 013 import gov.lbl.dsd.sea.nio.event.AdminRequest; 014 import gov.lbl.dsd.sea.nio.event.ChannelRequest; 015 import gov.lbl.dsd.sea.nio.event.ChannelResponse; 016 import gov.lbl.dsd.sea.nio.util.ByteBufferPool; 017 import gov.lbl.dsd.sea.nio.util.SocketOpts; 018 019 import java.io.IOException; 020 import java.net.InetSocketAddress; 021 import java.net.SocketException; 022 import java.nio.ByteBuffer; 023 import java.nio.channels.SelectionKey; 024 import java.nio.channels.SocketChannel; 025 026 /** 027 * Asynchronous non-blocking ping pong benchmark; Sends messages back and forth 028 * between client and server and measures throughput. 029 * <p> 030 * Example server usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench 031 * server 9000 2000 2000 032 * <p> 033 * Example client usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench 034 * client localhost 9000 2000 1000000 2000 2000 035 * <p> 036 * Example server usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench 037 * server 9000 1 1 038 * <p> 039 * Example client usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench 040 * client localhost 9000 0.003 600 1 1 041 * <p> 042 * Using localhost with large messages should report on the order of 190 MB/s throughput 043 * (appropriately large packet sizes and TCP buffer sizes are critical). 044 * <p> 045 * Using localhost with micro messages should report on the order of 30000 messages/s throughput. 046 * <p> 047 * Set log level to ERROR to avoid logging becoming the bottleneck! 048 * 049 * @author whoschek@lbl.gov 050 * @author $Author: gegles $ 051 * @version $Revision: 1.5 $, $Date: 2004/09/16 16:57:15 $ 052 */ 053 public class PingPongBench extends AgentEventHandler { 054 055 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(PingPongBench.class); 056 057 protected int chunkSize = 8192; 058 protected long todo = 10; 059 protected long done = 0; 060 protected boolean isClient = true; 061 protected long startTime; 062 protected long endTime; 063 064 protected long reads = 0; 065 066 static public void main(String args[]) throws IOException { 067 new PingPongBench(args); 068 } 069 070 public PingPongBench(String args[]) throws IOException { 071 NetAgent agent = new NetAgent(); 072 073 String hostName = "localhost"; 074 int port = 9000; 075 076 int k=-1; 077 if (args.length > ++k) { 078 isClient = args[k].equals("client"); 079 } 080 if (isClient && args.length > ++k) { 081 hostName = args[k]; 082 } 083 if (args.length > ++k) { 084 port = Integer.parseInt(args[k]); 085 } 086 if (isClient && args.length > ++k) { 087 chunkSize = Math.round(1024 * Float.parseFloat(args[k])); 088 } 089 if (isClient && args.length > ++k) { 090 todo = Math.round(1024 * Float.parseFloat(args[k])); 091 } 092 if (args.length > ++k) { 093 // setting a buffer pool is a purely optional performance optimization 094 int n = Math.round(1024 * Float.parseFloat(args[k])); 095 agent.setReadBufferPool(new ByteBufferPool(5 * n, n, true, null)); 096 } 097 098 SocketOpts options = new SocketOpts(); 099 if (args.length > ++k) { 100 int n = Math.round(1024 * Float.parseFloat(args[k])); 101 options.setOption(SocketOpts.SO_RCVBUF, new Integer(n)); 102 options.setOption(SocketOpts.SO_SNDBUF, new Integer(n)); 103 } 104 105 if (args.length > ++k) { 106 options.setOption(SocketOpts.TCP_NODELAY, new Boolean(args[k])); 107 } 108 agent.setSocketOptions(options); 109 110 ExecutorFactory execFactory = StageManager.QUEUED; 111 if (args.length > ++k) { 112 if (args[k].equals("single")) execFactory = StageManager.DIRECT; 113 } 114 Stage myStage = new StageManager(execFactory).createStage(this).start(); 115 116 if (isClient) { 117 agent.addConnectAddress(myStage, new InetSocketAddress(hostName, port)); 118 } 119 else { 120 agent.addListenPort(myStage, port); 121 } 122 123 agent.start(); 124 } 125 126 protected void onAccepted(ChannelResponse.Accepted rsp) { 127 // server has accepted new connection from remote client 128 // register read interest with new socket channel 129 // this triggers future ChannelResponse.Read events to be delivered to us 130 log.error("*************got accepted=" + rsp); 131 try { 132 log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket())); 133 } catch (SocketException e) { 134 throw new RuntimeException(e); 135 } 136 this.startTime = System.currentTimeMillis(); 137 rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ)); 138 log.error("*************enqueued register"); 139 } 140 141 protected void onClosed(ChannelResponse.Closed rsp) { 142 log.error("*************got channel closed=" + rsp); 143 if (rsp.getException() != null) { 144 ; // nothing special to be done 145 } 146 if (isClient) { 147 this.shutDown(rsp); 148 if (this.endTime > 0) { 149 this.printStats(rsp, endTime - startTime); 150 try { 151 Thread.sleep(100); 152 } catch (InterruptedException e) { 153 // TODO Auto-generated catch block 154 e.printStackTrace(); 155 } 156 this.printStats(rsp, endTime - startTime); 157 } 158 } 159 else { 160 this.endTime = System.currentTimeMillis(); 161 this.printStats(rsp, endTime - startTime); // no problem; continueing 162 try { 163 Thread.sleep(100); 164 } catch (InterruptedException e) { 165 // TODO Auto-generated catch block 166 e.printStackTrace(); 167 } 168 this.printStats(rsp, endTime - startTime); // no problem; continueing 169 this.endTime = 0; 170 } 171 } 172 173 protected void onConnected(ChannelResponse.Connected rsp) { 174 log.error("*************got connected=" + rsp); 175 176 ByteBuffer buf = ByteBuffer.allocateDirect(chunkSize); 177 //ByteBuffer buf = NioUtil.toAsciiByteBuffer("hello"); 178 179 try { 180 log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket())); 181 } catch (SocketException e) { 182 throw new RuntimeException(e); 183 } 184 185 this.startTime = System.currentTimeMillis(); 186 187 rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ)); 188 rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), buf)); 189 log.info("*************writeData1 enqueued"); 190 } 191 192 protected void onRegistered(ChannelResponse.Registered rsp) { 193 log.error("*************got registered=" + rsp); 194 } 195 196 protected void onRead(ChannelResponse.Read rsp) { 197 // we have read some data 198 this.reads++; 199 //byte[] bytes = NioUtil.toByteArray(rsp.getBuffer()); 200 //String str = new String(bytes); 201 //log.error("******************** read ***********\n" + str); 202 done += rsp.getBuffer().remaining(); 203 log.info("########### done reads so far = " + done); 204 if (isClient && done > todo) { 205 if (this.endTime > 0) { 206 done -= rsp.getBuffer().remaining(); 207 } 208 else { 209 this.endTime = System.currentTimeMillis(); 210 rsp.getAgent().enqueue(new ChannelRequest.Close(rsp.getKey().channel())); 211 212 } 213 } 214 else { 215 // echo the same data back to client 216 ByteBuffer writeBuf = rsp.getBuffer(); 217 rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), writeBuf)); 218 log.info("*************writeData2 enqueued"); 219 } 220 } 221 222 protected void onWrite(ChannelResponse.Write rsp) { 223 // we have written some data 224 log.info("******************** wrote all " + rsp.getBuffer().limit() + " bytes"); 225 // recycling to buffer pool is a purely optional performance optimization 226 rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); 227 } 228 229 private void shutDown(ChannelResponse rsp) { 230 log.error("now shutting down demo"); 231 this.getStage().stop(); 232 rsp.getAgent().enqueue(new AdminRequest.Stop()); 233 } 234 235 private void printStats(ChannelResponse rsp, long time) { 236 long realdone = this.done * 2; // we have written and read those bytes 237 long realreads = this.reads * 2; // we have written and read those bytes 238 System.out.println("Summary statistics:"); 239 System.out.println("*******************"); 240 System.out.println(realdone / (1024.0f * 1024) + " MB echoed back and forth"); 241 System.out.println(realdone + " bytes echoed back and forth"); 242 System.out.println(time / 1000.0f + " seconds"); 243 System.out.println(1.0f * realdone / ( 1024 * time) + " MB/s throughput"); 244 System.out.println(); 245 System.out.println(realreads + " messages"); 246 System.out.println(realreads / (time / 1000.0f) + " messages/s"); 247 System.out.println("\nagent=" + rsp.getAgent().toDebugString()); 248 } 249 250 }