mirror of
https://github.com/xerial/snappy-java.git
synced 2025-04-08 19:35:08 +02:00
Merge pull request #58 from bokken/develop
Add transferFrom/transferTo methods to provide transfer optimizations
This commit is contained in:
commit
d6457b2912
@ -4,16 +4,22 @@
|
|||||||
package org.xerial.snappy;
|
package org.xerial.snappy;
|
||||||
|
|
||||||
import static java.lang.Math.min;
|
import static java.lang.Math.min;
|
||||||
import static org.xerial.snappy.SnappyFramed.*;
|
import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG;
|
||||||
import static org.xerial.snappy.SnappyFramedOutputStream.*;
|
import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
|
||||||
|
import static org.xerial.snappy.SnappyFramed.STREAM_IDENTIFIER_FLAG;
|
||||||
|
import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
|
||||||
|
import static org.xerial.snappy.SnappyFramed.readBytes;
|
||||||
|
import static org.xerial.snappy.SnappyFramedOutputStream.MAX_BLOCK_SIZE;
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.Channels;
|
import java.nio.channels.Channels;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.nio.channels.ReadableByteChannel;
|
import java.nio.channels.ReadableByteChannel;
|
||||||
|
import java.nio.channels.WritableByteChannel;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -236,6 +242,91 @@ public final class SnappyFramedInputStream extends InputStream implements
|
|||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transfers the entire content of this {@link InputStream} to <i>os</i>.
|
||||||
|
* This potentially limits the amount of buffering required to decompress
|
||||||
|
* content.
|
||||||
|
* <p>
|
||||||
|
* Unlike {@link #read(byte[], int, int)}, this method does not need to be
|
||||||
|
* called multiple times. A single call will transfer all available content.
|
||||||
|
* Any calls after the source has been exhausted will result in a return
|
||||||
|
* value of {@code 0}.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param os
|
||||||
|
* The destination to write decompressed content to.
|
||||||
|
* @return The number of bytes transferred.
|
||||||
|
* @throws IOException
|
||||||
|
* @since 1.2
|
||||||
|
*/
|
||||||
|
public long transferTo(OutputStream os) throws IOException {
|
||||||
|
if (os == null) {
|
||||||
|
throw new IllegalArgumentException("os is null");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (closed) {
|
||||||
|
throw new ClosedChannelException();
|
||||||
|
}
|
||||||
|
|
||||||
|
long totTransfered = 0;
|
||||||
|
|
||||||
|
while (ensureBuffer()) {
|
||||||
|
final int available = available();
|
||||||
|
os.write(buffer, position, available);
|
||||||
|
position += available;
|
||||||
|
totTransfered += available;
|
||||||
|
}
|
||||||
|
|
||||||
|
return totTransfered;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transfers the entire content of this {@link ReadableByteChannel} to
|
||||||
|
* <i>wbc</i>. This potentially limits the amount of buffering required to
|
||||||
|
* decompress content.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* Unlike {@link #read(ByteBuffer)}, this method does not need to be called
|
||||||
|
* multiple times. A single call will transfer all available content. Any
|
||||||
|
* calls after the source has been exhausted will result in a return value
|
||||||
|
* of {@code 0}.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param wbc
|
||||||
|
* The destination to write decompressed content to.
|
||||||
|
* @return The number of bytes transferred.
|
||||||
|
* @throws IOException
|
||||||
|
* @since 1.2
|
||||||
|
*/
|
||||||
|
public long transferTo(WritableByteChannel wbc) throws IOException {
|
||||||
|
if (wbc == null) {
|
||||||
|
throw new IllegalArgumentException("wbc is null");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (closed) {
|
||||||
|
throw new ClosedChannelException();
|
||||||
|
}
|
||||||
|
|
||||||
|
final ByteBuffer bb = ByteBuffer.wrap(buffer);
|
||||||
|
|
||||||
|
long totTransfered = 0;
|
||||||
|
|
||||||
|
while (ensureBuffer()) {
|
||||||
|
bb.clear();
|
||||||
|
bb.position(position);
|
||||||
|
bb.limit(position + available());
|
||||||
|
|
||||||
|
wbc.write(bb);
|
||||||
|
|
||||||
|
final int written = bb.position() - position;
|
||||||
|
position += written;
|
||||||
|
|
||||||
|
totTransfered += written;
|
||||||
|
}
|
||||||
|
|
||||||
|
return totTransfered;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
try {
|
try {
|
||||||
|
@ -9,11 +9,13 @@ import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
|
|||||||
import static org.xerial.snappy.SnappyFramed.maskedCrc32c;
|
import static org.xerial.snappy.SnappyFramed.maskedCrc32c;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.ByteOrder;
|
import java.nio.ByteOrder;
|
||||||
import java.nio.channels.Channels;
|
import java.nio.channels.Channels;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
|
import java.nio.channels.ReadableByteChannel;
|
||||||
import java.nio.channels.WritableByteChannel;
|
import java.nio.channels.WritableByteChannel;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -186,7 +188,24 @@ public final class SnappyFramedOutputStream extends OutputStream implements
|
|||||||
throw new IOException("Stream is closed");
|
throw new IOException("Stream is closed");
|
||||||
}
|
}
|
||||||
|
|
||||||
write(ByteBuffer.wrap(input, offset, length));
|
if (input == null) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
} else if ((offset < 0) || (offset > input.length) || (length < 0)
|
||||||
|
|| ((offset + length) > input.length)
|
||||||
|
|| ((offset + length) < 0)) {
|
||||||
|
throw new IndexOutOfBoundsException();
|
||||||
|
}
|
||||||
|
|
||||||
|
while (length > 0) {
|
||||||
|
if (buffer.remaining() <= 0) {
|
||||||
|
flushBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
final int toPut = Math.min(length, buffer.remaining());
|
||||||
|
buffer.put(input, offset, toPut);
|
||||||
|
offset += toPut;
|
||||||
|
length -= toPut;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -229,6 +248,87 @@ public final class SnappyFramedOutputStream extends OutputStream implements
|
|||||||
return srcLength;
|
return srcLength;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transfers all the content from <i>is</i> to this {@link OutputStream}.
|
||||||
|
* This potentially limits the amount of buffering required to compress
|
||||||
|
* content.
|
||||||
|
*
|
||||||
|
* @param is
|
||||||
|
* The source of data to compress.
|
||||||
|
* @return The number of bytes read from <i>is</i>.
|
||||||
|
* @throws IOException
|
||||||
|
* @since 1.2
|
||||||
|
*/
|
||||||
|
public long transferFrom(InputStream is) throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
throw new ClosedChannelException();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (is == null) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (buffer.remaining() == 0) {
|
||||||
|
flushBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
assert buffer.hasArray();
|
||||||
|
final byte[] bytes = buffer.array();
|
||||||
|
|
||||||
|
final int arrayOffset = buffer.arrayOffset();
|
||||||
|
long totTransfered = 0;
|
||||||
|
int read;
|
||||||
|
while ((read = is.read(bytes, arrayOffset + buffer.position(),
|
||||||
|
buffer.remaining())) != -1) {
|
||||||
|
buffer.position(buffer.position() + read);
|
||||||
|
|
||||||
|
if (buffer.remaining() == 0) {
|
||||||
|
flushBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
totTransfered += read;
|
||||||
|
}
|
||||||
|
|
||||||
|
return totTransfered;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transfers all the content from <i>rbc</i> to this
|
||||||
|
* {@link WritableByteChannel}. This potentially limits the amount of
|
||||||
|
* buffering required to compress content.
|
||||||
|
*
|
||||||
|
* @param rbc
|
||||||
|
* The source of data to compress.
|
||||||
|
* @return The number of bytes read from <i>rbc</i>.
|
||||||
|
* @throws IOException
|
||||||
|
* @since 1.2
|
||||||
|
*/
|
||||||
|
public long transferFrom(ReadableByteChannel rbc) throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
throw new ClosedChannelException();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rbc == null) {
|
||||||
|
throw new NullPointerException();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (buffer.remaining() == 0) {
|
||||||
|
flushBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
long totTransfered = 0;
|
||||||
|
int read;
|
||||||
|
while ((read = rbc.read(buffer)) != -1) {
|
||||||
|
if (buffer.remaining() == 0) {
|
||||||
|
flushBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
totTransfered += read;
|
||||||
|
}
|
||||||
|
|
||||||
|
return totTransfered;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void flush() throws IOException {
|
public final void flush() throws IOException {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
|
@ -3,8 +3,13 @@
|
|||||||
*/
|
*/
|
||||||
package org.xerial.snappy;
|
package org.xerial.snappy;
|
||||||
|
|
||||||
import static org.xerial.snappy.SnappyFramed.*;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.xerial.snappy.SnappyFramed.COMPRESSED_DATA_FLAG;
|
||||||
|
import static org.xerial.snappy.SnappyFramed.HEADER_BYTES;
|
||||||
|
import static org.xerial.snappy.SnappyFramed.UNCOMPRESSED_DATA_FLAG;
|
||||||
|
import static org.xerial.snappy.SnappyFramed.maskedCrc32c;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
@ -12,6 +17,8 @@ import java.io.EOFException;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
import java.nio.channels.Channels;
|
||||||
|
import java.nio.channels.WritableByteChannel;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -159,6 +166,72 @@ public class SnappyFramedStreamTest {
|
|||||||
new byte[] { 'a' });
|
new byte[] { 'a' });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTransferFrom_InputStream() throws IOException {
|
||||||
|
final byte[] random = getRandom(0.5, 100000);
|
||||||
|
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream(
|
||||||
|
random.length);
|
||||||
|
final SnappyFramedOutputStream sfos = new SnappyFramedOutputStream(baos);
|
||||||
|
|
||||||
|
sfos.transferFrom(new ByteArrayInputStream(random));
|
||||||
|
|
||||||
|
sfos.close();
|
||||||
|
|
||||||
|
final byte[] uncompressed = uncompress(baos.toByteArray());
|
||||||
|
|
||||||
|
assertArrayEquals(random, uncompressed);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTransferFrom_ReadableByteChannel() throws IOException {
|
||||||
|
final byte[] random = getRandom(0.5, 100000);
|
||||||
|
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream(
|
||||||
|
random.length);
|
||||||
|
final SnappyFramedOutputStream sfos = new SnappyFramedOutputStream(baos);
|
||||||
|
|
||||||
|
sfos.transferFrom(Channels.newChannel(new ByteArrayInputStream(random)));
|
||||||
|
|
||||||
|
sfos.close();
|
||||||
|
|
||||||
|
final byte[] uncompressed = uncompress(baos.toByteArray());
|
||||||
|
|
||||||
|
assertArrayEquals(random, uncompressed);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTransferTo_OutputStream() throws IOException {
|
||||||
|
final byte[] random = getRandom(0.5, 100000);
|
||||||
|
|
||||||
|
final byte[] compressed = compress(random);
|
||||||
|
final SnappyFramedInputStream sfis = new SnappyFramedInputStream(
|
||||||
|
new ByteArrayInputStream(compressed));
|
||||||
|
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream(
|
||||||
|
random.length);
|
||||||
|
sfis.transferTo(baos);
|
||||||
|
|
||||||
|
assertArrayEquals(random, baos.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTransferTo_WritableByteChannel() throws IOException {
|
||||||
|
final byte[] random = getRandom(0.5, 100000);
|
||||||
|
|
||||||
|
final byte[] compressed = compress(random);
|
||||||
|
final SnappyFramedInputStream sfis = new SnappyFramedInputStream(
|
||||||
|
new ByteArrayInputStream(compressed));
|
||||||
|
|
||||||
|
final ByteArrayOutputStream baos = new ByteArrayOutputStream(
|
||||||
|
random.length);
|
||||||
|
final WritableByteChannel wbc = Channels.newChannel(baos);
|
||||||
|
sfis.transferTo(wbc);
|
||||||
|
wbc.close();
|
||||||
|
|
||||||
|
assertArrayEquals(random, baos.toByteArray());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLargerFrames_raw_() throws IOException {
|
public void testLargerFrames_raw_() throws IOException {
|
||||||
final byte[] random = getRandom(0.5, 100000);
|
final byte[] random = getRandom(0.5, 100000);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user