Pool byte[] and direct ByteBuffer (#234)

* https://github.com/xerial/snappy-java/issues/233

Pool byte[] and direct ByteBuffer instances for reuse by Framed
implementation.

* Remove use of google VisibleForTesting annotation

https://github.com/xerial/snappy-java/issues/233

* javadoc
This commit is contained in:
Brett Okken 2020-05-04 13:13:46 -05:00 committed by GitHub
parent 990f73398d
commit e94001da2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 917 additions and 172 deletions

View File

@ -3,24 +3,9 @@
*/
package org.xerial.snappy;
import static java.lang.invoke.MethodHandles.constant;
import static java.lang.invoke.MethodHandles.dropArguments;
import static java.lang.invoke.MethodHandles.filterReturnValue;
import static java.lang.invoke.MethodHandles.guardWithTest;
import static java.lang.invoke.MethodHandles.lookup;
import static java.lang.invoke.MethodType.methodType;
import java.io.IOException;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Constants and utilities for implementing x-snappy-framed.
@ -38,82 +23,6 @@ final class SnappyFramed
private static final int MASK_DELTA = 0xa282ead8;
/**
* Sun specific mechanisms to clean up resources associated with direct byte buffers.
*/
@SuppressWarnings("unchecked")
static final Class<? extends ByteBuffer> DIRECT_BUFFER_CLAZZ = (Class<? extends ByteBuffer>) lookupClassQuietly("java.nio.DirectByteBuffer");
static final MethodHandle CLEAN_HANDLE;
static {
// this approach is based off that used by apache lucene and documented here: https://issues.apache.org/jira/browse/LUCENE-6989
// and https://github.com/apache/lucene-solr/blob/7e03427fa14a024ce257babcb8362d2451941e21/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
MethodHandle cleanHandle = null;
try {
final PrivilegedExceptionAction<MethodHandle> action = new PrivilegedExceptionAction<MethodHandle>() {
@Override
public MethodHandle run() throws Exception {
MethodHandle handle = null;
if (DIRECT_BUFFER_CLAZZ != null) {
final Lookup lookup = lookup();
try {
// sun.misc.Unsafe unmapping (Java 9+)
final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
// first check if Unsafe has the right method, otherwise we can give up
// without doing any security critical stuff:
final MethodHandle unmapper = lookup.findVirtual(unsafeClass, "invokeCleaner", methodType(void.class, ByteBuffer.class));
// fetch the unsafe instance and bind it to the virtual MH:
final Field f = unsafeClass.getDeclaredField("theUnsafe");
f.setAccessible(true);
final Object theUnsafe = f.get(null);
handle = unmapper.bindTo(theUnsafe);
} catch (Exception e) {
Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "unable to use java 9 Unsafe.invokeCleaner", e);
// sun.misc.Cleaner unmapping (Java 8 and older)
final Method m = DIRECT_BUFFER_CLAZZ.getMethod("cleaner");
m.setAccessible(true);
final MethodHandle directBufferCleanerMethod = lookup.unreflect(m);
final Class<?> cleanerClass = directBufferCleanerMethod.type().returnType();
/*
* "Compile" a MethodHandle that basically is equivalent to the following code:
* void unmapper(ByteBuffer byteBuffer)
* {
* sun.misc.Cleaner cleaner = ((java.nio.DirectByteBuffer) byteBuffer).cleaner();
* if (nonNull(cleaner))
* {
* cleaner.clean();
* }
* else
* {
* // the noop is needed because MethodHandles#guardWithTest always needs ELSE
* noop(cleaner);
* }
* }
*/
final MethodHandle cleanMethod = lookup.findVirtual(cleanerClass, "clean", methodType(void.class));
final MethodHandle nonNullTest = lookup.findStatic(SnappyFramed.class, "nonNull", methodType(boolean.class, Object.class)).asType(methodType(boolean.class, cleanerClass));
final MethodHandle noop = dropArguments(constant(Void.class, null).asType(methodType(void.class)), 0, cleanerClass);
handle = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop)).asType(methodType(void.class, ByteBuffer.class));
}
}
return handle;
}
};
cleanHandle = AccessController.doPrivileged(action);
} catch (Throwable t) {
Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Exception occurred attempting to lookup Sun specific DirectByteBuffer cleaner classes.", t);
}
CLEAN_HANDLE = cleanHandle;
}
/**
* The header consists of the stream identifier flag, 3 bytes indicating a
* length of 6, and "sNaPpY" in ASCII.
@ -213,52 +122,4 @@ final class SnappyFramed
buffer.clear();
return skip - toSkip;
}
private static Class<?> lookupClassQuietly(String name)
{
try {
return SnappyFramed.class.getClassLoader().loadClass(name);
}
catch (Throwable t) {
Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Did not find requested class: " + name, t);
}
return null;
}
/**
* Provides jvm implementation specific operation to aggressively release resources associated with <i>buffer</i>.
*
* @param buffer The {@code ByteBuffer} to release. Must not be {@code null}. Must be {@link ByteBuffer#isDirect() direct}.
*/
static void releaseDirectByteBuffer(final ByteBuffer buffer)
{
assert buffer != null && buffer.isDirect();
if (CLEAN_HANDLE != null && DIRECT_BUFFER_CLAZZ.isInstance(buffer)) {
try {
final PrivilegedExceptionAction<Void> pea = new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try {
CLEAN_HANDLE.invokeExact(buffer);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
//this will be an error
throw new RuntimeException(t);
}
return null;
}
};
AccessController.doPrivileged(pea);
} catch (Throwable t) {
Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Exception occurred attempting to clean up Sun specific DirectByteBuffer.", t);
}
}
}
static boolean nonNull(Object o) {
return o != null;
}
}

