001 /* 002 * Copyright (c) 2003, The Regents of the University of California, through 003 * Lawrence Berkeley National Laboratory (subject to receipt of any required 004 * approvals from the U.S. Dept. of Energy). All rights reserved. 005 */ 006 package gov.lbl.dsd.sea.nio.util; 007 008 import java.io.IOException; 009 import java.net.InetSocketAddress; 010 import java.net.SocketAddress; 011 import java.nio.Buffer; 012 import java.nio.ByteBuffer; 013 import java.nio.InvalidMarkException; 014 import java.nio.channels.AsynchronousCloseException; 015 import java.nio.channels.ClosedByInterruptException; 016 import java.nio.channels.ClosedChannelException; 017 import java.nio.channels.DatagramChannel; 018 import java.nio.channels.NonReadableChannelException; 019 import java.nio.channels.NonWritableChannelException; 020 import java.nio.channels.ReadableByteChannel; 021 import java.nio.channels.SelectionKey; 022 import java.nio.channels.Selector; 023 import java.nio.channels.ServerSocketChannel; 024 import java.nio.channels.SocketChannel; 025 import java.nio.channels.WritableByteChannel; 026 import java.util.ArrayList; 027 import java.util.Iterator; 028 import java.util.List; 029 import java.util.Set; 030 031 /** 032 * Various utilities related to the {@link java.nio} package. 033 * 034 * @author whoschek@lbl.gov 035 * @author $Author: hoschek3 $ 036 * @version $Revision: 1.8 $, $Date: 2004/08/17 20:17:22 $ 037 */ 038 public class NioUtil { 039 040 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioUtil.class); 041 042 private NioUtil() {} // not instantiable 043 044 /** 045 * Creates and returns a non-blocking TCP server channel bound to the given 046 * network port. 047 * 048 * @param port 049 * the port to bind to. 050 * @return the channel 051 * @throws IOException if the bind operation fails, or if the socket 052 * is already bound or if some other I/O error occurs 053 */ 054 public static ServerSocketChannel createServerChannel(int port) throws IOException { 055 return createServerChannel(new InetSocketAddress(port), 0); 056 } 057 058 /** 059 * Creates and returns a non-blocking TCP server channel bound to the given 060 * network address. 061 * <p> 062 * If a connection indication arrives when the backlog queue is full, the 063 * connection is refused. The <code>backlog</code> argument must be a 064 * positive value greater than 0. If the value passed if equal or less than 065 * 0, then the default value will be assumed. 066 * 067 * @param address 068 * the address to bind to. 069 * @param backlog 070 * The listen backlog length. 071 * @return the channel 072 * @throws IOException if the bind operation fails, or if the socket 073 * is already bound or if some other I/O error occurs 074 */ 075 public static ServerSocketChannel createServerChannel(SocketAddress address, int backlog) throws IOException { 076 ServerSocketChannel channel = ServerSocketChannel.open(); 077 channel.configureBlocking(false); 078 channel.socket().bind(address, backlog); 079 return channel; 080 } 081 082 /** 083 * Creates and returns a non-blocking TCP server channel bound to the given 084 * network host and port. 085 * 086 * @param hostName 087 * the host to bind to. 088 * @param port 089 * the port to bind to. 090 * @return the channel 091 * @throws IOException If an I/O error occurs 092 */ 093 public static SocketChannel createClientChannel(String hostName, int port) throws IOException { 094 return createClientChannel(new InetSocketAddress(hostName, port)); 095 } 096 097 /** 098 * Creates and returns a non-blocking TCP client channel bound to the given 099 * network address. 100 * 101 * @param address 102 * the address to bind to. 103 * @return the channel 104 * @throws IOException If an I/O error occurs 105 */ 106 public static SocketChannel createClientChannel(SocketAddress address) throws IOException { 107 SocketChannel channel = SocketChannel.open(); 108 channel.configureBlocking(false); 109 long start = System.currentTimeMillis(); 110 channel.connect(address); 111 long end = System.currentTimeMillis(); 112 log.debug("connect took [ms]=" + (end-start)); 113 return channel; 114 } 115 116 /** 117 * Creates and returns a non-blocking UDP datagram channel bound to the given network 118 * port. 119 * 120 * @param port 121 * the port to bind to. 122 * @return the channel 123 * @throws IOException If an I/O error occurs 124 */ 125 public static DatagramChannel createDatagramChannel(int port) throws IOException { 126 return createDatagramChannel(new InetSocketAddress(port)); 127 } 128 129 /** 130 * Creates and returns a non-blocking UDP datagram channel bound to the given network 131 * address. 132 * 133 * @param address 134 * the address to bind to. 135 * @return the channel 136 * @throws IOException If an I/O error occurs 137 */ 138 public static DatagramChannel createDatagramChannel(SocketAddress address) throws IOException { 139 DatagramChannel channel = DatagramChannel.open(); 140 channel.configureBlocking(false); 141 channel.socket().bind(address); 142 return channel; 143 } 144 145 /** 146 * Returns whether or not a given buffer has the mark defined. 147 * @param buffer the buffer to check 148 * @return true if it has a mark 149 */ 150 public static boolean hasMark(Buffer buffer) { 151 // unfortunately this seems to be the only way to figure it out :-( 152 boolean hasMark = true; 153 int pos = buffer.position(); 154 try { 155 buffer.reset(); 156 buffer.position(pos); 157 } 158 catch (InvalidMarkException e) { 159 hasMark = false; 160 } 161 return hasMark; 162 } 163 164 /** 165 * Efficiently reads (without ever blocking) as many bytes as possible from 166 * the given non-blocking channel into the given buffer. Returns the total 167 * number of bytes that have been read. The algorithm strongly improves 168 * throughput at the expense of somewhat increased latency. This method 169 * simply propagates exceptions thrown in the underlying 170 * {@link ReadableByteChannel#read(ByteBuffer)}NIO code - it never causes 171 * an exception itself. 172 * 173 * @param channel - 174 * the channel to read from 175 * @param buffer - 176 * the buffer to read into 177 * @return the total number of bytes read, possibly zero. If end-of-stream 178 * (EOS) is encountered then the number returned is precisely 179 * <code>-(total+1)</code>. Hence, if EOS is encountered the 180 * returned number is always < 0, and else >= 0. In the EOS 181 * case, the caller can determine the total number of read bytes 182 * with <code>-(readMany(...)+1)</code>. (The same trick is used 183 * in {@link java.util.Arrays#binarySearch(long[], long)}to 184 * effectively return two return parameters within one parameter). 185 * 186 * <p> 187 * Example usage: 188 * 189 * <pre> 190 * 191 * 192 * int total = readMany(channel, buffer); 193 * boolean eos = false; 194 * if (total < 0) { 195 * eos = true; 196 * total = -(total + 1); 197 * } 198 * 199 * System.out.println(total + "bytes read"); 200 * 201 * // some meaningful processing of the read bytes goes here 202 * 203 * if (eos) { 204 * System.out.println(total + "bytes read, EOS reached, now closing channel"); 205 * channel.close(); 206 * System.exit(0); 207 * } 208 * 209 * 210 * </pre> 211 * 212 * @throws NonReadableChannelException 213 * If this channel was not opened for reading 214 * 215 * @throws ClosedChannelException 216 * If this channel is closed 217 * 218 * @throws AsynchronousCloseException 219 * If another thread closes this channel while the read 220 * operation is in progress 221 * 222 * @throws ClosedByInterruptException 223 * If another thread interrupts the current thread while the 224 * read operation is in progress, thereby closing the channel 225 * and setting the current thread's interrupt status 226 * 227 * @throws IOException 228 * If some other I/O error occurs 229 */ 230 public static int readMany(ReadableByteChannel channel, ByteBuffer buffer) 231 throws IOException { 232 // total --> -(total+1) --> -(newtotal+1) 233 // 234 // 0 --> -1 --> 0 235 // 1 --> -2 --> 1 236 // 2 --> -3 --> 2 237 238 /* 239 * For efficiency try to iterate more than once even when zero or only 240 * very few bytes are currently read-ready. The way java.nio is 241 * implemented, there is a high chance that almost immediately after the 242 * first few bytes become ready, a much larger block becomes ready. We 243 * try to wait for larger block in a short-lived, tight spin loop, 244 * avoiding thread context switches and selector registration business 245 * as much as possible!!! This approach strongly improves throughput at 246 * the expense of somewhat increased latency. 247 */ 248 int minIters = 2; // for efficiency iterate more than once 249 250 int startPos = buffer.position(); 251 int nr = 0; 252 253 while ((minIters > 0 || nr > 0) && buffer.hasRemaining() && (nr = channel.read(buffer)) >= 0) { 254 if (minIters > 0) minIters--; // safely avoid underflows 255 } 256 257 int total = buffer.position() - startPos; 258 259 if (nr < 0) { 260 // Reached end-of-stream, e.g. because remote host closed or lost connection 261 total = -(total + 1); 262 } 263 return total; 264 } 265 266 /** 267 * Efficiently writes (without ever blocking) as many bytes as possible from 268 * the given buffer to the given non-blocking channel. Returns the total 269 * number of bytes that have been written. The algorithm strongly improves 270 * throughput at the expense of somewhat increased latency. This method 271 * simply propagates exceptions thrown in the underlying 272 * {@link WritableByteChannel#write(ByteBuffer)}NIO code - it never causes 273 * an exception itself. 274 * 275 * @param channel - 276 * the channel to write to 277 * @param buffer - 278 * the buffer to read from 279 * @return the total number of bytes written, possibly zero. 280 * 281 * @throws NonWritableChannelException 282 * If this channel was not opened for writing 283 * 284 * @throws ClosedChannelException 285 * If this channel is closed 286 * 287 * @throws AsynchronousCloseException 288 * If another thread closes this channel while the write 289 * operation is in progress 290 * 291 * @throws ClosedByInterruptException 292 * If another thread interrupts the current thread while the 293 * write operation is in progress, thereby closing the channel 294 * and setting the current thread's interrupt status 295 * 296 * @throws IOException 297 * If some other I/O error occurs 298 */ 299 public static int writeMany(WritableByteChannel channel, ByteBuffer buffer) 300 throws IOException { 301 int minIters = 2; // for efficiency iterate more than once 302 303 int startPos = buffer.position(); 304 int nr = 0; 305 306 while ((minIters > 0 || nr > 0) && buffer.hasRemaining()) { 307 nr = channel.write(buffer); 308 if (minIters > 0) minIters--; // safely avoid underflows 309 } 310 311 int total = buffer.position() - startPos; 312 return total; 313 } 314 315 /** 316 * Returns all selectable channels registered with the given selector, 317 * excluding channels of invalid keys. 318 * 319 * @param selector 320 * @return the channels 321 */ 322 public static List getRegisteredChannels(Selector selector) { 323 // TODO: return as array, set, hashset? 324 List channels = new ArrayList(); 325 Iterator iter = selector.keys().iterator(); 326 while (iter.hasNext()) { 327 SelectionKey key = (SelectionKey) iter.next(); 328 if (key.isValid()) { 329 channels.add(key.channel()); 330 } 331 } 332 return channels; 333 } 334 335 /** 336 * Returns the number of ready operations of the given selection key. 337 * 338 * @param key 339 * @return the number 340 */ 341 public static int getNumberOfReadyOps(SelectionKey key) { 342 return bitCount(key.readyOps()); 343 /* 344 int n = 0; 345 if (key.isAcceptable()) n++; 346 if (key.isReadable()) n++; 347 if (key.isWritable()) n++; 348 if (key.isConnectable()) n++; 349 return n; 350 */ 351 } 352 353 /** 354 * Returns the number of one-bits in the two's complement binary 355 * representation of the specified <tt>int</tt> value. This function is 356 * sometimes referred to as the <i>population count </i>. 357 * 358 * @param i the value to count on. 359 * @return the number of one-bits in the two's complement binary 360 * representation of the specified <tt>int</tt> value. 361 */ 362 private static int bitCount(int i) { 363 // very efficient magic 364 // HD, Figure 5-2 365 i = i - ((i >>> 1) & 0x55555555); 366 i = (i & 0x33333333) + ((i >>> 2) & 0x33333333); 367 i = (i + (i >>> 4)) & 0x0f0f0f0f; 368 i = i + (i >>> 8); 369 i = i + (i >>> 16); 370 return i & 0x3f; 371 } 372 373 /** 374 * Adds the given operation bits to the interest set of the given key. 375 * 376 * @param key 377 * the selection key to work on. 378 * @param opBits 379 * the bits to add. 380 */ 381 public static void addInterestBits(SelectionKey key, int opBits) { 382 int ops = key.interestOps(); 383 if ((ops | opBits) != ops) 384 key.interestOps(ops | opBits); 385 } 386 387 /** 388 * Removes the given operation bits from the interest set of the given key. 389 * 390 * @param key 391 * the selection key to work on. 392 * @param opBits 393 * the bits to remove. 394 */ 395 public static void removeInterestBits(SelectionKey key, int opBits) { 396 int ops = key.interestOps(); 397 if ((ops & ~opBits) != ops) 398 key.interestOps(ops & ~opBits); 399 } 400 401 /** 402 * Returns a detailed string representation of the given selection key for 403 * debugging purposes. 404 * 405 * @param key 406 * the object to represent 407 * @return a string representation 408 */ 409 public static String toString(SelectionKey key) { 410 String str = key.isValid() ? "Valid key [" : "INVALID key ["; 411 if (key.isValid()) { // check avoids exceptions! 412 str = str + "readyOps=" + toString(key.readyOps()); 413 str = str + ", interestedOps=" + toString(key.interestOps()); 414 str = str + ", "; 415 } 416 if (!(key.attachment() instanceof ArrayByteList)) { 417 str = str + "attachment=" + key.attachment(); 418 } 419 str = str + ", channel=" + key.channel(); 420 return str + "]"; 421 } 422 423 /** 424 * Returns a detailed string representation of the given selection key 425 * readyOps or interestOps for debugging purposes. 426 * 427 * @param selectionKeyOps 428 * the object to represent 429 * @return a string representation 430 */ 431 public static String toString(int selectionKeyOps) { 432 String str = "{"; 433 final String[] names = { "OP_READ", "OP_WRITE", "OP_CONNECT", "OP_ACCEPT"}; 434 final int[] values = { SelectionKey.OP_READ, SelectionKey.OP_WRITE, SelectionKey.OP_CONNECT, SelectionKey.OP_ACCEPT}; 435 boolean first = true; 436 for (int i=0; i < values.length; i++) { 437 if ((selectionKeyOps & values[i]) != 0) { 438 if (! first) str = str + "|"; 439 str = str + names[i]; 440 first = false; 441 } 442 } 443 return str + "}"; 444 } 445 446 /** 447 * Returns a detailed string representation for debugging purposes. 448 * 449 * @param keySet 450 * a selector key set (e.g. selector.keys() or 451 * selector.selectedKeys()). 452 * @return a string representation 453 */ 454 public static String toString(Set keySet) { 455 if (keySet == null) return "[]"; 456 List list = new ArrayList(keySet.size()); 457 Iterator iter = keySet.iterator(); 458 while (iter.hasNext()) { 459 list.add(toString((SelectionKey) iter.next())); 460 } 461 return list.toString(); 462 } 463 464 /** 465 * Returns a detailed string representation of the given selector for 466 * debugging purposes. 467 * 468 * @param selector 469 * the object to represent 470 * @return a string representation 471 */ 472 public static String toString(Selector selector) { 473 StringBuffer str = new StringBuffer(); 474 str.append(selector.getClass().getName()).append(" {"); 475 str.append("isOpen=").append(selector.isOpen()).append(","); 476 if (selector.isOpen()) { 477 str.append("\nValid registered channels=").append(getRegisteredChannels(selector)); 478 str.append("\nInterestSet=").append(toString(selector.keys())); 479 str.append("\nReadySet=").append(toString(selector.selectedKeys())); 480 } 481 str.append("}"); 482 return str.toString(); 483 } 484 485 }