001    /*
002     * Copyright (c) 2003, The Regents of the University of California, through
003     * Lawrence Berkeley National Laboratory (subject to receipt of any required
004     * approvals from the U.S. Dept. of Energy). All rights reserved.
005     */
006    package gov.lbl.dsd.sea.nio;
007    
008    import gov.lbl.dsd.sea.EventHandler;
009    import gov.lbl.dsd.sea.event.IllegalEventException;
010    import gov.lbl.dsd.sea.nio.event.ChannelResponse;
011    
012    /**
013    Abstract base class simplifying the implementation of event
014    handlers sending requests to an agent, and receiving responses from the agent.
015    <p>
016    Override protected <code>onXYZ</code> event handling methods for application specific behaviour.
017    <p>
018    For example, a typical non-blocking agent event handler accumulates partial reads along the following lines:
019    
020    <pre>
021    public class MyAgentEventHandler extends AgentEventHandler {
022            // simple variable msg size protocol: MESSAGE = [payLoadLength(4 bytes) payLoad]
023            private static final Charset CHARSET = Charset.forName("UTF-8");
024            
025            protected void onAccepted(ChannelResponse.Accepted rsp) {
026                    // prepare read buffer and register READ interest
027                    rsp.getKey().attach(new gov.lbl.dsd.sea.nio.util.ArrayByteList());
028                    rsp.getAgent().enqueue(
029                                    new ChannelRequest.Register(this.getStage(), rsp.getKey()
030                                                    .channel(), SelectionKey.OP_READ));
031            }
032            
033    
034            protected void onRead(ChannelResponse.Read rsp) {
035                    ArrayByteList readBuffer = (ArrayByteList) rsp.getKey().attachment();
036                    readBuffer.add(rsp.getBuffer());
037                    rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); // recycle buffer
038                    while (readBuffer.size() >= 4) {
039                            // message header containing payload length has arrived
040                            int payloadLength = readBuffer.asByteBuffer().getInt(0);
041                            if (4 + payloadLength > readBuffer.size()) {
042                                    break; // payload not yet fully received, wait for more data
043                            }
044                            else {
045                                    // we have received the entire variable length payload
046                                    String payload = readBuffer.toString(4, 4 + payloadLength, CHARSET);
047                                    readBuffer.remove(0, 4 + payloadLength); // remove header and payload
048                                    
049                                    // do something useful with payload, here we just print it
050                                    System.out.println("payload=" + payload);
051                            }
052                    }
053            }
054            
055            // A more efficient but less readable/understandable alternative:
056            protected void onRead(ChannelResponse.Read rsp) {
057                    ArrayByteList readBuffer = (ArrayByteList) rsp.getKey().attachment();
058                    readBuffer.add(rsp.getBuffer());
059                    rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); // recycle buffer
060                    int i = 0;
061                    while (i + 4 <= readBuffer.size()) {
062                            // message header containing payload length has arrived
063                            int payloadLength = readBuffer.asByteBuffer().getInt(i);
064                            if (i + 4 + payloadLength > readBuffer.size()) {
065                                    // payload not yet fully received - wait for more data
066                                    break; 
067                            }
068                            else {
069                                    i += 4;
070                                    // we have received the entire variable length payload
071                                    String payload = readBuffer.toString(i, i + payloadLength, CHARSET);
072                                    i += payLoadLength;
073                                    
074                                    // do something useful with payload, here we just print it
075                                    System.out.println("payload=" + payload);
076                            }
077                    }
078                    // remove fully processed headers and payloads, if any
079                    readBuffer.remove(0, i); 
080            }
081            
082            ...
083    }
084    </pre>
085    
086    @author whoschek@lbl.gov
087    @author $Author: hoschek3 $
088    @version $Revision: 1.12 $, $Date: 2004/08/04 23:24:56 $
089    */
090    public abstract class AgentEventHandler extends EventHandler {
091    
092            private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(AgentEventHandler.class);
093    
094            protected AgentEventHandler() {}
095            
096            /** Called when an event has been received */
097            public void handle(Object event) {
098                    if (log.isTraceEnabled()) log.trace("handling event=" + event);
099    
100                    if (event instanceof ChannelResponse) {
101                            if (event instanceof ChannelResponse.Closed) { 
102                                    // handle without suppressing exception check
103                                    this.onClosed((ChannelResponse.Closed) event);
104                            } else {                                
105                                    ChannelResponse rsp = (ChannelResponse) event;
106                                    if (rsp.getException() != null) {
107                                            // ignore and wait for autoclosing ChannelResponse.Closed;
108                                            // handle cleanup later in onClosed(...)
109                                            if (log.isDebugEnabled()) log.debug("Got exception response=" + rsp);
110                                    } else if (event instanceof ChannelResponse.Read) {
111                                            this.onRead((ChannelResponse.Read) event);
112                                    } else if (event instanceof ChannelResponse.Write) {
113                                            this.onWrite((ChannelResponse.Write) event);
114                                    } else if (event instanceof ChannelResponse.Registered) {
115                                            this.onRegistered((ChannelResponse.Registered) event);
116                                    } else if (event instanceof ChannelResponse.Accepted) {
117                                            this.onAccepted((ChannelResponse.Accepted) event);
118                                    } else if (event instanceof ChannelResponse.Connected) {
119                                            this.onConnected((ChannelResponse.Connected) event);
120                                    } else {
121                                            throw new IllegalEventException("Agent ERROR - should never happen", event, this.getStage());
122                                    }
123                            }
124                    } else {
125                            this.onApplicationEvent(event);
126                    }
127            }
128    
129            /**
130             * Called when an event other than a ChannelResponse (that is, an
131             * application specific event) has been received; The default implementation
132             * throws an exception.
133             */
134            protected void onApplicationEvent(Object event) {
135                    throw new IllegalEventException(event, this.getStage());
136            }
137    
138            /**
139             * Called when an accepted response has been received; that is, when the
140             * (server) agent has accepted a new client channel (connection).
141             */
142            abstract protected void onAccepted(ChannelResponse.Accepted response);
143    
144            /**
145             * Called when a closed response has been received; that is, when the agent
146             * has closed a channel (connection).
147             */
148            abstract protected void onClosed(ChannelResponse.Closed response);
149    
150            /**
151             * Called when a connected response has been received; that is, when the
152             * (client) agent has connected a new channel (connection) to a remote
153             * server.
154             */
155            abstract protected void onConnected(ChannelResponse.Connected response);
156    
157            /**
158             * Called when a registered response has been received; that is, when the
159             * agent has processed a {@link ChannelRequest.Registered} request.
160             */
161            abstract protected void onRegistered(ChannelResponse.Registered response);
162    
163            /**
164             * Called when a read response has been received; that is, when the agent
165             * has read data from a channel (connection).
166             */
167            abstract protected void onRead(ChannelResponse.Read response);
168    
169            /**
170             * Called when a write response has been received; that is, when the agent
171             * has written data to a channel (connection).
172             */
173            abstract protected void onWrite(ChannelResponse.Write response);
174    
175    }