diff --git a/project/build.properties b/project/build.properties
index 6137a3a..005786e 100755
--- a/project/build.properties
+++ b/project/build.properties
@@ -1,2 +1,2 @@
-sbt.version=0.13.5
+sbt.version=0.13.6
diff --git a/src/main/java/org/xerial/snappy/BufferRecycler.java b/src/main/java/org/xerial/snappy/BufferRecycler.java
deleted file mode 100644
index b8382b6..0000000
--- a/src/main/java/org/xerial/snappy/BufferRecycler.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright (C) 2011 the original author or authors.
- * See the NOTICE file distributed with this work for additional
- * information regarding copyright ownership.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.xerial.snappy;
-import java.lang.ref.SoftReference;
-
-/**
- * Simple helper class to encapsulate details of basic buffer
- * recycling scheme, which helps a lot (as per profiling) for
- * smaller encoding cases.
- *
- * @author tatu
- */
-class BufferRecycler
-{
- private final static int MIN_ENCODING_BUFFER = 4000;
-
- private final static int MIN_OUTPUT_BUFFER = 8000;
-
- /**
- * This ThreadLocal
contains a {@link java.lang.ref.SoftReference}
- * to a {@link BufferRecycler} used to provide a low-cost
- * buffer recycling for buffers we need for encoding, decoding.
- */
- final protected static ThreadLocal> recyclerRef
- = new ThreadLocal>();
-
-
- private byte[] inputBuffer;
- private byte[] outputBuffer;
-
- private byte[] decodingBuffer;
- private byte[] encodingBuffer;
-
- private short[] encodingHash;
-
-
- /**
- * Accessor to get thread-local recycler instance
- */
- public static BufferRecycler instance()
- {
- SoftReference ref = recyclerRef.get();
-
- BufferRecycler bufferRecycler;
- if (ref == null) {
- bufferRecycler = null;
- }
- else {
- bufferRecycler = ref.get();
- }
-
- if (bufferRecycler == null) {
- bufferRecycler = new BufferRecycler();
- recyclerRef.set(new SoftReference(bufferRecycler));
- }
- return bufferRecycler;
- }
-
- ///////////////////////////////////////////////////////////////////////
- // Buffers for encoding (output)
- ///////////////////////////////////////////////////////////////////////
-
- public byte[] allocEncodingBuffer(int minSize)
- {
- byte[] buf = encodingBuffer;
- if (buf == null || buf.length < minSize) {
- buf = new byte[Math.max(minSize, MIN_ENCODING_BUFFER)];
- }
- else {
- encodingBuffer = null;
- }
- return buf;
- }
-
- public void releaseEncodeBuffer(byte[] buffer)
- {
- if (encodingBuffer == null || buffer.length > encodingBuffer.length) {
- encodingBuffer = buffer;
- }
- }
-
- public byte[] allocOutputBuffer(int minSize)
- {
- byte[] buf = outputBuffer;
- if (buf == null || buf.length < minSize) {
- buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)];
- }
- else {
- outputBuffer = null;
- }
- return buf;
- }
-
- public void releaseOutputBuffer(byte[] buffer)
- {
- if (outputBuffer == null || (buffer != null && buffer.length > outputBuffer.length)) {
- outputBuffer = buffer;
- }
- }
-
- public short[] allocEncodingHash(int suggestedSize)
- {
- short[] buf = encodingHash;
- if (buf == null || buf.length < suggestedSize) {
- buf = new short[suggestedSize];
- }
- else {
- encodingHash = null;
- }
- return buf;
- }
-
- public void releaseEncodingHash(short[] buffer)
- {
- if (encodingHash == null || (buffer != null && buffer.length > encodingHash.length)) {
- encodingHash = buffer;
- }
- }
-
- ///////////////////////////////////////////////////////////////////////
- // Buffers for decoding (input)
- ///////////////////////////////////////////////////////////////////////
-
- public byte[] allocInputBuffer(int minSize)
- {
- byte[] buf = inputBuffer;
- if (buf == null || buf.length < minSize) {
- buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)];
- }
- else {
- inputBuffer = null;
- }
- return buf;
- }
-
- public void releaseInputBuffer(byte[] buffer)
- {
- if (inputBuffer == null || (buffer != null && buffer.length > inputBuffer.length)) {
- inputBuffer = buffer;
- }
- }
-
- public byte[] allocDecodeBuffer(int size)
- {
- byte[] buf = decodingBuffer;
- if (buf == null || buf.length < size) {
- buf = new byte[size];
- }
- else {
- decodingBuffer = null;
- }
- return buf;
- }
-
- public void releaseDecodeBuffer(byte[] buffer)
- {
- if (decodingBuffer == null || (buffer != null && buffer.length > decodingBuffer.length)) {
- decodingBuffer = buffer;
- }
- }
-}
\ No newline at end of file
diff --git a/src/main/java/org/xerial/snappy/SnappyOutputStream.java b/src/main/java/org/xerial/snappy/SnappyOutputStream.java
index 3547c95..805bb39 100755
--- a/src/main/java/org/xerial/snappy/SnappyOutputStream.java
+++ b/src/main/java/org/xerial/snappy/SnappyOutputStream.java
@@ -24,6 +24,10 @@
//--------------------------------------
package org.xerial.snappy;
+import org.xerial.snappy.buffer.BufferAllocatorFactory;
+import org.xerial.snappy.buffer.BufferAllocator;
+import org.xerial.snappy.buffer.CachedBufferAllocator;
+
import java.io.IOException;
import java.io.OutputStream;
@@ -56,9 +60,11 @@ public class SnappyOutputStream extends OutputStream {
static final int DEFAULT_BLOCK_SIZE = 32 * 1024; // Use 32kb for the default block size
protected final OutputStream out;
-
- private final BufferRecycler recycler;
private final int blockSize;
+
+ private final BufferAllocator inputBufferAllocator;
+ private final BufferAllocator outputBufferAllocator;
+
protected final byte[] inputBuffer;
protected final byte[] outputBuffer;
private int inputCursor = 0;
@@ -74,14 +80,25 @@ public class SnappyOutputStream extends OutputStream {
* @throws IOException
*/
public SnappyOutputStream(OutputStream out, int blockSize) {
+ this(out, blockSize, CachedBufferAllocator.factory);
+ }
+
+ public SnappyOutputStream(OutputStream out, int blockSize, BufferAllocatorFactory bufferAllocatorFactory) {
this.out = out;
- this.recycler = BufferRecycler.instance();
this.blockSize = Math.max(MIN_BLOCK_SIZE, blockSize);
- inputBuffer = recycler.allocInputBuffer(this.blockSize);
- outputBuffer = recycler.allocOutputBuffer(SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(this.blockSize));
+ int inputSize = blockSize;
+ int outputSize = SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(blockSize);
+
+ this.inputBufferAllocator = bufferAllocatorFactory.getBufferAllocator(inputSize);
+ this.outputBufferAllocator = bufferAllocatorFactory.getBufferAllocator(outputSize);
+
+ inputBuffer = inputBufferAllocator.allocate(inputSize);
+ outputBuffer = inputBufferAllocator.allocate(outputSize);
+
outputCursor = SnappyCodec.currentHeader.writeHeader(outputBuffer, 0);
}
+
/* (non-Javadoc)
* @see java.io.OutputStream#write(byte[], int, int)
*/
@@ -265,9 +282,9 @@ public class SnappyOutputStream extends OutputStream {
static void writeInt(byte[] dst, int offset, int v) {
dst[offset] = (byte) ((v >> 24) & 0xFF);
- dst[offset+1] = (byte) ((v >> 16) & 0xFF);
- dst[offset+2] = (byte) ((v >> 8) & 0xFF);
- dst[offset+3] = (byte) ((v >> 0) & 0xFF);
+ dst[offset + 1] = (byte) ((v >> 16) & 0xFF);
+ dst[offset + 2] = (byte) ((v >> 8) & 0xFF);
+ dst[offset + 3] = (byte) ((v >> 0) & 0xFF);
}
static int readInt(byte[] buffer, int pos) {
@@ -312,10 +329,9 @@ public class SnappyOutputStream extends OutputStream {
try {
flush();
out.close();
- }
- finally {
- recycler.releaseInputBuffer(inputBuffer);
- recycler.releaseOutputBuffer(outputBuffer);
+ } finally {
+ inputBufferAllocator.release(inputBuffer);
+ outputBufferAllocator.release(outputBuffer);
}
}
diff --git a/src/main/java/org/xerial/snappy/buffer/BufferAllocator.java b/src/main/java/org/xerial/snappy/buffer/BufferAllocator.java
new file mode 100644
index 0000000..205a6ea
--- /dev/null
+++ b/src/main/java/org/xerial/snappy/buffer/BufferAllocator.java
@@ -0,0 +1,11 @@
+package org.xerial.snappy.buffer;
+
+/**
+ * BufferAllocator interface. The implementation of this interface must be thread-safe
+ */
+public interface BufferAllocator {
+
+ public byte[] allocate(int size);
+ public void release(byte[] buffer);
+
+}
diff --git a/src/main/java/org/xerial/snappy/buffer/BufferAllocatorFactory.java b/src/main/java/org/xerial/snappy/buffer/BufferAllocatorFactory.java
new file mode 100644
index 0000000..dda87b6
--- /dev/null
+++ b/src/main/java/org/xerial/snappy/buffer/BufferAllocatorFactory.java
@@ -0,0 +1,10 @@
+package org.xerial.snappy.buffer;
+
+/**
+ *
+ */
+public interface BufferAllocatorFactory {
+
+ BufferAllocator getBufferAllocator(int minSize);
+}
+
diff --git a/src/main/java/org/xerial/snappy/buffer/CachedBufferAllocator.java b/src/main/java/org/xerial/snappy/buffer/CachedBufferAllocator.java
new file mode 100644
index 0000000..2ad9926
--- /dev/null
+++ b/src/main/java/org/xerial/snappy/buffer/CachedBufferAllocator.java
@@ -0,0 +1,55 @@
+package org.xerial.snappy.buffer;
+
+import java.lang.ref.SoftReference;
+import java.util.*;
+
+/**
+ * Cached buffer
+ */
+public class CachedBufferAllocator implements BufferAllocator {
+
+ public static BufferAllocatorFactory factory = new BufferAllocatorFactory() {
+ @Override
+ public BufferAllocator getBufferAllocator(int bufferSize) {
+ return CachedBufferAllocator.getAllocator(bufferSize);
+ }
+ };
+
+ /**
+ * Use SoftReference so that having this queueTable does not prevent the GC of CachedBufferAllocator instances
+ */
+ public static Map> queueTable = new HashMap>();
+
+ private final int bufferSize;
+ private final Deque bufferQueue;
+
+ public CachedBufferAllocator(int bufferSize) {
+ this.bufferSize = bufferSize;
+ this.bufferQueue = new ArrayDeque();
+ }
+
+ public static synchronized CachedBufferAllocator getAllocator(int bufferSize) {
+ if(!queueTable.containsKey(bufferSize)) {
+ queueTable.put(bufferSize, new SoftReference(new CachedBufferAllocator(bufferSize)));
+ }
+ return queueTable.get(bufferSize).get();
+ }
+
+ @Override
+ public byte[] allocate(int size) {
+ synchronized(this) {
+ if(bufferQueue.isEmpty()) {
+ return new byte[size];
+ }
+ else {
+ return bufferQueue.pollFirst();
+ }
+ }
+ }
+ @Override
+ public void release(byte[] buffer) {
+ synchronized(this) {
+ bufferQueue.addLast(buffer);
+ }
+ }
+}
diff --git a/src/main/java/org/xerial/snappy/buffer/DefaultBufferAllocator.java b/src/main/java/org/xerial/snappy/buffer/DefaultBufferAllocator.java
new file mode 100644
index 0000000..0c459bc
--- /dev/null
+++ b/src/main/java/org/xerial/snappy/buffer/DefaultBufferAllocator.java
@@ -0,0 +1,26 @@
+package org.xerial.snappy.buffer;
+
+/**
+ * Simple buffer allocator, which does not reuse the allocated buffer
+ */
+public class DefaultBufferAllocator implements BufferAllocator {
+
+ public static BufferAllocatorFactory factory = new BufferAllocatorFactory() {
+ public BufferAllocator singleton = new DefaultBufferAllocator();
+ @Override
+ public BufferAllocator getBufferAllocator(int bufferSize) {
+ return singleton;
+ }
+ };
+
+ @Override
+ public byte[] allocate(int size) {
+ return new byte[size];
+ }
+
+ @Override
+ public void release(byte[] buffer) {
+ // do nothing
+ }
+
+}
diff --git a/version.sbt b/version.sbt
index 7aab699..ffd3363 100644
--- a/version.sbt
+++ b/version.sbt
@@ -1 +1 @@
-version in ThisBuild := "1.1.1.3"
+version in ThisBuild := "1.1.1.4-SNAPSHOT"