Pool table instances to make PureJavaSnappy thread safe (#271)

https://github.com/xerial/snappy-java/issues/270

Co-authored-by: BO8979 <BO8979@W1971362.northamerica.cerner.net>
This commit is contained in:
Brett Okken 2021-01-20 13:20:22 -06:00 committed by GitHub
parent 7f47cf744a
commit 110727ed69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 83 additions and 18 deletions

View File

@ -1,26 +1,41 @@
package org.xerial.snappy.pure;
import org.xerial.snappy.SnappyApi;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.xerial.snappy.pure.UnsafeUtil.getAddress;
import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.xerial.snappy.SnappyApi;
/**
* A pure-java Snappy implementation using https://github.com/airlift/aircompressor
*/
public class PureJavaSnappy implements SnappyApi
{
private final short[] table = new short[SnappyRawCompressor.MAX_HASH_TABLE_SIZE];
/**
* Using a {@link ConcurrentLinkedDeque}, with values constantly popped and pushed from the head, leads to the fewest
* {@code short[]} instances remaining live over time.
*/
private final static ConcurrentLinkedDeque<SoftReference<short[]>> CACHED_TABLES = new ConcurrentLinkedDeque<>();
private final static int MAX_OUTPUT_LENGTH = Integer.MAX_VALUE;
@Override
public long rawCompress(long inputAddr, long inputSize, long destAddr)
throws IOException
{
return SnappyRawCompressor.compress(null, inputAddr, inputSize, null, destAddr, MAX_OUTPUT_LENGTH, table);
final short[] table = getTable();
try
{
return SnappyRawCompressor.compress(null, inputAddr, inputSize, null, destAddr, MAX_OUTPUT_LENGTH, table);
}
finally
{
returnTable(table);
}
}
@Override
@ -76,16 +91,24 @@ public class PureJavaSnappy implements SnappyApi
// collected in a block, and technically, the JVM is allowed to eliminate these locks.
synchronized (input) {
synchronized (compressed) {
int written = SnappyRawCompressor.compress(
inputBase,
inputAddress,
inputLimit,
outputBase,
outputAddress,
outputLimit,
table);
compressed.position(compressed.position() + written);
return written;
final short[] table = getTable();
try
{
int written = SnappyRawCompressor.compress(
inputBase,
inputAddress,
inputLimit,
outputBase,
outputAddress,
outputLimit,
table);
compressed.position(compressed.position() + written);
return written;
}
finally
{
returnTable(table);
}
}
}
}
@ -99,7 +122,15 @@ public class PureJavaSnappy implements SnappyApi
long outputAddress = ARRAY_BYTE_BASE_OFFSET + outputOffset;
long outputLimit = outputAddress + MAX_OUTPUT_LENGTH;
return SnappyRawCompressor.compress(input, inputAddress, inputLimit, output, outputAddress, outputLimit, table);
final short[] table = getTable();
try
{
return SnappyRawCompressor.compress(input, inputAddress, inputLimit, output, outputAddress, outputLimit, table);
}
finally
{
returnTable(table);
}
}
@Override
@ -241,4 +272,38 @@ public class PureJavaSnappy implements SnappyApi
{
System.arraycopy(src, offset, dest, dOffset, byteLength);
}
private static short[] getTable()
{
SoftReference<short[]> existingRef;
while((existingRef = CACHED_TABLES.poll()) != null)
{
short[] table = existingRef.get();
if (table != null)
{
//purge oldest entries have lost references
SoftReference<short[]> entry;
boolean lastEmpty = true;
while (lastEmpty && (entry = CACHED_TABLES.peekLast()) != null)
{
if (entry.get() == null)
{
CACHED_TABLES.removeLastOccurrence(entry);
}
else
{
lastEmpty = false;
}
}
return table;
}
}
return new short[SnappyRawCompressor.MAX_HASH_TABLE_SIZE];
}
private static void returnTable(short[] table)
{
CACHED_TABLES.addFirst(new SoftReference<short[]>(table));
}
}