View File

@ -9,7 +9,6 @@ import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
import static org.xerial.snappy.SnappyFramed.STREAM_IDENTIFIER_FLAG;
import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.readBytes;
import static org.xerial.snappy.SnappyFramed.releaseDirectByteBuffer;
import static org.xerial.snappy.SnappyFramedOutputStream.MAX_BLOCK_SIZE;
import java.io.EOFException;
@ -23,6 +22,9 @@ import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import org.xerial.snappy.pool.BufferPool;
import org.xerial.snappy.pool.DefaultPoolFactory;
/**
* Implements the <a
* href="http://snappy.googlecode.com/svn/trunk/framing_format.txt"
@ -41,6 +43,7 @@ public final class SnappyFramedInputStream
private final ReadableByteChannel rbc;
private final ByteBuffer frameHeader;
private final boolean verifyChecksums;
private final BufferPool bufferPool;
/**
* A single frame read from the underlying {@link InputStream}.
@ -80,13 +83,48 @@ public final class SnappyFramedInputStream
/**
* Creates a Snappy input stream to read data from the specified underlying
* input stream.
*
* @param in the underlying input stream. Must not be {@code null}.
* <p>
* Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
* </p>
*
* @param in the underlying input stream. Must not be {@code null}.
* @throws IOException
*/
public SnappyFramedInputStream(InputStream in)
throws IOException
{
this(in, true);
this(in, true, DefaultPoolFactory.getDefaultPool());
}
/**
* Creates a Snappy input stream to read data from the specified underlying
* input stream.
*
* @param in the underlying input stream. Must not be {@code null}.
* @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
* @throws IOException
*/
public SnappyFramedInputStream(InputStream in, BufferPool bufferPool)
throws IOException
{
this(in, true, bufferPool);
}
/**
* Creates a Snappy input stream to read data from the specified underlying
* input stream.
* <p>
* Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
* </p>
*
* @param in the underlying input stream. Must not be {@code null}.
* @param verifyChecksums if true, checksums in input stream will be verified
* @throws IOException
*/
public SnappyFramedInputStream(InputStream in, boolean verifyChecksums)
throws IOException
{
this(in, verifyChecksums, DefaultPoolFactory.getDefaultPool());
}
/**
@ -95,11 +133,14 @@ public final class SnappyFramedInputStream
*
* @param in the underlying input stream. Must not be {@code null}.
* @param verifyChecksums if true, checksums in input stream will be verified
* @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
* @throws IOException
*/
public SnappyFramedInputStream(InputStream in, boolean verifyChecksums)
public SnappyFramedInputStream(InputStream in, boolean verifyChecksums,
BufferPool bufferPool)
throws IOException
{
this(Channels.newChannel(in), verifyChecksums);
this(Channels.newChannel(in), verifyChecksums, bufferPool);
}
/**
@ -107,6 +148,24 @@ public final class SnappyFramedInputStream
* channel.
*
* @param in the underlying readable channel. Must not be {@code null}.
* @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
* @throws IOException
*/
public SnappyFramedInputStream(ReadableByteChannel in, BufferPool bufferPool)
throws IOException
{
this(in, true, bufferPool);
}
/**
* Creates a Snappy input stream to read data from the specified underlying
* channel.
* <p>
* Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
* </p>
*
* @param in the underlying readable channel. Must not be {@code null}.
* @throws IOException
*/
public SnappyFramedInputStream(ReadableByteChannel in)
throws IOException
@ -117,18 +176,43 @@ public final class SnappyFramedInputStream
/**
* Creates a Snappy input stream to read data from the specified underlying
* channel.
* <p>
* Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
* </p>
*
* @param in the underlying readable channel. Must not be {@code null}.
* @param verifyChecksums if true, checksums in input stream will be verified
* @throws IOException
*/
public SnappyFramedInputStream(ReadableByteChannel in,
boolean verifyChecksums)
throws IOException
{
this(in, verifyChecksums, DefaultPoolFactory.getDefaultPool());
}
/**
* Creates a Snappy input stream to read data from the specified underlying
* channel.
*
* @param in the underlying readable channel. Must not be {@code null}.
* @param verifyChecksums if true, checksums in input stream will be verified
* @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
* @throws IOException
*/
public SnappyFramedInputStream(ReadableByteChannel in,
boolean verifyChecksums, BufferPool bufferPool)
throws IOException
{
if (in == null) {
throw new NullPointerException("in is null");
}
if (bufferPool == null) {
throw new NullPointerException("bufferPool is null");
}
this.bufferPool = bufferPool;
this.rbc = in;
this.verifyChecksums = verifyChecksums;
@ -155,19 +239,22 @@ public final class SnappyFramedInputStream
*/
private void allocateBuffersBasedOnSize(int size)
{
if (input != null) {
releaseDirectByteBuffer(input);
bufferPool.releaseDirect(input);
}
if (uncompressedDirect != null) {
releaseDirectByteBuffer(uncompressedDirect);
bufferPool.releaseDirect(uncompressedDirect);
}
input = ByteBuffer.allocateDirect(size);
if (buffer != null) {
bufferPool.releaseArray(buffer);
}
input = bufferPool.allocateDirect(size);
final int maxCompressedLength = Snappy.maxCompressedLength(size);
uncompressedDirect = ByteBuffer.allocateDirect(maxCompressedLength);
buffer = new byte[maxCompressedLength];
uncompressedDirect = bufferPool.allocateDirect(maxCompressedLength);
buffer = bufferPool.allocateArray(maxCompressedLength);
}
@Override
@ -359,14 +446,21 @@ public final class SnappyFramedInputStream
finally {
if (!closed) {
closed = true;
}
if (input != null) {
releaseDirectByteBuffer(input);
}
if (input != null) {
bufferPool.releaseDirect(input);
input = null;
}
if (uncompressedDirect != null) {
releaseDirectByteBuffer(uncompressedDirect);
if (uncompressedDirect != null) {
bufferPool.releaseDirect(uncompressedDirect);
uncompressedDirect = null;
}
if (buffer != null) {
bufferPool.releaseArray(buffer);
buffer = null;
}
}
}
}
@ -456,9 +550,10 @@ public final class SnappyFramedInputStream
final int uncompressedLength = Snappy.uncompressedLength(input);
if (uncompressedLength > uncompressedDirect.capacity()) {
uncompressedDirect = ByteBuffer
.allocateDirect(uncompressedLength);
buffer = new byte[Math.max(input.capacity(), uncompressedLength)];
bufferPool.releaseDirect(uncompressedDirect);
bufferPool.releaseArray(buffer);
uncompressedDirect = bufferPool.allocateDirect(uncompressedLength);
buffer = bufferPool.allocateArray(uncompressedLength);
}
uncompressedDirect.clear();

