Merge pull request #65 from bokken/develop

Aggressively reclaim direct byte buffers
This commit is contained in:
Taro L. Saito 2014-03-12 23:34:48 +09:00
commit fea947e46a
3 changed files with 79 additions and 0 deletions

View File

@ -4,8 +4,11 @@
package org.xerial.snappy;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Constants and utilities for implementing x-snappy-framed.
@ -21,6 +24,33 @@ final class SnappyFramed {
public static final int STREAM_IDENTIFIER_FLAG = 0xff;
private static final int MASK_DELTA = 0xa282ead8;
/**
* Sun specific mechanisms to clean up resources associated with direct byte buffers.
*/
@SuppressWarnings("unchecked")
private static final Class<? extends ByteBuffer> SUN_DIRECT_BUFFER = (Class<? extends ByteBuffer>) lookupClassQuietly("sun.nio.ch.DirectBuffer");
private static final Method SUN_BUFFER_CLEANER;
private static final Method SUN_CLEANER_CLEAN;
static
{
Method bufferCleaner = null;
Method cleanerClean = null;
try {
//operate under the assumption that if the sun direct buffer class exists,
//all of the sun classes exist
if (SUN_DIRECT_BUFFER != null) {
bufferCleaner = SUN_DIRECT_BUFFER.getMethod("cleaner", (Class[])null);
Class<?> cleanClazz = lookupClassQuietly("sun.misc.Cleaner");
cleanerClean = cleanClazz.getMethod("clean", (Class[])null);
}
} catch(Throwable t) {
Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Exception occurred attempting to lookup Sun specific DirectByteBuffer cleaner classes.", t);
}
SUN_BUFFER_CLEANER = bufferCleaner;
SUN_CLEANER_CLEAN = cleanerClean;
}
/**
* The header consists of the stream identifier flag, 3 bytes indicating a
@ -125,4 +155,32 @@ final class SnappyFramed {
buffer.clear();
return skip - toSkip;
}
private static Class<?> lookupClassQuietly(String name) {
try {
return SnappyFramed.class.getClassLoader().loadClass(name);
} catch (Throwable t) {
Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Did not find requested class: " + name, t);
}
return null;
}
/**
* Provides jvm implementation specific operation to aggressively release resources associated with <i>buffer</i>.
* @param buffer The {@code ByteBuffer} to release. Must not be {@code null}. Must be {@link ByteBuffer#isDirect() direct}.
*/
static void releaseDirectByteBuffer(ByteBuffer buffer)
{
assert buffer != null && buffer.isDirect();
if (SUN_DIRECT_BUFFER != null && SUN_DIRECT_BUFFER.isAssignableFrom(buffer.getClass())) {
try {
Object cleaner = SUN_BUFFER_CLEANER.invoke(buffer, (Object[]) null);
SUN_CLEANER_CLEAN.invoke(cleaner, (Object[]) null);
} catch (Throwable t) {
Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Exception occurred attempting to clean up Sun specific DirectByteBuffer.", t);
}
}
}
}

View File

@ -9,6 +9,7 @@ import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
import static org.xerial.snappy.SnappyFramed.STREAM_IDENTIFIER_FLAG;
import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.readBytes;
import static org.xerial.snappy.SnappyFramed.releaseDirectByteBuffer;
import static org.xerial.snappy.SnappyFramedOutputStream.MAX_BLOCK_SIZE;
import java.io.EOFException;
@ -151,6 +152,14 @@ public final class SnappyFramedInputStream extends InputStream implements
*/
private void allocateBuffersBasedOnSize(int size) {
if (input != null) {
releaseDirectByteBuffer(input);
}
if (uncompressedDirect != null) {
releaseDirectByteBuffer(uncompressedDirect);
}
input = ByteBuffer.allocateDirect(size);
final int maxCompressedLength = Snappy.maxCompressedLength(size);
uncompressedDirect = ByteBuffer.allocateDirect(maxCompressedLength);
@ -333,6 +342,14 @@ public final class SnappyFramedInputStream extends InputStream implements
if (!closed) {
closed = true;
}
if (input != null) {
releaseDirectByteBuffer(input);
}
if (uncompressedDirect != null) {
releaseDirectByteBuffer(uncompressedDirect);
}
}
}

View File

@ -7,6 +7,7 @@ import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.maskedCrc32c;
import static org.xerial.snappy.SnappyFramed.releaseDirectByteBuffer;
import java.io.IOException;
import java.io.InputStream;
@ -349,6 +350,9 @@ public final class SnappyFramedOutputStream extends OutputStream implements
out.close();
} finally {
closed = true;
releaseDirectByteBuffer(directInputBuffer);
releaseDirectByteBuffer(outputBuffer);
}
}