001    /*
002     * Copyright (c) 2003, The Regents of the University of California, through
003     * Lawrence Berkeley National Laboratory (subject to receipt of any required
004     * approvals from the U.S. Dept. of Energy). All rights reserved.
005     */
006    package gov.lbl.dsd.sea.nio;
007    
008    import gov.lbl.dsd.sea.Stage;
009    import gov.lbl.dsd.sea.event.IllegalEventException;
010    import gov.lbl.dsd.sea.nio.auth.HostAuthorizer;
011    import gov.lbl.dsd.sea.nio.auth.SmartHostAuthorizationRules;
012    import gov.lbl.dsd.sea.nio.auth.SmartHostAuthorizer;
013    import gov.lbl.dsd.sea.nio.event.AdminRequest;
014    import gov.lbl.dsd.sea.nio.event.ChannelRequest;
015    import gov.lbl.dsd.sea.nio.event.ChannelResponse;
016    import gov.lbl.dsd.sea.nio.util.ByteBufferPool;
017    import gov.lbl.dsd.sea.nio.util.NioUtil;
018    import gov.lbl.dsd.sea.nio.util.SocketOpts;
019    
020    import java.io.IOException;
021    import java.net.InetSocketAddress;
022    import java.nio.ByteBuffer;
023    import java.nio.channels.Channel;
024    import java.nio.channels.ClosedChannelException;
025    import java.nio.channels.DatagramChannel;
026    import java.nio.channels.ReadableByteChannel;
027    import java.nio.channels.SelectableChannel;
028    import java.nio.channels.SelectionKey;
029    import java.nio.channels.Selector;
030    import java.nio.channels.ServerSocketChannel;
031    import java.nio.channels.SocketChannel;
032    import java.nio.channels.WritableByteChannel;
033    import java.util.ArrayList;
034    import java.util.HashMap;
035    import java.util.Iterator;
036    import java.util.LinkedList;
037    import java.util.List;
038    import java.util.Map;
039    
040    import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
041    
042    /**
043     * Efficient and scalable NIO based non-blocking single-threaded network agent
044     * that can be used both as a client and a server, both for TCP and UDP
045     * transports; Can handle large amounts of concurrent client and server
046     * connections in a single thread; Automatically deals with the subtle grunt
047     * work of life-cycle issues, threading, I/O multiplexing and NIO gotchas; This
048     * is the main work horse of this package.
049     * <p>
050     * General usage pattern:
051     * 
052     * <ol>
053     * <li>Construct a new agent</li>
054     * <li>Configure it via setters and getters. Convenience methods simplify
055     * starting outgoing client connections and listening on ports for incoming
056     * connections.</li>
057     * <li>Start the agent via <code>start()</code> or by enqueuing a
058     * {@link gov.lbl.dsd.sea.nio.event.AdminRequest.Start}</li>
059     * <li>Have your application enqueue some request events from the
060     * {@link gov.lbl.dsd.sea.nio.event}package via <code>enqueue(request)</code>
061     * onto the agent. The agent will process the requests and enqueue responses to
062     * the requests onto the (synchronous or asynchronous) observer stage of the
063     * given network channel (connection), for example an {@link AgentEventHandler}
064     * 's stage.</li>
065     * <li>Finally, stop the agent via <code>stop()</code> or by enqueuing a
066     * {@link gov.lbl.dsd.sea.nio.event.AdminRequest.Stop}</li>
067     * </ol>
068     * <p>
069     * See the demo client and servers in the {@link gov.lbl.dsd.sea.nio.demo}
070     * package for simple and complex example usage.
071     * <p>
072     * Whenever an IOException or EOS (end-of-stream) is encountered as a result of
073     * a channel request, the following happens, in that order:
074     * <ol>
075     * <li>A ChannelResponse with the IOException is posted to the observer stage
076     * (e.g. a {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}or
077     * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Write}or
078     * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Read}or
079     * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Connected}or
080     * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Registered}, depending on
081     * the type of request in use).</li>
082     * <li>The agent automatically closes the channel</li>
083     * <li>A {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}response is
084     * posted to the observer stage. The response contains the very same exception (unless
085     * closing throws yet another exception).</li>
086     * </ol>
087     * 
088     * Hence, most applications can ignore responses containing IOExceptions,
089     * unless it is a {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}.
090     * <p>
091     * For a nice introduction to java networking, see <a
092     * href="http://lgl.epfl.ch/teaching/software_project/documentation/tutorials/networking.pdf">here
093     * </a> and <a target="_blank"
094     * href="http://www.theserverside.com/blogs/showblog.tss?id=DispellingNIOMyths">there
095     * </a> and <a target="_blank"
096     * href="http://www.ii.uib.no/~khalid/atij/atij-nio-web/atij-nio-2x1.pdf">also
097     * there </a>.
098     * 
099     * @author whoschek@lbl.gov
100     * @author $Author: hoschek3 $
101     * @version $Revision: 1.14 $, $Date: 2004/08/17 18:26:54 $
102     */
103    public final class NetAgent { // class made final until internal API is clean
104    
105            private volatile boolean shutdown;      // flag to indicate agent shutdown initiation
106            private Selector selector;  // interface to I/O signals from OS
107            private EDU.oswego.cs.dl.util.concurrent.Channel pendingEvents; // events to be handed from user thread to agent thread
108            private long selectTimeout; // the maximum time to block in Selector.select()   
109            private ByteBufferPool readBufferPool; // buffers to read into from network
110            private Map writeQueues; // Map<SelectionKey, List<ByteBuffer>> 
111            // since a SelectionKey corresponds to a registered channel, the map
112            // maintains a separate write buffer queue for each channel
113            // We could simply use key.attach to register a write queue with a key, but
114            // we explicitly do not take that approach in order to not pollute
115            // key.attachment(). That way user space apps can use key attachments for their custom
116            // application specific associations.
117            
118            private Map observerStages; // Map<SelectableChannel, Stage> 
119            private SocketOpts socketOptions; // options to set for new connections
120            private HostAuthorizer hostAuthorizer;
121            
122            private final Object outerLock = new Object();
123            private final Object innerLock = new Object();
124            private int nopLoops;
125            
126            private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NetAgent.class);
127    
128            /**
129             * Constructs a new agent with default configuration options; 
130             * the agent is not yet started.
131             */
132            public NetAgent() {
133                    this.shutdown = true; // initially it is shutdown; run() toggles that
134                    this.selector = null;
135                    this.pendingEvents = new LinkedQueue();
136                    this.selectTimeout = 0; // wait indefinitely
137                    //this.selectTimeout = 1000; // wait for at most 1 sec
138                    this.setSocketOptions(new SocketOpts());
139                    this.setReadBufferPool(new ByteBufferPool(8 * 128 * 1024, 128 * 1024, true, null));
140                    
141                    // default: allow unlimited accepts from all hosts
142                    SmartHostAuthorizer auth = new SmartHostAuthorizer(
143                                    true,
144                                    new SmartHostAuthorizationRules().addHost(SmartHostAuthorizationRules.ALL),
145                                    new SmartHostAuthorizationRules()
146                            );
147                    this.setAcceptHostAuthorizer(auth);
148            }
149    
150            /**
151             * Tells the agent to start listening as a server for TCP connections on the
152             * given port. To register multiple listen ports, call this method multiple
153             * times.
154             * 
155             * @param observer
156             *            the stage onto which responses to the new server channel
157             *            should be enqueued (may be null)
158             * @param port
159             *            the port to listen on
160             * @return the agent itself (for method chaining convenience)
161             * @throws IOException
162             */
163            public NetAgent addListenPort(Stage observer, int port) throws IOException {
164                    return this.addChannel(observer, NioUtil
165                                    .createServerChannel(port), SelectionKey.OP_ACCEPT);
166            }
167    
168            /** 
169             * Tells the agent to start a TCP client connection to the given remote address.
170             * To register multiple connections, call this method multiple times.
171             * 
172             * @param observer
173             *            the stage onto which responses to the new client channel
174             *            should be enqueued (may be null)
175             * @param address the address to connect to
176             * @return the agent itself (for method chaining convenience)
177             * @throws IOException
178             */
179            public NetAgent addConnectAddress(Stage observer, InetSocketAddress address) throws IOException {
180                    SocketChannel channel = NioUtil.createClientChannel(address);
181                    int interestOps = SelectionKey.OP_CONNECT;
182                    if (! channel.isConnectionPending()) {
183                            log.debug("socket channel already connected");
184                            interestOps = 0;
185                    }
186                    else {
187                            log.debug("socket channel not yet connected");
188                    }
189                    return this.addChannel(observer, channel, interestOps);
190            }
191    
192            /** 
193             * Add the given channel with the given interest set and attachment to the agent;
194             * for example so that it starts listening on a given port. 
195             * To register multiple channels, call this method multiple times.
196             * 
197             * @param observer
198             *            the stage onto which responses to the channel
199             *            should be enqueued (may be null)
200             * @param channel the channel to register
201             * @param interestOps the NIO interest set
202             * @return the agent itself (for method chaining convenience)
203             * @throws IOException
204             */
205            protected NetAgent addChannel(Stage observer, SelectableChannel channel, int interestOps) {
206                    this.enqueue(new ChannelRequest.Register(observer, channel, interestOps));
207                    return this;
208            }
209    
210            /**
211             * Sets the host authorizer the server uses to allow/deny accepting
212             * connections from the network.
213             * 
214             * @param hostAuthorizer
215             *            the host authorizer to use.
216             */
217            public void setAcceptHostAuthorizer(HostAuthorizer hostAuthorizer) {
218                    if (hostAuthorizer == null) throw new IllegalArgumentException("hostAuthorizer must not be null");
219                    this.checkIsShutdown();
220                    this.hostAuthorizer = hostAuthorizer;
221            }
222            
223            /**
224             * Sets the buffer pool to be used on reading from the network. Use this
225             * method for maximum performance; For casual usage you can safely and
226             * entirely ignore ALL buffer pool related concerns.
227             * 
228             * @param readBufferPool
229             *            the pool to use.
230             */
231            public void setReadBufferPool(ByteBufferPool readBufferPool) {
232                    if (readBufferPool == null) throw new IllegalArgumentException("readBufferPool must not be null");
233                    this.checkIsShutdown();
234                    this.readBufferPool = readBufferPool;
235            }
236            
237            /**
238             * Returns the buffer pool to be used on reading from the network.
239             */
240            public ByteBufferPool getReadBufferPool() {
241                    return this.readBufferPool;
242            }
243            
244            /**
245             * Set the socket options to be used for newly accepted connections as well
246             * as client connections (on OP_ACCEPT and OP_CONNECT).
247             * 
248             * @param options
249             *            the options to be set for the socket.
250             */
251            public void setSocketOptions(SocketOpts options) {
252                    if (options == null) throw new IllegalArgumentException("options must not be null");
253                    this.checkIsShutdown();
254                    this.socketOptions = options;
255            }       
256            
257            /**
258             * Hand a request event from the {@link gov.lbl.dsd.sea.nio.event} package
259             * to the agent; the request will be processed the next time the agent thread
260             * comes around the select loop.
261             * 
262             * @param event
263             *            the event to hand to the agent.
264             */
265            public void enqueue(Object event) {
266                    if (event instanceof AdminRequest.Start) {
267                            if (this.shutdown) {
268                                    new Thread(
269                                            new Runnable() {
270                                                    public void run() {
271                                                            start();
272                                                    }
273                                            }
274                                    ).start();
275                            }
276                            else {
277                                    ; // ignore (we are already running)
278                            }
279                    }
280                    else {
281                            try {
282                                    this.pendingEvents.put(event);
283                            } catch (InterruptedException e) {
284                                    log.warn("interrupted", e);
285                            }
286                            Selector sel = this.selector; // avoid potential NPE race by caching in local var
287                            if (sel != null) sel.wakeup();
288                            //if (log.isTraceEnabled()) log.trace("handed event off to selector thread="+event);
289                    }
290            }
291            
292            /**
293             * The main selector loop; Runs the agent selector in the current thread.
294             */
295            public void start() {
296                    try {
297                            this.shutdown = false;
298                            this.selector = Selector.open();
299                            this.writeQueues = new HashMap();
300                            this.observerStages = new HashMap();
301                            this.nopLoops = 0;
302    
303                            // main selector loop
304                            while (! this.shutdown) {
305                                    synchronized (outerLock) {} // sync up with toDebugString()
306                                    synchronized (innerLock) {
307                                            this.doEvents(); // do any selector updates within our thread
308                                            this.doSelect(this.selector); // this callback is the main workhorse
309                                    }
310                            }
311                    } catch (IOException e) {
312                            log.fatal(e);
313                    }
314                    finally {
315                            synchronized (innerLock) {                      
316                                    try {
317                                            this.doCloseAll();
318                                    } catch (IOException e1) {
319                                            log.fatal(e1);
320                                    }
321                            }
322                    }
323            }
324    
325            /**
326             * Cleanly shut the agent down, releasing acquired resources.
327             */
328            public void stop() {
329                    if (this.shutdown) return; // nothing to do anymore
330                    this.shutdown = true;
331                    this.selector.wakeup(); // break out of blocking select() within doSelect() of start()
332            }
333            
334            /** 
335             * Returns a summary string representation of the receiver.
336             */
337            public String toDebugString() {
338                    String s = this.getClass().getName() + ": ";
339    
340                    Selector sel = this.selector;
341                    if (sel != null) {
342                            synchronized (outerLock) { // sync up with start() and select()
343                                    sel.wakeup(); // ensure we can acquire the lock below to safely access selector state
344                                    synchronized (innerLock) {
345                                            s += "writeQueues=" + this.writeQueues;
346                                            s += ", observerStages=" + this.observerStages;
347                                            s += "\nselector=" + NioUtil.toString(sel);
348                                    }
349                            }
350                    }
351    
352                    s += "\n\nreadBufferPool=" + this.getReadBufferPool().toString();
353                    return s;
354            }
355    
356            /**
357             * Handle events in the selector thread (and ONLY in the selector thread)
358             * Otherwise the selector methods may block and deadlock us! Takes user
359             * level events from the given queue (without ever blocking), and processes
360             * them.
361             */
362            protected void doEvents() {
363                    try {
364                            Object event;
365                            this.pendingEvents.put(this.innerLock); // terminator flag prevents potentially infinite race loops
366                            while ((event = this.pendingEvents.poll(0)) != null && event != this.innerLock) {
367                                    this.doEvent(event);
368                                    this.nopLoops = 0;
369                            }               
370    //                      while ((event = this.pendingEvents.poll(0)) != null) {
371    //                              this.doEvent(event);
372    //                              this.nopLoops = 0;
373    //                      }
374                    } catch (InterruptedException e) {
375                            log.warn("interrupted", e);
376                    }
377            }
378    
379            /**
380             * Handle given event in the selector thread (and ONLY in the selector thread)
381             * 
382             * @param event
383             *            the event to handle.
384             */
385            protected void doEvent(Object event) {
386                    if (log.isTraceEnabled()) log.trace("doEvent=" + event);
387    
388                    // shut the agent down
389                    if (event instanceof AdminRequest.Stop) {
390                            this.stop();
391                    }
392                    
393                    // close a channel
394                    else if (event instanceof ChannelRequest.Close) {
395                            ChannelRequest.Close req = (ChannelRequest.Close) event;
396                            //this.observerStages.put(req.getChannel(), req.getSource());
397                            this.onClose(req.getChannel().keyFor(this.selector));
398                    }
399                    
400                    // schedule writing data to a channel
401                    else if (event instanceof ChannelRequest.WriteData) {
402                            ChannelRequest.WriteData req = (ChannelRequest.WriteData) event;
403                            this.onWriteRequest(req.getChannel().keyFor(this.selector), req.getBuffer());
404                    }
405                    
406                    // register event interest in a channel
407                    else if (event instanceof ChannelRequest.Register) {
408                            ChannelRequest.Register req = (ChannelRequest.Register) event;
409                            this.observerStages.put(req.getChannel(), req.getSource());
410                            Object attachment = req.hasAttachment() ? req.getAttachment() : null;
411                            this.onRegisterSelectorInterest(req.getChannel(), req.getInterestOps(), attachment, req.hasAttachment());
412                    }
413                    
414                    // OOPSLA
415                    else throw new IllegalEventException("Illegal event enqueued", event, null);
416            }
417    
418            /**
419             * Wait for I/O signals from OS, then dispatch them via selection key.
420             * Override for custom behaviour.
421             * 
422             * @param selector
423             *            the IO multiplexer interface to the OS
424             */
425            protected void doSelect(Selector selector) throws IOException   {       
426                    // block until OS ready event or wakeup() or timeout or thread interruption occurs              
427                    int     numUpdatedReadyKeys = selector.select(this.selectTimeout);
428                    
429                    boolean isFineTraceEnabled = false;
430                    if (isFineTraceEnabled) {
431                            log.error("num="+numUpdatedReadyKeys + ", readySet.size="+selector.selectedKeys().size() + ", keyset.size="+selector.keys().size()); 
432                            //log.trace("selector=" + NioUtil.toString(selector));
433                            //log.trace("channels=" + this.getRegisteredChannels());
434                            //log.trace("selKeys="+selector.selectedKeys());
435                            //log.trace("selector="+selector);
436                    }
437    
438                    if (numUpdatedReadyKeys == 0) { // nothing to do
439                            this.nopLoops++;
440                            if (this.nopLoops > 100) { // FIXME TODO
441                                    log.fatal("no operation loop detected; unnecessarily eating CPU");
442                                    log.error("num="+numUpdatedReadyKeys + ", readySet.size="+selector.selectedKeys().size() + ", keyset.size="+selector.keys().size()); 
443                                    this.nopLoops = 0;
444                            }
445                            //return;
446                    }
447                    Iterator iter = selector.selectedKeys().iterator();
448                    while (iter.hasNext()) {
449                            SelectionKey key = (SelectionKey) iter.next();                  
450                            if (key.isValid()) {
451                                    onKeyReady(key);
452                            }
453                            iter.remove(); // remove key from readySet !!
454                            if (!key.isValid()) {
455                                    this.onKeyInvalidation(key);
456                            }
457                    }
458            }
459    
460            protected void onKeyInvalidation(SelectionKey key) {
461                    if (log.isTraceEnabled()) {
462                            log.trace("******** Garbage collecting write buffers for invalid key=" + NioUtil.toString(key));
463                    }
464                    this.writeQueues.remove(key);
465                    this.observerStages.remove(key.channel());
466            }
467            
468            /**
469             * Handle a selection key that has become ready.
470             * 
471             * @param key
472             *            the selection key that has become ready.
473             * @return true if the key should be removed from the selector's ready-set,
474             *         false otherwise.
475             */
476            protected void onKeyReady(SelectionKey key) {
477                    // potentially handle multiple ready ops together
478                    if (log.isTraceEnabled()) log.trace("onKeyReady: key=" + NioUtil.toString(key));
479    
480                    if (key.isValid() && key.isWritable()) {
481                            this.onWriteReady(key);
482                    }
483                    if (key.isValid() && key.isReadable()) {
484                            this.onReadReady(key);
485                    }
486                    if (key.isValid() && key.isAcceptable()) { 
487                            this.onAcceptReady(key);
488                    }
489                    if (key.isValid() && key.isConnectable()) {
490                            this.onConnectReady(key);
491                    }
492            }
493                    
494            /**
495             * Handle an accept-ready signal selected from OS, for examply by accepting
496             * the new connection.
497             * 
498             * @param key
499             *            the selection key that has become ready.
500             */
501            protected void onAcceptReady(SelectionKey key) {
502                    try {
503                            if (log.isDebugEnabled()) 
504                                    log.debug("Accepting a new channel on server channel=" + key.channel());
505                            
506                            ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
507                            SocketChannel clientChannel = serverChannel.accept();
508                            if (clientChannel == null) {
509                                    log.error("null channel accepted (should never happen)");
510                            }
511                            else {
512                                    synchronized (this.hostAuthorizer) {
513                                            if (! this.hostAuthorizer.isAllowed(clientChannel.socket().getInetAddress()))   {
514                                                    if (log.isWarnEnabled()) log.warn("Accept authorization denied to " + clientChannel.socket().getInetAddress());
515                                                    clientChannel.close();
516                                                    return;
517                                            }
518                                    }
519    
520                                    this.socketOptions.copyTo(clientChannel.socket());
521                                    clientChannel.configureBlocking(false);
522                                    SelectionKey newKey = clientChannel.register(this.selector, 0);
523                                    
524                                    if (this.observerStages.get(serverChannel) != null) {
525                                            // Initially the client socket inherits its observer stage from the server socket.
526                                            // It will stay that way until a ChannelRequest.Register
527                                            // for the client socket is later received.
528                                            this.observerStages.put(clientChannel, this.observerStages.get(serverChannel));
529                                    }
530                                    newKey.attach(key.attachment()); // initially inherited from server channel
531                                    
532                                    this.onAcceptDone(newKey, null);
533                            }
534                    } catch (IOException e) {
535                            this.onAcceptDone(key, e);
536                    }
537            }
538            
539            /**
540             * Called when an accept-ready signal has been successfully processed.
541             * Override this method for custom behaviour (e.g forwarding to another
542             * stage).
543             * 
544             * @param key
545             *            the selection key that has become ready.
546             */
547            protected void onAcceptDone(SelectionKey key, IOException exception) {
548                    this.onException(exception);
549                    if (log.isTraceEnabled()) {
550                            if (key == null) 
551                                    log.warn("Oopsla, new null channel accepted!");
552                            else
553                                    log.trace("Accepted new channel=" + key.channel());                             
554                    }
555                    ChannelResponse.Accepted event = new ChannelResponse.Accepted(this, key, exception);
556                    this.notifyObservers(key, event);
557            }
558    
559            /**
560             * Handle a connect-ready signal selected from OS, for example by completing
561             * to connect.
562             * 
563             * @param key
564             *            the selection key that has become ready.
565             */
566            protected void onConnectReady(SelectionKey key) {
567                    if (log.isTraceEnabled()) log.trace("now connecting channel with key=" + NioUtil.toString(key));
568                    SocketChannel channel = (SocketChannel) key.channel();
569                    try {
570                            long start = System.currentTimeMillis();
571                            if (channel.finishConnect()) {
572                                    long end = System.currentTimeMillis();
573                                    log.debug("finishConnect took [ms] = " + (end-start));
574                                    this.socketOptions.copyTo(channel.socket());
575                                    if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_CONNECT);
576                                    this.onConnectDone(key, null);
577                            }
578                            else { // connection process is not yet complete
579                                    ;  // wait for completion
580                            }
581                    } catch (IOException e) {
582                            if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_CONNECT);
583                            this.onConnectDone(key, e);
584                            this.onClose(key, e);
585                    }
586            }
587                    
588            /**
589             * Called when a connect-ready signal has been successfully processed.
590             * Override this method for custom behaviour (e.g forwarding to another
591             * stage).
592             * 
593             * @param key
594             *            the selection key that has become ready.
595             */
596            protected void onConnectDone(SelectionKey key, IOException exception) {
597                    this.onException(exception);
598                    if (exception == null && log.isTraceEnabled()) {
599                            log.trace("Connected to channel=" + key.channel());
600                    }
601                    ChannelResponse.Connected event = new ChannelResponse.Connected(this, key, exception);
602                    this.notifyObservers(key, event);
603            }
604            
605            /**
606             * Closes the given channel.
607             * @param channel the channel to close.
608             */
609            protected void onClose(SelectionKey key) {
610                    this.onClose(key, null);
611            }
612            
613            protected void onClose(SelectionKey key, IOException reason) {
614                    if (log.isTraceEnabled())
615                            log.trace("onClose with key=" + (key==null ? "null" : NioUtil.toString(key)) + ", reason=" + reason);
616                    
617                    if (key == null) return; // ignore
618                    try {
619                            if (key.isValid()) key.interestOps(0);
620                            key.cancel();
621                            Channel channel = key.channel();
622                            if (channel.isOpen()) {
623                                    if (log.isTraceEnabled()) log.trace("Closing channel=" + channel);
624                                    channel.close();
625                                    if (channel instanceof SocketChannel) {
626                                            ((SocketChannel) channel).socket().close();
627                                            //((SocketChannel) channel).socket().shutdownOutput();
628                                            //((SocketChannel) channel).socket().shutdownInput();
629                                    }
630                                    if (channel instanceof ServerSocketChannel) {
631                                            // Even with this conservative code, on MacOSX, the server socket is
632                                            // sometimes not closed properly (probably yet another vm bug).
633                                            // On Linux it seems to work fine.
634                                            ((ServerSocketChannel) channel).socket().close();
635                                    }
636                                    if (channel instanceof DatagramChannel) {
637                                            ((DatagramChannel) channel).socket().close();
638                                            ((DatagramChannel) channel).socket().disconnect();
639                                    }
640                            }
641                            this.onCloseDone(key, reason);
642                    } catch (IOException e) {
643                            if (log.isErrorEnabled()) log.error("closing itself threw exception", e);
644                            this.onCloseDone(key, e);
645                    }
646            }
647            
648            /**
649             * Called when a close signal has been successfully processed.
650             * Override this method for custom behaviour (e.g forwarding to another stage).
651             * 
652             * @param key the selection key that has become ready.
653             */
654            protected void onCloseDone(SelectionKey key, IOException exception) {
655                    this.onException(exception);
656                    if (log.isTraceEnabled()) log.trace("Closed channel=" + key.channel());
657                    ChannelResponse.Closed event = new ChannelResponse.Closed(this, key, exception);
658                    this.notifyObservers(key, event);
659            }
660            
661            /**
662             * Handle a read-ready signal selected from OS, for example by reading bytes
663             * from the key's channel.
664             * 
665             * @param key
666             *            the selection key that has become ready.
667             */
668            protected void onReadReady(SelectionKey key) {
669                    ReadableByteChannel channel = (ReadableByteChannel) key.channel();
670                    //ByteBuffer readBuffer = ByteBuffer.allocate(this.readBufferPool.minBufferCapacity);
671                    ByteBuffer readBuffer = this.readBufferPool.take();
672                    
673                    int n;
674                    try {
675                            n = NioUtil.readMany(channel, readBuffer);
676                    } catch (IOException e) {
677                            if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_READ);
678                            readBuffer.flip(); // prepare for user reads
679                            this.onReadDone(key, readBuffer, e);
680                            this.onClose(key, e);
681                            return;
682                    }
683                    boolean eos = false;
684                    if (n < 0) {
685                            eos = true;
686                            n = -(n + 1);
687                    }
688                    
689                    if (n > 0) {
690                            readBuffer.flip(); // prepare for user reads
691                            this.onReadDone(key, readBuffer, null);
692                    }
693                    else { // assert: n == 0, buffer does not get handed to user, so reuse immediately 
694                            this.readBufferPool.put(readBuffer);
695                    }
696                    
697                    if (eos) {  
698                            log.debug("Reached end-of-stream; remote host seems to have closed or lost connection");
699                            if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_READ);
700                            IOException e = new IOException("Agent reached EOS");
701                            this.onReadDone(key, ByteBuffer.allocate(0), e);
702                            this.onClose(key, e);                   
703                    }
704            }
705            
706            /**
707             * Called when a read-ready signal has been successfully processed.
708             * 
709             * @param key the selection key that has become ready.
710             */
711            protected void onReadDone(SelectionKey key, ByteBuffer buffer, IOException exception) {
712                    this.onException(exception);
713                    if (exception == null && log.isTraceEnabled()) {
714                            log.trace("Read " + buffer.position() + " bytes from channel=" + key.channel() + ", into buffer=" + buffer);
715                    }
716                    ChannelResponse.Read event = new ChannelResponse.Read(this, key, exception, buffer);
717                    this.notifyObservers(key, event);
718            }
719            
720            /**
721             * Schedules the given buffer to be written to the key's channel once it
722             * becomes write-ready. Call this method repeatadly to schedule multiple
723             * buffers for later writing. As usual with NIO buffers, the buffer contents
724             * between index 0 and buffer.limit() are written in relative mode. After
725             * invocation of this method, you MUST NOT modify the buffer in user space
726             * until onWriteDone with buffer.hasRemaining()==false is called back. Once
727             * that is called back you MAY again modify and/or reuse the buffer (e.g.
728             * with a buffer pool).
729             * 
730             * @param key
731             *            the selection key to write to.
732             * @param buffer
733             *            the buffer to read from.
734             */
735            protected void onWriteRequest(SelectionKey key, ByteBuffer buffer) {
736                    if (key == null || !key.isValid()) return; // ignore
737                    if (log.isTraceEnabled()) {
738                            log.trace("adding write request to key" + NioUtil.toString(key));
739                    }
740                    List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer>
741                    if (buffersToWrite == null) {
742                            buffersToWrite = new LinkedList(); // linked list for efficiency
743                            this.writeQueues.put(key, buffersToWrite); 
744                    }
745                    buffersToWrite.add(buffer);
746                    
747                    if (key.isValid()) NioUtil.addInterestBits(key, SelectionKey.OP_WRITE);
748                    //if (buffersToWrite.size() == 1) {
749                    this.onWriteReady(key); // optimization: try to write immediately without waiting for write ready to bubble up from selector
750                    //}
751            }
752            
753            /**
754             * Handle a write-ready signal selected from OS, for examply by writing
755             * bytes to the key's channel.
756             * 
757             * @param key
758             *            the selection key that has become ready.
759             */
760            protected void onWriteReady(SelectionKey key) {
761                    List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer>
762                    if (buffersToWrite == null) { 
763                            log.warn("Nothing to write - really should not happen");
764                            return;
765                    }
766                    WritableByteChannel channel = (WritableByteChannel) key.channel();
767                    // try to write as many buffers as possible
768                    while (buffersToWrite.size() > 0) {
769                            ByteBuffer buffer = (ByteBuffer) buffersToWrite.get(0);
770                            if (! buffer.hasRemaining()) { // notify empty buffer write (correctly handle pathological case)!
771                                    buffersToWrite.remove(0);
772                                    this.onWriteDone(key, buffer, 0, null);
773                                    continue;
774                            }
775                            int n;
776                            try {
777                                    n = NioUtil.writeMany(channel, buffer);
778                            } catch (IOException e) {
779                                    if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_WRITE);
780                                    this.onWriteDone(key, buffer, 0, e);                                    
781                                    this.onClose(key, e);
782                                    return;
783                            }
784                            if (n == 0) {
785                                    // apparently not much can be written right now.
786                                    // wait for next write-ready signal and then resume writing
787                                    break;
788                            }
789                            else {
790                                    if (buffer.hasRemaining()) { // just a little could be written
791                                            break;
792                                    }
793                                    else {
794                                            buffersToWrite.remove(0); // remove fully written buffer
795                                            this.onWriteDone(key, buffer, n, null);
796                                    }
797                            }
798                    }
799                    
800                    // nothing more to write? if so deregister WRITE interest
801                    if (buffersToWrite.size() == 0 && key.isValid()) {
802                            NioUtil.removeInterestBits(key, SelectionKey.OP_WRITE);
803                    }
804            }
805    
806            /**
807             * Called when N bytes have been successfully written from the given buffer
808             * to the key's channel. There MAY still be remaining bytes in the buffer
809             * waiting to be written in the future. In such a "partial write" case
810             * <code>buffer.hasRemaining()</code> will return true. 
811             * 
812             * @param key
813             *            the selection key that has become ready.
814             * @param buffer
815             *            the buffer read from
816             * @param n
817             *            the number of bytes written
818             */
819            protected void onWriteDone(SelectionKey key, ByteBuffer buffer, int n, IOException exception) {
820                    this.onException(exception);
821                    if (exception == null && log.isTraceEnabled()) {
822                            log.trace("Fully wrote " + n + " bytes to channel=" + key.channel() + ", from buffer=" + buffer);
823                    }
824                    ChannelResponse.Write event = new ChannelResponse.Write(this, key, exception, buffer);
825                    this.notifyObservers(key, event);
826            }
827    
828            /**
829             * Handle channel interest registration request.
830             */
831            protected void onRegisterSelectorInterest(SelectableChannel channel, int ops, Object attachment, boolean hasAttachment) {
832                    ops = ops & ~SelectionKey.OP_WRITE; // enqueuing a write toggles that automatically
833                    SelectionKey key = channel.keyFor(this.selector);
834                    try {
835                            if (key != null) {
836                                    if (hasAttachment) key.attach(attachment);
837                                    if (!key.isValid()) throw new ClosedChannelException();
838                                    List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer>
839                                    if (buffersToWrite != null && buffersToWrite.size() > 0) {
840                                            ops = ops | SelectionKey.OP_WRITE; // we still need to write stuff to the network
841                                    }
842                                    key.interestOps(ops);
843                            }
844                            else {
845                                    key = channel.register(this.selector, ops);
846                                    if (hasAttachment) key.attach(attachment);
847                            }
848                            
849                            this.onRegisterSelectorInterestDone(key, ops, null);
850                    } catch (ClosedChannelException e) {
851                            this.onRegisterSelectorInterestDone(key, ops, e);
852                            if (key != null) {
853                                    this.onClose(key, e);
854                            }
855                    }
856            }
857            
858            
859            /**
860             * Done registering interest 
861             */
862            protected void onRegisterSelectorInterestDone(SelectionKey key, int ops, ClosedChannelException exception) {
863                    this.onException(exception);
864                    if (exception == null && log.isTraceEnabled()) {
865                            log.trace("Registered interest = " + NioUtil.toString(ops) + ", key=" + (key == null ? "null" : NioUtil.toString(key)));
866                    }
867                    ChannelResponse.Registered event = new ChannelResponse.Registered(this, key, exception, ops);
868                    this.notifyObservers(key, event);
869            }
870    
871            /**
872             * Take care of exception
873             * @param exception
874             */
875            protected void onException(IOException exception) {
876                    if (exception != null && log.isTraceEnabled()) 
877                            log.trace("Gracefully forwarding exception = ", exception);
878            }
879            
880            /**
881             * Cleanly shut the agent down, releasing acquired resources.
882             * @throws IOException
883             */
884            protected void doCloseAll() throws IOException {
885                    if (log.isTraceEnabled()) log.trace("doCloseAll");
886                    List channels = this.getRegisteredChannels();                   
887                    for (int i = 0; i < channels.size(); i++) {
888                            this.onClose(((SelectableChannel) channels.get(i)).keyFor(this.selector));
889                    }
890                    
891                    if (log.isTraceEnabled()) log.trace("selector before selector.close()=" + NioUtil.toString(this.selector));
892                    try {
893                            this.selector.close(); 
894                    } catch (IOException e) {
895                            // vm bug on MacOSX & FreeBSD produces BadFileDescriptor exception, see
896                            // http://freepastry.rice.edu/FreePastry/README-1.3.2.html
897                            // http://list.droso.net/15/15756
898                            if (System.getProperty("os.name").startsWith("Mac") || 
899                                            System.getProperty("os.name").startsWith("Free") 
900                                            && "Bad file descriptor".equals(e.getMessage())) {
901                                    ; // ignore
902                            }
903                            else throw e; // rethrow
904                    }
905                    if (log.isTraceEnabled()) log.trace("selector after selector.close()=" + NioUtil.toString(this.selector));
906    
907                    this.selector = null; // help garbage collector
908                    this.pendingEvents = new LinkedQueue();
909                    this.readBufferPool.clear();
910                    this.writeQueues = null;
911                    this.observerStages = null;
912            }
913            
914            /**
915             * Enqueues the given event onto the observer stage associated with the given channel.
916             * 
917             * @param key
918             * @param event
919             */
920            protected void notifyObservers(SelectionKey key, Object event) {
921                    Channel channel = key.channel();
922                    Object observer = this.observerStages.get(channel);
923                    if (observer instanceof Stage) {
924                            if (log.isTraceEnabled()) log.trace("Agent enqueuing to observer: event="+event+", observerStage=" + observer);
925                            ((Stage) observer).enqueue(event);
926                    }
927                    //else {
928                    //      if (log.isTraceEnabled()) log.trace("No observer defined for channel="+channel);
929                    //}
930            }
931            
932            /**
933             * Returns all selectable channels registered with this agent,
934             * excluding channels with invalid keys.
935             * 
936             * @return the channels
937             */
938            protected List getRegisteredChannels() {
939                    Selector sel = this.selector;
940                    if (sel == null) 
941                            return new ArrayList(0);
942                    else
943                            return NioUtil.getRegisteredChannels(sel);
944            }
945    
946            /**
947             * Checks if the agent is running in the selector loop, and throws an
948             * exception if it is runnning.
949             */
950            protected void checkIsShutdown() {
951                    if (this.shutdown == false) throw new IllegalStateException("must not be invoked on running agent.");
952            }
953    
954    }