View File

@ -7,7 +7,6 @@ import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.maskedCrc32c;
import static org.xerial.snappy.SnappyFramed.releaseDirectByteBuffer;
import java.io.IOException;
import java.io.InputStream;
@ -19,6 +18,9 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.xerial.snappy.pool.BufferPool;
import org.xerial.snappy.pool.DefaultPoolFactory;
/**
* Implements the <a
* href="http://snappy.googlecode.com/svn/trunk/framing_format.txt"
@ -59,6 +61,8 @@ public final class SnappyFramedOutputStream
private final ByteBuffer headerBuffer = ByteBuffer.allocate(8).order(
ByteOrder.LITTLE_ENDIAN);
private final BufferPool bufferPool;
private final int blockSize;
private final ByteBuffer buffer;
private final ByteBuffer directInputBuffer;
private final ByteBuffer outputBuffer;
@ -72,6 +76,9 @@ public final class SnappyFramedOutputStream
/**
* Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE}
* and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
* <p>
* Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
* </p>
*
* @param out The underlying {@link OutputStream} to write to. Must not be
* {@code null}.
@ -80,11 +87,29 @@ public final class SnappyFramedOutputStream
public SnappyFramedOutputStream(OutputStream out)
throws IOException
{
this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO);
this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool());
}
/**
* Creates a new {@link SnappyFramedOutputStream} using the {@link #DEFAULT_BLOCK_SIZE}
* and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
*
* @param out The underlying {@link OutputStream} to write to. Must not be
* {@code null}.
* @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
* @throws IOException
*/
public SnappyFramedOutputStream(OutputStream out, BufferPool bufferPool)
throws IOException
{
this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool);
}
/**
* Creates a new {@link SnappyFramedOutputStream} instance.
* <p>
* Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
* </p>
*
* @param out The underlying {@link OutputStream} to write to. Must not be
* {@code null}.
@ -99,12 +124,35 @@ public final class SnappyFramedOutputStream
double minCompressionRatio)
throws IOException
{
this(Channels.newChannel(out), blockSize, minCompressionRatio);
this(Channels.newChannel(out), blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool());
}
/**
* Creates a new {@link SnappyFramedOutputStream} instance.
*
* @param out The underlying {@link OutputStream} to write to. Must not be
* {@code null}.
* @param blockSize The block size (of raw data) to compress before writing frames
* to <i>out</i>. Must be in (0, 65536].
* @param minCompressionRatio Defines the minimum compression ratio (
* {@code compressedLength / rawLength}) that must be achieved to
* write the compressed data. This must be in (0, 1.0].
* @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
* @throws IOException
*/
public SnappyFramedOutputStream(OutputStream out, int blockSize,
double minCompressionRatio, BufferPool bufferPool)
throws IOException
{
this(Channels.newChannel(out), blockSize, minCompressionRatio, bufferPool);
}
/**
* Creates a new {@link SnappyFramedOutputStream} using the
* {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
* <p>
* Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
* </p>
*
* @param out The underlying {@link WritableByteChannel} to write to. Must
* not be {@code null}.
@ -114,7 +162,25 @@ public final class SnappyFramedOutputStream
public SnappyFramedOutputStream(WritableByteChannel out)
throws IOException
{
this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO);
this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, DefaultPoolFactory.getDefaultPool());
}
/**
* Creates a new {@link SnappyFramedOutputStream} using the
* {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
* <p>
* Uses {@link DefaultPoolFactory} to obtain {@link BufferPool} for buffers.
* </p>
*
* @param out The underlying {@link WritableByteChannel} to write to. Must
* not be {@code null}.
* @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
* @throws IOException
*/
public SnappyFramedOutputStream(WritableByteChannel out, BufferPool bufferPool)
throws IOException
{
this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO, bufferPool);
}
/**
@ -133,9 +199,34 @@ public final class SnappyFramedOutputStream
public SnappyFramedOutputStream(WritableByteChannel out, int blockSize,
double minCompressionRatio)
throws IOException
{
this(out, blockSize, minCompressionRatio, DefaultPoolFactory.getDefaultPool());
}
/**
* Creates a new {@link SnappyFramedOutputStream} instance.
*
* @param out The underlying {@link WritableByteChannel} to write to. Must
* not be {@code null}.
* @param blockSize The block size (of raw data) to compress before writing frames
* to <i>out</i>. Must be in (0, 65536].
* @param minCompressionRatio Defines the minimum compression ratio (
* {@code compressedLength / rawLength}) that must be achieved to
* write the compressed data. This must be in (0, 1.0].
* @param bufferPool Used to obtain buffer instances. Must not be {@code null}.
* @throws IOException
* @since 1.1.1
*/
public SnappyFramedOutputStream(WritableByteChannel out, int blockSize,
double minCompressionRatio, BufferPool bufferPool)
throws IOException
{
if (out == null) {
throw new NullPointerException();
throw new NullPointerException("out is null");
}
if (bufferPool == null) {
throw new NullPointerException("buffer pool is null");
}
if (minCompressionRatio <= 0 || minCompressionRatio > 1.0) {
@ -147,12 +238,14 @@ public final class SnappyFramedOutputStream
throw new IllegalArgumentException("block size " + blockSize
+ " must be in (0, 65536]");
}
this.blockSize = blockSize;
this.out = out;
this.minCompressionRatio = minCompressionRatio;
buffer = ByteBuffer.allocate(blockSize);
directInputBuffer = ByteBuffer.allocateDirect(blockSize);
outputBuffer = ByteBuffer.allocateDirect(Snappy
this.bufferPool = bufferPool;
buffer = ByteBuffer.wrap(bufferPool.allocateArray(blockSize), 0, blockSize);
directInputBuffer = bufferPool.allocateDirect(blockSize);
outputBuffer = bufferPool.allocateDirect(Snappy
.maxCompressedLength(blockSize));
writeHeader(out);
@ -370,9 +463,9 @@ public final class SnappyFramedOutputStream
}
finally {
closed = true;
releaseDirectByteBuffer(directInputBuffer);
releaseDirectByteBuffer(outputBuffer);
bufferPool.releaseArray(buffer.array());
bufferPool.releaseDirect(directInputBuffer);
bufferPool.releaseDirect(outputBuffer);
}
}
@ -389,6 +482,7 @@ public final class SnappyFramedOutputStream
buffer.flip();
writeCompressed(buffer);
buffer.clear();
buffer.limit(blockSize);
}
}

