#103: Support reading concatenated streams in SnappyInputStream

This commit is contained in:
Taro L. Saito 2015-05-12 01:43:23 +09:00
parent 81536ea146
commit 1c702ba366
4 changed files with 94 additions and 21 deletions

View File

@ -52,6 +52,13 @@ public class SnappyCodec
public static final byte[] MAGIC_HEADER = new byte[] { -126, 'S', 'N', 'A', 'P', 'P', 'Y', 0 };
public static final int MAGIC_LEN = MAGIC_HEADER.length;
public static final int HEADER_SIZE = MAGIC_LEN + 8;
public static final int MAGIC_HEADER_HEAD = SnappyOutputStream.readInt(MAGIC_HEADER, 0);
public static final int MAGIC_HEADER_TAIL = SnappyOutputStream.readInt(MAGIC_HEADER, 4);
static {
assert(MAGIC_HEADER_HEAD < 0);
}
public static final int DEFAULT_VERSION = 1;
public static final int MINIMUM_COMPATIBLE_VERSION = 1;

View File

@ -45,7 +45,7 @@ public class SnappyInputStream extends InputStream
private int uncompressedCursor = 0;
private int uncompressedLimit = 0;
private byte[] chunkSizeBuf = new byte[4];
private byte[] header = new byte[SnappyCodec.headerSize()];
/**
* Create a filter for reading compressed data as a uncompressed stream
@ -73,7 +73,6 @@ public class SnappyInputStream extends InputStream
}
protected void readHeader() throws IOException {
byte[] header = new byte[SnappyCodec.headerSize()];
int readBytes = 0;
while (readBytes < header.length) {
int ret = in.read(header, readBytes, header.length - readBytes);
@ -93,22 +92,28 @@ public class SnappyInputStream extends InputStream
return;
}
SnappyCodec codec = SnappyCodec.readHeader(new ByteArrayInputStream(header));
if (codec.isValidMagicHeader()) {
// The input data is compressed by SnappyOutputStream
if (codec.version < SnappyCodec.MINIMUM_COMPATIBLE_VERSION) {
throw new SnappyIOException(SnappyErrorCode.INCOMPATIBLE_VERSION, String.format(
"Compressed with an incompatible codec version %d. At least version %d is required",
codec.version, SnappyCodec.MINIMUM_COMPATIBLE_VERSION));
}
}
else {
if(!isValidHeader(header)) {
// (probably) compressed by Snappy.compress(byte[])
readFully(header, readBytes);
return;
}
}
private static boolean isValidHeader(byte[] header) throws IOException {
SnappyCodec codec = SnappyCodec.readHeader(new ByteArrayInputStream(header));
if (codec.isValidMagicHeader()) {
// The input data is compressed by SnappyOutputStream
if(codec.version < SnappyCodec.MINIMUM_COMPATIBLE_VERSION) {
throw new SnappyIOException(SnappyErrorCode.INCOMPATIBLE_VERSION, String.format(
"Compressed with an incompatible codec version %d. At least version %d is required",
codec.version, SnappyCodec.MINIMUM_COMPATIBLE_VERSION));
}
return true;
}
else
return false;
}
protected void readFully(byte[] fragment, int fragmentLength) throws IOException {
if(fragmentLength == 0) {
finishedReading = true;
@ -324,6 +329,25 @@ public class SnappyInputStream extends InputStream
return read(d, 0, d.length);
}
/**
*
* @param dest
* @param len
* @return read bytes
*/
private int readNext(byte[] dest, int offset, int len) throws IOException {
int readBytes = 0;
while (readBytes < len) {
int ret = in.read(dest, readBytes + offset, len - readBytes);
if (ret == -1) {
finishedReading = true;
return readBytes;
}
readBytes += ret;
}
return readBytes;
}
protected boolean hasNextChunk() throws IOException {
if (finishedReading)
return false;
@ -331,16 +355,24 @@ public class SnappyInputStream extends InputStream
uncompressedCursor = 0;
uncompressedLimit = 0;
int readBytes = 0;
while (readBytes < 4) {
int ret = in.read(chunkSizeBuf, readBytes, 4 - readBytes);
if (ret == -1) {
finishedReading = true;
int readBytes = readNext(header, 0, 4);
if(readBytes < 4)
return false;
int chunkSize = SnappyOutputStream.readInt(header, 0);
if(chunkSize == SnappyCodec.MAGIC_HEADER_HEAD) {
// Concatenated data
int remainingHeaderSize = SnappyCodec.headerSize() - 4;
readBytes = readNext(header, 4, remainingHeaderSize);
if(readBytes < remainingHeaderSize)
return false;
if(isValidHeader(header))
return hasNextChunk();
else
return false;
}
readBytes += ret;
}
int chunkSize = SnappyOutputStream.readInt(chunkSizeBuf, 0);
// extend the compressed data buffer size
if (compressed == null || chunkSize > compressed.length) {
compressed = new byte[chunkSize];

View File

@ -35,6 +35,7 @@ import java.io.InputStream;
import org.junit.Test;
import org.xerial.util.FileResource;
import org.xerial.util.log.Logger;
import scala.Array;
public class SnappyInputStreamTest
{
@ -142,4 +143,37 @@ public class SnappyInputStreamTest
}
}
public static byte[] compressResource(String resourcePath) throws Exception {
ByteArrayOutputStream compressedBuf = new ByteArrayOutputStream();
SnappyOutputStream snappyOut = new SnappyOutputStream(compressedBuf);
byte[] orig = readResourceFile(resourcePath);
snappyOut.write(orig);
snappyOut.close();
return compressedBuf.toByteArray();
}
@Test
public void chunkRead() throws Exception {
byte[] chunk1 = compressResource("alice29.txt");
byte[] chunk2 = compressResource("testdata/calgary/paper6");
byte[] concatenated = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, concatenated, 0, chunk1.length);
System.arraycopy(chunk2, 0, concatenated, chunk1.length, chunk2.length);
SnappyInputStream in = new SnappyInputStream(new ByteArrayInputStream(concatenated));
byte[] uncompressed = readFully(in);
byte[] orig1 = readResourceFile("alice29.txt");
byte[] orig2 = readResourceFile("testdata/calgary/paper6");
assertEquals(orig1.length + orig2.length, uncompressed.length);
byte[] uncompressed1 = new byte[orig1.length];
byte[] uncompressed2 = new byte[orig2.length];
System.arraycopy(uncompressed, 0, uncompressed1, 0, orig1.length);
System.arraycopy(uncompressed, orig1.length, uncompressed2, 0, orig2.length);
assertArrayEquals(orig1, uncompressed1);
assertArrayEquals(orig2, uncompressed2);
}
}

View File

@ -1,2 +1,2 @@
version in ThisBuild := "1.1.1.8-SNAPSHOT"
version in ThisBuild := "1.1.2-SNAPSHOT"