From 6f75b02cd927aeab07b02ff00b63c0a3ecc94141 Mon Sep 17 00:00:00 2001 From: bokken Date: Fri, 7 Feb 2014 09:49:46 -0600 Subject: [PATCH] Aggressively reclaim direct byte buffers https://github.com/xerial/snappy-java/issues/64 --- .../java/org/xerial/snappy/SnappyFramed.java | 58 +++++++++++++++++++ .../snappy/SnappyFramedInputStream.java | 21 ++++++- .../snappy/SnappyFramedOutputStream.java | 4 ++ 3 files changed, 80 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/xerial/snappy/SnappyFramed.java b/src/main/java/org/xerial/snappy/SnappyFramed.java index 41fbe8d..2925945 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramed.java +++ b/src/main/java/org/xerial/snappy/SnappyFramed.java @@ -4,8 +4,11 @@ package org.xerial.snappy; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Constants and utilities for implementing x-snappy-framed. @@ -21,6 +24,33 @@ final class SnappyFramed { public static final int STREAM_IDENTIFIER_FLAG = 0xff; private static final int MASK_DELTA = 0xa282ead8; + + /** + * Sun specific mechanisms to clean up resources associated with direct byte buffers. + */ + @SuppressWarnings("unchecked") + private static final Class SUN_DIRECT_BUFFER = (Class) lookupClassQuietly("sun.nio.ch.DirectBuffer"); + private static final Method SUN_BUFFER_CLEANER; + private static final Method SUN_CLEANER_CLEAN; + + static + { + Method bufferCleaner = null; + Method cleanerClean = null; + try { + //operate under the assumption that if the sun direct buffer class exists, + //all of the sun classes exist + if (SUN_DIRECT_BUFFER != null) { + bufferCleaner = SUN_DIRECT_BUFFER.getMethod("cleaner", (Class[])null); + Class cleanClazz = lookupClassQuietly("sun.misc.Cleaner"); + cleanerClean = cleanClazz.getMethod("clean", (Class[])null); + } + } catch(Throwable t) { + Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Exception occurred attempting to lookup Sun specific DirectByteBuffer cleaner classes.", t); + } + SUN_BUFFER_CLEANER = bufferCleaner; + SUN_CLEANER_CLEAN = cleanerClean; + } /** * The header consists of the stream identifier flag, 3 bytes indicating a @@ -125,4 +155,32 @@ final class SnappyFramed { buffer.clear(); return skip - toSkip; } + + private static Class lookupClassQuietly(String name) { + try { + return SnappyFramed.class.getClassLoader().loadClass(name); + } catch (Throwable t) { + Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Did not find requested class: " + name, t); + } + + return null; + } + + /** + * Provides jvm implementation specific operation to aggressively release resources associated with buffer. + * @param buffer The {@code ByteBuffer} to release. Must not be {@code null}. Must be {@link ByteBuffer#isDirect() direct}. + */ + static void releaseDirectByteBuffer(ByteBuffer buffer) + { + assert buffer != null && buffer.isDirect(); + + if (SUN_DIRECT_BUFFER != null && SUN_DIRECT_BUFFER.isAssignableFrom(buffer.getClass())) { + try { + Object cleaner = SUN_BUFFER_CLEANER.invoke(buffer, (Object[]) null); + SUN_CLEANER_CLEAN.invoke(cleaner, (Object[]) null); + } catch (Throwable t) { + Logger.getLogger(SnappyFramed.class.getName()).log(Level.FINE, "Exception occurred attempting to clean up Sun specific DirectByteBuffer.", t); + } + } + } } diff --git a/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java b/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java index d10ba46..b400d37 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java @@ -9,6 +9,7 @@ import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; import static org.xerial.snappy.SnappyFramed.STREAM_IDENTIFIER_FLAG; import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; import static org.xerial.snappy.SnappyFramed.readBytes; +import static org.xerial.snappy.SnappyFramed.releaseDirectByteBuffer; import static org.xerial.snappy.SnappyFramedOutputStream.MAX_BLOCK_SIZE; import java.io.EOFException; @@ -136,9 +137,7 @@ public final class SnappyFramedInputStream extends InputStream implements final byte[] actualHeader = new byte[expectedHeader.length]; final ByteBuffer actualBuffer = ByteBuffer.wrap(actualHeader); - // assume that if the input cannot read 4 bytes that something is - // wrong. - final int read = in.read(actualBuffer); + final int read = SnappyFramed.readBytes(in, actualBuffer); if (read < expectedHeader.length) { throw new EOFException( "encountered EOF while reading stream header"); @@ -153,6 +152,14 @@ public final class SnappyFramedInputStream extends InputStream implements */ private void allocateBuffersBasedOnSize(int size) { + if (input != null) { + releaseDirectByteBuffer(input); + } + + if (uncompressedDirect != null) { + releaseDirectByteBuffer(uncompressedDirect); + } + input = ByteBuffer.allocateDirect(size); final int maxCompressedLength = Snappy.maxCompressedLength(size); uncompressedDirect = ByteBuffer.allocateDirect(maxCompressedLength); @@ -335,6 +342,14 @@ public final class SnappyFramedInputStream extends InputStream implements if (!closed) { closed = true; } + + if (input != null) { + releaseDirectByteBuffer(input); + } + + if (uncompressedDirect != null) { + releaseDirectByteBuffer(uncompressedDirect); + } } } diff --git a/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java b/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java index f7474c7..0af72b1 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java @@ -7,6 +7,7 @@ import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG; import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; import static org.xerial.snappy.SnappyFramed.maskedCrc32c; +import static org.xerial.snappy.SnappyFramed.releaseDirectByteBuffer; import java.io.IOException; import java.io.InputStream; @@ -349,6 +350,9 @@ public final class SnappyFramedOutputStream extends OutputStream implements out.close(); } finally { closed = true; + + releaseDirectByteBuffer(directInputBuffer); + releaseDirectByteBuffer(outputBuffer); } }