View File

@ -0,0 +1,53 @@
package org.xerial.snappy.pool;
import java.nio.ByteBuffer;
/**
* Makes various types of buffers available for use and potential re-use.
*
* <p>
* Implementations must be safe for concurrent use by multiple threads.
* </p>
*
* @author Brett Okken
*/
public interface BufferPool {
/**
* Returns a {@code byte[]} of <i>size</i> or greater length.
* @param size The minimum size array required. Must be {@code >= 0}.
* @return A {@code byte[]} with length of at least <i>size</i>.
* @see #releaseArray(byte[])
*/
public byte[] allocateArray(int size);
/**
* Returns instance to pool for potential future reuse.
* <p>
* Must not be returned more than 1 time. Must not be used by caller after return.
* </p>
* @param buffer Instance to return to pool. Must not be {@code null}.
* Must not be returned more than 1 time. Must not be used by caller after return.
*/
public void releaseArray(byte[] buffer);
/**
* Returns a {@link ByteBuffer#allocateDirect(int) direct ByteBuffer} of <i>size</i> or
* greater {@link ByteBuffer#capacity() capacity}.
* @param size The minimum size buffer required. Must be {@code >= 0}.
* @return A {@code ByteBuffer} of <i>size</i> or greater {@link ByteBuffer#capacity() capacity}.
* @see #releaseDirect(ByteBuffer)
* @see ByteBuffer#allocateDirect(int)
*/
public ByteBuffer allocateDirect(int size);
/**
* Returns instance to pool for potential future reuse.
* <p>
* Must not be returned more than 1 time. Must not be used by caller after return.
* </p>
* @param buffer Instance to return to pool. Must not be {@code null}.
* Must not be returned more than 1 time. Must not be used by caller after return.
*/
public void releaseDirect(ByteBuffer buffer);
}

View File

