Merge pull request #50 from bokken/develop

Optimize SnappyFramedOutputStream for NIO.
This commit is contained in:
Taro L. Saito 2013-10-17 18:09:10 -07:00
commit 0b110d55da
1 changed files with 76 additions and 39 deletions

View File

@ -11,6 +11,8 @@ import static org.xerial.snappy.SnappyFramed.maskedCrc32c;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritableByteChannel;
@ -49,12 +51,14 @@ public final class SnappyFramedOutputStream extends OutputStream implements
*/
public static final double DEFAULT_MIN_COMPRESSION_RATIO = 0.85d;
// private final int blockSize;
private final ByteBuffer headerBuffer = ByteBuffer.allocate(8).order(
ByteOrder.LITTLE_ENDIAN);
private final ByteBuffer buffer;
private final byte[] outputBuffer;
private final ByteBuffer directInputBuffer;
private final ByteBuffer outputBuffer;
private final double minCompressionRatio;
private final OutputStream out;
private final WritableByteChannel out;
// private int position;
private boolean closed;
@ -88,6 +92,39 @@ public final class SnappyFramedOutputStream extends OutputStream implements
*/
public SnappyFramedOutputStream(OutputStream out, int blockSize,
double minCompressionRatio) throws IOException {
this(Channels.newChannel(out), blockSize, minCompressionRatio);
}
/**
* Creates a new {@link SnappyFramedOutputStream} using the
* {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
*
* @param out
* The underlying {@link WritableByteChannel} to write to. Must
* not be {@code null}.
* @throws IOException
*/
public SnappyFramedOutputStream(WritableByteChannel out) throws IOException {
this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO);
}
/**
* Creates a new {@link SnappyFramedOutputStream} instance.
*
* @param out
* The underlying {@link WritableByteChannel} to write to. Must
* not be {@code null}.
* @param blockSize
* The block size (of raw data) to compress before writing frames
* to <i>out</i>. Must be in (0, 65536].
* @param minCompressionRatio
* Defines the minimum compression ratio (
* {@code compressedLength / rawLength}) that must be achieved to
* write the compressed data. This must be in (0, 1.0].
* @throws IOException
*/
public SnappyFramedOutputStream(WritableByteChannel out, int blockSize,
double minCompressionRatio) throws IOException {
if (out == null) {
throw new NullPointerException();
}
@ -105,7 +142,9 @@ public final class SnappyFramedOutputStream extends OutputStream implements
this.out = out;
this.minCompressionRatio = minCompressionRatio;
buffer = ByteBuffer.allocate(blockSize);
outputBuffer = new byte[Snappy.maxCompressedLength(blockSize)];
directInputBuffer = ByteBuffer.allocateDirect(blockSize);
outputBuffer = ByteBuffer.allocateDirect(Snappy
.maxCompressedLength(blockSize));
writeHeader(out);
}
@ -118,8 +157,8 @@ public final class SnappyFramedOutputStream extends OutputStream implements
* The underlying {@link OutputStream}.
* @throws IOException
*/
private void writeHeader(OutputStream out) throws IOException {
out.write(HEADER_BYTES);
private void writeHeader(WritableByteChannel out) throws IOException {
out.write(ByteBuffer.wrap(HEADER_BYTES));
}
/**
@ -196,7 +235,6 @@ public final class SnappyFramedOutputStream extends OutputStream implements
throw new IOException("Stream is closed");
}
flushBuffer();
out.flush();
}
@Override
@ -246,33 +284,28 @@ public final class SnappyFramedOutputStream extends OutputStream implements
final int length = buffer.remaining();
// crc is based on the user supplied input data
final int crc32c = calculateCRC32C(input, 0, length);
final int crc32c = maskedCrc32c(input, 0, length);
final int compressedLength = Snappy.compress(input, 0, length,
outputBuffer, 0);
directInputBuffer.clear();
directInputBuffer.put(buffer);
directInputBuffer.flip();
// only use the compressed data if copmression ratio is <= the
outputBuffer.clear();
Snappy.compress(directInputBuffer, outputBuffer);
final int compressedLength = outputBuffer.remaining();
// only use the compressed data if compression ratio is <= the
// minCompressonRatio
if (((double) compressedLength / (double) length) <= minCompressionRatio) {
writeBlock(out, outputBuffer, 0, compressedLength, true, crc32c);
writeBlock(out, outputBuffer, true, crc32c);
} else {
// otherwise use the uncomprssed data.
writeBlock(out, input, 0, length, false, crc32c);
// otherwise use the uncompressed data.
buffer.flip();
writeBlock(out, buffer, false, crc32c);
}
}
/**
* Calculates a masked CRC32C checksum over the data.
*
* @param data
* @param offset
* @param length
* @return The CRC32 checksum.
*/
private int calculateCRC32C(byte[] data, int offset, int length) {
return maskedCrc32c(data, offset, length);
}
/**
* Write a frame (block) to <i>out</i>.
*
@ -292,26 +325,30 @@ public final class SnappyFramedOutputStream extends OutputStream implements
* The calculated checksum.
* @throws IOException
*/
private void writeBlock(final OutputStream out, byte[] data, int offset,
int length, boolean compressed, int crc32c) throws IOException {
out.write(compressed ? COMPRESSED_DATA_FLAG : UNCOMPRESSED_DATA_FLAG);
private void writeBlock(final WritableByteChannel out, ByteBuffer data,
boolean compressed, int crc32c) throws IOException {
headerBuffer.clear();
headerBuffer.put((byte) (compressed ? COMPRESSED_DATA_FLAG
: UNCOMPRESSED_DATA_FLAG));
// the length written out to the header is both the checksum and the
// frame
final int headerLength = length + 4;
final int headerLength = data.remaining() + 4;
// write length
out.write(headerLength);
out.write(headerLength >>> 8);
out.write(headerLength >>> 16);
headerBuffer.put((byte) headerLength);
headerBuffer.put((byte) (headerLength >>> 8));
headerBuffer.put((byte) (headerLength >>> 16));
// write crc32c of user input data
out.write(crc32c);
out.write(crc32c >>> 8);
out.write(crc32c >>> 16);
out.write(crc32c >>> 24);
headerBuffer.putInt(crc32c);
// write data
out.write(data, offset, length);
headerBuffer.flip();
// write the header
out.write(headerBuffer);
// write the raw data
out.write(data);
}
}