mirror of
https://github.com/xerial/snappy-java.git
synced 2025-07-21 21:14:31 +02:00
Optimize SnappyFramedOutputStream for NIO.
https://github.com/xerial/snappy-java/issues/49
This commit is contained in:
parent
bbb30d35b8
commit
682a839317
@ -11,6 +11,8 @@ import static org.xerial.snappy.SnappyFramed.maskedCrc32c;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.ByteOrder;
|
||||||
|
import java.nio.channels.Channels;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.nio.channels.WritableByteChannel;
|
import java.nio.channels.WritableByteChannel;
|
||||||
|
|
||||||
@ -49,12 +51,14 @@ public final class SnappyFramedOutputStream extends OutputStream implements
|
|||||||
*/
|
*/
|
||||||
public static final double DEFAULT_MIN_COMPRESSION_RATIO = 0.85d;
|
public static final double DEFAULT_MIN_COMPRESSION_RATIO = 0.85d;
|
||||||
|
|
||||||
// private final int blockSize;
|
private final ByteBuffer headerBuffer = ByteBuffer.allocate(8).order(
|
||||||
|
ByteOrder.LITTLE_ENDIAN);
|
||||||
private final ByteBuffer buffer;
|
private final ByteBuffer buffer;
|
||||||
private final byte[] outputBuffer;
|
private final ByteBuffer directInputBuffer;
|
||||||
|
private final ByteBuffer outputBuffer;
|
||||||
private final double minCompressionRatio;
|
private final double minCompressionRatio;
|
||||||
|
|
||||||
private final OutputStream out;
|
private final WritableByteChannel out;
|
||||||
|
|
||||||
// private int position;
|
// private int position;
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
@ -88,6 +92,39 @@ public final class SnappyFramedOutputStream extends OutputStream implements
|
|||||||
*/
|
*/
|
||||||
public SnappyFramedOutputStream(OutputStream out, int blockSize,
|
public SnappyFramedOutputStream(OutputStream out, int blockSize,
|
||||||
double minCompressionRatio) throws IOException {
|
double minCompressionRatio) throws IOException {
|
||||||
|
this(Channels.newChannel(out), blockSize, minCompressionRatio);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new {@link SnappyFramedOutputStream} using the
|
||||||
|
* {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}.
|
||||||
|
*
|
||||||
|
* @param out
|
||||||
|
* The underlying {@link WritableByteChannel} to write to. Must
|
||||||
|
* not be {@code null}.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public SnappyFramedOutputStream(WritableByteChannel out) throws IOException {
|
||||||
|
this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new {@link SnappyFramedOutputStream} instance.
|
||||||
|
*
|
||||||
|
* @param out
|
||||||
|
* The underlying {@link WritableByteChannel} to write to. Must
|
||||||
|
* not be {@code null}.
|
||||||
|
* @param blockSize
|
||||||
|
* The block size (of raw data) to compress before writing frames
|
||||||
|
* to <i>out</i>. Must be in (0, 65536].
|
||||||
|
* @param minCompressionRatio
|
||||||
|
* Defines the minimum compression ratio (
|
||||||
|
* {@code compressedLength / rawLength}) that must be achieved to
|
||||||
|
* write the compressed data. This must be in (0, 1.0].
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public SnappyFramedOutputStream(WritableByteChannel out, int blockSize,
|
||||||
|
double minCompressionRatio) throws IOException {
|
||||||
if (out == null) {
|
if (out == null) {
|
||||||
throw new NullPointerException();
|
throw new NullPointerException();
|
||||||
}
|
}
|
||||||
@ -105,7 +142,9 @@ public final class SnappyFramedOutputStream extends OutputStream implements
|
|||||||
this.out = out;
|
this.out = out;
|
||||||
this.minCompressionRatio = minCompressionRatio;
|
this.minCompressionRatio = minCompressionRatio;
|
||||||
buffer = ByteBuffer.allocate(blockSize);
|
buffer = ByteBuffer.allocate(blockSize);
|
||||||
outputBuffer = new byte[Snappy.maxCompressedLength(blockSize)];
|
directInputBuffer = ByteBuffer.allocateDirect(blockSize);
|
||||||
|
outputBuffer = ByteBuffer.allocateDirect(Snappy
|
||||||
|
.maxCompressedLength(blockSize));
|
||||||
|
|
||||||
writeHeader(out);
|
writeHeader(out);
|
||||||
}
|
}
|
||||||
@ -118,8 +157,8 @@ public final class SnappyFramedOutputStream extends OutputStream implements
|
|||||||
* The underlying {@link OutputStream}.
|
* The underlying {@link OutputStream}.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void writeHeader(OutputStream out) throws IOException {
|
private void writeHeader(WritableByteChannel out) throws IOException {
|
||||||
out.write(HEADER_BYTES);
|
out.write(ByteBuffer.wrap(HEADER_BYTES));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -196,7 +235,6 @@ public final class SnappyFramedOutputStream extends OutputStream implements
|
|||||||
throw new IOException("Stream is closed");
|
throw new IOException("Stream is closed");
|
||||||
}
|
}
|
||||||
flushBuffer();
|
flushBuffer();
|
||||||
out.flush();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -246,33 +284,28 @@ public final class SnappyFramedOutputStream extends OutputStream implements
|
|||||||
final int length = buffer.remaining();
|
final int length = buffer.remaining();
|
||||||
|
|
||||||
// crc is based on the user supplied input data
|
// crc is based on the user supplied input data
|
||||||
final int crc32c = calculateCRC32C(input, 0, length);
|
final int crc32c = maskedCrc32c(input, 0, length);
|
||||||
|
|
||||||
final int compressedLength = Snappy.compress(input, 0, length,
|
directInputBuffer.clear();
|
||||||
outputBuffer, 0);
|
directInputBuffer.put(buffer);
|
||||||
|
directInputBuffer.flip();
|
||||||
|
|
||||||
// only use the compressed data if copmression ratio is <= the
|
outputBuffer.clear();
|
||||||
|
Snappy.compress(directInputBuffer, outputBuffer);
|
||||||
|
|
||||||
|
final int compressedLength = outputBuffer.remaining();
|
||||||
|
|
||||||
|
// only use the compressed data if compression ratio is <= the
|
||||||
// minCompressonRatio
|
// minCompressonRatio
|
||||||
if (((double) compressedLength / (double) length) <= minCompressionRatio) {
|
if (((double) compressedLength / (double) length) <= minCompressionRatio) {
|
||||||
writeBlock(out, outputBuffer, 0, compressedLength, true, crc32c);
|
writeBlock(out, outputBuffer, true, crc32c);
|
||||||
} else {
|
} else {
|
||||||
// otherwise use the uncomprssed data.
|
// otherwise use the uncompressed data.
|
||||||
writeBlock(out, input, 0, length, false, crc32c);
|
buffer.flip();
|
||||||
|
writeBlock(out, buffer, false, crc32c);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Calculates a masked CRC32C checksum over the data.
|
|
||||||
*
|
|
||||||
* @param data
|
|
||||||
* @param offset
|
|
||||||
* @param length
|
|
||||||
* @return The CRC32 checksum.
|
|
||||||
*/
|
|
||||||
private int calculateCRC32C(byte[] data, int offset, int length) {
|
|
||||||
return maskedCrc32c(data, offset, length);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write a frame (block) to <i>out</i>.
|
* Write a frame (block) to <i>out</i>.
|
||||||
*
|
*
|
||||||
@ -292,26 +325,30 @@ public final class SnappyFramedOutputStream extends OutputStream implements
|
|||||||
* The calculated checksum.
|
* The calculated checksum.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void writeBlock(final OutputStream out, byte[] data, int offset,
|
private void writeBlock(final WritableByteChannel out, ByteBuffer data,
|
||||||
int length, boolean compressed, int crc32c) throws IOException {
|
boolean compressed, int crc32c) throws IOException {
|
||||||
out.write(compressed ? COMPRESSED_DATA_FLAG : UNCOMPRESSED_DATA_FLAG);
|
|
||||||
|
headerBuffer.clear();
|
||||||
|
headerBuffer.put((byte) (compressed ? COMPRESSED_DATA_FLAG
|
||||||
|
: UNCOMPRESSED_DATA_FLAG));
|
||||||
|
|
||||||
// the length written out to the header is both the checksum and the
|
// the length written out to the header is both the checksum and the
|
||||||
// frame
|
// frame
|
||||||
final int headerLength = length + 4;
|
final int headerLength = data.remaining() + 4;
|
||||||
|
|
||||||
// write length
|
// write length
|
||||||
out.write(headerLength);
|
headerBuffer.put((byte) headerLength);
|
||||||
out.write(headerLength >>> 8);
|
headerBuffer.put((byte) (headerLength >>> 8));
|
||||||
out.write(headerLength >>> 16);
|
headerBuffer.put((byte) (headerLength >>> 16));
|
||||||
|
|
||||||
// write crc32c of user input data
|
// write crc32c of user input data
|
||||||
out.write(crc32c);
|
headerBuffer.putInt(crc32c);
|
||||||
out.write(crc32c >>> 8);
|
|
||||||
out.write(crc32c >>> 16);
|
|
||||||
out.write(crc32c >>> 24);
|
|
||||||
|
|
||||||
// write data
|
headerBuffer.flip();
|
||||||
out.write(data, offset, length);
|
|
||||||
|
// write the header
|
||||||
|
out.write(headerBuffer);
|
||||||
|
// write the raw data
|
||||||
|
out.write(data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user