@ -0,0 +1,216 @@
package org.xerial.snappy.pool;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
/**
* A {@link BufferPool} implementation which caches values at fixed sizes.
* <p>
* Pooled instances are held as {@link SoftReference} to allow GC if necessary.
* </p>
* <p>
* The current fixed sizes are calculated as follows:
* <ul>
* <li>Values < 4KB return 4KB</li>
* <li>4KB - 32KB to 2KB</li>
* <li>32KB - 512KB to 16KB</li>
* <li>512KB - 2MB to 128KB</li>
* <li>2MB - 16MB to 512KB</li>
* <li>16MB - 128MB to 4MB</li>
* <li>128MB - 512MB to 16MB</li>
* <li>512MB - 1.5 GB to 128MB</li>
* <li>Values > 1.5GB return {@link Integer#MAX_VALUE}</li>
* </ul>
* </p>
* @author Brett Okken
*/
public final class CachingBufferPool implements BufferPool {
private static interface IntFunction<E> {
public E create(int size);
}
private static final IntFunction<byte[]> ARRAY_FUNCTION = new IntFunction<byte[]>() {
@Override
public byte[] create(int size) {
return new byte[size];
}
};
private static final IntFunction<ByteBuffer> DBB_FUNCTION = new IntFunction<ByteBuffer>() {
@Override
public ByteBuffer create(int size) {
return ByteBuffer.allocateDirect(size);
}
};
private static final CachingBufferPool INSTANCE = new CachingBufferPool();
private final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<byte[]>>> bytes = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<ByteBuffer>>> buffers = new ConcurrentHashMap<>();
private CachingBufferPool() {
}
/**
* Returns instance of {@link CachingBufferPool} for using cached buffers.
* @return instance of {@link CachingBufferPool} for using cached buffers.
*/
public static BufferPool getInstance() {
return INSTANCE;
}
/**
* {@inheritDoc}
*/
@Override
public byte[] allocateArray(int size) {
if (size <= 0) {
throw new IllegalArgumentException("size is invalid: " + size);
}
return getOrCreate(size, bytes, ARRAY_FUNCTION);
}
/**
* {@inheritDoc}
*/
@Override
public void releaseArray(byte[] buffer) {
if (buffer == null) {
throw new IllegalArgumentException("buffer is null");
}
returnValue(buffer, buffer.length, bytes);
}
/**
* {@inheritDoc}
*/
@Override
public ByteBuffer allocateDirect(int size) {
if (size <= 0) {
throw new IllegalArgumentException("size is invalid: " + size);
}
return getOrCreate(size, buffers, DBB_FUNCTION);
}
/**
* {@inheritDoc}
*/
@Override
public void releaseDirect(ByteBuffer buffer) {
if (buffer == null) {
throw new IllegalArgumentException("buffer is null");
}
buffer.clear();
returnValue(buffer, buffer.capacity(), buffers);
}
private static <E> E getOrCreate(final int size, final ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map, final IntFunction<E> creator) {
assert size > 0;
final int adjustedSize = adjustSize(size);
final ConcurrentLinkedDeque<SoftReference<E>> queue = optimisticGetEntry(adjustedSize, map);
SoftReference<E> entry;
while ((entry = queue.pollFirst()) != null) {
final E val = entry.get();
if (val != null) {
return val;
}
}
return creator.create(adjustedSize);
}
/*
* This is package scope to allow direct unit testing.
*/
static int adjustSize(int size) {
assert size > 0;
switch (Integer.numberOfLeadingZeros(size)) {
case 1: // 1GB - 2GB
case 2: // 512MB
//if 512MB - 1.5 GB round to nearest 128 MB (2^27), else Integer.MAX_VALUE
return size <= 0x6000_0000 ? roundToPowers(size, 27) : Integer.MAX_VALUE;
case 3: //256MB
case 4: //128MB
//if 128MB - 512MB, round to nearest 16 MB
return roundToPowers(size, 24);
case 5: // 64MB
case 6: // 32MB
case 7: // 16MB
//if 16MB - 128MB, round to nearest 4MB
return roundToPowers(size, 22);
case 8: // 8MB
case 9: // 4MB
case 10: // 2MB
//if 2MB - 16MB, round to nearest 512KB
return roundToPowers(size, 19);
case 11: // 1MB
case 12: //512KB
//if 512KB - 2MB, round to nearest 128KB
return roundToPowers(size, 17);
case 13: //256KB
case 14: //128KB
case 15: // 64KB
case 16: // 32KB
//if 32KB to 512KB, round to nearest 16KB
return roundToPowers(size, 14);
case 17: // 16KB
case 18: // 8KB
case 19: // 4KB
// if 4KB - 32KB, round to nearest 2KB
return roundToPowers(size, 11);
default:
return 4 * 1024;
}
}
private static int roundToPowers(int number, int bits) {
final int mask = (0x7FFF_FFFF >> bits) << bits;
final int floor = number & mask;
return floor == number ? number : floor + (1 << bits);
}
private static <E> ConcurrentLinkedDeque<SoftReference<E>> optimisticGetEntry(Integer key, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map) {
ConcurrentLinkedDeque<SoftReference<E>> val = map.get(key);
if (val == null) {
map.putIfAbsent(key, new ConcurrentLinkedDeque<SoftReference<E>>());
val = map.get(key);
}
return val;
}
private static <E> void returnValue(E value, Integer size, ConcurrentMap<Integer, ConcurrentLinkedDeque<SoftReference<E>>> map) {
final ConcurrentLinkedDeque<SoftReference<E>> queue = map.get(size);
//no queue will exist if buffer was not originally obtained from this class
if (queue != null) {
//push this value onto deque first so that concurrent request can use it
queue.addFirst(new SoftReference<E>(value));
//purge oldest entries have lost references
SoftReference<E> entry;
boolean lastEmpty = true;
while(lastEmpty && (entry = queue.peekLast()) != null) {
if (entry.get() == null) {
queue.removeLastOccurrence(entry);
} else {
lastEmpty = false;
}
}
}
}
/**
* {@inheritDoc}
*/
@Override
public String toString() {
return "CachingBufferPool [bytes=" + this.bytes + ", buffers=" + this.buffers + "]";
}
}

