001 /* 002 * Copyright (c) 2003, The Regents of the University of California, through 003 * Lawrence Berkeley National Laboratory (subject to receipt of any required 004 * approvals from the U.S. Dept. of Energy). All rights reserved. 005 */ 006 package gov.lbl.dsd.sea.nio.demo; 007 import gov.lbl.dsd.sea.nio.NetAgent; 008 import gov.lbl.dsd.sea.nio.util.BufferPool; 009 import gov.lbl.dsd.sea.nio.util.SocketOpts; 010 011 import java.io.IOException; 012 import java.net.InetSocketAddress; 013 import java.net.SocketException; 014 import java.nio.ByteBuffer; 015 import java.nio.channels.SelectionKey; 016 import java.nio.channels.SocketChannel; 017 018 /** 019 * Simple single-threaded ping pong benchmark. 020 * 021 * @author whoschek@lbl.gov 022 * @author $Author: hoschek3 $ 023 * @version $Revision: 1.1 $, $Date: 2004/06/15 00:36:05 $ 024 */ 025 public class XSinglePingPongBench extends NetAgent { 026 027 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(XSinglePingPongBench.class); 028 029 protected int chunkSize = 8192; 030 protected long todo = 10; 031 protected long done = 0; 032 protected boolean isClient = true; 033 protected boolean autoClose = true; 034 protected long start; 035 036 protected long reads = 0; 037 protected long partialWrites = 0; 038 039 static public void main(String args[]) throws IOException { 040 new XSinglePingPongBench(args); 041 } 042 043 public XSinglePingPongBench(String args[]) throws IOException { 044 this.setFailFast(false); 045 046 String hostName = "localhost"; 047 int port = 9000; 048 049 int k=-1; 050 if (args.length > ++k) { 051 isClient = args[k].equals("client"); 052 } 053 if (isClient && args.length > ++k) { 054 hostName = args[k]; 055 } 056 if (args.length > ++k) { 057 port = Integer.parseInt(args[k]); 058 } 059 if (isClient && args.length > ++k) { 060 chunkSize = Math.round(1024 * Float.parseFloat(args[k])); 061 } 062 if (isClient && args.length > ++k) { 063 todo = Math.round(1024 * Float.parseFloat(args[k])); 064 } 065 if (args.length > ++k) { 066 // setting a buffer pool is a purely optional performance optimization 067 int n = 1024 * Integer.parseInt(args[k]); 068 this.setReadBufferPool(new BufferPool(15 * n, n, true, null)); 069 //agent.setReadBufferCapacity(1024 * Integer.parseInt(args[k])); 070 } 071 072 SocketOpts options = new SocketOpts(); 073 if (args.length > ++k) { 074 int val = Math.round(1024 * Float.parseFloat(args[k])); 075 options.setOption(SocketOpts.SO_RCVBUF, new Integer(val)); 076 options.setOption(SocketOpts.SO_SNDBUF, new Integer(val)); 077 } 078 079 if (args.length > ++k) { 080 this.autoClose = new Boolean(args[k]).booleanValue(); 081 } 082 this.setAutoClose(this.autoClose); 083 log.error("using autoClose=" + autoClose); 084 085 if (args.length > ++k) { 086 options.setOption(SocketOpts.TCP_NODELAY, new Boolean(args[k])); 087 } 088 this.setSocketOptions(options); 089 090 if (isClient) { 091 this.addConnectAddress(null, new InetSocketAddress(hostName, port)); 092 } 093 else { 094 this.addListenPort(null, port); 095 } 096 097 this.start(); 098 } 099 100 protected void onConnectDone(SelectionKey key, IOException exception) { 101 super.onConnectDone(key, exception); 102 key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ); 103 ByteBuffer buf = ByteBuffer.allocateDirect(chunkSize); 104 //ByteBuffer buf = NioUtil.toAsciiByteBuffer("hello"); 105 106 try { 107 log.error("socketOpts=" + new SocketOpts(((SocketChannel) key.channel()).socket())); 108 } catch (SocketException e) { 109 throw new RuntimeException(e); 110 } 111 this.start = System.currentTimeMillis(); 112 113 this.onWriteRequest(key, buf); 114 //this.onWriteRequest(key, NioUtil.toAsciiByteBuffer(createHTTPRequest())); 115 } 116 117 protected void onAcceptDone(SelectionKey key, IOException exception) { 118 super.onAcceptDone(key, exception); 119 key.interestOps(SelectionKey.OP_READ); 120 try { 121 log.error("socketOpts=" + new SocketOpts(((SocketChannel) key.channel()).socket())); 122 } catch (SocketException e) { 123 throw new RuntimeException(e); 124 } 125 this.start = System.currentTimeMillis(); 126 } 127 128 protected void onCloseDone(SelectionKey key, IOException exception) { 129 super.onCloseDone(key, exception); 130 if (!isClient) this.printStats(); 131 if (isClient) this.shutDown(); 132 } 133 134 protected void onReadDone(SelectionKey key, ByteBuffer buffer, IOException exception) { 135 super.onReadDone(key, buffer, exception); 136 this.reads++; 137 //byte[] bytes = NioUtil.toByteArray(buffer); 138 //String str = new String(bytes); 139 //log.error("******************** read ***********\n" + str); 140 done += buffer.limit(); 141 log.warn("########### done reads so far = " + done); 142 if (isClient && done > todo) { 143 if (this.start < 0) return; // avoid reporting more than once (possible if multiple Reads responses are in flight) 144 this.printStats(); 145 this.start = -1; 146 this.shutDown(); 147 } 148 else { 149 // echo the same data back to client 150 boolean wantPartialWriteResponses = false; 151 152 ByteBuffer writeBuf = buffer; 153 //writeBuf.flip(); 154 155 if (wantPartialWriteResponses) writeBuf.mark(); // marking indicates we want to get partial write reponses 156 157 this.onWriteRequest(key, writeBuf); 158 log.info("*************writeData2 enqueued"); 159 } 160 } 161 162 protected void onWriteDone(SelectionKey key, ByteBuffer buffer, int n, IOException exception) { 163 super.onWriteDone(key, buffer, n, exception); 164 ByteBuffer buf = buffer; 165 log.warn("******************** total written so far = " + buf.position() + " bytes"); 166 if (buf.position() < buf.limit()) { // we got a partial write 167 this.partialWrites++; 168 log.info(buf.limit() - buf.position() + " bytes left to be written"); 169 } 170 else { // we got a full write; 171 log.info("all written"); 172 // recycling to buffer pool is a purely optional performance optimization 173 this.getReadBufferPool().put(buf); 174 } 175 } 176 177 protected String createHTTPRequest() { 178 String str = "GET / HTTP/1.0" + "\r\n" + "\r\n"; 179 return str; 180 } 181 182 private void shutDown() { 183 log.error("now shutting down demo"); 184 this.stop(); 185 } 186 187 private void printStats() { 188 long realdone = done * 2; // we have written and read those bytes 189 long realreads = this.reads * 2; // we have written and read those bytes 190 long time = System.currentTimeMillis() - start; 191 System.out.println(realdone / (1024.0f * 1024) + " MB echoed back and forth"); 192 System.out.println(realdone + " bytes echoed back and forth"); 193 System.out.println(time / 1000.0f + " seconds"); 194 System.out.println(1.0f * realdone / ( 1024 * time) + " MB/s throughput"); 195 System.out.println(); 196 System.out.println(realreads + " messages"); 197 System.out.println(realreads / (time / 1000.0f) + " messages/s"); 198 System.out.println(this.partialWrites + " partial writes"); 199 System.out.println("bufferPool= " + this.getReadBufferPool()); 200 } 201 202 }