001 /* 002 * Copyright (c) 2003, The Regents of the University of California, through 003 * Lawrence Berkeley National Laboratory (subject to receipt of any required 004 * approvals from the U.S. Dept. of Energy). All rights reserved. 005 */ 006 package gov.lbl.dsd.sea.nio; 007 008 import gov.lbl.dsd.sea.EventHandler; 009 import gov.lbl.dsd.sea.event.IllegalEventException; 010 import gov.lbl.dsd.sea.nio.event.ChannelResponse; 011 012 /** 013 Abstract base class simplifying the implementation of event 014 handlers sending requests to an agent, and receiving responses from the agent. 015 <p> 016 Override protected <code>onXYZ</code> event handling methods for application specific behaviour. 017 <p> 018 For example, a typical non-blocking agent event handler accumulates partial reads along the following lines: 019 020 <pre> 021 public class MyAgentEventHandler extends AgentEventHandler { 022 // simple variable msg size protocol: MESSAGE = [payLoadLength(4 bytes) payLoad] 023 private static final Charset CHARSET = Charset.forName("UTF-8"); 024 025 protected void onAccepted(ChannelResponse.Accepted rsp) { 026 // prepare read buffer and register READ interest 027 rsp.getKey().attach(new gov.lbl.dsd.sea.nio.util.ArrayByteList()); 028 rsp.getAgent().enqueue( 029 new ChannelRequest.Register(this.getStage(), rsp.getKey() 030 .channel(), SelectionKey.OP_READ)); 031 } 032 033 034 protected void onRead(ChannelResponse.Read rsp) { 035 ArrayByteList readBuffer = (ArrayByteList) rsp.getKey().attachment(); 036 readBuffer.add(rsp.getBuffer()); 037 rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); // recycle buffer 038 while (readBuffer.size() >= 4) { 039 // message header containing payload length has arrived 040 int payloadLength = readBuffer.asByteBuffer().getInt(0); 041 if (4 + payloadLength > readBuffer.size()) { 042 break; // payload not yet fully received, wait for more data 043 } 044 else { 045 // we have received the entire variable length payload 046 String payload = readBuffer.toString(4, 4 + payloadLength, CHARSET); 047 readBuffer.remove(0, 4 + payloadLength); // remove header and payload 048 049 // do something useful with payload, here we just print it 050 System.out.println("payload=" + payload); 051 } 052 } 053 } 054 055 // A more efficient but less readable/understandable alternative: 056 protected void onRead(ChannelResponse.Read rsp) { 057 ArrayByteList readBuffer = (ArrayByteList) rsp.getKey().attachment(); 058 readBuffer.add(rsp.getBuffer()); 059 rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); // recycle buffer 060 int i = 0; 061 while (i + 4 <= readBuffer.size()) { 062 // message header containing payload length has arrived 063 int payloadLength = readBuffer.asByteBuffer().getInt(i); 064 if (i + 4 + payloadLength > readBuffer.size()) { 065 // payload not yet fully received - wait for more data 066 break; 067 } 068 else { 069 i += 4; 070 // we have received the entire variable length payload 071 String payload = readBuffer.toString(i, i + payloadLength, CHARSET); 072 i += payLoadLength; 073 074 // do something useful with payload, here we just print it 075 System.out.println("payload=" + payload); 076 } 077 } 078 // remove fully processed headers and payloads, if any 079 readBuffer.remove(0, i); 080 } 081 082 ... 083 } 084 </pre> 085 086 @author whoschek@lbl.gov 087 @author $Author: hoschek3 $ 088 @version $Revision: 1.12 $, $Date: 2004/08/04 23:24:56 $ 089 */ 090 public abstract class AgentEventHandler extends EventHandler { 091 092 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(AgentEventHandler.class); 093 094 protected AgentEventHandler() {} 095 096 /** Called when an event has been received */ 097 public void handle(Object event) { 098 if (log.isTraceEnabled()) log.trace("handling event=" + event); 099 100 if (event instanceof ChannelResponse) { 101 if (event instanceof ChannelResponse.Closed) { 102 // handle without suppressing exception check 103 this.onClosed((ChannelResponse.Closed) event); 104 } else { 105 ChannelResponse rsp = (ChannelResponse) event; 106 if (rsp.getException() != null) { 107 // ignore and wait for autoclosing ChannelResponse.Closed; 108 // handle cleanup later in onClosed(...) 109 if (log.isDebugEnabled()) log.debug("Got exception response=" + rsp); 110 } else if (event instanceof ChannelResponse.Read) { 111 this.onRead((ChannelResponse.Read) event); 112 } else if (event instanceof ChannelResponse.Write) { 113 this.onWrite((ChannelResponse.Write) event); 114 } else if (event instanceof ChannelResponse.Registered) { 115 this.onRegistered((ChannelResponse.Registered) event); 116 } else if (event instanceof ChannelResponse.Accepted) { 117 this.onAccepted((ChannelResponse.Accepted) event); 118 } else if (event instanceof ChannelResponse.Connected) { 119 this.onConnected((ChannelResponse.Connected) event); 120 } else { 121 throw new IllegalEventException("Agent ERROR - should never happen", event, this.getStage()); 122 } 123 } 124 } else { 125 this.onApplicationEvent(event); 126 } 127 } 128 129 /** 130 * Called when an event other than a ChannelResponse (that is, an 131 * application specific event) has been received; The default implementation 132 * throws an exception. 133 */ 134 protected void onApplicationEvent(Object event) { 135 throw new IllegalEventException(event, this.getStage()); 136 } 137 138 /** 139 * Called when an accepted response has been received; that is, when the 140 * (server) agent has accepted a new client channel (connection). 141 */ 142 abstract protected void onAccepted(ChannelResponse.Accepted response); 143 144 /** 145 * Called when a closed response has been received; that is, when the agent 146 * has closed a channel (connection). 147 */ 148 abstract protected void onClosed(ChannelResponse.Closed response); 149 150 /** 151 * Called when a connected response has been received; that is, when the 152 * (client) agent has connected a new channel (connection) to a remote 153 * server. 154 */ 155 abstract protected void onConnected(ChannelResponse.Connected response); 156 157 /** 158 * Called when a registered response has been received; that is, when the 159 * agent has processed a {@link ChannelRequest.Registered} request. 160 */ 161 abstract protected void onRegistered(ChannelResponse.Registered response); 162 163 /** 164 * Called when a read response has been received; that is, when the agent 165 * has read data from a channel (connection). 166 */ 167 abstract protected void onRead(ChannelResponse.Read response); 168 169 /** 170 * Called when a write response has been received; that is, when the agent 171 * has written data to a channel (connection). 172 */ 173 abstract protected void onWrite(ChannelResponse.Write response); 174 175 }