View File

@ -0,0 +1,37 @@
package org.xerial.snappy.pool;
/**
* Manages implementation of {@link BufferPool} to use by default. Setting the system property {@link #DISABLE_CACHING_PROPERTY} to {@code true} will
* cause the {@link QuiescentBufferPool} to be used by default. Otherwise, {@link CachingBufferPool} will be used by default.
* {@link #setDefaultPool(BufferPool)} can be used to explicitly control the implementation to use.
*/
public final class DefaultPoolFactory {
/**
* Name of system property to disable use of {@link CachingBufferPool} by default.
*/
public static final String DISABLE_CACHING_PROPERTY = "org.xerial.snappy.pool.disable";
private static volatile BufferPool defaultPool = "true".equalsIgnoreCase(System.getProperty(DISABLE_CACHING_PROPERTY))
? QuiescentBufferPool.getInstance()
: CachingBufferPool.getInstance();
/**
* @return The default instance to use.
*/
public static BufferPool getDefaultPool() {
return defaultPool;
}
/**
* Sets the default instance to use.
* @param pool The default instance to use. Must not be {@code null}.
* @see #getDefaultPool()
*/
public static void setDefaultPool(BufferPool pool) {
if (pool == null) {
throw new IllegalArgumentException("pool is null");
}
defaultPool = pool;
}
}

View File

@ -0,0 +1,150 @@
package org.xerial.snappy.pool;
import static java.lang.invoke.MethodHandles.constant;
import static java.lang.invoke.MethodHandles.dropArguments;
import static java.lang.invoke.MethodHandles.filterReturnValue;
import static java.lang.invoke.MethodHandles.guardWithTest;
import static java.lang.invoke.MethodHandles.lookup;
import static java.lang.invoke.MethodType.methodType;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Utility to facilitate disposing of direct byte buffer instances.
*/
final class DirectByteBuffers {
/**
* Sun specific mechanisms to clean up resources associated with direct byte buffers.
*/
@SuppressWarnings("unchecked")
static final Class<? extends ByteBuffer> DIRECT_BUFFER_CLAZZ = (Class<? extends ByteBuffer>) lookupClassQuietly("java.nio.DirectByteBuffer");
static final MethodHandle CLEAN_HANDLE;
static {
// this approach is based off that used by apache lucene and documented here: https://issues.apache.org/jira/browse/LUCENE-6989
// and https://github.com/apache/lucene-solr/blob/7e03427fa14a024ce257babcb8362d2451941e21/lucene/core/src/java/org/apache/lucene/store/MMapDirectory.java
MethodHandle cleanHandle = null;
try {
final PrivilegedExceptionAction<MethodHandle> action = new PrivilegedExceptionAction<MethodHandle>() {
@Override
public MethodHandle run() throws Exception {
MethodHandle handle = null;
if (DIRECT_BUFFER_CLAZZ != null) {
final Lookup lookup = lookup();
try {
// sun.misc.Unsafe unmapping (Java 9+)
final Class<?> unsafeClass = Class.forName("sun.misc.Unsafe");
// first check if Unsafe has the right method, otherwise we can give up
// without doing any security critical stuff:
final MethodHandle unmapper = lookup.findVirtual(unsafeClass, "invokeCleaner", methodType(void.class, ByteBuffer.class));
// fetch the unsafe instance and bind it to the virtual MH:
final Field f = unsafeClass.getDeclaredField("theUnsafe");
f.setAccessible(true);
final Object theUnsafe = f.get(null);
handle = unmapper.bindTo(theUnsafe);
} catch (Exception e) {
Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "unable to use java 9 Unsafe.invokeCleaner", e);
// sun.misc.Cleaner unmapping (Java 8 and older)
final Method m = DIRECT_BUFFER_CLAZZ.getMethod("cleaner");
m.setAccessible(true);
final MethodHandle directBufferCleanerMethod = lookup.unreflect(m);
final Class<?> cleanerClass = directBufferCleanerMethod.type().returnType();
/*
* "Compile" a MethodHandle that basically is equivalent to the following code:
* void unmapper(ByteBuffer byteBuffer)
* {
* sun.misc.Cleaner cleaner = ((java.nio.DirectByteBuffer) byteBuffer).cleaner();
* if (nonNull(cleaner))
* {
* cleaner.clean();
* }
* else
* {
* // the noop is needed because MethodHandles#guardWithTest always needs ELSE
* noop(cleaner);
* }
* }
*/
final MethodHandle cleanMethod = lookup.findVirtual(cleanerClass, "clean", methodType(void.class));
final MethodHandle nonNullTest = lookup.findStatic(DirectByteBuffers.class, "nonNull", methodType(boolean.class, Object.class)).asType(methodType(boolean.class, cleanerClass));
final MethodHandle noop = dropArguments(constant(Void.class, null).asType(methodType(void.class)), 0, cleanerClass);
handle = filterReturnValue(directBufferCleanerMethod, guardWithTest(nonNullTest, cleanMethod, noop)).asType(methodType(void.class, ByteBuffer.class));
}
}
return handle;
}
};
cleanHandle = AccessController.doPrivileged(action);
} catch (Throwable t) {
Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "Exception occurred attempting to lookup Sun specific DirectByteBuffer cleaner classes.", t);
}
CLEAN_HANDLE = cleanHandle;
}
private static Class<?> lookupClassQuietly(String name)
{
try {
return DirectByteBuffers.class.getClassLoader().loadClass(name);
}
catch (Throwable t) {
Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "Did not find requested class: " + name, t);
}
return null;
}
static boolean nonNull(Object o) {
return o != null;
}
/**
* Provides jvm implementation specific operation to aggressively release resources associated with <i>buffer</i>.
*
* @param buffer The {@code ByteBuffer} to release. Must not be {@code null}. Must be {@link ByteBuffer#isDirect() direct}.
*/
public static void releaseDirectByteBuffer(final ByteBuffer buffer)
{
assert buffer != null && buffer.isDirect();
if (CLEAN_HANDLE != null && DIRECT_BUFFER_CLAZZ.isInstance(buffer)) {
try {
final PrivilegedExceptionAction<Void> pea = new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try {
CLEAN_HANDLE.invokeExact(buffer);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
//this will be an error
throw new RuntimeException(t);
}
return null;
}
};
AccessController.doPrivileged(pea);
} catch (Throwable t) {
Logger.getLogger(DirectByteBuffers.class.getName()).log(Level.FINE, "Exception occurred attempting to clean up Sun specific DirectByteBuffer.", t);
}
}
}
}

