001 /* 002 * Copyright (c) 2003, The Regents of the University of California, through 003 * Lawrence Berkeley National Laboratory (subject to receipt of any required 004 * approvals from the U.S. Dept. of Energy). All rights reserved. 005 */ 006 package gov.lbl.dsd.sea.nio.demo; 007 008 import gov.lbl.dsd.sea.Stage; 009 import gov.lbl.dsd.sea.StageManager; 010 import gov.lbl.dsd.sea.nio.AgentEventHandler; 011 import gov.lbl.dsd.sea.nio.NetAgent; 012 import gov.lbl.dsd.sea.nio.event.AdminRequest; 013 import gov.lbl.dsd.sea.nio.event.ChannelRequest; 014 import gov.lbl.dsd.sea.nio.event.ChannelResponse; 015 016 import java.io.IOException; 017 import java.nio.channels.SelectionKey; 018 import java.nio.charset.Charset; 019 020 /** 021 * Simple non-blocking echo server that can be tested in 022 * conjunction with a standard UNIX telnet client. 023 * 024 * @author whoschek@lbl.gov 025 * @author $Author: gegles $ 026 * @version $Revision: 1.5 $, $Date: 2004/09/16 16:57:15 $ 027 */ 028 public class EchoServer extends AgentEventHandler { 029 030 private static final Charset CHARSET = Charset.forName("US-ASCII"); 031 032 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory 033 .getLog(EchoServer.class); 034 035 static public void main(String args[]) throws IOException { 036 new EchoServer(args); 037 } 038 039 public EchoServer(String args[]) throws IOException { 040 NetAgent agent = new NetAgent(); 041 Stage myStage = new StageManager().createStage(this).start(); 042 043 for (int i = 0; i < args.length; i++) { 044 int port = Integer.parseInt(args[i]); 045 agent.addListenPort(myStage, port); 046 } 047 048 agent.start(); 049 } 050 051 protected void onClosed(ChannelResponse.Closed rsp) { 052 log.info("*************got channel closed=" + rsp); 053 if (rsp.getException() != null) { 054 ; // nothing special to be done 055 } 056 System.out.println("agent status=" + rsp.getAgent().toDebugString()); 057 } 058 059 protected void onConnected(ChannelResponse.Connected rsp) { 060 throw new UnsupportedOperationException(); 061 } 062 063 protected void onAccepted(ChannelResponse.Accepted rsp) { 064 rsp.getAgent().enqueue( 065 new ChannelRequest.Register(this.getStage(), rsp.getKey() 066 .channel(), SelectionKey.OP_READ)); 067 } 068 069 protected void onRead(ChannelResponse.Read rsp) { 070 String str = CHARSET.decode(rsp.getBuffer().duplicate()).toString(); 071 System.out.println("******************** read *********** " + str); 072 System.out.println("******************** read bytes ***** " + str.length()); 073 074 if (str.startsWith("exit")) { 075 this.shutDown(rsp); 076 } else if (str.startsWith("close")) { 077 rsp.getAgent().enqueue(new ChannelRequest.Close(rsp.getKey().channel())); 078 } else { 079 rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), rsp.getBuffer())); 080 } 081 } 082 083 protected void onWrite(ChannelResponse.Write rsp) { 084 rsp.getBuffer().flip(); 085 System.out.println("******************** wrote *********** " 086 + CHARSET.decode(rsp.getBuffer()).toString()); 087 088 // recycling to buffer pool is an optional optimization 089 rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); 090 } 091 092 protected void onRegistered(ChannelResponse.Registered rsp) { 093 if (rsp.getInterestOps() == SelectionKey.OP_ACCEPT) { 094 System.out.println("******** server started; now listening *****"); 095 } 096 } 097 098 private void shutDown(ChannelResponse rsp) { 099 System.out.println("now shutting down..."); 100 this.getStage().stop(); 101 rsp.getAgent().enqueue(new AdminRequest.Stop()); 102 } 103 104 }