001    /*
002     * Copyright (c) 2003, The Regents of the University of California, through
003     * Lawrence Berkeley National Laboratory (subject to receipt of any required
004     * approvals from the U.S. Dept. of Energy). All rights reserved.
005     */
006    package gov.lbl.dsd.sea.nio.util;
007    
008    import java.nio.ByteBuffer;
009    import java.nio.ByteOrder;
010    import java.util.LinkedList;
011    import java.util.List;
012    
013    /**
014     * Efficient thread-safe pool of {@link ByteBuffer}s for high performance NIO
015     * applications. Using a buffer pool can drastically reduce memory allocation,
016     * memory copying and garbage collection by taking buffers from the pool when
017     * needed, and recycling them back to the pool when they are no more needed.
018     * <p>
019     * There is a trade-off here: The improved performance a pool promises comes at
020     * the expense of larger overall memory footprint, since buffers in the pool are
021     * not subject to intermediate garbage collection (unless the entire pool is no
022     * more referenced or cleared, of course).
023     * <p>
024     * Once you have taken a buffer via the <code>take</code> method from the
025     * pool, you can modify it in any way desired. Once you have recycled a buffer
026     * via the <code>put</code> method back to the pool you MUST NOT modify it
027     * anymore, NOT EVEN it's mark, position or limit, whether directly or
028     * indirectly!
029     * <p>
030     * On <code>put</code> the pool will ignore buffers with
031     * <code>buffer.capacity() < bufferCapacity</code> or when the aggregate
032     * capacity of all buffers in the pool would become larger than
033     * <code>maxPoolCapacity</code>.
034     * <p>
035     * On <code>take</code> the pool will return a cleared buffer with at least
036     * the given <code>bufferCapacity</code>, which will be a direct or heap
037     * buffer, depending on the <code>preferDirect</code> flag. In any case, the
038     * returned buffer will have the given <code>byteOrder</code>, or BIG_ENDIAN
039     * byte order if <code>byteOrder</code> is null.
040     * <p>
041     * If empty on <code>take</code> the pool will create a new buffer and return
042     * that. (The buffer pool is smart in avoiding allocating too many direct
043     * buffers and in its preference strategies).
044     * <p>
045     * Hint: At least in jdk-1.4.2 the total maximum amount of direct buffers that
046     * may be allocated is 64MB by default. You can change this via
047     * <code>java -XX:MaxDirectMemorySize=256m</code>. See bug 4879883 on Java
048     * Bug Parade. See http://iais.kemsu.ru/odocs/javax/JSDK.Src/java/nio/Bits.java
049     * 
050     * @author whoschek@lbl.gov
051     * @author $Author: hoschek3 $
052     * @version $Revision: 1.1 $, $Date: 2004/08/04 23:26:46 $
053     */
054    public class ByteBufferPool {
055            
056            private final List heapBuffers;
057            private final List directBuffers;
058            private final int bufferCapacity;
059            private final long maxPoolCapacity;
060            private final boolean preferDirect;
061            private final ByteOrder byteOrder;
062            private long currentPoolCapacity;
063            
064            // statistics
065            protected long nrAllocations;
066            protected long nrAllocated;
067            protected long nrTakes;
068            protected long nrTakesReused;
069            protected long nrTakenBytes;
070            protected long nrReusedBytes;
071            protected long nrReplacingPuts;
072    
073            private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(ByteBufferPool.class);
074    
075            /**
076             * Creates a new pool with the given properties.
077             */
078            public ByteBufferPool(long maxPoolCapacity, int bufferCapacity, boolean preferDirect, ByteOrder byteOrder) {
079                    if (bufferCapacity <= 0) throw new IllegalArgumentException("bufferCapacity must be > 0");
080                    if (maxPoolCapacity < 0) throw new IllegalArgumentException("maxPoolCapacity must be >= 0");
081                    this.maxPoolCapacity = maxPoolCapacity;
082                    this.bufferCapacity = bufferCapacity;
083                    this.preferDirect = preferDirect;
084                    this.byteOrder = (byteOrder == null ? ByteOrder.BIG_ENDIAN : byteOrder);
085                    this.heapBuffers = new LinkedList();
086                    this.directBuffers = new LinkedList();
087                    
088                    this.clear();
089            }
090            
091            /** 
092             * Returns the buffer capacity.
093             */
094            public int getBufferCapacity() {
095                    return this.bufferCapacity;
096            }
097            
098            /**
099             * Returns the byte order used when returning buffers.
100             */
101            public ByteOrder getByteOrder() {
102                    return this.byteOrder;
103            }
104            
105            /** 
106             * Returns the maximum pool capacity.
107             */
108            public long getMaxPoolCapacity() {
109                    return this.maxPoolCapacity;
110            }
111    
112            /** 
113             * Returns whether or not this pool prefers to return direct or heap buffers.
114             */
115            public boolean getPreferDirect() {
116                    return this.preferDirect;
117            }
118            
119            /**
120             * Recycles a buffer back into the pool (adds it to the pool).
121             * @param buffer the buffer to put into the pool.
122             */
123            synchronized public void put(ByteBuffer buffer) {
124                    if (buffer == null || buffer.capacity() < this.bufferCapacity) {
125                            return; // ignore
126                    }
127                    
128                    if (this.currentPoolCapacity + buffer.capacity() > this.maxPoolCapacity) {
129                            if (buffer.isDirect() != preferDirect) return; // ignore
130                            
131                            // try to drop a non-preferred buffer and see if new buffer fits
132                            List dropBuffers = buffer.isDirect() ? heapBuffers : directBuffers;
133                            if (dropBuffers.size() == 0) return; // ignore
134                            
135                            int cap = ((ByteBuffer) dropBuffers.get(dropBuffers.size()-1)).capacity();
136                            //int cap = this.bufferCapacity;
137                            if (this.currentPoolCapacity - cap + buffer.capacity() > this.maxPoolCapacity) {
138                                    return; // ignore
139                            }
140                            else {
141                                    dropBuffers.remove(dropBuffers.size()-1);
142                                    this.currentPoolCapacity -= cap;
143                                    this.nrReplacingPuts++;
144                            }
145                    }
146                                    
147                    List buffers = buffer.isDirect() ? directBuffers : heapBuffers;
148                    buffers.add(0, buffer);
149                    this.currentPoolCapacity += buffer.capacity();
150            }
151    
152            /**
153             * Returns a cleared buffer from the pool, or creates and returns a new
154             * buffer.
155             * 
156             * @return a buffer from the pool.
157             */
158            public ByteBuffer take() {
159                    ByteBuffer buffer = null;
160                    synchronized (this) {
161                            this.nrTakes ++;
162                            List buffers = preferDirect ? directBuffers : heapBuffers;
163                            
164                            if (buffers.size() > 0) { // try preferred buffers
165                                    buffer = (ByteBuffer) buffers.get(0);
166                            }
167                            else { // try non-preferred buffers
168                                    buffers = preferDirect ? heapBuffers : directBuffers;
169                                    if (buffers.size() > 0) {
170                                            buffer = (ByteBuffer) buffers.get(0);
171                                    }
172                            }
173                            if (buffer != null) {
174                                    buffers.remove(0);
175                                    this.currentPoolCapacity -= buffer.capacity();
176                                    this.nrReusedBytes += buffer.capacity();
177                                    this.nrTakenBytes += buffer.capacity();
178                                    this.nrTakesReused++;
179                            }                       
180                    }
181                    
182                    if (buffer == null) {
183                            boolean allocateDirect;
184                            synchronized (this) {
185                                    // fix for vm bugs limiting max amount of direct buffer mem that may
186                                    // be allocated
187                                    allocateDirect = this.preferDirect
188                                                    && nrAllocated + this.bufferCapacity > maxPoolCapacity ? false
189                                                    : this.preferDirect;
190                                    this.nrAllocated += this.bufferCapacity;
191                                    this.nrTakenBytes += this.bufferCapacity;
192                                    this.nrAllocations++;
193                            }
194                            buffer = this.createBuffer(this.bufferCapacity, allocateDirect);
195                    }
196                    
197                    buffer.clear();
198                    if (buffer.order() != this.byteOrder) {
199                            buffer.order(this.byteOrder);
200                    }
201                    return buffer;
202            }
203            
204            /**
205             * Override this method to create custom bytebuffers.
206             */
207            protected ByteBuffer createBuffer(int capacity, boolean direct) {
208                    if (direct) {
209                            try {
210                                    return ByteBuffer.allocateDirect(capacity);
211                            } catch (OutOfMemoryError e) {
212                                    log.warn("OutOfMemoryError: No more direct buffers available; trying heap buffer instead");
213                            } 
214                    }
215                    return ByteBuffer.allocate(capacity);
216            }
217            
218            /**
219             * Removes all buffers from the pool.
220             */
221            synchronized public void clear() {
222                    this.heapBuffers.clear();
223                    this.directBuffers.clear();
224                    this.currentPoolCapacity = 0;
225                    
226                    this.nrAllocations = 0;
227                    this.nrAllocated = 0;
228                    this.nrTakes = 0;
229                    this.nrTakesReused = 0;
230                    this.nrTakenBytes = 0;
231                    this.nrReusedBytes = 0;         
232                    this.nrReplacingPuts = 0;               
233            }
234    
235            /**
236             * Returns a summary statistics representation of the receiver.
237             */
238            public synchronized String toString() {
239                    String s = this.getClass().getName() + ": ";
240                    s += "nrAllocated=" + mb(nrAllocated) + " MB";
241                    s += ", nrAllocations=" + nrAllocations;
242                    s += ", nrTakes=" + nrTakes;
243                    s += ", nrTakesReused=" + nrTakesReused;
244                    s += ", nrReplacingPuts=" + nrReplacingPuts;
245                    s += ", nrTakenBytes=" + mb(nrTakenBytes) + " MB";
246                    s += ", nrReusedBytes=" + mb(nrReusedBytes) + " MB";
247                    s += ", maxPoolCapacity=" + mb(maxPoolCapacity) + " MB";
248                    s += ", currentPoolCapacity=" + mb(currentPoolCapacity) + " MB";
249                    s += " --> EFFICIENCY=" + (100.0f * nrReusedBytes / nrTakenBytes) + " %";
250                    return s;
251            }
252            
253            private static float mb(long bytes) { 
254                    return bytes / (1024.0f * 1024.0f);
255            }
256    }