001    /*
002     * Copyright (c) 2003, The Regents of the University of California, through
003     * Lawrence Berkeley National Laboratory (subject to receipt of any required
004     * approvals from the U.S. Dept. of Energy). All rights reserved.
005     */
006    package gov.lbl.dsd.sea.nio.util;
007    
008    import java.io.IOException;
009    import java.net.InetSocketAddress;
010    import java.net.SocketAddress;
011    import java.nio.Buffer;
012    import java.nio.ByteBuffer;
013    import java.nio.InvalidMarkException;
014    import java.nio.channels.AsynchronousCloseException;
015    import java.nio.channels.ClosedByInterruptException;
016    import java.nio.channels.ClosedChannelException;
017    import java.nio.channels.DatagramChannel;
018    import java.nio.channels.NonReadableChannelException;
019    import java.nio.channels.NonWritableChannelException;
020    import java.nio.channels.ReadableByteChannel;
021    import java.nio.channels.SelectionKey;
022    import java.nio.channels.Selector;
023    import java.nio.channels.ServerSocketChannel;
024    import java.nio.channels.SocketChannel;
025    import java.nio.channels.WritableByteChannel;
026    import java.util.ArrayList;
027    import java.util.Iterator;
028    import java.util.List;
029    import java.util.Set;
030    
031    /**
032     * Various utilities related to the {@link java.nio} package.
033     * 
034     * @author whoschek@lbl.gov
035     * @author $Author: hoschek3 $
036     * @version $Revision: 1.8 $, $Date: 2004/08/17 20:17:22 $
037     */
038    public class NioUtil {
039    
040            private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(NioUtil.class);
041    
042            private NioUtil() {} // not instantiable
043            
044            /**
045             * Creates and returns a non-blocking TCP server channel bound to the given
046             * network port.
047             * 
048             * @param port
049             *            the port to bind to.
050             * @return the channel
051             * @throws IOException if the bind operation fails, or if the socket
052             *                         is already bound or if some other I/O error occurs
053             */
054            public static ServerSocketChannel createServerChannel(int port) throws IOException {
055                    return createServerChannel(new InetSocketAddress(port), 0);
056            }
057            
058            /**
059             * Creates and returns a non-blocking TCP server channel bound to the given
060             * network address.
061             * <p>
062             * If a connection indication arrives when the backlog queue is full, the
063             * connection is refused. The <code>backlog</code> argument must be a
064             * positive value greater than 0. If the value passed if equal or less than
065             * 0, then the default value will be assumed.
066             * 
067             * @param address
068             *            the address to bind to.
069             * @param backlog
070             *            The listen backlog length.
071             * @return the channel
072             * @throws IOException if the bind operation fails, or if the socket
073             *                         is already bound or if some other I/O error occurs
074             */ 
075            public static ServerSocketChannel createServerChannel(SocketAddress address, int backlog) throws IOException {
076                    ServerSocketChannel channel = ServerSocketChannel.open();
077                    channel.configureBlocking(false);
078                    channel.socket().bind(address, backlog);
079                    return channel;
080            }
081            
082            /**
083             * Creates and returns a non-blocking TCP server channel bound to the given
084             * network host and port.
085             * 
086             * @param hostName
087             *            the host to bind to.
088             * @param port
089             *            the port to bind to.
090             * @return the channel
091             * @throws  IOException If an I/O error occurs
092             */
093            public static SocketChannel createClientChannel(String hostName, int port) throws IOException {
094                    return createClientChannel(new InetSocketAddress(hostName, port));
095            }
096            
097            /**
098             * Creates and returns a non-blocking TCP client channel bound to the given
099             * network address.
100             * 
101             * @param address
102             *            the address to bind to.
103             * @return the channel
104             * @throws  IOException If an I/O error occurs
105             */ 
106            public static SocketChannel createClientChannel(SocketAddress address) throws IOException {
107                    SocketChannel channel = SocketChannel.open();
108                    channel.configureBlocking(false);
109                    long start = System.currentTimeMillis();
110                    channel.connect(address); 
111                    long end = System.currentTimeMillis();
112                    log.debug("connect took [ms]=" + (end-start));
113                    return channel;
114            }
115            
116            /**
117             * Creates and returns a non-blocking UDP datagram channel bound to the given network
118             * port.
119             * 
120             * @param port
121             *            the port to bind to.
122             * @return the channel
123             * @throws  IOException If an I/O error occurs
124             */ 
125            public static DatagramChannel createDatagramChannel(int port) throws IOException {
126                    return createDatagramChannel(new InetSocketAddress(port));
127            }
128            
129            /**
130             * Creates and returns a non-blocking UDP datagram channel bound to the given network
131             * address.
132             * 
133             * @param address
134             *            the address to bind to.
135             * @return the channel
136             * @throws  IOException If an I/O error occurs
137             */ 
138            public static DatagramChannel createDatagramChannel(SocketAddress address) throws IOException {
139                    DatagramChannel channel = DatagramChannel.open();
140                    channel.configureBlocking(false);
141                    channel.socket().bind(address);
142                    return channel;
143            }
144    
145            /**
146             * Returns whether or not a given buffer has the mark defined.
147             * @param buffer the buffer to check
148             * @return true if it has a mark
149             */
150            public static boolean hasMark(Buffer buffer) {
151                    // unfortunately this seems to be the only way to figure it out :-(
152                    boolean hasMark = true;
153                    int pos = buffer.position();
154                    try {
155                            buffer.reset();
156                            buffer.position(pos);
157                    }
158                    catch (InvalidMarkException e) {
159                            hasMark = false;
160                    }
161                    return hasMark;
162            }
163    
164            /**
165             * Efficiently reads (without ever blocking) as many bytes as possible from
166             * the given non-blocking channel into the given buffer. Returns the total
167             * number of bytes that have been read. The algorithm strongly improves
168             * throughput at the expense of somewhat increased latency. This method
169             * simply propagates exceptions thrown in the underlying
170             * {@link ReadableByteChannel#read(ByteBuffer)}NIO code - it never causes
171             * an exception itself.
172             * 
173             * @param channel -
174             *            the channel to read from
175             * @param buffer -
176             *            the buffer to read into
177             * @return the total number of bytes read, possibly zero. If end-of-stream
178             *         (EOS) is encountered then the number returned is precisely
179             *         <code>-(total+1)</code>. Hence, if EOS is encountered the
180             *         returned number is always < 0, and else >= 0. In the EOS
181             *         case, the caller can determine the total number of read bytes
182             *         with <code>-(readMany(...)+1)</code>. (The same trick is used
183             *         in {@link java.util.Arrays#binarySearch(long[], long)}to
184             *         effectively return two return parameters within one parameter).
185             * 
186             * <p>
187             * Example usage:
188             * 
189             * <pre>
190             * 
191             * 
192             * int total = readMany(channel, buffer);
193             * boolean eos = false;
194             * if (total < 0) {
195             *      eos = true;
196             *      total = -(total + 1);
197             * }
198             * 
199             * System.out.println(total + "bytes read");
200             * 
201             * // some meaningful processing of the read bytes goes here
202             * 
203             * if (eos) {
204             *      System.out.println(total + "bytes read, EOS reached, now closing channel");
205             *      channel.close();
206             *      System.exit(0);
207             * }
208             * 
209             * 
210             * </pre>
211             * 
212             * @throws NonReadableChannelException
213             *             If this channel was not opened for reading
214             * 
215             * @throws ClosedChannelException
216             *             If this channel is closed
217             * 
218             * @throws AsynchronousCloseException
219             *             If another thread closes this channel while the read
220             *             operation is in progress
221             * 
222             * @throws ClosedByInterruptException
223             *             If another thread interrupts the current thread while the
224             *             read operation is in progress, thereby closing the channel
225             *             and setting the current thread's interrupt status
226             * 
227             * @throws IOException
228             *             If some other I/O error occurs
229             */
230            public static int readMany(ReadableByteChannel channel, ByteBuffer buffer)
231                            throws IOException {
232                    // total  -->   -(total+1)   -->   -(newtotal+1) 
233                    //
234                    //   0    -->      -1        -->        0
235                    //   1    -->      -2        -->        1
236                    //   2    -->      -3        -->        2
237                    
238                    /*
239                     * For efficiency try to iterate more than once even when zero or only
240                     * very few bytes are currently read-ready. The way java.nio is
241                     * implemented, there is a high chance that almost immediately after the
242                     * first few bytes become ready, a much larger block becomes ready. We
243                     * try to wait for larger block in a short-lived, tight spin loop,
244                     * avoiding thread context switches and selector registration business
245                     * as much as possible!!! This approach strongly improves throughput at
246                     * the expense of somewhat increased latency.
247                     */
248                    int minIters = 2; // for efficiency iterate more than once
249    
250                    int startPos = buffer.position();
251                    int nr = 0;
252    
253                    while ((minIters > 0 || nr > 0) && buffer.hasRemaining() && (nr = channel.read(buffer)) >= 0) {
254                            if (minIters > 0) minIters--; // safely avoid underflows
255                    }
256    
257                    int total = buffer.position() - startPos;
258                    
259                    if (nr < 0) { 
260                            // Reached end-of-stream, e.g. because remote host closed or lost connection
261                            total = -(total + 1);
262                    }
263                    return total;
264            }
265    
266            /**
267             * Efficiently writes (without ever blocking) as many bytes as possible from
268             * the given buffer to the given non-blocking channel. Returns the total
269             * number of bytes that have been written. The algorithm strongly improves
270             * throughput at the expense of somewhat increased latency. This method
271             * simply propagates exceptions thrown in the underlying
272             * {@link WritableByteChannel#write(ByteBuffer)}NIO code - it never causes
273             * an exception itself.
274             * 
275             * @param channel -
276             *            the channel to write to
277             * @param buffer -
278             *            the buffer to read from
279             * @return the total number of bytes written, possibly zero.
280             * 
281             * @throws NonWritableChannelException
282             *             If this channel was not opened for writing
283             * 
284             * @throws ClosedChannelException
285             *             If this channel is closed
286             * 
287             * @throws AsynchronousCloseException
288             *             If another thread closes this channel while the write
289             *             operation is in progress
290             * 
291             * @throws ClosedByInterruptException
292             *             If another thread interrupts the current thread while the
293             *             write operation is in progress, thereby closing the channel
294             *             and setting the current thread's interrupt status
295             * 
296             * @throws IOException
297             *             If some other I/O error occurs
298             */ 
299            public static int writeMany(WritableByteChannel channel, ByteBuffer buffer)
300                            throws IOException {
301                    int minIters = 2; // for efficiency iterate more than once
302    
303                    int startPos = buffer.position();
304                    int nr = 0;
305                    
306                    while ((minIters > 0 || nr > 0) && buffer.hasRemaining()) {
307                            nr = channel.write(buffer);
308                            if (minIters > 0) minIters--; // safely avoid underflows
309                    }
310    
311                    int total = buffer.position() - startPos;
312                    return total;
313            }
314    
315            /**
316             * Returns all selectable channels registered with the given selector,
317             * excluding channels of invalid keys.
318             * 
319             * @param selector
320             * @return the channels
321             */
322            public static List getRegisteredChannels(Selector selector) {
323                    // TODO: return as array, set, hashset?
324                    List channels = new ArrayList();
325                    Iterator iter = selector.keys().iterator();
326                    while (iter.hasNext()) {
327                            SelectionKey key = (SelectionKey) iter.next();
328                            if (key.isValid()) {
329                                    channels.add(key.channel());
330                            }
331                    }
332                    return channels;
333            }
334            
335            /**
336             * Returns the number of ready operations of the given selection key.
337             * 
338             * @param key
339             * @return the number
340             */
341            public static int getNumberOfReadyOps(SelectionKey key) {
342                    return bitCount(key.readyOps());
343                    /*
344                    int n = 0;
345                    if (key.isAcceptable())  n++;
346                    if (key.isReadable())    n++;
347                    if (key.isWritable())    n++;
348                    if (key.isConnectable()) n++;
349                    return n;
350                    */
351            }
352            
353             /**
354              * Returns the number of one-bits in the two's complement binary
355              * representation of the specified <tt>int</tt> value. This function is
356              * sometimes referred to as the <i>population count </i>.
357              * 
358              * @param i the value to count on.
359              * @return the number of one-bits in the two's complement binary
360              *         representation of the specified <tt>int</tt> value.
361              */
362        private static int bitCount(int i) {
363                    // very efficient magic
364                    // HD, Figure 5-2
365                    i = i - ((i >>> 1) & 0x55555555);
366                    i = (i & 0x33333333) + ((i >>> 2) & 0x33333333);
367                    i = (i + (i >>> 4)) & 0x0f0f0f0f;
368                    i = i + (i >>> 8);
369                    i = i + (i >>> 16);
370                    return i & 0x3f;
371            }
372    
373            /**
374             * Adds the given operation bits to the interest set of the given key.
375             * 
376             * @param key
377             *            the selection key to work on.
378             * @param opBits
379             *            the bits to add.
380             */
381            public static void addInterestBits(SelectionKey key, int opBits) {
382                    int ops = key.interestOps();
383                    if ((ops | opBits) != ops) 
384                            key.interestOps(ops | opBits);
385            }
386            
387            /**
388             * Removes the given operation bits from the interest set of the given key.
389             * 
390             * @param key
391             *            the selection key to work on.
392             * @param opBits
393             *            the bits to remove.
394             */
395            public static void removeInterestBits(SelectionKey key, int opBits) {
396                    int ops = key.interestOps();
397                    if ((ops & ~opBits) != ops)
398                            key.interestOps(ops & ~opBits);
399            }
400            
401            /**
402             * Returns a detailed string representation of the given selection key for
403             * debugging purposes.
404             * 
405             * @param key
406             *            the object to represent
407             * @return a string representation
408             */
409            public static String toString(SelectionKey key) {               
410                    String str = key.isValid() ? "Valid key [" : "INVALID key [";
411                    if (key.isValid()) { // check avoids exceptions!
412                            str = str + "readyOps=" + toString(key.readyOps());
413                            str = str + ", interestedOps=" + toString(key.interestOps());
414                            str = str + ", ";
415                    }
416                    if (!(key.attachment() instanceof ArrayByteList)) {
417                            str = str + "attachment=" + key.attachment();
418                    }
419                    str = str + ", channel=" + key.channel();
420                    return str + "]";
421            }
422    
423            /**
424             * Returns a detailed string representation of the given selection key
425             * readyOps or interestOps for debugging purposes.
426             * 
427             * @param selectionKeyOps
428             *            the object to represent
429             * @return a string representation
430             */
431            public static String toString(int selectionKeyOps) {
432                    String str = "{";
433                    final String[] names = { "OP_READ", "OP_WRITE", "OP_CONNECT", "OP_ACCEPT"};
434                    final int[] values = { SelectionKey.OP_READ, SelectionKey.OP_WRITE, SelectionKey.OP_CONNECT, SelectionKey.OP_ACCEPT};
435                    boolean first = true;
436                    for (int i=0; i < values.length; i++) {
437                            if ((selectionKeyOps & values[i]) != 0) { 
438                                    if (! first) str = str + "|";
439                                    str = str + names[i];
440                                    first = false;
441                            }
442                    }
443                    return str + "}";
444            }
445    
446            /**
447             * Returns a detailed string representation for debugging purposes.
448             * 
449             * @param keySet
450             *            a selector key set (e.g. selector.keys() or
451             *            selector.selectedKeys()).
452             * @return a string representation
453             */
454            public static String toString(Set keySet) {
455                    if (keySet == null) return "[]";
456                    List list = new ArrayList(keySet.size());
457                    Iterator iter = keySet.iterator();
458                    while (iter.hasNext()) {
459                            list.add(toString((SelectionKey) iter.next())); 
460                    }
461                    return list.toString();
462            }
463    
464            /**
465             * Returns a detailed string representation of the given selector for
466             * debugging purposes.
467             * 
468             * @param selector
469             *            the object to represent
470             * @return a string representation
471             */
472            public static String toString(Selector selector) {
473                    StringBuffer str = new StringBuffer();
474                    str.append(selector.getClass().getName()).append(" {");
475                    str.append("isOpen=").append(selector.isOpen()).append(",");
476                    if (selector.isOpen()) {
477                            str.append("\nValid registered channels=").append(getRegisteredChannels(selector));
478                            str.append("\nInterestSet=").append(toString(selector.keys()));
479                            str.append("\nReadySet=").append(toString(selector.selectedKeys()));
480                    }
481                    str.append("}");
482                    return str.toString();
483            }
484            
485    }