diff --git a/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java b/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java index 0c02018..ad6494d 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyFramedInputStream.java @@ -4,16 +4,22 @@ package org.xerial.snappy; import static java.lang.Math.min; -import static org.xerial.snappy.SnappyFramed.*; -import static org.xerial.snappy.SnappyFramedOutputStream.*; +import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG; +import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; +import static org.xerial.snappy.SnappyFramed.STREAM_IDENTIFIER_FLAG; +import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; +import static org.xerial.snappy.SnappyFramed.readBytes; +import static org.xerial.snappy.SnappyFramedOutputStream.MAX_BLOCK_SIZE; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; import java.util.Arrays; /** @@ -236,6 +242,91 @@ public final class SnappyFramedInputStream extends InputStream implements return size; } + /** + * Transfers the entire content of this {@link InputStream} to os. + * This potentially limits the amount of buffering required to decompress + * content. + *
+ * Unlike {@link #read(byte[], int, int)}, this method does not need to be + * called multiple times. A single call will transfer all available content. + * Any calls after the source has been exhausted will result in a return + * value of {@code 0}. + *
+ * + * @param os + * The destination to write decompressed content to. + * @return The number of bytes transferred. + * @throws IOException + * @since 1.2 + */ + public long transferTo(OutputStream os) throws IOException { + if (os == null) { + throw new IllegalArgumentException("os is null"); + } + + if (closed) { + throw new ClosedChannelException(); + } + + long totTransfered = 0; + + while (ensureBuffer()) { + final int available = available(); + os.write(buffer, position, available); + position += available; + totTransfered += available; + } + + return totTransfered; + } + + /** + * Transfers the entire content of this {@link ReadableByteChannel} to + * wbc. This potentially limits the amount of buffering required to + * decompress content. + * + *+ * Unlike {@link #read(ByteBuffer)}, this method does not need to be called + * multiple times. A single call will transfer all available content. Any + * calls after the source has been exhausted will result in a return value + * of {@code 0}. + *
+ * + * @param wbc + * The destination to write decompressed content to. + * @return The number of bytes transferred. + * @throws IOException + * @since 1.2 + */ + public long transferTo(WritableByteChannel wbc) throws IOException { + if (wbc == null) { + throw new IllegalArgumentException("wbc is null"); + } + + if (closed) { + throw new ClosedChannelException(); + } + + final ByteBuffer bb = ByteBuffer.wrap(buffer); + + long totTransfered = 0; + + while (ensureBuffer()) { + bb.clear(); + bb.position(position); + bb.limit(position + available()); + + wbc.write(bb); + + final int written = bb.position() - position; + position += written; + + totTransfered += written; + } + + return totTransfered; + } + @Override public void close() throws IOException { try { diff --git a/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java b/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java index 65a2f1b..1f82c6a 100644 --- a/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyFramedOutputStream.java @@ -9,11 +9,13 @@ import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; import static org.xerial.snappy.SnappyFramed.maskedCrc32c; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; +import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; /** @@ -186,7 +188,24 @@ public final class SnappyFramedOutputStream extends OutputStream implements throw new IOException("Stream is closed"); } - write(ByteBuffer.wrap(input, offset, length)); + if (input == null) { + throw new NullPointerException(); + } else if ((offset < 0) || (offset > input.length) || (length < 0) + || ((offset + length) > input.length) + || ((offset + length) < 0)) { + throw new IndexOutOfBoundsException(); + } + + while (length > 0) { + if (buffer.remaining() <= 0) { + flushBuffer(); + } + + final int toPut = Math.min(length, buffer.remaining()); + buffer.put(input, offset, toPut); + offset += toPut; + length -= toPut; + } } /** @@ -229,6 +248,87 @@ public final class SnappyFramedOutputStream extends OutputStream implements return srcLength; } + /** + * Transfers all the content from is to this {@link OutputStream}. + * This potentially limits the amount of buffering required to compress + * content. + * + * @param is + * The source of data to compress. + * @return The number of bytes read from is. + * @throws IOException + * @since 1.2 + */ + public long transferFrom(InputStream is) throws IOException { + if (closed) { + throw new ClosedChannelException(); + } + + if (is == null) { + throw new NullPointerException(); + } + + if (buffer.remaining() == 0) { + flushBuffer(); + } + + assert buffer.hasArray(); + final byte[] bytes = buffer.array(); + + final int arrayOffset = buffer.arrayOffset(); + long totTransfered = 0; + int read; + while ((read = is.read(bytes, arrayOffset + buffer.position(), + buffer.remaining())) != -1) { + buffer.position(buffer.position() + read); + + if (buffer.remaining() == 0) { + flushBuffer(); + } + + totTransfered += read; + } + + return totTransfered; + } + + /** + * Transfers all the content from rbc to this + * {@link WritableByteChannel}. This potentially limits the amount of + * buffering required to compress content. + * + * @param rbc + * The source of data to compress. + * @return The number of bytes read from rbc. + * @throws IOException + * @since 1.2 + */ + public long transferFrom(ReadableByteChannel rbc) throws IOException { + if (closed) { + throw new ClosedChannelException(); + } + + if (rbc == null) { + throw new NullPointerException(); + } + + if (buffer.remaining() == 0) { + flushBuffer(); + } + + long totTransfered = 0; + int read; + while ((read = rbc.read(buffer)) != -1) { + if (buffer.remaining() == 0) { + flushBuffer(); + } + + totTransfered += read; + } + + return totTransfered; + } + @Override public final void flush() throws IOException { if (closed) { diff --git a/src/test/java/org/xerial/snappy/SnappyFramedStreamTest.java b/src/test/java/org/xerial/snappy/SnappyFramedStreamTest.java index decb43e..f2801f7 100644 --- a/src/test/java/org/xerial/snappy/SnappyFramedStreamTest.java +++ b/src/test/java/org/xerial/snappy/SnappyFramedStreamTest.java @@ -3,8 +3,13 @@ */ package org.xerial.snappy; -import static org.xerial.snappy.SnappyFramed.*; -import static org.junit.Assert.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG; +import static org.xerial.snappy.SnappyFramed.HEADER_BYTES; +import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG; +import static org.xerial.snappy.SnappyFramed.maskedCrc32c; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -12,6 +17,8 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.util.Arrays; import org.junit.Test; @@ -159,6 +166,72 @@ public class SnappyFramedStreamTest { new byte[] { 'a' }); } + @Test + public void testTransferFrom_InputStream() throws IOException { + final byte[] random = getRandom(0.5, 100000); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream( + random.length); + final SnappyFramedOutputStream sfos = new SnappyFramedOutputStream(baos); + + sfos.transferFrom(new ByteArrayInputStream(random)); + + sfos.close(); + + final byte[] uncompressed = uncompress(baos.toByteArray()); + + assertArrayEquals(random, uncompressed); + } + + @Test + public void testTransferFrom_ReadableByteChannel() throws IOException { + final byte[] random = getRandom(0.5, 100000); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream( + random.length); + final SnappyFramedOutputStream sfos = new SnappyFramedOutputStream(baos); + + sfos.transferFrom(Channels.newChannel(new ByteArrayInputStream(random))); + + sfos.close(); + + final byte[] uncompressed = uncompress(baos.toByteArray()); + + assertArrayEquals(random, uncompressed); + } + + @Test + public void testTransferTo_OutputStream() throws IOException { + final byte[] random = getRandom(0.5, 100000); + + final byte[] compressed = compress(random); + final SnappyFramedInputStream sfis = new SnappyFramedInputStream( + new ByteArrayInputStream(compressed)); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream( + random.length); + sfis.transferTo(baos); + + assertArrayEquals(random, baos.toByteArray()); + } + + @Test + public void testTransferTo_WritableByteChannel() throws IOException { + final byte[] random = getRandom(0.5, 100000); + + final byte[] compressed = compress(random); + final SnappyFramedInputStream sfis = new SnappyFramedInputStream( + new ByteArrayInputStream(compressed)); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream( + random.length); + final WritableByteChannel wbc = Channels.newChannel(baos); + sfis.transferTo(wbc); + wbc.close(); + + assertArrayEquals(random, baos.toByteArray()); + } + @Test public void testLargerFrames_raw_() throws IOException { final byte[] random = getRandom(0.5, 100000);