001 /* 002 * Copyright (c) 2003, The Regents of the University of California, through 003 * Lawrence Berkeley National Laboratory (subject to receipt of any required 004 * approvals from the U.S. Dept. of Energy). All rights reserved. 005 */ 006 package gov.lbl.dsd.sea.nio; 007 008 import gov.lbl.dsd.sea.Stage; 009 import gov.lbl.dsd.sea.event.IllegalEventException; 010 import gov.lbl.dsd.sea.nio.auth.HostAuthorizer; 011 import gov.lbl.dsd.sea.nio.auth.SmartHostAuthorizationRules; 012 import gov.lbl.dsd.sea.nio.auth.SmartHostAuthorizer; 013 import gov.lbl.dsd.sea.nio.event.AdminRequest; 014 import gov.lbl.dsd.sea.nio.event.ChannelRequest; 015 import gov.lbl.dsd.sea.nio.event.ChannelResponse; 016 import gov.lbl.dsd.sea.nio.util.ByteBufferPool; 017 import gov.lbl.dsd.sea.nio.util.NioUtil; 018 import gov.lbl.dsd.sea.nio.util.SocketOpts; 019 020 import java.io.IOException; 021 import java.net.InetSocketAddress; 022 import java.nio.ByteBuffer; 023 import java.nio.channels.Channel; 024 import java.nio.channels.ClosedChannelException; 025 import java.nio.channels.DatagramChannel; 026 import java.nio.channels.ReadableByteChannel; 027 import java.nio.channels.SelectableChannel; 028 import java.nio.channels.SelectionKey; 029 import java.nio.channels.Selector; 030 import java.nio.channels.ServerSocketChannel; 031 import java.nio.channels.SocketChannel; 032 import java.nio.channels.WritableByteChannel; 033 import java.util.ArrayList; 034 import java.util.HashMap; 035 import java.util.Iterator; 036 import java.util.LinkedList; 037 import java.util.List; 038 import java.util.Map; 039 040 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 041 042 /** 043 * Efficient and scalable NIO based non-blocking single-threaded network agent 044 * that can be used both as a client and a server, both for TCP and UDP 045 * transports; Can handle large amounts of concurrent client and server 046 * connections in a single thread; Automatically deals with the subtle grunt 047 * work of life-cycle issues, threading, I/O multiplexing and NIO gotchas; This 048 * is the main work horse of this package. 049 * <p> 050 * General usage pattern: 051 * 052 * <ol> 053 * <li>Construct a new agent</li> 054 * <li>Configure it via setters and getters. Convenience methods simplify 055 * starting outgoing client connections and listening on ports for incoming 056 * connections.</li> 057 * <li>Start the agent via <code>start()</code> or by enqueuing a 058 * {@link gov.lbl.dsd.sea.nio.event.AdminRequest.Start}</li> 059 * <li>Have your application enqueue some request events from the 060 * {@link gov.lbl.dsd.sea.nio.event}package via <code>enqueue(request)</code> 061 * onto the agent. The agent will process the requests and enqueue responses to 062 * the requests onto the (synchronous or asynchronous) observer stage of the 063 * given network channel (connection), for example an {@link AgentEventHandler} 064 * 's stage.</li> 065 * <li>Finally, stop the agent via <code>stop()</code> or by enqueuing a 066 * {@link gov.lbl.dsd.sea.nio.event.AdminRequest.Stop}</li> 067 * </ol> 068 * <p> 069 * See the demo client and servers in the {@link gov.lbl.dsd.sea.nio.demo} 070 * package for simple and complex example usage. 071 * <p> 072 * Whenever an IOException or EOS (end-of-stream) is encountered as a result of 073 * a channel request, the following happens, in that order: 074 * <ol> 075 * <li>A ChannelResponse with the IOException is posted to the observer stage 076 * (e.g. a {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}or 077 * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Write}or 078 * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Read}or 079 * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Connected}or 080 * {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Registered}, depending on 081 * the type of request in use).</li> 082 * <li>The agent automatically closes the channel</li> 083 * <li>A {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}response is 084 * posted to the observer stage. The response contains the very same exception (unless 085 * closing throws yet another exception).</li> 086 * </ol> 087 * 088 * Hence, most applications can ignore responses containing IOExceptions, 089 * unless it is a {@link gov.lbl.dsd.sea.nio.event.ChannelResponse.Closed}. 090 * <p> 091 * For a nice introduction to java networking, see <a 092 * href="http://lgl.epfl.ch/teaching/software_project/documentation/tutorials/networking.pdf">here 093 * </a> and <a target="_blank" 094 * href="http://www.theserverside.com/blogs/showblog.tss?id=DispellingNIOMyths">there 095 * </a> and <a target="_blank" 096 * href="http://www.ii.uib.no/~khalid/atij/atij-nio-web/atij-nio-2x1.pdf">also 097 * there </a>. 098 * 099 * @author whoschek@lbl.gov 100 * @author $Author: hoschek3 $ 101 * @version $Revision: 1.14 $, $Date: 2004/08/17 18:26:54 $ 102 */ 103 public final class NetAgent { // class made final until internal API is clean 104 105 private volatile boolean shutdown; // flag to indicate agent shutdown initiation 106 private Selector selector; // interface to I/O signals from OS 107 private EDU.oswego.cs.dl.util.concurrent.Channel pendingEvents; // events to be handed from user thread to agent thread 108 private long selectTimeout; // the maximum time to block in Selector.select() 109 private ByteBufferPool readBufferPool; // buffers to read into from network 110 private Map writeQueues; // Map<SelectionKey, List<ByteBuffer>> 111 // since a SelectionKey corresponds to a registered channel, the map 112 // maintains a separate write buffer queue for each channel 113 // We could simply use key.attach to register a write queue with a key, but 114 // we explicitly do not take that approach in order to not pollute 115 // key.attachment(). That way user space apps can use key attachments for their custom 116 // application specific associations. 117 118 private Map observerStages; // Map<SelectableChannel, Stage> 119 private SocketOpts socketOptions; // options to set for new connections 120 private HostAuthorizer hostAuthorizer; 121 122 private final Object outerLock = new Object(); 123 private final Object innerLock = new Object(); 124 private int nopLoops; 125 126 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NetAgent.class); 127 128 /** 129 * Constructs a new agent with default configuration options; 130 * the agent is not yet started. 131 */ 132 public NetAgent() { 133 this.shutdown = true; // initially it is shutdown; run() toggles that 134 this.selector = null; 135 this.pendingEvents = new LinkedQueue(); 136 this.selectTimeout = 0; // wait indefinitely 137 //this.selectTimeout = 1000; // wait for at most 1 sec 138 this.setSocketOptions(new SocketOpts()); 139 this.setReadBufferPool(new ByteBufferPool(8 * 128 * 1024, 128 * 1024, true, null)); 140 141 // default: allow unlimited accepts from all hosts 142 SmartHostAuthorizer auth = new SmartHostAuthorizer( 143 true, 144 new SmartHostAuthorizationRules().addHost(SmartHostAuthorizationRules.ALL), 145 new SmartHostAuthorizationRules() 146 ); 147 this.setAcceptHostAuthorizer(auth); 148 } 149 150 /** 151 * Tells the agent to start listening as a server for TCP connections on the 152 * given port. To register multiple listen ports, call this method multiple 153 * times. 154 * 155 * @param observer 156 * the stage onto which responses to the new server channel 157 * should be enqueued (may be null) 158 * @param port 159 * the port to listen on 160 * @return the agent itself (for method chaining convenience) 161 * @throws IOException 162 */ 163 public NetAgent addListenPort(Stage observer, int port) throws IOException { 164 return this.addChannel(observer, NioUtil 165 .createServerChannel(port), SelectionKey.OP_ACCEPT); 166 } 167 168 /** 169 * Tells the agent to start a TCP client connection to the given remote address. 170 * To register multiple connections, call this method multiple times. 171 * 172 * @param observer 173 * the stage onto which responses to the new client channel 174 * should be enqueued (may be null) 175 * @param address the address to connect to 176 * @return the agent itself (for method chaining convenience) 177 * @throws IOException 178 */ 179 public NetAgent addConnectAddress(Stage observer, InetSocketAddress address) throws IOException { 180 SocketChannel channel = NioUtil.createClientChannel(address); 181 int interestOps = SelectionKey.OP_CONNECT; 182 if (! channel.isConnectionPending()) { 183 log.debug("socket channel already connected"); 184 interestOps = 0; 185 } 186 else { 187 log.debug("socket channel not yet connected"); 188 } 189 return this.addChannel(observer, channel, interestOps); 190 } 191 192 /** 193 * Add the given channel with the given interest set and attachment to the agent; 194 * for example so that it starts listening on a given port. 195 * To register multiple channels, call this method multiple times. 196 * 197 * @param observer 198 * the stage onto which responses to the channel 199 * should be enqueued (may be null) 200 * @param channel the channel to register 201 * @param interestOps the NIO interest set 202 * @return the agent itself (for method chaining convenience) 203 * @throws IOException 204 */ 205 protected NetAgent addChannel(Stage observer, SelectableChannel channel, int interestOps) { 206 this.enqueue(new ChannelRequest.Register(observer, channel, interestOps)); 207 return this; 208 } 209 210 /** 211 * Sets the host authorizer the server uses to allow/deny accepting 212 * connections from the network. 213 * 214 * @param hostAuthorizer 215 * the host authorizer to use. 216 */ 217 public void setAcceptHostAuthorizer(HostAuthorizer hostAuthorizer) { 218 if (hostAuthorizer == null) throw new IllegalArgumentException("hostAuthorizer must not be null"); 219 this.checkIsShutdown(); 220 this.hostAuthorizer = hostAuthorizer; 221 } 222 223 /** 224 * Sets the buffer pool to be used on reading from the network. Use this 225 * method for maximum performance; For casual usage you can safely and 226 * entirely ignore ALL buffer pool related concerns. 227 * 228 * @param readBufferPool 229 * the pool to use. 230 */ 231 public void setReadBufferPool(ByteBufferPool readBufferPool) { 232 if (readBufferPool == null) throw new IllegalArgumentException("readBufferPool must not be null"); 233 this.checkIsShutdown(); 234 this.readBufferPool = readBufferPool; 235 } 236 237 /** 238 * Returns the buffer pool to be used on reading from the network. 239 */ 240 public ByteBufferPool getReadBufferPool() { 241 return this.readBufferPool; 242 } 243 244 /** 245 * Set the socket options to be used for newly accepted connections as well 246 * as client connections (on OP_ACCEPT and OP_CONNECT). 247 * 248 * @param options 249 * the options to be set for the socket. 250 */ 251 public void setSocketOptions(SocketOpts options) { 252 if (options == null) throw new IllegalArgumentException("options must not be null"); 253 this.checkIsShutdown(); 254 this.socketOptions = options; 255 } 256 257 /** 258 * Hand a request event from the {@link gov.lbl.dsd.sea.nio.event} package 259 * to the agent; the request will be processed the next time the agent thread 260 * comes around the select loop. 261 * 262 * @param event 263 * the event to hand to the agent. 264 */ 265 public void enqueue(Object event) { 266 if (event instanceof AdminRequest.Start) { 267 if (this.shutdown) { 268 new Thread( 269 new Runnable() { 270 public void run() { 271 start(); 272 } 273 } 274 ).start(); 275 } 276 else { 277 ; // ignore (we are already running) 278 } 279 } 280 else { 281 try { 282 this.pendingEvents.put(event); 283 } catch (InterruptedException e) { 284 log.warn("interrupted", e); 285 } 286 Selector sel = this.selector; // avoid potential NPE race by caching in local var 287 if (sel != null) sel.wakeup(); 288 //if (log.isTraceEnabled()) log.trace("handed event off to selector thread="+event); 289 } 290 } 291 292 /** 293 * The main selector loop; Runs the agent selector in the current thread. 294 */ 295 public void start() { 296 try { 297 this.shutdown = false; 298 this.selector = Selector.open(); 299 this.writeQueues = new HashMap(); 300 this.observerStages = new HashMap(); 301 this.nopLoops = 0; 302 303 // main selector loop 304 while (! this.shutdown) { 305 synchronized (outerLock) {} // sync up with toDebugString() 306 synchronized (innerLock) { 307 this.doEvents(); // do any selector updates within our thread 308 this.doSelect(this.selector); // this callback is the main workhorse 309 } 310 } 311 } catch (IOException e) { 312 log.fatal(e); 313 } 314 finally { 315 synchronized (innerLock) { 316 try { 317 this.doCloseAll(); 318 } catch (IOException e1) { 319 log.fatal(e1); 320 } 321 } 322 } 323 } 324 325 /** 326 * Cleanly shut the agent down, releasing acquired resources. 327 */ 328 public void stop() { 329 if (this.shutdown) return; // nothing to do anymore 330 this.shutdown = true; 331 this.selector.wakeup(); // break out of blocking select() within doSelect() of start() 332 } 333 334 /** 335 * Returns a summary string representation of the receiver. 336 */ 337 public String toDebugString() { 338 String s = this.getClass().getName() + ": "; 339 340 Selector sel = this.selector; 341 if (sel != null) { 342 synchronized (outerLock) { // sync up with start() and select() 343 sel.wakeup(); // ensure we can acquire the lock below to safely access selector state 344 synchronized (innerLock) { 345 s += "writeQueues=" + this.writeQueues; 346 s += ", observerStages=" + this.observerStages; 347 s += "\nselector=" + NioUtil.toString(sel); 348 } 349 } 350 } 351 352 s += "\n\nreadBufferPool=" + this.getReadBufferPool().toString(); 353 return s; 354 } 355 356 /** 357 * Handle events in the selector thread (and ONLY in the selector thread) 358 * Otherwise the selector methods may block and deadlock us! Takes user 359 * level events from the given queue (without ever blocking), and processes 360 * them. 361 */ 362 protected void doEvents() { 363 try { 364 Object event; 365 this.pendingEvents.put(this.innerLock); // terminator flag prevents potentially infinite race loops 366 while ((event = this.pendingEvents.poll(0)) != null && event != this.innerLock) { 367 this.doEvent(event); 368 this.nopLoops = 0; 369 } 370 // while ((event = this.pendingEvents.poll(0)) != null) { 371 // this.doEvent(event); 372 // this.nopLoops = 0; 373 // } 374 } catch (InterruptedException e) { 375 log.warn("interrupted", e); 376 } 377 } 378 379 /** 380 * Handle given event in the selector thread (and ONLY in the selector thread) 381 * 382 * @param event 383 * the event to handle. 384 */ 385 protected void doEvent(Object event) { 386 if (log.isTraceEnabled()) log.trace("doEvent=" + event); 387 388 // shut the agent down 389 if (event instanceof AdminRequest.Stop) { 390 this.stop(); 391 } 392 393 // close a channel 394 else if (event instanceof ChannelRequest.Close) { 395 ChannelRequest.Close req = (ChannelRequest.Close) event; 396 //this.observerStages.put(req.getChannel(), req.getSource()); 397 this.onClose(req.getChannel().keyFor(this.selector)); 398 } 399 400 // schedule writing data to a channel 401 else if (event instanceof ChannelRequest.WriteData) { 402 ChannelRequest.WriteData req = (ChannelRequest.WriteData) event; 403 this.onWriteRequest(req.getChannel().keyFor(this.selector), req.getBuffer()); 404 } 405 406 // register event interest in a channel 407 else if (event instanceof ChannelRequest.Register) { 408 ChannelRequest.Register req = (ChannelRequest.Register) event; 409 this.observerStages.put(req.getChannel(), req.getSource()); 410 Object attachment = req.hasAttachment() ? req.getAttachment() : null; 411 this.onRegisterSelectorInterest(req.getChannel(), req.getInterestOps(), attachment, req.hasAttachment()); 412 } 413 414 // OOPSLA 415 else throw new IllegalEventException("Illegal event enqueued", event, null); 416 } 417 418 /** 419 * Wait for I/O signals from OS, then dispatch them via selection key. 420 * Override for custom behaviour. 421 * 422 * @param selector 423 * the IO multiplexer interface to the OS 424 */ 425 protected void doSelect(Selector selector) throws IOException { 426 // block until OS ready event or wakeup() or timeout or thread interruption occurs 427 int numUpdatedReadyKeys = selector.select(this.selectTimeout); 428 429 boolean isFineTraceEnabled = false; 430 if (isFineTraceEnabled) { 431 log.error("num="+numUpdatedReadyKeys + ", readySet.size="+selector.selectedKeys().size() + ", keyset.size="+selector.keys().size()); 432 //log.trace("selector=" + NioUtil.toString(selector)); 433 //log.trace("channels=" + this.getRegisteredChannels()); 434 //log.trace("selKeys="+selector.selectedKeys()); 435 //log.trace("selector="+selector); 436 } 437 438 if (numUpdatedReadyKeys == 0) { // nothing to do 439 this.nopLoops++; 440 if (this.nopLoops > 100) { // FIXME TODO 441 log.fatal("no operation loop detected; unnecessarily eating CPU"); 442 log.error("num="+numUpdatedReadyKeys + ", readySet.size="+selector.selectedKeys().size() + ", keyset.size="+selector.keys().size()); 443 this.nopLoops = 0; 444 } 445 //return; 446 } 447 Iterator iter = selector.selectedKeys().iterator(); 448 while (iter.hasNext()) { 449 SelectionKey key = (SelectionKey) iter.next(); 450 if (key.isValid()) { 451 onKeyReady(key); 452 } 453 iter.remove(); // remove key from readySet !! 454 if (!key.isValid()) { 455 this.onKeyInvalidation(key); 456 } 457 } 458 } 459 460 protected void onKeyInvalidation(SelectionKey key) { 461 if (log.isTraceEnabled()) { 462 log.trace("******** Garbage collecting write buffers for invalid key=" + NioUtil.toString(key)); 463 } 464 this.writeQueues.remove(key); 465 this.observerStages.remove(key.channel()); 466 } 467 468 /** 469 * Handle a selection key that has become ready. 470 * 471 * @param key 472 * the selection key that has become ready. 473 * @return true if the key should be removed from the selector's ready-set, 474 * false otherwise. 475 */ 476 protected void onKeyReady(SelectionKey key) { 477 // potentially handle multiple ready ops together 478 if (log.isTraceEnabled()) log.trace("onKeyReady: key=" + NioUtil.toString(key)); 479 480 if (key.isValid() && key.isWritable()) { 481 this.onWriteReady(key); 482 } 483 if (key.isValid() && key.isReadable()) { 484 this.onReadReady(key); 485 } 486 if (key.isValid() && key.isAcceptable()) { 487 this.onAcceptReady(key); 488 } 489 if (key.isValid() && key.isConnectable()) { 490 this.onConnectReady(key); 491 } 492 } 493 494 /** 495 * Handle an accept-ready signal selected from OS, for examply by accepting 496 * the new connection. 497 * 498 * @param key 499 * the selection key that has become ready. 500 */ 501 protected void onAcceptReady(SelectionKey key) { 502 try { 503 if (log.isDebugEnabled()) 504 log.debug("Accepting a new channel on server channel=" + key.channel()); 505 506 ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); 507 SocketChannel clientChannel = serverChannel.accept(); 508 if (clientChannel == null) { 509 log.error("null channel accepted (should never happen)"); 510 } 511 else { 512 synchronized (this.hostAuthorizer) { 513 if (! this.hostAuthorizer.isAllowed(clientChannel.socket().getInetAddress())) { 514 if (log.isWarnEnabled()) log.warn("Accept authorization denied to " + clientChannel.socket().getInetAddress()); 515 clientChannel.close(); 516 return; 517 } 518 } 519 520 this.socketOptions.copyTo(clientChannel.socket()); 521 clientChannel.configureBlocking(false); 522 SelectionKey newKey = clientChannel.register(this.selector, 0); 523 524 if (this.observerStages.get(serverChannel) != null) { 525 // Initially the client socket inherits its observer stage from the server socket. 526 // It will stay that way until a ChannelRequest.Register 527 // for the client socket is later received. 528 this.observerStages.put(clientChannel, this.observerStages.get(serverChannel)); 529 } 530 newKey.attach(key.attachment()); // initially inherited from server channel 531 532 this.onAcceptDone(newKey, null); 533 } 534 } catch (IOException e) { 535 this.onAcceptDone(key, e); 536 } 537 } 538 539 /** 540 * Called when an accept-ready signal has been successfully processed. 541 * Override this method for custom behaviour (e.g forwarding to another 542 * stage). 543 * 544 * @param key 545 * the selection key that has become ready. 546 */ 547 protected void onAcceptDone(SelectionKey key, IOException exception) { 548 this.onException(exception); 549 if (log.isTraceEnabled()) { 550 if (key == null) 551 log.warn("Oopsla, new null channel accepted!"); 552 else 553 log.trace("Accepted new channel=" + key.channel()); 554 } 555 ChannelResponse.Accepted event = new ChannelResponse.Accepted(this, key, exception); 556 this.notifyObservers(key, event); 557 } 558 559 /** 560 * Handle a connect-ready signal selected from OS, for example by completing 561 * to connect. 562 * 563 * @param key 564 * the selection key that has become ready. 565 */ 566 protected void onConnectReady(SelectionKey key) { 567 if (log.isTraceEnabled()) log.trace("now connecting channel with key=" + NioUtil.toString(key)); 568 SocketChannel channel = (SocketChannel) key.channel(); 569 try { 570 long start = System.currentTimeMillis(); 571 if (channel.finishConnect()) { 572 long end = System.currentTimeMillis(); 573 log.debug("finishConnect took [ms] = " + (end-start)); 574 this.socketOptions.copyTo(channel.socket()); 575 if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_CONNECT); 576 this.onConnectDone(key, null); 577 } 578 else { // connection process is not yet complete 579 ; // wait for completion 580 } 581 } catch (IOException e) { 582 if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_CONNECT); 583 this.onConnectDone(key, e); 584 this.onClose(key, e); 585 } 586 } 587 588 /** 589 * Called when a connect-ready signal has been successfully processed. 590 * Override this method for custom behaviour (e.g forwarding to another 591 * stage). 592 * 593 * @param key 594 * the selection key that has become ready. 595 */ 596 protected void onConnectDone(SelectionKey key, IOException exception) { 597 this.onException(exception); 598 if (exception == null && log.isTraceEnabled()) { 599 log.trace("Connected to channel=" + key.channel()); 600 } 601 ChannelResponse.Connected event = new ChannelResponse.Connected(this, key, exception); 602 this.notifyObservers(key, event); 603 } 604 605 /** 606 * Closes the given channel. 607 * @param channel the channel to close. 608 */ 609 protected void onClose(SelectionKey key) { 610 this.onClose(key, null); 611 } 612 613 protected void onClose(SelectionKey key, IOException reason) { 614 if (log.isTraceEnabled()) 615 log.trace("onClose with key=" + (key==null ? "null" : NioUtil.toString(key)) + ", reason=" + reason); 616 617 if (key == null) return; // ignore 618 try { 619 if (key.isValid()) key.interestOps(0); 620 key.cancel(); 621 Channel channel = key.channel(); 622 if (channel.isOpen()) { 623 if (log.isTraceEnabled()) log.trace("Closing channel=" + channel); 624 channel.close(); 625 if (channel instanceof SocketChannel) { 626 ((SocketChannel) channel).socket().close(); 627 //((SocketChannel) channel).socket().shutdownOutput(); 628 //((SocketChannel) channel).socket().shutdownInput(); 629 } 630 if (channel instanceof ServerSocketChannel) { 631 // Even with this conservative code, on MacOSX, the server socket is 632 // sometimes not closed properly (probably yet another vm bug). 633 // On Linux it seems to work fine. 634 ((ServerSocketChannel) channel).socket().close(); 635 } 636 if (channel instanceof DatagramChannel) { 637 ((DatagramChannel) channel).socket().close(); 638 ((DatagramChannel) channel).socket().disconnect(); 639 } 640 } 641 this.onCloseDone(key, reason); 642 } catch (IOException e) { 643 if (log.isErrorEnabled()) log.error("closing itself threw exception", e); 644 this.onCloseDone(key, e); 645 } 646 } 647 648 /** 649 * Called when a close signal has been successfully processed. 650 * Override this method for custom behaviour (e.g forwarding to another stage). 651 * 652 * @param key the selection key that has become ready. 653 */ 654 protected void onCloseDone(SelectionKey key, IOException exception) { 655 this.onException(exception); 656 if (log.isTraceEnabled()) log.trace("Closed channel=" + key.channel()); 657 ChannelResponse.Closed event = new ChannelResponse.Closed(this, key, exception); 658 this.notifyObservers(key, event); 659 } 660 661 /** 662 * Handle a read-ready signal selected from OS, for example by reading bytes 663 * from the key's channel. 664 * 665 * @param key 666 * the selection key that has become ready. 667 */ 668 protected void onReadReady(SelectionKey key) { 669 ReadableByteChannel channel = (ReadableByteChannel) key.channel(); 670 //ByteBuffer readBuffer = ByteBuffer.allocate(this.readBufferPool.minBufferCapacity); 671 ByteBuffer readBuffer = this.readBufferPool.take(); 672 673 int n; 674 try { 675 n = NioUtil.readMany(channel, readBuffer); 676 } catch (IOException e) { 677 if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_READ); 678 readBuffer.flip(); // prepare for user reads 679 this.onReadDone(key, readBuffer, e); 680 this.onClose(key, e); 681 return; 682 } 683 boolean eos = false; 684 if (n < 0) { 685 eos = true; 686 n = -(n + 1); 687 } 688 689 if (n > 0) { 690 readBuffer.flip(); // prepare for user reads 691 this.onReadDone(key, readBuffer, null); 692 } 693 else { // assert: n == 0, buffer does not get handed to user, so reuse immediately 694 this.readBufferPool.put(readBuffer); 695 } 696 697 if (eos) { 698 log.debug("Reached end-of-stream; remote host seems to have closed or lost connection"); 699 if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_READ); 700 IOException e = new IOException("Agent reached EOS"); 701 this.onReadDone(key, ByteBuffer.allocate(0), e); 702 this.onClose(key, e); 703 } 704 } 705 706 /** 707 * Called when a read-ready signal has been successfully processed. 708 * 709 * @param key the selection key that has become ready. 710 */ 711 protected void onReadDone(SelectionKey key, ByteBuffer buffer, IOException exception) { 712 this.onException(exception); 713 if (exception == null && log.isTraceEnabled()) { 714 log.trace("Read " + buffer.position() + " bytes from channel=" + key.channel() + ", into buffer=" + buffer); 715 } 716 ChannelResponse.Read event = new ChannelResponse.Read(this, key, exception, buffer); 717 this.notifyObservers(key, event); 718 } 719 720 /** 721 * Schedules the given buffer to be written to the key's channel once it 722 * becomes write-ready. Call this method repeatadly to schedule multiple 723 * buffers for later writing. As usual with NIO buffers, the buffer contents 724 * between index 0 and buffer.limit() are written in relative mode. After 725 * invocation of this method, you MUST NOT modify the buffer in user space 726 * until onWriteDone with buffer.hasRemaining()==false is called back. Once 727 * that is called back you MAY again modify and/or reuse the buffer (e.g. 728 * with a buffer pool). 729 * 730 * @param key 731 * the selection key to write to. 732 * @param buffer 733 * the buffer to read from. 734 */ 735 protected void onWriteRequest(SelectionKey key, ByteBuffer buffer) { 736 if (key == null || !key.isValid()) return; // ignore 737 if (log.isTraceEnabled()) { 738 log.trace("adding write request to key" + NioUtil.toString(key)); 739 } 740 List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer> 741 if (buffersToWrite == null) { 742 buffersToWrite = new LinkedList(); // linked list for efficiency 743 this.writeQueues.put(key, buffersToWrite); 744 } 745 buffersToWrite.add(buffer); 746 747 if (key.isValid()) NioUtil.addInterestBits(key, SelectionKey.OP_WRITE); 748 //if (buffersToWrite.size() == 1) { 749 this.onWriteReady(key); // optimization: try to write immediately without waiting for write ready to bubble up from selector 750 //} 751 } 752 753 /** 754 * Handle a write-ready signal selected from OS, for examply by writing 755 * bytes to the key's channel. 756 * 757 * @param key 758 * the selection key that has become ready. 759 */ 760 protected void onWriteReady(SelectionKey key) { 761 List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer> 762 if (buffersToWrite == null) { 763 log.warn("Nothing to write - really should not happen"); 764 return; 765 } 766 WritableByteChannel channel = (WritableByteChannel) key.channel(); 767 // try to write as many buffers as possible 768 while (buffersToWrite.size() > 0) { 769 ByteBuffer buffer = (ByteBuffer) buffersToWrite.get(0); 770 if (! buffer.hasRemaining()) { // notify empty buffer write (correctly handle pathological case)! 771 buffersToWrite.remove(0); 772 this.onWriteDone(key, buffer, 0, null); 773 continue; 774 } 775 int n; 776 try { 777 n = NioUtil.writeMany(channel, buffer); 778 } catch (IOException e) { 779 if (key.isValid()) NioUtil.removeInterestBits(key, SelectionKey.OP_WRITE); 780 this.onWriteDone(key, buffer, 0, e); 781 this.onClose(key, e); 782 return; 783 } 784 if (n == 0) { 785 // apparently not much can be written right now. 786 // wait for next write-ready signal and then resume writing 787 break; 788 } 789 else { 790 if (buffer.hasRemaining()) { // just a little could be written 791 break; 792 } 793 else { 794 buffersToWrite.remove(0); // remove fully written buffer 795 this.onWriteDone(key, buffer, n, null); 796 } 797 } 798 } 799 800 // nothing more to write? if so deregister WRITE interest 801 if (buffersToWrite.size() == 0 && key.isValid()) { 802 NioUtil.removeInterestBits(key, SelectionKey.OP_WRITE); 803 } 804 } 805 806 /** 807 * Called when N bytes have been successfully written from the given buffer 808 * to the key's channel. There MAY still be remaining bytes in the buffer 809 * waiting to be written in the future. In such a "partial write" case 810 * <code>buffer.hasRemaining()</code> will return true. 811 * 812 * @param key 813 * the selection key that has become ready. 814 * @param buffer 815 * the buffer read from 816 * @param n 817 * the number of bytes written 818 */ 819 protected void onWriteDone(SelectionKey key, ByteBuffer buffer, int n, IOException exception) { 820 this.onException(exception); 821 if (exception == null && log.isTraceEnabled()) { 822 log.trace("Fully wrote " + n + " bytes to channel=" + key.channel() + ", from buffer=" + buffer); 823 } 824 ChannelResponse.Write event = new ChannelResponse.Write(this, key, exception, buffer); 825 this.notifyObservers(key, event); 826 } 827 828 /** 829 * Handle channel interest registration request. 830 */ 831 protected void onRegisterSelectorInterest(SelectableChannel channel, int ops, Object attachment, boolean hasAttachment) { 832 ops = ops & ~SelectionKey.OP_WRITE; // enqueuing a write toggles that automatically 833 SelectionKey key = channel.keyFor(this.selector); 834 try { 835 if (key != null) { 836 if (hasAttachment) key.attach(attachment); 837 if (!key.isValid()) throw new ClosedChannelException(); 838 List buffersToWrite = (List) this.writeQueues.get(key); // List<ByteBuffer> 839 if (buffersToWrite != null && buffersToWrite.size() > 0) { 840 ops = ops | SelectionKey.OP_WRITE; // we still need to write stuff to the network 841 } 842 key.interestOps(ops); 843 } 844 else { 845 key = channel.register(this.selector, ops); 846 if (hasAttachment) key.attach(attachment); 847 } 848 849 this.onRegisterSelectorInterestDone(key, ops, null); 850 } catch (ClosedChannelException e) { 851 this.onRegisterSelectorInterestDone(key, ops, e); 852 if (key != null) { 853 this.onClose(key, e); 854 } 855 } 856 } 857 858 859 /** 860 * Done registering interest 861 */ 862 protected void onRegisterSelectorInterestDone(SelectionKey key, int ops, ClosedChannelException exception) { 863 this.onException(exception); 864 if (exception == null && log.isTraceEnabled()) { 865 log.trace("Registered interest = " + NioUtil.toString(ops) + ", key=" + (key == null ? "null" : NioUtil.toString(key))); 866 } 867 ChannelResponse.Registered event = new ChannelResponse.Registered(this, key, exception, ops); 868 this.notifyObservers(key, event); 869 } 870 871 /** 872 * Take care of exception 873 * @param exception 874 */ 875 protected void onException(IOException exception) { 876 if (exception != null && log.isTraceEnabled()) 877 log.trace("Gracefully forwarding exception = ", exception); 878 } 879 880 /** 881 * Cleanly shut the agent down, releasing acquired resources. 882 * @throws IOException 883 */ 884 protected void doCloseAll() throws IOException { 885 if (log.isTraceEnabled()) log.trace("doCloseAll"); 886 List channels = this.getRegisteredChannels(); 887 for (int i = 0; i < channels.size(); i++) { 888 this.onClose(((SelectableChannel) channels.get(i)).keyFor(this.selector)); 889 } 890 891 if (log.isTraceEnabled()) log.trace("selector before selector.close()=" + NioUtil.toString(this.selector)); 892 try { 893 this.selector.close(); 894 } catch (IOException e) { 895 // vm bug on MacOSX & FreeBSD produces BadFileDescriptor exception, see 896 // http://freepastry.rice.edu/FreePastry/README-1.3.2.html 897 // http://list.droso.net/15/15756 898 if (System.getProperty("os.name").startsWith("Mac") || 899 System.getProperty("os.name").startsWith("Free") 900 && "Bad file descriptor".equals(e.getMessage())) { 901 ; // ignore 902 } 903 else throw e; // rethrow 904 } 905 if (log.isTraceEnabled()) log.trace("selector after selector.close()=" + NioUtil.toString(this.selector)); 906 907 this.selector = null; // help garbage collector 908 this.pendingEvents = new LinkedQueue(); 909 this.readBufferPool.clear(); 910 this.writeQueues = null; 911 this.observerStages = null; 912 } 913 914 /** 915 * Enqueues the given event onto the observer stage associated with the given channel. 916 * 917 * @param key 918 * @param event 919 */ 920 protected void notifyObservers(SelectionKey key, Object event) { 921 Channel channel = key.channel(); 922 Object observer = this.observerStages.get(channel); 923 if (observer instanceof Stage) { 924 if (log.isTraceEnabled()) log.trace("Agent enqueuing to observer: event="+event+", observerStage=" + observer); 925 ((Stage) observer).enqueue(event); 926 } 927 //else { 928 // if (log.isTraceEnabled()) log.trace("No observer defined for channel="+channel); 929 //} 930 } 931 932 /** 933 * Returns all selectable channels registered with this agent, 934 * excluding channels with invalid keys. 935 * 936 * @return the channels 937 */ 938 protected List getRegisteredChannels() { 939 Selector sel = this.selector; 940 if (sel == null) 941 return new ArrayList(0); 942 else 943 return NioUtil.getRegisteredChannels(sel); 944 } 945 946 /** 947 * Checks if the agent is running in the selector loop, and throws an 948 * exception if it is runnning. 949 */ 950 protected void checkIsShutdown() { 951 if (this.shutdown == false) throw new IllegalStateException("must not be invoked on running agent."); 952 } 953 954 }