Add transferFrom/transferTo methods to provide transfer optimizations

https://github.com/xerial/snappy-java/issues/57
This commit is contained in:
bokken 2013-11-11 10:25:58 -06:00
parent 682a839317
commit bb10b5064e
3 changed files with 269 additions and 5 deletions

View File

@ -4,16 +4,22 @@
package org.xerial.snappy;
import static java.lang.Math.min;
import static org.xerial.snappy.SnappyFramed.*;
import static org.xerial.snappy.SnappyFramedOutputStream.*;
import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
import static org.xerial.snappy.SnappyFramed.STREAM_IDENTIFIER_FLAG;
import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.readBytes;
import static org.xerial.snappy.SnappyFramedOutputStream.MAX_BLOCK_SIZE;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
/**
@ -236,6 +242,91 @@ public final class SnappyFramedInputStream extends InputStream implements
return size;
}
/**
* Transfers the entire content of this {@link InputStream} to <i>os</i>.
* This potentially limits the amount of buffering required to decompress
* content.
* <p>
* Unlike {@link #read(byte[], int, int)}, this method does not need to be
* called multiple times. A single call will transfer all available content.
* Any calls after the source has been exhausted will result in a return
* value of {@code 0}.
* </p>
*
* @param os
* The destination to write decompressed content to.
* @return The number of bytes transferred.
* @throws IOException
* @since 1.2
*/
public long transferTo(OutputStream os) throws IOException {
if (os == null) {
throw new IllegalArgumentException("os is null");
}
if (closed) {
throw new ClosedChannelException();
}
long totTransfered = 0;
while (ensureBuffer()) {
final int available = available();
os.write(buffer, position, available);
position += available;
totTransfered += available;
}
return totTransfered;
}
/**
* Transfers the entire content of this {@link ReadableByteChannel} to
* <i>wbc</i>. This potentially limits the amount of buffering required to
* decompress content.
*
* <p>
* Unlike {@link #read(ByteBuffer)}, this method does not need to be called
* multiple times. A single call will transfer all available content. Any
* calls after the source has been exhausted will result in a return value
* of {@code 0}.
* </p>
*
* @param wbc
* The destination to write decompressed content to.
* @return The number of bytes transferred.
* @throws IOException
* @since 1.2
*/
public long transferTo(WritableByteChannel wbc) throws IOException {
if (wbc == null) {
throw new IllegalArgumentException("wbc is null");
}
if (closed) {
throw new ClosedChannelException();
}
final ByteBuffer bb = ByteBuffer.wrap(buffer);
long totTransfered = 0;
while (ensureBuffer()) {
bb.clear();
bb.position(position);
bb.limit(position + available());
wbc.write(bb);
final int written = bb.position() - position;
position += written;
totTransfered += written;
}
return totTransfered;
}
@Override
public void close() throws IOException {
try {

View File

@ -9,11 +9,13 @@ import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.maskedCrc32c;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
/**
@ -186,7 +188,24 @@ public final class SnappyFramedOutputStream extends OutputStream implements
throw new IOException("Stream is closed");
}
write(ByteBuffer.wrap(input, offset, length));
if (input == null) {
throw new NullPointerException();
} else if ((offset < 0) || (offset > input.length) || (length < 0)
|| ((offset + length) > input.length)
|| ((offset + length) < 0)) {
throw new IndexOutOfBoundsException();
}
while (length > 0) {
if (buffer.remaining() <= 0) {
flushBuffer();
}
final int toPut = Math.min(length, buffer.remaining());
buffer.put(input, offset, toPut);
offset += toPut;
length -= toPut;
}
}
/**
@ -229,6 +248,87 @@ public final class SnappyFramedOutputStream extends OutputStream implements
return srcLength;
}
/**
* Transfers all the content from <i>is</i> to this {@link OutputStream}.
* This potentially limits the amount of buffering required to compress
* content.
*
* @param is
* The source of data to compress.
* @return The number of bytes read from <i>is</i>.
* @throws IOException
* @since 1.2
*/
public long transferFrom(InputStream is) throws IOException {
if (closed) {
throw new ClosedChannelException();
}
if (is == null) {
throw new NullPointerException();
}
if (buffer.remaining() == 0) {
flushBuffer();
}
assert buffer.hasArray();
final byte[] bytes = buffer.array();
final int arrayOffset = buffer.arrayOffset();
long totTransfered = 0;
int read;
while ((read = is.read(bytes, arrayOffset + buffer.position(),
buffer.remaining())) != -1) {
buffer.position(buffer.position() + read);
if (buffer.remaining() == 0) {
flushBuffer();
}
totTransfered += read;
}
return totTransfered;
}
/**
* Transfers all the content from <i>rbc</i> to this
* {@link WritableByteChannel}. This potentially limits the amount of
* buffering required to compress content.
*
* @param rbc
* The source of data to compress.
* @return The number of bytes read from <i>rbc</i>.
* @throws IOException
* @since 1.2
*/
public long transferFrom(ReadableByteChannel rbc) throws IOException {
if (closed) {
throw new ClosedChannelException();
}
if (rbc == null) {
throw new NullPointerException();
}
if (buffer.remaining() == 0) {
flushBuffer();
}
long totTransfered = 0;
int read;
while ((read = rbc.read(buffer)) != -1) {
if (buffer.remaining() == 0) {
flushBuffer();
}
totTransfered += read;
}
return totTransfered;
}
@Override
public final void flush() throws IOException {
if (closed) {

View File

@ -3,8 +3,13 @@
*/
package org.xerial.snappy;
import static org.xerial.snappy.SnappyFramed.*;
import static org.junit.Assert.*;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
import static org.xerial.snappy.SnappyFramed.maskedCrc32c;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -12,6 +17,8 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.Arrays;
import org.junit.Test;
@ -159,6 +166,72 @@ public class SnappyFramedStreamTest {
new byte[] { 'a' });
}
@Test
public void testTransferFrom_InputStream() throws IOException {
final byte[] random = getRandom(0.5, 100000);
final ByteArrayOutputStream baos = new ByteArrayOutputStream(
random.length);
final SnappyFramedOutputStream sfos = new SnappyFramedOutputStream(baos);
sfos.transferFrom(new ByteArrayInputStream(random));
sfos.close();
final byte[] uncompressed = uncompress(baos.toByteArray());
assertArrayEquals(random, uncompressed);
}
@Test
public void testTransferFrom_ReadableByteChannel() throws IOException {
final byte[] random = getRandom(0.5, 100000);
final ByteArrayOutputStream baos = new ByteArrayOutputStream(
random.length);
final SnappyFramedOutputStream sfos = new SnappyFramedOutputStream(baos);
sfos.transferFrom(Channels.newChannel(new ByteArrayInputStream(random)));
sfos.close();
final byte[] uncompressed = uncompress(baos.toByteArray());
assertArrayEquals(random, uncompressed);
}
@Test
public void testTransferTo_OutputStream() throws IOException {
final byte[] random = getRandom(0.5, 100000);
final byte[] compressed = compress(random);
final SnappyFramedInputStream sfis = new SnappyFramedInputStream(
new ByteArrayInputStream(compressed));
final ByteArrayOutputStream baos = new ByteArrayOutputStream(
random.length);
sfis.transferTo(baos);
assertArrayEquals(random, baos.toByteArray());
}
@Test
public void testTransferTo_WritableByteChannel() throws IOException {
final byte[] random = getRandom(0.5, 100000);
final byte[] compressed = compress(random);
final SnappyFramedInputStream sfis = new SnappyFramedInputStream(
new ByteArrayInputStream(compressed));
final ByteArrayOutputStream baos = new ByteArrayOutputStream(
random.length);
final WritableByteChannel wbc = Channels.newChannel(baos);
sfis.transferTo(wbc);
wbc.close();
assertArrayEquals(random, baos.toByteArray());
}
@Test
public void testLargerFrames_raw_() throws IOException {
final byte[] random = getRandom(0.5, 100000);