From 7b86642f75c280debf3c1983053ea7f8635b48a5 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Wed, 22 Oct 2014 00:05:13 +0900 Subject: [PATCH] Fixes #88 by introducing thread-safe BufferAllocator --- project/build.properties | 2 +- .../org/xerial/snappy/BufferRecycler.java | 176 ------------------ .../org/xerial/snappy/SnappyOutputStream.java | 40 ++-- .../xerial/snappy/buffer/BufferAllocator.java | 11 ++ .../snappy/buffer/BufferAllocatorFactory.java | 10 + .../snappy/buffer/CachedBufferAllocator.java | 55 ++++++ .../snappy/buffer/DefaultBufferAllocator.java | 26 +++ version.sbt | 2 +- 8 files changed, 132 insertions(+), 190 deletions(-) delete mode 100644 src/main/java/org/xerial/snappy/BufferRecycler.java create mode 100644 src/main/java/org/xerial/snappy/buffer/BufferAllocator.java create mode 100644 src/main/java/org/xerial/snappy/buffer/BufferAllocatorFactory.java create mode 100644 src/main/java/org/xerial/snappy/buffer/CachedBufferAllocator.java create mode 100644 src/main/java/org/xerial/snappy/buffer/DefaultBufferAllocator.java diff --git a/project/build.properties b/project/build.properties index 6137a3a..005786e 100755 --- a/project/build.properties +++ b/project/build.properties @@ -1,2 +1,2 @@ -sbt.version=0.13.5 +sbt.version=0.13.6 diff --git a/src/main/java/org/xerial/snappy/BufferRecycler.java b/src/main/java/org/xerial/snappy/BufferRecycler.java deleted file mode 100644 index b8382b6..0000000 --- a/src/main/java/org/xerial/snappy/BufferRecycler.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Copyright (C) 2011 the original author or authors. - * See the NOTICE file distributed with this work for additional - * information regarding copyright ownership. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.xerial.snappy; -import java.lang.ref.SoftReference; - -/** - * Simple helper class to encapsulate details of basic buffer - * recycling scheme, which helps a lot (as per profiling) for - * smaller encoding cases. - * - * @author tatu - */ -class BufferRecycler -{ - private final static int MIN_ENCODING_BUFFER = 4000; - - private final static int MIN_OUTPUT_BUFFER = 8000; - - /** - * This ThreadLocal contains a {@link java.lang.ref.SoftReference} - * to a {@link BufferRecycler} used to provide a low-cost - * buffer recycling for buffers we need for encoding, decoding. - */ - final protected static ThreadLocal> recyclerRef - = new ThreadLocal>(); - - - private byte[] inputBuffer; - private byte[] outputBuffer; - - private byte[] decodingBuffer; - private byte[] encodingBuffer; - - private short[] encodingHash; - - - /** - * Accessor to get thread-local recycler instance - */ - public static BufferRecycler instance() - { - SoftReference ref = recyclerRef.get(); - - BufferRecycler bufferRecycler; - if (ref == null) { - bufferRecycler = null; - } - else { - bufferRecycler = ref.get(); - } - - if (bufferRecycler == null) { - bufferRecycler = new BufferRecycler(); - recyclerRef.set(new SoftReference(bufferRecycler)); - } - return bufferRecycler; - } - - /////////////////////////////////////////////////////////////////////// - // Buffers for encoding (output) - /////////////////////////////////////////////////////////////////////// - - public byte[] allocEncodingBuffer(int minSize) - { - byte[] buf = encodingBuffer; - if (buf == null || buf.length < minSize) { - buf = new byte[Math.max(minSize, MIN_ENCODING_BUFFER)]; - } - else { - encodingBuffer = null; - } - return buf; - } - - public void releaseEncodeBuffer(byte[] buffer) - { - if (encodingBuffer == null || buffer.length > encodingBuffer.length) { - encodingBuffer = buffer; - } - } - - public byte[] allocOutputBuffer(int minSize) - { - byte[] buf = outputBuffer; - if (buf == null || buf.length < minSize) { - buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)]; - } - else { - outputBuffer = null; - } - return buf; - } - - public void releaseOutputBuffer(byte[] buffer) - { - if (outputBuffer == null || (buffer != null && buffer.length > outputBuffer.length)) { - outputBuffer = buffer; - } - } - - public short[] allocEncodingHash(int suggestedSize) - { - short[] buf = encodingHash; - if (buf == null || buf.length < suggestedSize) { - buf = new short[suggestedSize]; - } - else { - encodingHash = null; - } - return buf; - } - - public void releaseEncodingHash(short[] buffer) - { - if (encodingHash == null || (buffer != null && buffer.length > encodingHash.length)) { - encodingHash = buffer; - } - } - - /////////////////////////////////////////////////////////////////////// - // Buffers for decoding (input) - /////////////////////////////////////////////////////////////////////// - - public byte[] allocInputBuffer(int minSize) - { - byte[] buf = inputBuffer; - if (buf == null || buf.length < minSize) { - buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)]; - } - else { - inputBuffer = null; - } - return buf; - } - - public void releaseInputBuffer(byte[] buffer) - { - if (inputBuffer == null || (buffer != null && buffer.length > inputBuffer.length)) { - inputBuffer = buffer; - } - } - - public byte[] allocDecodeBuffer(int size) - { - byte[] buf = decodingBuffer; - if (buf == null || buf.length < size) { - buf = new byte[size]; - } - else { - decodingBuffer = null; - } - return buf; - } - - public void releaseDecodeBuffer(byte[] buffer) - { - if (decodingBuffer == null || (buffer != null && buffer.length > decodingBuffer.length)) { - decodingBuffer = buffer; - } - } -} \ No newline at end of file diff --git a/src/main/java/org/xerial/snappy/SnappyOutputStream.java b/src/main/java/org/xerial/snappy/SnappyOutputStream.java index 3547c95..805bb39 100755 --- a/src/main/java/org/xerial/snappy/SnappyOutputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyOutputStream.java @@ -24,6 +24,10 @@ //-------------------------------------- package org.xerial.snappy; +import org.xerial.snappy.buffer.BufferAllocatorFactory; +import org.xerial.snappy.buffer.BufferAllocator; +import org.xerial.snappy.buffer.CachedBufferAllocator; + import java.io.IOException; import java.io.OutputStream; @@ -56,9 +60,11 @@ public class SnappyOutputStream extends OutputStream { static final int DEFAULT_BLOCK_SIZE = 32 * 1024; // Use 32kb for the default block size protected final OutputStream out; - - private final BufferRecycler recycler; private final int blockSize; + + private final BufferAllocator inputBufferAllocator; + private final BufferAllocator outputBufferAllocator; + protected final byte[] inputBuffer; protected final byte[] outputBuffer; private int inputCursor = 0; @@ -74,14 +80,25 @@ public class SnappyOutputStream extends OutputStream { * @throws IOException */ public SnappyOutputStream(OutputStream out, int blockSize) { + this(out, blockSize, CachedBufferAllocator.factory); + } + + public SnappyOutputStream(OutputStream out, int blockSize, BufferAllocatorFactory bufferAllocatorFactory) { this.out = out; - this.recycler = BufferRecycler.instance(); this.blockSize = Math.max(MIN_BLOCK_SIZE, blockSize); - inputBuffer = recycler.allocInputBuffer(this.blockSize); - outputBuffer = recycler.allocOutputBuffer(SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(this.blockSize)); + int inputSize = blockSize; + int outputSize = SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(blockSize); + + this.inputBufferAllocator = bufferAllocatorFactory.getBufferAllocator(inputSize); + this.outputBufferAllocator = bufferAllocatorFactory.getBufferAllocator(outputSize); + + inputBuffer = inputBufferAllocator.allocate(inputSize); + outputBuffer = inputBufferAllocator.allocate(outputSize); + outputCursor = SnappyCodec.currentHeader.writeHeader(outputBuffer, 0); } + /* (non-Javadoc) * @see java.io.OutputStream#write(byte[], int, int) */ @@ -265,9 +282,9 @@ public class SnappyOutputStream extends OutputStream { static void writeInt(byte[] dst, int offset, int v) { dst[offset] = (byte) ((v >> 24) & 0xFF); - dst[offset+1] = (byte) ((v >> 16) & 0xFF); - dst[offset+2] = (byte) ((v >> 8) & 0xFF); - dst[offset+3] = (byte) ((v >> 0) & 0xFF); + dst[offset + 1] = (byte) ((v >> 16) & 0xFF); + dst[offset + 2] = (byte) ((v >> 8) & 0xFF); + dst[offset + 3] = (byte) ((v >> 0) & 0xFF); } static int readInt(byte[] buffer, int pos) { @@ -312,10 +329,9 @@ public class SnappyOutputStream extends OutputStream { try { flush(); out.close(); - } - finally { - recycler.releaseInputBuffer(inputBuffer); - recycler.releaseOutputBuffer(outputBuffer); + } finally { + inputBufferAllocator.release(inputBuffer); + outputBufferAllocator.release(outputBuffer); } } diff --git a/src/main/java/org/xerial/snappy/buffer/BufferAllocator.java b/src/main/java/org/xerial/snappy/buffer/BufferAllocator.java new file mode 100644 index 0000000..205a6ea --- /dev/null +++ b/src/main/java/org/xerial/snappy/buffer/BufferAllocator.java @@ -0,0 +1,11 @@ +package org.xerial.snappy.buffer; + +/** + * BufferAllocator interface. The implementation of this interface must be thread-safe + */ +public interface BufferAllocator { + + public byte[] allocate(int size); + public void release(byte[] buffer); + +} diff --git a/src/main/java/org/xerial/snappy/buffer/BufferAllocatorFactory.java b/src/main/java/org/xerial/snappy/buffer/BufferAllocatorFactory.java new file mode 100644 index 0000000..dda87b6 --- /dev/null +++ b/src/main/java/org/xerial/snappy/buffer/BufferAllocatorFactory.java @@ -0,0 +1,10 @@ +package org.xerial.snappy.buffer; + +/** + * + */ +public interface BufferAllocatorFactory { + + BufferAllocator getBufferAllocator(int minSize); +} + diff --git a/src/main/java/org/xerial/snappy/buffer/CachedBufferAllocator.java b/src/main/java/org/xerial/snappy/buffer/CachedBufferAllocator.java new file mode 100644 index 0000000..2ad9926 --- /dev/null +++ b/src/main/java/org/xerial/snappy/buffer/CachedBufferAllocator.java @@ -0,0 +1,55 @@ +package org.xerial.snappy.buffer; + +import java.lang.ref.SoftReference; +import java.util.*; + +/** + * Cached buffer + */ +public class CachedBufferAllocator implements BufferAllocator { + + public static BufferAllocatorFactory factory = new BufferAllocatorFactory() { + @Override + public BufferAllocator getBufferAllocator(int bufferSize) { + return CachedBufferAllocator.getAllocator(bufferSize); + } + }; + + /** + * Use SoftReference so that having this queueTable does not prevent the GC of CachedBufferAllocator instances + */ + public static Map> queueTable = new HashMap>(); + + private final int bufferSize; + private final Deque bufferQueue; + + public CachedBufferAllocator(int bufferSize) { + this.bufferSize = bufferSize; + this.bufferQueue = new ArrayDeque(); + } + + public static synchronized CachedBufferAllocator getAllocator(int bufferSize) { + if(!queueTable.containsKey(bufferSize)) { + queueTable.put(bufferSize, new SoftReference(new CachedBufferAllocator(bufferSize))); + } + return queueTable.get(bufferSize).get(); + } + + @Override + public byte[] allocate(int size) { + synchronized(this) { + if(bufferQueue.isEmpty()) { + return new byte[size]; + } + else { + return bufferQueue.pollFirst(); + } + } + } + @Override + public void release(byte[] buffer) { + synchronized(this) { + bufferQueue.addLast(buffer); + } + } +} diff --git a/src/main/java/org/xerial/snappy/buffer/DefaultBufferAllocator.java b/src/main/java/org/xerial/snappy/buffer/DefaultBufferAllocator.java new file mode 100644 index 0000000..0c459bc --- /dev/null +++ b/src/main/java/org/xerial/snappy/buffer/DefaultBufferAllocator.java @@ -0,0 +1,26 @@ +package org.xerial.snappy.buffer; + +/** + * Simple buffer allocator, which does not reuse the allocated buffer + */ +public class DefaultBufferAllocator implements BufferAllocator { + + public static BufferAllocatorFactory factory = new BufferAllocatorFactory() { + public BufferAllocator singleton = new DefaultBufferAllocator(); + @Override + public BufferAllocator getBufferAllocator(int bufferSize) { + return singleton; + } + }; + + @Override + public byte[] allocate(int size) { + return new byte[size]; + } + + @Override + public void release(byte[] buffer) { + // do nothing + } + +} diff --git a/version.sbt b/version.sbt index 7aab699..ffd3363 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.1.3" +version in ThisBuild := "1.1.1.4-SNAPSHOT"