mirror of
https://github.com/xerial/snappy-java.git
synced 2025-07-23 14:04:39 +02:00
Fixes #88 by introducing thread-safe BufferAllocator
This commit is contained in:
parent
5eebef89c6
commit
7b86642f75
@ -1,2 +1,2 @@
|
|||||||
sbt.version=0.13.5
|
sbt.version=0.13.6
|
||||||
|
|
||||||
|
@ -1,176 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (C) 2011 the original author or authors.
|
|
||||||
* See the NOTICE file distributed with this work for additional
|
|
||||||
* information regarding copyright ownership.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.xerial.snappy;
|
|
||||||
import java.lang.ref.SoftReference;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Simple helper class to encapsulate details of basic buffer
|
|
||||||
* recycling scheme, which helps a lot (as per profiling) for
|
|
||||||
* smaller encoding cases.
|
|
||||||
*
|
|
||||||
* @author tatu
|
|
||||||
*/
|
|
||||||
class BufferRecycler
|
|
||||||
{
|
|
||||||
private final static int MIN_ENCODING_BUFFER = 4000;
|
|
||||||
|
|
||||||
private final static int MIN_OUTPUT_BUFFER = 8000;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This <code>ThreadLocal</code> contains a {@link java.lang.ref.SoftReference}
|
|
||||||
* to a {@link BufferRecycler} used to provide a low-cost
|
|
||||||
* buffer recycling for buffers we need for encoding, decoding.
|
|
||||||
*/
|
|
||||||
final protected static ThreadLocal<SoftReference<BufferRecycler>> recyclerRef
|
|
||||||
= new ThreadLocal<SoftReference<BufferRecycler>>();
|
|
||||||
|
|
||||||
|
|
||||||
private byte[] inputBuffer;
|
|
||||||
private byte[] outputBuffer;
|
|
||||||
|
|
||||||
private byte[] decodingBuffer;
|
|
||||||
private byte[] encodingBuffer;
|
|
||||||
|
|
||||||
private short[] encodingHash;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Accessor to get thread-local recycler instance
|
|
||||||
*/
|
|
||||||
public static BufferRecycler instance()
|
|
||||||
{
|
|
||||||
SoftReference<BufferRecycler> ref = recyclerRef.get();
|
|
||||||
|
|
||||||
BufferRecycler bufferRecycler;
|
|
||||||
if (ref == null) {
|
|
||||||
bufferRecycler = null;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
bufferRecycler = ref.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (bufferRecycler == null) {
|
|
||||||
bufferRecycler = new BufferRecycler();
|
|
||||||
recyclerRef.set(new SoftReference<BufferRecycler>(bufferRecycler));
|
|
||||||
}
|
|
||||||
return bufferRecycler;
|
|
||||||
}
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////
|
|
||||||
// Buffers for encoding (output)
|
|
||||||
///////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
public byte[] allocEncodingBuffer(int minSize)
|
|
||||||
{
|
|
||||||
byte[] buf = encodingBuffer;
|
|
||||||
if (buf == null || buf.length < minSize) {
|
|
||||||
buf = new byte[Math.max(minSize, MIN_ENCODING_BUFFER)];
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
encodingBuffer = null;
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void releaseEncodeBuffer(byte[] buffer)
|
|
||||||
{
|
|
||||||
if (encodingBuffer == null || buffer.length > encodingBuffer.length) {
|
|
||||||
encodingBuffer = buffer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] allocOutputBuffer(int minSize)
|
|
||||||
{
|
|
||||||
byte[] buf = outputBuffer;
|
|
||||||
if (buf == null || buf.length < minSize) {
|
|
||||||
buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)];
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
outputBuffer = null;
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void releaseOutputBuffer(byte[] buffer)
|
|
||||||
{
|
|
||||||
if (outputBuffer == null || (buffer != null && buffer.length > outputBuffer.length)) {
|
|
||||||
outputBuffer = buffer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public short[] allocEncodingHash(int suggestedSize)
|
|
||||||
{
|
|
||||||
short[] buf = encodingHash;
|
|
||||||
if (buf == null || buf.length < suggestedSize) {
|
|
||||||
buf = new short[suggestedSize];
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
encodingHash = null;
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void releaseEncodingHash(short[] buffer)
|
|
||||||
{
|
|
||||||
if (encodingHash == null || (buffer != null && buffer.length > encodingHash.length)) {
|
|
||||||
encodingHash = buffer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////
|
|
||||||
// Buffers for decoding (input)
|
|
||||||
///////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
public byte[] allocInputBuffer(int minSize)
|
|
||||||
{
|
|
||||||
byte[] buf = inputBuffer;
|
|
||||||
if (buf == null || buf.length < minSize) {
|
|
||||||
buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)];
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
inputBuffer = null;
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void releaseInputBuffer(byte[] buffer)
|
|
||||||
{
|
|
||||||
if (inputBuffer == null || (buffer != null && buffer.length > inputBuffer.length)) {
|
|
||||||
inputBuffer = buffer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] allocDecodeBuffer(int size)
|
|
||||||
{
|
|
||||||
byte[] buf = decodingBuffer;
|
|
||||||
if (buf == null || buf.length < size) {
|
|
||||||
buf = new byte[size];
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
decodingBuffer = null;
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void releaseDecodeBuffer(byte[] buffer)
|
|
||||||
{
|
|
||||||
if (decodingBuffer == null || (buffer != null && buffer.length > decodingBuffer.length)) {
|
|
||||||
decodingBuffer = buffer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -24,6 +24,10 @@
|
|||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
package org.xerial.snappy;
|
package org.xerial.snappy;
|
||||||
|
|
||||||
|
import org.xerial.snappy.buffer.BufferAllocatorFactory;
|
||||||
|
import org.xerial.snappy.buffer.BufferAllocator;
|
||||||
|
import org.xerial.snappy.buffer.CachedBufferAllocator;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
|
||||||
@ -56,9 +60,11 @@ public class SnappyOutputStream extends OutputStream {
|
|||||||
static final int DEFAULT_BLOCK_SIZE = 32 * 1024; // Use 32kb for the default block size
|
static final int DEFAULT_BLOCK_SIZE = 32 * 1024; // Use 32kb for the default block size
|
||||||
|
|
||||||
protected final OutputStream out;
|
protected final OutputStream out;
|
||||||
|
|
||||||
private final BufferRecycler recycler;
|
|
||||||
private final int blockSize;
|
private final int blockSize;
|
||||||
|
|
||||||
|
private final BufferAllocator inputBufferAllocator;
|
||||||
|
private final BufferAllocator outputBufferAllocator;
|
||||||
|
|
||||||
protected final byte[] inputBuffer;
|
protected final byte[] inputBuffer;
|
||||||
protected final byte[] outputBuffer;
|
protected final byte[] outputBuffer;
|
||||||
private int inputCursor = 0;
|
private int inputCursor = 0;
|
||||||
@ -74,14 +80,25 @@ public class SnappyOutputStream extends OutputStream {
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public SnappyOutputStream(OutputStream out, int blockSize) {
|
public SnappyOutputStream(OutputStream out, int blockSize) {
|
||||||
|
this(out, blockSize, CachedBufferAllocator.factory);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SnappyOutputStream(OutputStream out, int blockSize, BufferAllocatorFactory bufferAllocatorFactory) {
|
||||||
this.out = out;
|
this.out = out;
|
||||||
this.recycler = BufferRecycler.instance();
|
|
||||||
this.blockSize = Math.max(MIN_BLOCK_SIZE, blockSize);
|
this.blockSize = Math.max(MIN_BLOCK_SIZE, blockSize);
|
||||||
inputBuffer = recycler.allocInputBuffer(this.blockSize);
|
int inputSize = blockSize;
|
||||||
outputBuffer = recycler.allocOutputBuffer(SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(this.blockSize));
|
int outputSize = SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(blockSize);
|
||||||
|
|
||||||
|
this.inputBufferAllocator = bufferAllocatorFactory.getBufferAllocator(inputSize);
|
||||||
|
this.outputBufferAllocator = bufferAllocatorFactory.getBufferAllocator(outputSize);
|
||||||
|
|
||||||
|
inputBuffer = inputBufferAllocator.allocate(inputSize);
|
||||||
|
outputBuffer = inputBufferAllocator.allocate(outputSize);
|
||||||
|
|
||||||
outputCursor = SnappyCodec.currentHeader.writeHeader(outputBuffer, 0);
|
outputCursor = SnappyCodec.currentHeader.writeHeader(outputBuffer, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* (non-Javadoc)
|
/* (non-Javadoc)
|
||||||
* @see java.io.OutputStream#write(byte[], int, int)
|
* @see java.io.OutputStream#write(byte[], int, int)
|
||||||
*/
|
*/
|
||||||
@ -265,9 +282,9 @@ public class SnappyOutputStream extends OutputStream {
|
|||||||
|
|
||||||
static void writeInt(byte[] dst, int offset, int v) {
|
static void writeInt(byte[] dst, int offset, int v) {
|
||||||
dst[offset] = (byte) ((v >> 24) & 0xFF);
|
dst[offset] = (byte) ((v >> 24) & 0xFF);
|
||||||
dst[offset+1] = (byte) ((v >> 16) & 0xFF);
|
dst[offset + 1] = (byte) ((v >> 16) & 0xFF);
|
||||||
dst[offset+2] = (byte) ((v >> 8) & 0xFF);
|
dst[offset + 2] = (byte) ((v >> 8) & 0xFF);
|
||||||
dst[offset+3] = (byte) ((v >> 0) & 0xFF);
|
dst[offset + 3] = (byte) ((v >> 0) & 0xFF);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int readInt(byte[] buffer, int pos) {
|
static int readInt(byte[] buffer, int pos) {
|
||||||
@ -312,10 +329,9 @@ public class SnappyOutputStream extends OutputStream {
|
|||||||
try {
|
try {
|
||||||
flush();
|
flush();
|
||||||
out.close();
|
out.close();
|
||||||
}
|
} finally {
|
||||||
finally {
|
inputBufferAllocator.release(inputBuffer);
|
||||||
recycler.releaseInputBuffer(inputBuffer);
|
outputBufferAllocator.release(outputBuffer);
|
||||||
recycler.releaseOutputBuffer(outputBuffer);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
11
src/main/java/org/xerial/snappy/buffer/BufferAllocator.java
Normal file
11
src/main/java/org/xerial/snappy/buffer/BufferAllocator.java
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
package org.xerial.snappy.buffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* BufferAllocator interface. The implementation of this interface must be thread-safe
|
||||||
|
*/
|
||||||
|
public interface BufferAllocator {
|
||||||
|
|
||||||
|
public byte[] allocate(int size);
|
||||||
|
public void release(byte[] buffer);
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,10 @@
|
|||||||
|
package org.xerial.snappy.buffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public interface BufferAllocatorFactory {
|
||||||
|
|
||||||
|
BufferAllocator getBufferAllocator(int minSize);
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,55 @@
|
|||||||
|
package org.xerial.snappy.buffer;
|
||||||
|
|
||||||
|
import java.lang.ref.SoftReference;
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cached buffer
|
||||||
|
*/
|
||||||
|
public class CachedBufferAllocator implements BufferAllocator {
|
||||||
|
|
||||||
|
public static BufferAllocatorFactory factory = new BufferAllocatorFactory() {
|
||||||
|
@Override
|
||||||
|
public BufferAllocator getBufferAllocator(int bufferSize) {
|
||||||
|
return CachedBufferAllocator.getAllocator(bufferSize);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use SoftReference so that having this queueTable does not prevent the GC of CachedBufferAllocator instances
|
||||||
|
*/
|
||||||
|
public static Map<Integer, SoftReference<CachedBufferAllocator>> queueTable = new HashMap<Integer, SoftReference<CachedBufferAllocator>>();
|
||||||
|
|
||||||
|
private final int bufferSize;
|
||||||
|
private final Deque<byte[]> bufferQueue;
|
||||||
|
|
||||||
|
public CachedBufferAllocator(int bufferSize) {
|
||||||
|
this.bufferSize = bufferSize;
|
||||||
|
this.bufferQueue = new ArrayDeque<byte[]>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static synchronized CachedBufferAllocator getAllocator(int bufferSize) {
|
||||||
|
if(!queueTable.containsKey(bufferSize)) {
|
||||||
|
queueTable.put(bufferSize, new SoftReference<CachedBufferAllocator>(new CachedBufferAllocator(bufferSize)));
|
||||||
|
}
|
||||||
|
return queueTable.get(bufferSize).get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] allocate(int size) {
|
||||||
|
synchronized(this) {
|
||||||
|
if(bufferQueue.isEmpty()) {
|
||||||
|
return new byte[size];
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return bufferQueue.pollFirst();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void release(byte[] buffer) {
|
||||||
|
synchronized(this) {
|
||||||
|
bufferQueue.addLast(buffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,26 @@
|
|||||||
|
package org.xerial.snappy.buffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple buffer allocator, which does not reuse the allocated buffer
|
||||||
|
*/
|
||||||
|
public class DefaultBufferAllocator implements BufferAllocator {
|
||||||
|
|
||||||
|
public static BufferAllocatorFactory factory = new BufferAllocatorFactory() {
|
||||||
|
public BufferAllocator singleton = new DefaultBufferAllocator();
|
||||||
|
@Override
|
||||||
|
public BufferAllocator getBufferAllocator(int bufferSize) {
|
||||||
|
return singleton;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] allocate(int size) {
|
||||||
|
return new byte[size];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void release(byte[] buffer) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1 +1 @@
|
|||||||
version in ThisBuild := "1.1.1.3"
|
version in ThisBuild := "1.1.1.4-SNAPSHOT"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user