View File

@ -0,0 +1,55 @@
package org.xerial.snappy.pool;
import java.nio.ByteBuffer;
/**
* A {@link BufferPool} implementation which does no pooling. New instances will be created for each call to allocate.
* @author Brett Okken
*/
public final class QuiescentBufferPool implements BufferPool {
private static final QuiescentBufferPool INSTANCE = new QuiescentBufferPool();
private QuiescentBufferPool() {
}
/**
* @return Instance of {@link BufferPool} which does no caching/reuse of instances.
*/
public static BufferPool getInstance() {
return INSTANCE;
}
/**
* Creates a new {@code byte[]} of <i>size</i>.
*/
@Override
public byte[] allocateArray(int size) {
return new byte[size];
}
/**
* Does nothing.
*/
@Override
public void releaseArray(byte[] buffer) {
}
/**
* {@link ByteBuffer#allocateDirect(int) Allocates} a direct {@link ByteBuffer} of <i>size</i>.
*/
@Override
public ByteBuffer allocateDirect(int size) {
return ByteBuffer.allocateDirect(size);
}
/**
* Aggressively releases native resources associated with <i>buffer</i>.
*/
@Override
public void releaseDirect(ByteBuffer buffer) {
assert buffer != null && buffer.isDirect();
DirectByteBuffers.releaseDirectByteBuffer(buffer);
}
}

View File

