001 /* 002 * Copyright (c) 2003, The Regents of the University of California, through 003 * Lawrence Berkeley National Laboratory (subject to receipt of any required 004 * approvals from the U.S. Dept. of Energy). All rights reserved. 005 */ 006 package gov.lbl.dsd.sea.nio.demo; 007 008 import gov.lbl.dsd.sea.ExecutorFactory; 009 import gov.lbl.dsd.sea.Stage; 010 import gov.lbl.dsd.sea.StageManager; 011 import gov.lbl.dsd.sea.nio.AgentEventHandler; 012 import gov.lbl.dsd.sea.nio.NetAgent; 013 import gov.lbl.dsd.sea.nio.event.AdminRequest; 014 import gov.lbl.dsd.sea.nio.event.ChannelRequest; 015 import gov.lbl.dsd.sea.nio.event.ChannelResponse; 016 017 import java.io.IOException; 018 import java.net.InetSocketAddress; 019 import java.nio.ByteBuffer; 020 import java.nio.channels.SelectionKey; 021 import java.nio.charset.Charset; 022 023 /** 024 * Simple non-blocking hello world client and server, echoing 025 * messages back and forth. 026 * <p> 027 * Example server usage: fire-java gov.lbl.dsd.sea.nio.demo.PingPong server 028 * 9000 029 * <p> 030 * Example client usage: fire-java gov.lbl.dsd.sea.nio.demo.PingPong client 031 * localhost 9000 032 * 033 * @author whoschek@lbl.gov 034 * @author $Author: gegles $ 035 * @version $Revision: 1.4 $, $Date: 2004/09/16 16:57:15 $ 036 */ 037 public class PingPong extends AgentEventHandler { 038 039 protected boolean isClient = true; // am I a client or a server? 040 private static final Charset CHARSET = Charset.forName("US-ASCII"); 041 042 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 043 .getLog(PingPong.class); 044 045 static public void main(String args[]) throws IOException { 046 new PingPong(args); 047 } 048 049 public PingPong(String args[]) throws IOException { 050 NetAgent agent = new NetAgent(); 051 052 String hostName = "localhost"; 053 int port = 9000; 054 055 int k = -1; 056 if (args.length > ++k) { 057 isClient = args[k].equals("client"); 058 } 059 if (isClient && args.length > ++k) { 060 hostName = args[k]; 061 } 062 if (args.length > ++k) { 063 port = Integer.parseInt(args[k]); 064 } 065 066 ExecutorFactory execFactory = StageManager.QUEUED; 067 if (args.length > ++k) { 068 if (args[k].equals("single")) execFactory = StageManager.DIRECT; 069 } 070 Stage myStage = new StageManager(execFactory).createStage(this).start(); 071 072 if (isClient) { 073 agent.addConnectAddress(myStage, new InetSocketAddress(hostName, 074 port)); 075 } else { 076 agent.addListenPort(myStage, port); 077 } 078 079 agent.start(); 080 } 081 082 protected void onClosed(ChannelResponse.Closed rsp) { 083 log.info("*************got channel closed=" + rsp); 084 if (rsp.getException() != null) { 085 ; // nothing special to be done 086 } 087 if (isClient) { 088 this.shutDown(rsp); 089 } else { 090 // no problem; continueing 091 } 092 System.out.println("agent status=" + rsp.getAgent().toDebugString()); 093 } 094 095 protected void onConnected(ChannelResponse.Connected rsp) { 096 ByteBuffer buffer = CHARSET.encode("hello world"); 097 rsp.getAgent().enqueue( 098 new ChannelRequest.Register(this.getStage(), rsp.getKey() 099 .channel(), SelectionKey.OP_READ)); 100 rsp.getAgent().enqueue( 101 new ChannelRequest.WriteData(rsp.getKey().channel(), buffer)); 102 } 103 104 protected void onAccepted(ChannelResponse.Accepted rsp) { 105 rsp.getAgent().enqueue( 106 new ChannelRequest.Register(this.getStage(), rsp.getKey() 107 .channel(), SelectionKey.OP_READ)); 108 } 109 110 protected void onRead(ChannelResponse.Read rsp) { 111 String str = CHARSET.decode(rsp.getBuffer()).toString(); 112 System.out.println("******************** read *********** " + str); 113 114 try { // artificially slow down to better see progress on terminal 115 Thread.sleep(500); 116 } catch (InterruptedException e) { 117 throw new RuntimeException(e); 118 } 119 120 ByteBuffer writeBuffer = CHARSET.encode(str + "."); 121 122 // recycling to buffer pool is an optional optimization 123 rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); 124 125 rsp.getAgent().enqueue( 126 new ChannelRequest.WriteData(rsp.getKey().channel(), writeBuffer)); 127 } 128 129 protected void onWrite(ChannelResponse.Write rsp) { 130 rsp.getBuffer().flip(); 131 System.out.println("*************** fully wrote *********** " 132 + CHARSET.decode(rsp.getBuffer()).toString()); 133 134 // recycling to buffer pool is an optional optimization 135 rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); 136 } 137 138 protected void onRegistered(ChannelResponse.Registered rsp) { 139 if (rsp.getInterestOps() == SelectionKey.OP_ACCEPT) { 140 System.out.println("******** server started; now listening *****"); 141 } 142 } 143 144 private void shutDown(ChannelResponse rsp) { 145 System.out.println("now shutting down..."); 146 this.getStage().stop(); 147 rsp.getAgent().enqueue(new AdminRequest.Stop()); 148 } 149 150 }