diff --git a/src/main/java/org/xerial/snappy/SnappyInputStream.java b/src/main/java/org/xerial/snappy/SnappyInputStream.java index 0cc9dde..40a0d53 100755 --- a/src/main/java/org/xerial/snappy/SnappyInputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyInputStream.java @@ -36,40 +36,83 @@ import java.io.InputStream; public class SnappyInputStream extends InputStream { protected final InputStream in; - private int blockSize = SnappyOutputStream.DEFAULT_BLOCK_SIZE; + private boolean finishedReading = false; + private int blockSize = SnappyOutputStream.DEFAULT_BLOCK_SIZE; - private byte[] compressed; - private byte[] uncompressed; + private byte[] compressed = new byte[blockSize]; + + private byte[] uncompressed = new byte[blockSize]; + private int uncompressedCursor = 0; + private int uncompressedLimit = 0; + + private byte[] chunkSizeBuf = new byte[4]; public SnappyInputStream(InputStream input) throws IOException { this.in = input; - readHeader(); } - protected void readHeader() throws IOException { - byte[] header = new byte[SnappyOutputStream.PREAMBLE_SIZE]; - int readBytes = in.read(header); - if (readBytes != header.length) { - throw new IOException("Invalid Snappy stream"); - } - String headerStr = new String(header, 0, SnappyOutputStream.HEADER_SIZE, "UTF-8"); - if (!headerStr.startsWith(SnappyOutputStream.STREAM_FORMAT_VERSION)) { - throw new IOException("Incompatible stream version"); - } - blockSize = SnappyOutputStream.readInt(header, SnappyOutputStream.HEADER_SIZE); - if (blockSize < 0) { - throw new IOException("Invalid block size: " + blockSize); + @Override + public int read(byte[] b, int off, int len) throws IOException { + int wroteBytes = 0; + for (; wroteBytes < len;) { + if (uncompressedCursor >= uncompressedLimit) { + if (hasNextChunk()) + continue; + else { + return wroteBytes == 0 ? -1 : wroteBytes; + } + } + int bytesToWrite = Math.min(uncompressedLimit - uncompressedCursor, len); + System.arraycopy(uncompressed, uncompressedCursor, b, off + wroteBytes, bytesToWrite); + wroteBytes += bytesToWrite; + uncompressedCursor += bytesToWrite; } - compressed = new byte[blockSize]; - uncompressed = new byte[blockSize]; + return wroteBytes; + } + protected boolean hasNextChunk() throws IOException { + if (finishedReading) + return false; + + uncompressedCursor = 0; + + int chunkSizeDataLen = in.read(chunkSizeBuf, 0, 4); + if (chunkSizeDataLen < 4) { + finishedReading = true; + return false; + } + int chunkSize = SnappyOutputStream.readInt(chunkSizeBuf, 0); + // extend the compressed data buffer size + if (chunkSize > compressed.length) { + compressed = new byte[chunkSize]; + } + int readBytes = in.read(compressed, 0, chunkSize); + if (readBytes < chunkSize) { + throw new IOException("failed to read chunk"); + } + try { + int uncompressedLength = Snappy.uncompressedLength(compressed, 0, chunkSize); + if (uncompressedLength > uncompressed.length) { + uncompressed = new byte[uncompressedLength]; + } + int actualUncompressedLength = Snappy.uncompress(compressed, 0, chunkSize, uncompressed, 0); + if (uncompressedLength != actualUncompressedLength) { + throw new IOException("invalid uncompressed byte size"); + } + uncompressedLimit = actualUncompressedLength; + } + catch (SnappyException e) { + throw new IOException("failed to uncompress the chunk: " + e.getMessage()); + } + + return true; } @Override public int read() throws IOException { - // TODO Auto-generated method stub - return 0; + byte[] buf = new byte[1]; + return read(buf, 0, 1); } } diff --git a/src/main/java/org/xerial/snappy/SnappyOutputStream.java b/src/main/java/org/xerial/snappy/SnappyOutputStream.java index b6d0859..ddfd394 100755 --- a/src/main/java/org/xerial/snappy/SnappyOutputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyOutputStream.java @@ -26,7 +26,6 @@ package org.xerial.snappy; import java.io.IOException; import java.io.OutputStream; -import java.util.Arrays; /** * This class implements a stream filter for writing compressed data using @@ -43,15 +42,11 @@ import java.util.Arrays; */ public class SnappyOutputStream extends OutputStream { - public final static String STREAM_FORMAT_VERSION = "snappy-1.0.1"; - public static final int HEADER_SIZE = 16; // version (16 bytes) - public static final int PREAMBLE_SIZE = HEADER_SIZE + 4; // version (16 bytes) & block size (int: 4 bytes) - - static final int DEFAULT_BLOCK_SIZE = 1 << 15; // use 2^15 = 32KB as block size + static final int DEFAULT_BLOCK_SIZE = 1 << 15; // use 2^15 = 32KB as block size protected final OutputStream out; private final int blockSize; - private int cursor = 0; + private int cursor = 0; protected byte[] uncompressed; protected byte[] compressed; @@ -64,17 +59,6 @@ public class SnappyOutputStream extends OutputStream this.blockSize = blockSize; uncompressed = new byte[blockSize]; compressed = new byte[Snappy.maxCompressedLength(blockSize)]; - writeHeader(); - } - - protected void writeHeader() throws IOException { - byte[] header = new byte[16]; // header size - Arrays.fill(header, (byte) 0); - byte[] version = STREAM_FORMAT_VERSION.getBytes("UTF-8"); - assert (version.length <= 16); - System.arraycopy(version, 0, header, 0, version.length); - out.write(header); - writeInt(out, blockSize); } @Override diff --git a/src/test/java/org/xerial/snappy/SnappyOutputStreamTest.java b/src/test/java/org/xerial/snappy/SnappyOutputStreamTest.java index 3ccbb31..7a7c756 100755 --- a/src/test/java/org/xerial/snappy/SnappyOutputStreamTest.java +++ b/src/test/java/org/xerial/snappy/SnappyOutputStreamTest.java @@ -61,7 +61,7 @@ public class SnappyOutputStreamTest ByteArrayOutputStream decompressed = new ByteArrayOutputStream(); byte[] compressed = buf.toByteArray(); // decompress - for (int cursor = SnappyOutputStream.HEADER_SIZE; cursor < compressed.length;) { + for (int cursor = 0; cursor < compressed.length;) { int chunkSize = SnappyOutputStream.readInt(compressed, cursor); cursor += 4; byte[] tmpOut = new byte[Snappy.uncompressedLength(compressed, cursor, chunkSize)];