From e94001da2e4c7f106b98480f8ab841db1c393a87 Mon Sep 17 00:00:00 2001 From: Brett Okken Date: Mon, 4 May 2020 13:13:46 -0500 Subject: [PATCH] Pool byte[] and direct ByteBuffer (#234) * https://github.com/xerial/snappy-java/issues/233 Pool byte[] and direct ByteBuffer instances for reuse by Framed implementation. * Remove use of google VisibleForTesting annotation https://github.com/xerial/snappy-java/issues/233 * javadoc --- .../java/org/xerial/snappy/SnappyFramed.java | 139 ----------- .../snappy/SnappyFramedInputStream.java | 137 +++++++++-- .../snappy/SnappyFramedOutputStream.java | 118 +++++++++- .../org/xerial/snappy/pool/BufferPool.java | 53 +++++ .../xerial/snappy/pool/CachingBufferPool.java | 216 ++++++++++++++++++ .../snappy/pool/DefaultPoolFactory.java | 37 +++ .../xerial/snappy/pool/DirectByteBuffers.java | 150 ++++++++++++ .../snappy/pool/QuiescentBufferPool.java | 55 +++++ .../snappy/pool/CachingBufferPoolTest.java | 184 +++++++++++++++ 9 files changed, 917 insertions(+), 172 deletions(-) create mode 100644 src/main/java/org/xerial/snappy/pool/BufferPool.java create mode 100644 src/main/java/org/xerial/snappy/pool/CachingBufferPool.java create mode 100644 src/main/java/org/xerial/snappy/pool/DefaultPoolFactory.java create mode 100644 src/main/java/org/xerial/snappy/pool/DirectByteBuffers.java create mode 100644 src/main/java/org/xerial/snappy/pool/QuiescentBufferPool.java create mode 100644 src/test/java/org/xerial/snappy/pool/CachingBufferPoolTest.java diff --git a/src/main/java/org/xerial/snappy/SnappyFramed.java b/src/main/java/org/xerial/snappy/SnappyFramed.java index 97f0147..df8a82d 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramed.java +++ b/src/main/java/org/xerial/snappy/SnappyFramed.java @@ -3,24 +3,9 @@ */ package org.xerial.snappy; -import static java.lang.invoke.MethodHandles.constant; -import static java.lang.invoke.MethodHandles.dropArguments; -import static java.lang.invoke.MethodHandles.filterReturnValue; -import static java.lang.invoke.MethodHandles.guardWithTest; -import static java.lang.invoke.MethodHandles.lookup; -import static java.lang.invoke.MethodType.methodType; - import java.io.IOException; -import java.lang.invoke.MethodHandle; -import java.lang.invoke.MethodHandles.Lookup; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; -import java.security.AccessController; -import java.security.PrivilegedExceptionAction; -import java.util.logging.Level; -import java.util.logging.Logger; /** * Constants and utilities for implementing x-snappy-framed. @@ -38,82 +23,6 @@ final class SnappyFramed private static final int MASK_DELTA = 0xa282ead8; - /** - * Sun specific mechanisms to clean up resources associated with direct byte buffers. - */ - @SuppressWarnings("unchecked") - static final Class DIRECT_BUFFER_CLAZZ = (Class) lookupClassQuietly("java.nio.DirectByteBuffer"); - - static final MethodHandle CLEAN_HANDLE; - - static { - // this approach is based off that used by apache lucene and documented here: https://issues.apache.org/jira/browse/LUCENE-6989 - // and https://github.com/apache/lucene-solr/blob/7e03427fa14a024ce257babcb8362d2451941e21/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java - MethodHandle cleanHandle = null; - try { - final PrivilegedExceptionAction action = new PrivilegedExceptionAction() { - - @Override - public MethodHandle run() throws Exception { - MethodHandle handle = null; - if (DIRECT_BUFFER_CLAZZ != null) { - final Lookup lookup = lookup(); - - try { - // sun.misc.Unsafe unmapping (Java 9+) - final Class unsafeClass = Class.forName("sun.misc.Unsafe"); - // first check if Unsafe has the right method, otherwise we can give up - // without doing any security critical stuff: - final MethodHandle unmapper = lookup.findVirtual(unsafeClass, "invokeCleaner", methodType(void.class, ByteBuffer.class)); - // fetch the unsafe instance and bind it to the virtual MH: - final Field f = unsafeClass.getDeclaredField("theUnsafe"); - f.setAccessible(true); - final Object theUnsafe = f.get(null); - handle = unmapper.bindTo(theUnsafe); - } catch (Exception e) { - Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "unable to use java 9 Unsafe.invokeCleaner", e); - - // sun.misc.Cleaner unmapping (Java 8 and older) - final Method m = DIRECT_BUFFER_CLAZZ.getMethod("cleaner"); - m.setAccessible(true); - final MethodHandle directBufferCleanerMethod = lookup.unreflect(m); - final Class cleanerClass = directBufferCleanerMethod.type().returnType(); - - /* - * "Compile" a MethodHandle that basically is equivalent to the following code: - * void unmapper(ByteBuffer byteBuffer) - * { - * sun.misc.Cleaner cleaner = ((java.nio.DirectByteBuffer) byteBuffer).cleaner(); - * if (nonNull(cleaner)) - * { - * cleaner.clean(); - * } - * else - * { - * // the noop is needed because MethodHandles#guardWithTest always needs ELSE - * noop(cleaner); - * } - * } - */ - final MethodHandle cleanMethod = lookup.findVirtual(cleanerClass, "clean", methodType(void.class)); - final MethodHandle nonNullTest = lookup.findStatic(SnappyFramed.class, "nonNull", methodType(boolean.class, Object.class)).asType(methodType(boolean.class, cleanerClass)); - final MethodHandle noop = dropArguments(constant(Void.class, null).asType(methodType(void.class)), 0, cleanerClass); - handle = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop)).asType(methodType(void.class, ByteBuffer.class)); - } - } - - return handle; - } - }; - - cleanHandle = AccessController.doPrivileged(action); - - } catch (Throwable t) { - Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Exception occurred attempting to lookup Sun specific DirectByteBuffer cleaner classes.", t); - } - CLEAN_HANDLE = cleanHandle; - } - /** * The header consists of the stream identifier flag, 3 bytes indicating a * length of 6, and "sNaPpY" in ASCII. @@ -213,52 +122,4 @@ final class SnappyFramed buffer.clear(); return skip - toSkip; } - - private static Class lookupClassQuietly(String name) - { - try { - return SnappyFramed.class.getClassLoader().loadClass(name); - } - catch (Throwable t) { - Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Did not find requested class: " + name, t); - } - - return null; - } - - /** - * Provides jvm implementation specific operation to aggressively release resources associated with buffer. - * - * @param buffer The {@code ByteBuffer} to release. Must not be {@code null}. Must be {@link ByteBuffer#isDirect() direct}. - */ - static void releaseDirectByteBuffer(final ByteBuffer buffer) - { - assert buffer != null && buffer.isDirect(); - - if (CLEAN_HANDLE != null && DIRECT_BUFFER_CLAZZ.isInstance(buffer)) { - try { - final PrivilegedExceptionAction pea = new PrivilegedExceptionAction() { - @Override - public Void run() throws Exception { - try { - CLEAN_HANDLE.invokeExact(buffer); - } catch (Exception e) { - throw e; - } catch (Throwable t) { - //this will be an error - throw new RuntimeException(t); - } - return null; - } - }; - AccessController.doPrivileged(pea); - } catch (Throwable t) { - Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Exception occurred attempting to clean up Sun specific DirectByteBuffer.", t); - } - } - } - - static boolean nonNull(Object o) { - return o != null; - } } diff --git a/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java b/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java index 21eb30c..d96b866 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java @@ -9,7 +9,6 @@ import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; import static org.xerial.snappy.SnappyFramed.STREAM_IDENTIFIER_FLAG; import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; import static org.xerial.snappy.SnappyFramed.readBytes; -import static org.xerial.snappy.SnappyFramed.releaseDirectByteBuffer; import static org.xerial.snappy.SnappyFramedOutputStream.MAX_BLOCK_SIZE; import java.io.EOFException; @@ -23,6 +22,9 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.util.Arrays; +import org.xerial.snappy.pool.BufferPool; +import org.xerial.snappy.pool.DefaultPoolFactory; + /** * Implements the + * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param in the underlying input stream. Must not be {@code null}. + * @throws IOException */ public SnappyFramedInputStream(InputStream in) throws IOException { - this(in, true); + this(in, true, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a Snappy input stream to read data from the specified underlying + * input stream. + * + * @param in the underlying input stream. Must not be {@code null}. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedInputStream(InputStream in, BufferPool bufferPool) + throws IOException + { + this(in, true, bufferPool); + } + + /** + * Creates a Snappy input stream to read data from the specified underlying + * input stream. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param in the underlying input stream. Must not be {@code null}. + * @param verifyChecksums if true, checksums in input stream will be verified + * @throws IOException + */ + public SnappyFramedInputStream(InputStream in, boolean verifyChecksums) + throws IOException + { + this(in, verifyChecksums, DefaultPoolFactory.getDefaultPool()); } /** @@ -95,11 +133,14 @@ public final class SnappyFramedInputStream * * @param in the underlying input stream. Must not be {@code null}. * @param verifyChecksums if true, checksums in input stream will be verified + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException */ - public SnappyFramedInputStream(InputStream in, boolean verifyChecksums) + public SnappyFramedInputStream(InputStream in, boolean verifyChecksums, + BufferPool bufferPool) throws IOException { - this(Channels.newChannel(in), verifyChecksums); + this(Channels.newChannel(in), verifyChecksums, bufferPool); } /** @@ -107,6 +148,24 @@ public final class SnappyFramedInputStream * channel. * * @param in the underlying readable channel. Must not be {@code null}. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedInputStream(ReadableByteChannel in, BufferPool bufferPool) + throws IOException + { + this(in, true, bufferPool); + } + + /** + * Creates a Snappy input stream to read data from the specified underlying + * channel. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param in the underlying readable channel. Must not be {@code null}. + * @throws IOException */ public SnappyFramedInputStream(ReadableByteChannel in) throws IOException @@ -117,18 +176,43 @@ public final class SnappyFramedInputStream /** * Creates a Snappy input stream to read data from the specified underlying * channel. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

* * @param in the underlying readable channel. Must not be {@code null}. * @param verifyChecksums if true, checksums in input stream will be verified + * @throws IOException */ public SnappyFramedInputStream(ReadableByteChannel in, boolean verifyChecksums) throws IOException + { + this(in, verifyChecksums, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a Snappy input stream to read data from the specified underlying + * channel. + * + * @param in the underlying readable channel. Must not be {@code null}. + * @param verifyChecksums if true, checksums in input stream will be verified + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedInputStream(ReadableByteChannel in, + boolean verifyChecksums, BufferPool bufferPool) + throws IOException { if (in == null) { throw new NullPointerException("in is null"); } + if (bufferPool == null) { + throw new NullPointerException("bufferPool is null"); + } + + this.bufferPool = bufferPool; this.rbc = in; this.verifyChecksums = verifyChecksums; @@ -155,19 +239,22 @@ public final class SnappyFramedInputStream */ private void allocateBuffersBasedOnSize(int size) { - if (input != null) { - releaseDirectByteBuffer(input); + bufferPool.releaseDirect(input); } if (uncompressedDirect != null) { - releaseDirectByteBuffer(uncompressedDirect); + bufferPool.releaseDirect(uncompressedDirect); } - input = ByteBuffer.allocateDirect(size); + if (buffer != null) { + bufferPool.releaseArray(buffer); + } + + input = bufferPool.allocateDirect(size); final int maxCompressedLength = Snappy.maxCompressedLength(size); - uncompressedDirect = ByteBuffer.allocateDirect(maxCompressedLength); - buffer = new byte[maxCompressedLength]; + uncompressedDirect = bufferPool.allocateDirect(maxCompressedLength); + buffer = bufferPool.allocateArray(maxCompressedLength); } @Override @@ -359,14 +446,21 @@ public final class SnappyFramedInputStream finally { if (!closed) { closed = true; - } - if (input != null) { - releaseDirectByteBuffer(input); - } + if (input != null) { + bufferPool.releaseDirect(input); + input = null; + } - if (uncompressedDirect != null) { - releaseDirectByteBuffer(uncompressedDirect); + if (uncompressedDirect != null) { + bufferPool.releaseDirect(uncompressedDirect); + uncompressedDirect = null; + } + + if (buffer != null) { + bufferPool.releaseArray(buffer); + buffer = null; + } } } } @@ -456,9 +550,10 @@ public final class SnappyFramedInputStream final int uncompressedLength = Snappy.uncompressedLength(input); if (uncompressedLength > uncompressedDirect.capacity()) { - uncompressedDirect = ByteBuffer - .allocateDirect(uncompressedLength); - buffer = new byte[Math.max(input.capacity(), uncompressedLength)]; + bufferPool.releaseDirect(uncompressedDirect); + bufferPool.releaseArray(buffer); + uncompressedDirect = bufferPool.allocateDirect(uncompressedLength); + buffer = bufferPool.allocateArray(uncompressedLength); } uncompressedDirect.clear(); diff --git a/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java b/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java index 7d4d1a7..6797b6c 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java @@ -7,7 +7,6 @@ import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG; import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; import static org.xerial.snappy.SnappyFramed.maskedCrc32c; -import static org.xerial.snappy.SnappyFramed.releaseDirectByteBuffer; import java.io.IOException; import java.io.InputStream; @@ -19,6 +18,9 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import org.xerial.snappy.pool.BufferPool; +import org.xerial.snappy.pool.DefaultPoolFactory; + /** * Implements the
+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

* * @param out The underlying {@link OutputStream} to write to. Must not be * {@code null}. @@ -80,11 +87,29 @@ public final class SnappyFramedOutputStream public SnappyFramedOutputStream(OutputStream out) throws IOException { - this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO); + this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE} + * and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. + * + * @param out The underlying {@link OutputStream} to write to. Must not be + * {@code null}. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedOutputStream(OutputStream out, BufferPool bufferPool) + throws IOException + { + this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool); } /** * Creates a new {@link SnappyFramedOutputStream} instance. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

* * @param out The underlying {@link OutputStream} to write to. Must not be * {@code null}. @@ -99,12 +124,35 @@ public final class SnappyFramedOutputStream double minCompressionRatio) throws IOException { - this(Channels.newChannel(out), blockSize, minCompressionRatio); + this(Channels.newChannel(out), blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} instance. + * + * @param out The underlying {@link OutputStream} to write to. Must not be + * {@code null}. + * @param blockSize The block size (of raw data) to compress before writing frames + * to out. Must be in (0, 65536]. + * @param minCompressionRatio Defines the minimum compression ratio ( + * {@code compressedLength / rawLength}) that must be achieved to + * write the compressed data. This must be in (0, 1.0]. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedOutputStream(OutputStream out, int blockSize, + double minCompressionRatio, BufferPool bufferPool) + throws IOException + { + this(Channels.newChannel(out), blockSize, minCompressionRatio, bufferPool); } /** * Creates a new {@link SnappyFramedOutputStream} using the * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

* * @param out The underlying {@link WritableByteChannel} to write to. Must * not be {@code null}. @@ -114,7 +162,25 @@ public final class SnappyFramedOutputStream public SnappyFramedOutputStream(WritableByteChannel out) throws IOException { - this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO); + this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} using the + * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. + *

+ * Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers. + *

+ * + * @param out The underlying {@link WritableByteChannel} to write to. Must + * not be {@code null}. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + */ + public SnappyFramedOutputStream(WritableByteChannel out, BufferPool bufferPool) + throws IOException + { + this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool); } /** @@ -133,9 +199,34 @@ public final class SnappyFramedOutputStream public SnappyFramedOutputStream(WritableByteChannel out, int blockSize, double minCompressionRatio) throws IOException + { + this(out, blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool()); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} instance. + * + * @param out The underlying {@link WritableByteChannel} to write to. Must + * not be {@code null}. + * @param blockSize The block size (of raw data) to compress before writing frames + * to out. Must be in (0, 65536]. + * @param minCompressionRatio Defines the minimum compression ratio ( + * {@code compressedLength / rawLength}) that must be achieved to + * write the compressed data. This must be in (0, 1.0]. + * @param bufferPool Used to obtain buffer instances. Must not be {@code null}. + * @throws IOException + * @since 1.1.1 + */ + public SnappyFramedOutputStream(WritableByteChannel out, int blockSize, + double minCompressionRatio, BufferPool bufferPool) + throws IOException { if (out == null) { - throw new NullPointerException(); + throw new NullPointerException("out is null"); + } + + if (bufferPool == null) { + throw new NullPointerException("buffer pool is null"); } if (minCompressionRatio <= 0 || minCompressionRatio > 1.0) { @@ -147,12 +238,14 @@ public final class SnappyFramedOutputStream throw new IllegalArgumentException("block size " + blockSize + " must be in (0, 65536]"); } - + this.blockSize = blockSize; this.out = out; this.minCompressionRatio = minCompressionRatio; - buffer = ByteBuffer.allocate(blockSize); - directInputBuffer = ByteBuffer.allocateDirect(blockSize); - outputBuffer = ByteBuffer.allocateDirect(Snappy + + this.bufferPool = bufferPool; + buffer = ByteBuffer.wrap(bufferPool.allocateArray(blockSize), 0, blockSize); + directInputBuffer = bufferPool.allocateDirect(blockSize); + outputBuffer = bufferPool.allocateDirect(Snappy .maxCompressedLength(blockSize)); writeHeader(out); @@ -370,9 +463,9 @@ public final class SnappyFramedOutputStream } finally { closed = true; - - releaseDirectByteBuffer(directInputBuffer); - releaseDirectByteBuffer(outputBuffer); + bufferPool.releaseArray(buffer.array()); + bufferPool.releaseDirect(directInputBuffer); + bufferPool.releaseDirect(outputBuffer); } } @@ -389,6 +482,7 @@ public final class SnappyFramedOutputStream buffer.flip(); writeCompressed(buffer); buffer.clear(); + buffer.limit(blockSize); } } diff --git a/src/main/java/org/xerial/snappy/pool/BufferPool.java b/src/main/java/org/xerial/snappy/pool/BufferPool.java new file mode 100644 index 0000000..ae39a76 --- /dev/null +++ b/src/main/java/org/xerial/snappy/pool/BufferPool.java @@ -0,0 +1,53 @@ +package org.xerial.snappy.pool; + +import java.nio.ByteBuffer; + +/** + * Makes various types of buffers available for use and potential re-use. + * + *

+ * Implementations must be safe for concurrent use by multiple threads. + *

+ * + * @author Brett Okken + */ +public interface BufferPool { + + /** + * Returns a {@code byte[]} of size or greater length. + * @param size The minimum size array required. Must be {@code >= 0}. + * @return A {@code byte[]} with length of at least size. + * @see #releaseArray(byte[]) + */ + public byte[] allocateArray(int size); + + /** + * Returns instance to pool for potential future reuse. + *

+ * Must not be returned more than 1 time. Must not be used by caller after return. + *

+ * @param buffer Instance to return to pool. Must not be {@code null}. + * Must not be returned more than 1 time. Must not be used by caller after return. + */ + public void releaseArray(byte[] buffer); + + /** + * Returns a {@link ByteBuffer#allocateDirect(int) direct ByteBuffer} of size or + * greater {@link ByteBuffer#capacity() capacity}. + * @param size The minimum size buffer required. Must be {@code >= 0}. + * @return A {@code ByteBuffer} of size or greater {@link ByteBuffer#capacity() capacity}. + * @see #releaseDirect(ByteBuffer) + * @see ByteBuffer#allocateDirect(int) + */ + public ByteBuffer allocateDirect(int size); + + /** + * Returns instance to pool for potential future reuse. + *

+ * Must not be returned more than 1 time. Must not be used by caller after return. + *

+ * @param buffer Instance to return to pool. Must not be {@code null}. + * Must not be returned more than 1 time. Must not be used by caller after return. + */ + public void releaseDirect(ByteBuffer buffer); +} diff --git a/src/main/java/org/xerial/snappy/pool/CachingBufferPool.java b/src/main/java/org/xerial/snappy/pool/CachingBufferPool.java new file mode 100644 index 0000000..80232e3 --- /dev/null +++ b/src/main/java/org/xerial/snappy/pool/CachingBufferPool.java @@ -0,0 +1,216 @@ +package org.xerial.snappy.pool; + +import java.lang.ref.SoftReference; +import java.nio.ByteBuffer; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentMap; + +/** + * A {@link BufferPool} implementation which caches values at fixed sizes. + *

+ * Pooled instances are held as {@link SoftReference} to allow GC if necessary. + *

+ *

+ * The current fixed sizes are calculated as follows: + *

    + *
  • Values < 4KB return 4KB
  • + *
  • 4KB - 32KB to 2KB
  • + *
  • 32KB - 512KB to 16KB
  • + *
  • 512KB - 2MB to 128KB
  • + *
  • 2MB - 16MB to 512KB
  • + *
  • 16MB - 128MB to 4MB
  • + *
  • 128MB - 512MB to 16MB
  • + *
  • 512MB - 1.5 GB to 128MB
  • + *
  • Values > 1.5GB return {@link Integer#MAX_VALUE}
  • + *
+ *

+ * @author Brett Okken + */ +public final class CachingBufferPool implements BufferPool { + + private static interface IntFunction { + public E create(int size); + } + + private static final IntFunction ARRAY_FUNCTION = new IntFunction() { + @Override + public byte[] create(int size) { + return new byte[size]; + } + }; + + private static final IntFunction DBB_FUNCTION = new IntFunction() { + @Override + public ByteBuffer create(int size) { + return ByteBuffer.allocateDirect(size); + } + }; + + private static final CachingBufferPool INSTANCE = new CachingBufferPool(); + + private final ConcurrentMap>> bytes = new ConcurrentHashMap<>(); + private final ConcurrentMap>> buffers = new ConcurrentHashMap<>(); + + private CachingBufferPool() { + } + + /** + * Returns instance of {@link CachingBufferPool} for using cached buffers. + * @return instance of {@link CachingBufferPool} for using cached buffers. + */ + public static BufferPool getInstance() { + return INSTANCE; + } + + /** + * {@inheritDoc} + */ + @Override + public byte[] allocateArray(int size) { + if (size <= 0) { + throw new IllegalArgumentException("size is invalid: " + size); + } + + return getOrCreate(size, bytes, ARRAY_FUNCTION); + } + + /** + * {@inheritDoc} + */ + @Override + public void releaseArray(byte[] buffer) { + if (buffer == null) { + throw new IllegalArgumentException("buffer is null"); + } + returnValue(buffer, buffer.length, bytes); + } + + /** + * {@inheritDoc} + */ + @Override + public ByteBuffer allocateDirect(int size) { + if (size <= 0) { + throw new IllegalArgumentException("size is invalid: " + size); + } + + return getOrCreate(size, buffers, DBB_FUNCTION); + } + + /** + * {@inheritDoc} + */ + @Override + public void releaseDirect(ByteBuffer buffer) { + if (buffer == null) { + throw new IllegalArgumentException("buffer is null"); + } + buffer.clear(); + returnValue(buffer, buffer.capacity(), buffers); + } + + private static E getOrCreate(final int size, final ConcurrentMap>> map, final IntFunction creator) { + assert size > 0; + final int adjustedSize = adjustSize(size); + final ConcurrentLinkedDeque> queue = optimisticGetEntry(adjustedSize, map); + SoftReference entry; + while ((entry = queue.pollFirst()) != null) { + final E val = entry.get(); + if (val != null) { + return val; + } + } + + return creator.create(adjustedSize); + } + + /* + * This is package scope to allow direct unit testing. + */ + static int adjustSize(int size) { + assert size > 0; + + switch (Integer.numberOfLeadingZeros(size)) { + case 1: // 1GB - 2GB + case 2: // 512MB + //if 512MB - 1.5 GB round to nearest 128 MB (2^27), else Integer.MAX_VALUE + return size <= 0x6000_0000 ? roundToPowers(size, 27) : Integer.MAX_VALUE; + case 3: //256MB + case 4: //128MB + //if 128MB - 512MB, round to nearest 16 MB + return roundToPowers(size, 24); + case 5: // 64MB + case 6: // 32MB + case 7: // 16MB + //if 16MB - 128MB, round to nearest 4MB + return roundToPowers(size, 22); + case 8: // 8MB + case 9: // 4MB + case 10: // 2MB + //if 2MB - 16MB, round to nearest 512KB + return roundToPowers(size, 19); + case 11: // 1MB + case 12: //512KB + //if 512KB - 2MB, round to nearest 128KB + return roundToPowers(size, 17); + case 13: //256KB + case 14: //128KB + case 15: // 64KB + case 16: // 32KB + //if 32KB to 512KB, round to nearest 16KB + return roundToPowers(size, 14); + case 17: // 16KB + case 18: // 8KB + case 19: // 4KB + // if 4KB - 32KB, round to nearest 2KB + return roundToPowers(size, 11); + default: + return 4 * 1024; + } + } + + private static int roundToPowers(int number, int bits) { + final int mask = (0x7FFF_FFFF >> bits) << bits; + final int floor = number & mask; + return floor == number ? number : floor + (1 << bits); + } + + private static ConcurrentLinkedDeque> optimisticGetEntry(Integer key, ConcurrentMap>> map) { + ConcurrentLinkedDeque> val = map.get(key); + if (val == null) { + map.putIfAbsent(key, new ConcurrentLinkedDeque>()); + val = map.get(key); + } + return val; + } + + private static void returnValue(E value, Integer size, ConcurrentMap>> map) { + final ConcurrentLinkedDeque> queue = map.get(size); + //no queue will exist if buffer was not originally obtained from this class + if (queue != null) { + //push this value onto deque first so that concurrent request can use it + queue.addFirst(new SoftReference(value)); + + //purge oldest entries have lost references + SoftReference entry; + boolean lastEmpty = true; + while(lastEmpty && (entry = queue.peekLast()) != null) { + if (entry.get() == null) { + queue.removeLastOccurrence(entry); + } else { + lastEmpty = false; + } + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return "CachingBufferPool [bytes=" + this.bytes + ", buffers=" + this.buffers + "]"; + } +} + diff --git a/src/main/java/org/xerial/snappy/pool/DefaultPoolFactory.java b/src/main/java/org/xerial/snappy/pool/DefaultPoolFactory.java new file mode 100644 index 0000000..fc9447c --- /dev/null +++ b/src/main/java/org/xerial/snappy/pool/DefaultPoolFactory.java @@ -0,0 +1,37 @@ +package org.xerial.snappy.pool; + +/** + * Manages implementation of {@link BufferPool} to use by default. Setting the system property {@link #DISABLE_CACHING_PROPERTY} to {@code true} will + * cause the {@link QuiescentBufferPool} to be used by default. Otherwise, {@link CachingBufferPool} will be used by default. + * {@link #setDefaultPool(BufferPool)} can be used to explicitly control the implementation to use. + */ +public final class DefaultPoolFactory { + + /** + * Name of system property to disable use of {@link CachingBufferPool} by default. + */ + public static final String DISABLE_CACHING_PROPERTY = "org.xerial.snappy.pool.disable"; + + private static volatile BufferPool defaultPool = "true".equalsIgnoreCase(System.getProperty(DISABLE_CACHING_PROPERTY)) + ? QuiescentBufferPool.getInstance() + : CachingBufferPool.getInstance(); + + /** + * @return The default instance to use. + */ + public static BufferPool getDefaultPool() { + return defaultPool; + } + + /** + * Sets the default instance to use. + * @param pool The default instance to use. Must not be {@code null}. + * @see #getDefaultPool() + */ + public static void setDefaultPool(BufferPool pool) { + if (pool == null) { + throw new IllegalArgumentException("pool is null"); + } + defaultPool = pool; + } +} diff --git a/src/main/java/org/xerial/snappy/pool/DirectByteBuffers.java b/src/main/java/org/xerial/snappy/pool/DirectByteBuffers.java new file mode 100644 index 0000000..2d77ed8 --- /dev/null +++ b/src/main/java/org/xerial/snappy/pool/DirectByteBuffers.java @@ -0,0 +1,150 @@ +package org.xerial.snappy.pool; + +import static java.lang.invoke.MethodHandles.constant; +import static java.lang.invoke.MethodHandles.dropArguments; +import static java.lang.invoke.MethodHandles.filterReturnValue; +import static java.lang.invoke.MethodHandles.guardWithTest; +import static java.lang.invoke.MethodHandles.lookup; +import static java.lang.invoke.MethodType.methodType; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles.Lookup; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedExceptionAction; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Utility to facilitate disposing of direct byte buffer instances. + */ +final class DirectByteBuffers { + + /** + * Sun specific mechanisms to clean up resources associated with direct byte buffers. + */ + @SuppressWarnings("unchecked") + static final Class DIRECT_BUFFER_CLAZZ = (Class) lookupClassQuietly("java.nio.DirectByteBuffer"); + + static final MethodHandle CLEAN_HANDLE; + + static { + // this approach is based off that used by apache lucene and documented here: https://issues.apache.org/jira/browse/LUCENE-6989 + // and https://github.com/apache/lucene-solr/blob/7e03427fa14a024ce257babcb8362d2451941e21/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java + MethodHandle cleanHandle = null; + try { + final PrivilegedExceptionAction action = new PrivilegedExceptionAction() { + + @Override + public MethodHandle run() throws Exception { + MethodHandle handle = null; + if (DIRECT_BUFFER_CLAZZ != null) { + final Lookup lookup = lookup(); + + try { + // sun.misc.Unsafe unmapping (Java 9+) + final Class unsafeClass = Class.forName("sun.misc.Unsafe"); + // first check if Unsafe has the right method, otherwise we can give up + // without doing any security critical stuff: + final MethodHandle unmapper = lookup.findVirtual(unsafeClass, "invokeCleaner", methodType(void.class, ByteBuffer.class)); + // fetch the unsafe instance and bind it to the virtual MH: + final Field f = unsafeClass.getDeclaredField("theUnsafe"); + f.setAccessible(true); + final Object theUnsafe = f.get(null); + handle = unmapper.bindTo(theUnsafe); + } catch (Exception e) { + Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "unable to use java 9 Unsafe.invokeCleaner", e); + + // sun.misc.Cleaner unmapping (Java 8 and older) + final Method m = DIRECT_BUFFER_CLAZZ.getMethod("cleaner"); + m.setAccessible(true); + final MethodHandle directBufferCleanerMethod = lookup.unreflect(m); + final Class cleanerClass = directBufferCleanerMethod.type().returnType(); + + /* + * "Compile" a MethodHandle that basically is equivalent to the following code: + * void unmapper(ByteBuffer byteBuffer) + * { + * sun.misc.Cleaner cleaner = ((java.nio.DirectByteBuffer) byteBuffer).cleaner(); + * if (nonNull(cleaner)) + * { + * cleaner.clean(); + * } + * else + * { + * // the noop is needed because MethodHandles#guardWithTest always needs ELSE + * noop(cleaner); + * } + * } + */ + final MethodHandle cleanMethod = lookup.findVirtual(cleanerClass, "clean", methodType(void.class)); + final MethodHandle nonNullTest = lookup.findStatic(DirectByteBuffers.class, "nonNull", methodType(boolean.class, Object.class)).asType(methodType(boolean.class, cleanerClass)); + final MethodHandle noop = dropArguments(constant(Void.class, null).asType(methodType(void.class)), 0, cleanerClass); + handle = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop)).asType(methodType(void.class, ByteBuffer.class)); + } + } + + return handle; + } + }; + + cleanHandle = AccessController.doPrivileged(action); + + } catch (Throwable t) { + Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "Exception occurred attempting to lookup Sun specific DirectByteBuffer cleaner classes.", t); + } + CLEAN_HANDLE = cleanHandle; + } + + + private static Class lookupClassQuietly(String name) + { + try { + return DirectByteBuffers.class.getClassLoader().loadClass(name); + } + catch (Throwable t) { + Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "Did not find requested class: " + name, t); + } + + return null; + } + + + static boolean nonNull(Object o) { + return o != null; + } + + /** + * Provides jvm implementation specific operation to aggressively release resources associated with buffer. + * + * @param buffer The {@code ByteBuffer} to release. Must not be {@code null}. Must be {@link ByteBuffer#isDirect() direct}. + */ + public static void releaseDirectByteBuffer(final ByteBuffer buffer) + { + assert buffer != null && buffer.isDirect(); + + if (CLEAN_HANDLE != null && DIRECT_BUFFER_CLAZZ.isInstance(buffer)) { + try { + final PrivilegedExceptionAction pea = new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + try { + CLEAN_HANDLE.invokeExact(buffer); + } catch (Exception e) { + throw e; + } catch (Throwable t) { + //this will be an error + throw new RuntimeException(t); + } + return null; + } + }; + AccessController.doPrivileged(pea); + } catch (Throwable t) { + Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "Exception occurred attempting to clean up Sun specific DirectByteBuffer.", t); + } + } + } +} diff --git a/src/main/java/org/xerial/snappy/pool/QuiescentBufferPool.java b/src/main/java/org/xerial/snappy/pool/QuiescentBufferPool.java new file mode 100644 index 0000000..8a5c489 --- /dev/null +++ b/src/main/java/org/xerial/snappy/pool/QuiescentBufferPool.java @@ -0,0 +1,55 @@ +package org.xerial.snappy.pool; + +import java.nio.ByteBuffer; + +/** + * A {@link BufferPool} implementation which does no pooling. New instances will be created for each call to allocate. + * @author Brett Okken + */ +public final class QuiescentBufferPool implements BufferPool { + + private static final QuiescentBufferPool INSTANCE = new QuiescentBufferPool(); + + private QuiescentBufferPool() { + } + + /** + * @return Instance of {@link BufferPool} which does no caching/reuse of instances. + */ + public static BufferPool getInstance() { + return INSTANCE; + } + + /** + * Creates a new {@code byte[]} of size. + */ + @Override + public byte[] allocateArray(int size) { + return new byte[size]; + } + + /** + * Does nothing. + */ + @Override + public void releaseArray(byte[] buffer) { + } + + /** + * {@link ByteBuffer#allocateDirect(int) Allocates} a direct {@link ByteBuffer} of size. + */ + @Override + public ByteBuffer allocateDirect(int size) { + return ByteBuffer.allocateDirect(size); + } + + /** + * Aggressively releases native resources associated with buffer. + */ + @Override + public void releaseDirect(ByteBuffer buffer) { + assert buffer != null && buffer.isDirect(); + DirectByteBuffers.releaseDirectByteBuffer(buffer); + } + +} diff --git a/src/test/java/org/xerial/snappy/pool/CachingBufferPoolTest.java b/src/test/java/org/xerial/snappy/pool/CachingBufferPoolTest.java new file mode 100644 index 0000000..af51f4a --- /dev/null +++ b/src/test/java/org/xerial/snappy/pool/CachingBufferPoolTest.java @@ -0,0 +1,184 @@ +package org.xerial.snappy.pool; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + + +public class CachingBufferPoolTest { + + private static final int LIST_COUNT = 2048; + + @Test + public void testAdjustSize() { + assertEquals(4 * 1024, CachingBufferPool.adjustSize(2)); + assertEquals(4 * 1024, CachingBufferPool.adjustSize(1023)); + assertEquals(4 * 1024, CachingBufferPool.adjustSize(1024)); + assertEquals(4 * 1024, CachingBufferPool.adjustSize(1025)); + assertEquals(4 * 1024, CachingBufferPool.adjustSize(4 * 1024)); + assertEquals((4 + 2) * 1024, CachingBufferPool.adjustSize((4 * 1024) + 1)); + assertEquals(6 * 1024, CachingBufferPool.adjustSize(5 * 1024)); + assertEquals(6 * 1024, CachingBufferPool.adjustSize((5 * 1024) + 1)); + + assertEquals(32 * 1024, CachingBufferPool.adjustSize(32 * 1024)); + assertEquals((32 + 16) * 1024, CachingBufferPool.adjustSize((32 * 1024) + 1)); + + assertEquals(2 * 1024 * 1024, CachingBufferPool.adjustSize(2 * 1024 * 1024)); + assertEquals(((2 * 1024) + 512) * 1024, CachingBufferPool.adjustSize((2 * 1024 * 1024) + 1)); + + assertEquals(16 * 1024 * 1024, CachingBufferPool.adjustSize(16 * 1024 * 1024)); + assertEquals((16 + 4) * 1024 * 1024, CachingBufferPool.adjustSize((16 * 1024 * 1024) + 1)); + + assertEquals(128 * 1024 * 1024, CachingBufferPool.adjustSize(128 * 1024 * 1024)); + assertEquals((128 + 16) * 1024 * 1024, CachingBufferPool.adjustSize((128 * 1024 * 1024) + 1)); + + assertEquals(512 * 1024 * 1024, CachingBufferPool.adjustSize(512 * 1024 * 1024)); + assertEquals((512 + 128) * 1024 * 1024, CachingBufferPool.adjustSize((512 * 1024 * 1024) + 1)); + assertEquals(0x6000_0000, CachingBufferPool.adjustSize(0x6000_0000)); + assertEquals(0x6000_0000, CachingBufferPool.adjustSize(0x6000_0000 - 1)); + assertEquals(Integer.MAX_VALUE, CachingBufferPool.adjustSize(0x6000_0001)); + assertEquals(Integer.MAX_VALUE, CachingBufferPool.adjustSize(Integer.MAX_VALUE)); + assertEquals(Integer.MAX_VALUE, CachingBufferPool.adjustSize(Integer.MAX_VALUE - 1)); + } + + @Test + public void testDirectByteBuffers() throws Exception { + + BufferPool pool = CachingBufferPool.getInstance(); + + ByteBuffer bb1 = pool.allocateDirect(12 * 1024); + assertNotNull(bb1); + assertEquals(12 * 1024, bb1.limit()); + assertEquals(12 * 1024, bb1.capacity()); + assertEquals(0, bb1.position()); + + ByteBuffer bb2 = pool.allocateDirect(12 * 1024); + assertNotNull(bb2); + assertEquals(12 * 1024, bb2.limit()); + assertEquals(12 * 1024, bb2.capacity()); + assertEquals(0, bb2.position()); + + assertNotSame(bb1, bb2); + + bb2.position(18); + pool.releaseDirect(bb2); + + ByteBuffer bb3 = pool.allocateDirect(12 * 1024); + assertNotNull(bb3); + assertEquals(12 * 1024, bb3.limit()); + assertEquals(12 * 1024, bb3.capacity()); + assertEquals(0, bb3.position()); + + assertNotSame(bb1, bb2); + assertSame(bb2, bb3); + + pool.releaseDirect(bb1); + + ByteBuffer bb4 = pool.allocateDirect((12 * 1024) - 1); + assertNotNull(bb4); + assertEquals(12 * 1024, bb4.limit()); + assertEquals(12 * 1024, bb4.capacity()); + assertEquals(0, bb4.position()); + + assertSame(bb1, bb4); + } + + @Test + public void testArrays() throws Exception { + + BufferPool pool = CachingBufferPool.getInstance(); + + byte[] bb1 = pool.allocateArray(12 * 1024); + assertNotNull(bb1); + assertEquals(12 * 1024, bb1.length); + + byte[] bb2 = pool.allocateArray(12 * 1024); + assertNotNull(bb2); + assertEquals(12 * 1024, bb2.length); + + assertNotSame(bb1, bb2); + + pool.releaseArray(bb2); + + byte[] bb3 = pool.allocateArray(12 * 1024); + assertNotNull(bb3); + assertEquals(12 * 1024, bb3.length); + + assertNotSame(bb1, bb2); + assertSame(bb2, bb3); + + pool.releaseArray(bb1); + + byte[] bb4 = pool.allocateArray((12 * 1024) - 1); + assertNotNull(bb4); + assertEquals(12 * 1024, bb4.length); + + assertSame(bb1, bb4); + } + + @Test + public void testSoftReferences() { + + BufferPool pool = CachingBufferPool.getInstance(); + byte[] bb1 = pool.allocateArray(8 * 1024); + Reference ref = new WeakReference(bb1); + bb1[0] = 123; + bb1[8000] = -74; + int bb1HC = System.identityHashCode(bb1); + + pool.releaseArray(bb1); + + byte[] bb1_copy = pool.allocateArray(8 * 1024); + assertSame(bb1, bb1_copy); + assertEquals(123, bb1_copy[0]); + assertEquals(-74, bb1_copy[8000]); + assertEquals(bb1HC, System.identityHashCode(bb1_copy)); + + //release back into pool (again) + pool.releaseArray(bb1); + + //release strong references + bb1_copy = null; + bb1 = null; + assertNotNull(ref.get()); + + //force an OOME to for SoftReferences to be collected + List vals = forceOOMEGC(LIST_COUNT); + assertTrue("count: " + vals.size(), vals.size() < LIST_COUNT); + + //assert that our test reference has been cleared + assertNull(ref.get()); + + //get another value from the pool + byte[] bb2 = pool.allocateArray(8 * 1024); + //assert that it is indeed a new value, and not same from previous + assertNotEquals(123, bb2[0]); + assertNotEquals(-74, bb2[8000]); + assertNotEquals(bb1HC, System.identityHashCode(bb2)); + } + + private static List forceOOMEGC(int count) { + final List vals = new ArrayList<>(count); + + try { + for (int i=0; i