@ -0,0 +1,184 @@
package org.xerial.snappy.pool;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
public class CachingBufferPoolTest {
private static final int LIST_COUNT = 2048;
@Test
public void testAdjustSize() {
assertEquals(4 * 1024, CachingBufferPool.adjustSize(2));
assertEquals(4 * 1024, CachingBufferPool.adjustSize(1023));
assertEquals(4 * 1024, CachingBufferPool.adjustSize(1024));
assertEquals(4 * 1024, CachingBufferPool.adjustSize(1025));
assertEquals(4 * 1024, CachingBufferPool.adjustSize(4 * 1024));
assertEquals((4 + 2) * 1024, CachingBufferPool.adjustSize((4 * 1024) + 1));
assertEquals(6 * 1024, CachingBufferPool.adjustSize(5 * 1024));
assertEquals(6 * 1024, CachingBufferPool.adjustSize((5 * 1024) + 1));
assertEquals(32 * 1024, CachingBufferPool.adjustSize(32 * 1024));
assertEquals((32 + 16) * 1024, CachingBufferPool.adjustSize((32 * 1024) + 1));
assertEquals(2 * 1024 * 1024, CachingBufferPool.adjustSize(2 * 1024 * 1024));
assertEquals(((2 * 1024) + 512) * 1024, CachingBufferPool.adjustSize((2 * 1024 * 1024) + 1));
assertEquals(16 * 1024 * 1024, CachingBufferPool.adjustSize(16 * 1024 * 1024));
assertEquals((16 + 4) * 1024 * 1024, CachingBufferPool.adjustSize((16 * 1024 * 1024) + 1));
assertEquals(128 * 1024 * 1024, CachingBufferPool.adjustSize(128 * 1024 * 1024));
assertEquals((128 + 16) * 1024 * 1024, CachingBufferPool.adjustSize((128 * 1024 * 1024) + 1));
assertEquals(512 * 1024 * 1024, CachingBufferPool.adjustSize(512 * 1024 * 1024));
assertEquals((512 + 128) * 1024 * 1024, CachingBufferPool.adjustSize((512 * 1024 * 1024) + 1));
assertEquals(0x6000_0000, CachingBufferPool.adjustSize(0x6000_0000));
assertEquals(0x6000_0000, CachingBufferPool.adjustSize(0x6000_0000 - 1));
assertEquals(Integer.MAX_VALUE, CachingBufferPool.adjustSize(0x6000_0001));
assertEquals(Integer.MAX_VALUE, CachingBufferPool.adjustSize(Integer.MAX_VALUE));
assertEquals(Integer.MAX_VALUE, CachingBufferPool.adjustSize(Integer.MAX_VALUE - 1));
}
@Test
public void testDirectByteBuffers() throws Exception {
BufferPool pool = CachingBufferPool.getInstance();
ByteBuffer bb1 = pool.allocateDirect(12 * 1024);
assertNotNull(bb1);
assertEquals(12 * 1024, bb1.limit());
assertEquals(12 * 1024, bb1.capacity());
assertEquals(0, bb1.position());
ByteBuffer bb2 = pool.allocateDirect(12 * 1024);
assertNotNull(bb2);
assertEquals(12 * 1024, bb2.limit());
assertEquals(12 * 1024, bb2.capacity());
assertEquals(0, bb2.position());
assertNotSame(bb1, bb2);
bb2.position(18);
pool.releaseDirect(bb2);
ByteBuffer bb3 = pool.allocateDirect(12 * 1024);
assertNotNull(bb3);
assertEquals(12 * 1024, bb3.limit());
assertEquals(12 * 1024, bb3.capacity());
assertEquals(0, bb3.position());
assertNotSame(bb1, bb2);
assertSame(bb2, bb3);
pool.releaseDirect(bb1);
ByteBuffer bb4 = pool.allocateDirect((12 * 1024) - 1);
assertNotNull(bb4);
assertEquals(12 * 1024, bb4.limit());
assertEquals(12 * 1024, bb4.capacity());
assertEquals(0, bb4.position());
assertSame(bb1, bb4);
}
@Test
public void testArrays() throws Exception {
BufferPool pool = CachingBufferPool.getInstance();
byte[] bb1 = pool.allocateArray(12 * 1024);
assertNotNull(bb1);
assertEquals(12 * 1024, bb1.length);
byte[] bb2 = pool.allocateArray(12 * 1024);
assertNotNull(bb2);
assertEquals(12 * 1024, bb2.length);
assertNotSame(bb1, bb2);
pool.releaseArray(bb2);
byte[] bb3 = pool.allocateArray(12 * 1024);
assertNotNull(bb3);
assertEquals(12 * 1024, bb3.length);
assertNotSame(bb1, bb2);
assertSame(bb2, bb3);
pool.releaseArray(bb1);
byte[] bb4 = pool.allocateArray((12 * 1024) - 1);
assertNotNull(bb4);
assertEquals(12 * 1024, bb4.length);
assertSame(bb1, bb4);
}
@Test
public void testSoftReferences() {
BufferPool pool = CachingBufferPool.getInstance();
byte[] bb1 = pool.allocateArray(8 * 1024);
Reference<byte[]> ref = new WeakReference<byte[]>(bb1);
bb1[0] = 123;
bb1[8000] = -74;
int bb1HC = System.identityHashCode(bb1);
pool.releaseArray(bb1);
byte[] bb1_copy = pool.allocateArray(8 * 1024);
assertSame(bb1, bb1_copy);
assertEquals(123, bb1_copy[0]);
assertEquals(-74, bb1_copy[8000]);
assertEquals(bb1HC, System.identityHashCode(bb1_copy));
//release back into pool (again)
pool.releaseArray(bb1);
//release strong references
bb1_copy = null;
bb1 = null;
assertNotNull(ref.get());
//force an OOME to for SoftReferences to be collected
List<byte[]> vals = forceOOMEGC(LIST_COUNT);
assertTrue("count: " + vals.size(), vals.size() < LIST_COUNT);
//assert that our test reference has been cleared
assertNull(ref.get());
//get another value from the pool
byte[] bb2 = pool.allocateArray(8 * 1024);
//assert that it is indeed a new value, and not same from previous
assertNotEquals(123, bb2[0]);
assertNotEquals(-74, bb2[8000]);
assertNotEquals(bb1HC, System.identityHashCode(bb2));
}
private static List<byte[]> forceOOMEGC(int count) {
final List<byte[]> vals = new ArrayList<>(count);
try {
for (int i=0; i<count; ++i) {
vals.add(new byte[10 * 1024 * 1024]);
}
} catch(Error e) {
}
return vals;
}
}