diff --git a/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java b/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java index 221e242..65a2f1b 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java @@ -11,6 +11,8 @@ import static org.xerial.snappy.SnappyFramed.maskedCrc32c; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; import java.nio.channels.WritableByteChannel; @@ -49,12 +51,14 @@ public final class SnappyFramedOutputStream extends OutputStream implements */ public static final double DEFAULT_MIN_COMPRESSION_RATIO = 0.85d; - // private final int blockSize; + private final ByteBuffer headerBuffer = ByteBuffer.allocate(8).order( + ByteOrder.LITTLE_ENDIAN); private final ByteBuffer buffer; - private final byte[] outputBuffer; + private final ByteBuffer directInputBuffer; + private final ByteBuffer outputBuffer; private final double minCompressionRatio; - private final OutputStream out; + private final WritableByteChannel out; // private int position; private boolean closed; @@ -88,6 +92,39 @@ public final class SnappyFramedOutputStream extends OutputStream implements */ public SnappyFramedOutputStream(OutputStream out, int blockSize, double minCompressionRatio) throws IOException { + this(Channels.newChannel(out), blockSize, minCompressionRatio); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} using the + * {@link #DEFAULT_BLOCK_SIZE} and {@link #DEFAULT_MIN_COMPRESSION_RATIO}. + * + * @param out + * The underlying {@link WritableByteChannel} to write to. Must + * not be {@code null}. + * @throws IOException + */ + public SnappyFramedOutputStream(WritableByteChannel out) throws IOException { + this(out, DEFAULT_BLOCK_SIZE, DEFAULT_MIN_COMPRESSION_RATIO); + } + + /** + * Creates a new {@link SnappyFramedOutputStream} instance. + * + * @param out + * The underlying {@link WritableByteChannel} to write to. Must + * not be {@code null}. + * @param blockSize + * The block size (of raw data) to compress before writing frames + * to out. Must be in (0, 65536]. + * @param minCompressionRatio + * Defines the minimum compression ratio ( + * {@code compressedLength / rawLength}) that must be achieved to + * write the compressed data. This must be in (0, 1.0]. + * @throws IOException + */ + public SnappyFramedOutputStream(WritableByteChannel out, int blockSize, + double minCompressionRatio) throws IOException { if (out == null) { throw new NullPointerException(); } @@ -105,7 +142,9 @@ public final class SnappyFramedOutputStream extends OutputStream implements this.out = out; this.minCompressionRatio = minCompressionRatio; buffer = ByteBuffer.allocate(blockSize); - outputBuffer = new byte[Snappy.maxCompressedLength(blockSize)]; + directInputBuffer = ByteBuffer.allocateDirect(blockSize); + outputBuffer = ByteBuffer.allocateDirect(Snappy + .maxCompressedLength(blockSize)); writeHeader(out); } @@ -118,8 +157,8 @@ public final class SnappyFramedOutputStream extends OutputStream implements * The underlying {@link OutputStream}. * @throws IOException */ - private void writeHeader(OutputStream out) throws IOException { - out.write(HEADER_BYTES); + private void writeHeader(WritableByteChannel out) throws IOException { + out.write(ByteBuffer.wrap(HEADER_BYTES)); } /** @@ -196,7 +235,6 @@ public final class SnappyFramedOutputStream extends OutputStream implements throw new IOException("Stream is closed"); } flushBuffer(); - out.flush(); } @Override @@ -246,33 +284,28 @@ public final class SnappyFramedOutputStream extends OutputStream implements final int length = buffer.remaining(); // crc is based on the user supplied input data - final int crc32c = calculateCRC32C(input, 0, length); + final int crc32c = maskedCrc32c(input, 0, length); - final int compressedLength = Snappy.compress(input, 0, length, - outputBuffer, 0); + directInputBuffer.clear(); + directInputBuffer.put(buffer); + directInputBuffer.flip(); - // only use the compressed data if copmression ratio is <= the + outputBuffer.clear(); + Snappy.compress(directInputBuffer, outputBuffer); + + final int compressedLength = outputBuffer.remaining(); + + // only use the compressed data if compression ratio is <= the // minCompressonRatio if (((double) compressedLength / (double) length) <= minCompressionRatio) { - writeBlock(out, outputBuffer, 0, compressedLength, true, crc32c); + writeBlock(out, outputBuffer, true, crc32c); } else { - // otherwise use the uncomprssed data. - writeBlock(out, input, 0, length, false, crc32c); + // otherwise use the uncompressed data. + buffer.flip(); + writeBlock(out, buffer, false, crc32c); } } - /** - * Calculates a masked CRC32C checksum over the data. - * - * @param data - * @param offset - * @param length - * @return The CRC32 checksum. - */ - private int calculateCRC32C(byte[] data, int offset, int length) { - return maskedCrc32c(data, offset, length); - } - /** * Write a frame (block) to out. * @@ -292,26 +325,30 @@ public final class SnappyFramedOutputStream extends OutputStream implements * The calculated checksum. * @throws IOException */ - private void writeBlock(final OutputStream out, byte[] data, int offset, - int length, boolean compressed, int crc32c) throws IOException { - out.write(compressed ? COMPRESSED_DATA_FLAG : UNCOMPRESSED_DATA_FLAG); + private void writeBlock(final WritableByteChannel out, ByteBuffer data, + boolean compressed, int crc32c) throws IOException { + + headerBuffer.clear(); + headerBuffer.put((byte) (compressed ? COMPRESSED_DATA_FLAG + : UNCOMPRESSED_DATA_FLAG)); // the length written out to the header is both the checksum and the // frame - final int headerLength = length + 4; + final int headerLength = data.remaining() + 4; // write length - out.write(headerLength); - out.write(headerLength >>> 8); - out.write(headerLength >>> 16); + headerBuffer.put((byte) headerLength); + headerBuffer.put((byte) (headerLength >>> 8)); + headerBuffer.put((byte) (headerLength >>> 16)); // write crc32c of user input data - out.write(crc32c); - out.write(crc32c >>> 8); - out.write(crc32c >>> 16); - out.write(crc32c >>> 24); + headerBuffer.putInt(crc32c); - // write data - out.write(data, offset, length); + headerBuffer.flip(); + + // write the header + out.write(headerBuffer); + // write the raw data + out.write(data); } }