Add SnappyInputStream

This commit is contained in:
Taro L. Saito 2011-03-31 22:49:27 +09:00
parent 0fcb9f8b9a
commit 91a087f449
3 changed files with 67 additions and 40 deletions

View File

@ -36,40 +36,83 @@ import java.io.InputStream;
public class SnappyInputStream extends InputStream public class SnappyInputStream extends InputStream
{ {
protected final InputStream in; protected final InputStream in;
private int blockSize = SnappyOutputStream.DEFAULT_BLOCK_SIZE; private boolean finishedReading = false;
private int blockSize = SnappyOutputStream.DEFAULT_BLOCK_SIZE;
private byte[] compressed; private byte[] compressed = new byte[blockSize];
private byte[] uncompressed;
private byte[] uncompressed = new byte[blockSize];
private int uncompressedCursor = 0;
private int uncompressedLimit = 0;
private byte[] chunkSizeBuf = new byte[4];
public SnappyInputStream(InputStream input) throws IOException { public SnappyInputStream(InputStream input) throws IOException {
this.in = input; this.in = input;
readHeader();
} }
protected void readHeader() throws IOException { @Override
byte[] header = new byte[SnappyOutputStream.PREAMBLE_SIZE]; public int read(byte[] b, int off, int len) throws IOException {
int readBytes = in.read(header); int wroteBytes = 0;
if (readBytes != header.length) { for (; wroteBytes < len;) {
throw new IOException("Invalid Snappy stream"); if (uncompressedCursor >= uncompressedLimit) {
} if (hasNextChunk())
String headerStr = new String(header, 0, SnappyOutputStream.HEADER_SIZE, "UTF-8"); continue;
if (!headerStr.startsWith(SnappyOutputStream.STREAM_FORMAT_VERSION)) { else {
throw new IOException("Incompatible stream version"); return wroteBytes == 0 ? -1 : wroteBytes;
} }
blockSize = SnappyOutputStream.readInt(header, SnappyOutputStream.HEADER_SIZE); }
if (blockSize < 0) { int bytesToWrite = Math.min(uncompressedLimit - uncompressedCursor, len);
throw new IOException("Invalid block size: " + blockSize); System.arraycopy(uncompressed, uncompressedCursor, b, off + wroteBytes, bytesToWrite);
wroteBytes += bytesToWrite;
uncompressedCursor += bytesToWrite;
} }
compressed = new byte[blockSize]; return wroteBytes;
uncompressed = new byte[blockSize]; }
protected boolean hasNextChunk() throws IOException {
if (finishedReading)
return false;
uncompressedCursor = 0;
int chunkSizeDataLen = in.read(chunkSizeBuf, 0, 4);
if (chunkSizeDataLen < 4) {
finishedReading = true;
return false;
}
int chunkSize = SnappyOutputStream.readInt(chunkSizeBuf, 0);
// extend the compressed data buffer size
if (chunkSize > compressed.length) {
compressed = new byte[chunkSize];
}
int readBytes = in.read(compressed, 0, chunkSize);
if (readBytes < chunkSize) {
throw new IOException("failed to read chunk");
}
try {
int uncompressedLength = Snappy.uncompressedLength(compressed, 0, chunkSize);
if (uncompressedLength > uncompressed.length) {
uncompressed = new byte[uncompressedLength];
}
int actualUncompressedLength = Snappy.uncompress(compressed, 0, chunkSize, uncompressed, 0);
if (uncompressedLength != actualUncompressedLength) {
throw new IOException("invalid uncompressed byte size");
}
uncompressedLimit = actualUncompressedLength;
}
catch (SnappyException e) {
throw new IOException("failed to uncompress the chunk: " + e.getMessage());
}
return true;
} }
@Override @Override
public int read() throws IOException { public int read() throws IOException {
// TODO Auto-generated method stub byte[] buf = new byte[1];
return 0; return read(buf, 0, 1);
} }
} }

View File

@ -26,7 +26,6 @@ package org.xerial.snappy;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.Arrays;
/** /**
* This class implements a stream filter for writing compressed data using * This class implements a stream filter for writing compressed data using
@ -43,15 +42,11 @@ import java.util.Arrays;
*/ */
public class SnappyOutputStream extends OutputStream public class SnappyOutputStream extends OutputStream
{ {
public final static String STREAM_FORMAT_VERSION = "snappy-1.0.1"; static final int DEFAULT_BLOCK_SIZE = 1 << 15; // use 2^15 = 32KB as block size
public static final int HEADER_SIZE = 16; // version (16 bytes)
public static final int PREAMBLE_SIZE = HEADER_SIZE + 4; // version (16 bytes) & block size (int: 4 bytes)
static final int DEFAULT_BLOCK_SIZE = 1 << 15; // use 2^15 = 32KB as block size
protected final OutputStream out; protected final OutputStream out;
private final int blockSize; private final int blockSize;
private int cursor = 0; private int cursor = 0;
protected byte[] uncompressed; protected byte[] uncompressed;
protected byte[] compressed; protected byte[] compressed;
@ -64,17 +59,6 @@ public class SnappyOutputStream extends OutputStream
this.blockSize = blockSize; this.blockSize = blockSize;
uncompressed = new byte[blockSize]; uncompressed = new byte[blockSize];
compressed = new byte[Snappy.maxCompressedLength(blockSize)]; compressed = new byte[Snappy.maxCompressedLength(blockSize)];
writeHeader();
}
protected void writeHeader() throws IOException {
byte[] header = new byte[16]; // header size
Arrays.fill(header, (byte) 0);
byte[] version = STREAM_FORMAT_VERSION.getBytes("UTF-8");
assert (version.length <= 16);
System.arraycopy(version, 0, header, 0, version.length);
out.write(header);
writeInt(out, blockSize);
} }
@Override @Override

View File

@ -61,7 +61,7 @@ public class SnappyOutputStreamTest
ByteArrayOutputStream decompressed = new ByteArrayOutputStream(); ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
byte[] compressed = buf.toByteArray(); byte[] compressed = buf.toByteArray();
// decompress // decompress
for (int cursor = SnappyOutputStream.HEADER_SIZE; cursor < compressed.length;) { for (int cursor = 0; cursor < compressed.length;) {
int chunkSize = SnappyOutputStream.readInt(compressed, cursor); int chunkSize = SnappyOutputStream.readInt(compressed, cursor);
cursor += 4; cursor += 4;
byte[] tmpOut = new byte[Snappy.uncompressedLength(compressed, cursor, chunkSize)]; byte[] tmpOut = new byte[Snappy.uncompressedLength(compressed, cursor, chunkSize)];