Merge pull request #104 from xerial/chunked-input

Concatenated input support
This commit is contained in:
Taro L. Saito 2015-05-13 10:51:37 +09:00
commit d1745ebfcb
10 changed files with 120 additions and 33 deletions

View File

@ -1,6 +1,6 @@
TARGET:=target TARGET:=target
SRC:=src/main/java SRC:=src/main/java
include $(SRC)/org/xerial/snappy/VERSION include src/main/resources/org/xerial/snappy/VERSION
ifndef JAVA_HOME ifndef JAVA_HOME
$(error Set JAVA_HOME environment variable) $(error Set JAVA_HOME environment variable)

View File

@ -1,6 +1,3 @@
import SonatypeKeys._
sonatypeSettings
name := "snappy-java" name := "snappy-java"
@ -10,7 +7,7 @@ organizationName := "xerial.org"
description := "snappy-java: A fast compression/decompression library" description := "snappy-java: A fast compression/decompression library"
profileName := "org.xerial" sonatypeProfileName := "org.xerial"
pomExtra := { pomExtra := {
<url>https://github.comm/xerial/snappy-java</url> <url>https://github.comm/xerial/snappy-java</url>
@ -47,7 +44,7 @@ pomExtra := {
</scm> </scm>
} }
scalaVersion := "2.11.1" scalaVersion := "2.11.6"
javacOptions in (Compile, compile) ++= Seq("-encoding", "UTF-8", "-Xlint:unchecked", "-Xlint:deprecation", "-source", "1.6", "-target", "1.6") javacOptions in (Compile, compile) ++= Seq("-encoding", "UTF-8", "-Xlint:unchecked", "-Xlint:deprecation", "-source", "1.6", "-target", "1.6")
@ -114,3 +111,20 @@ OsgiKeys.additionalHeaders := Map(
"Bundle-ActivationPolicy" -> "lazy", "Bundle-ActivationPolicy" -> "lazy",
"Bundle-Name" -> "snappy-java: A fast compression/decompression library" "Bundle-Name" -> "snappy-java: A fast compression/decompression library"
) )
import ReleaseTransformations._
releaseProcess := Seq[ReleaseStep](
checkSnapshotDependencies,
inquireVersions,
runClean,
runTest,
setReleaseVersion,
commitReleaseVersion,
tagRelease,
ReleaseStep(action = Command.process("publishSigned", _)),
setNextVersion,
commitNextVersion,
ReleaseStep(action = Command.process("sonatypeReleaseAll", _)),
pushChanges
)

View File

@ -1,2 +1,2 @@
sbt.version=0.13.6 sbt.version=0.13.8

View File

@ -1,9 +1,9 @@
addSbtPlugin("com.github.gseitz" % "sbt-release" % "0.7.1") addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.2.1") addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.5.0")
addSbtPlugin("com.typesafe.sbt" % "sbt-pgp" % "0.8.3") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")
addSbtPlugin("de.johoop" % "findbugs4sbt" % "1.3.0") addSbtPlugin("de.johoop" % "findbugs4sbt" % "1.3.0")

View File

@ -52,6 +52,13 @@ public class SnappyCodec
public static final byte[] MAGIC_HEADER = new byte[] { -126, 'S', 'N', 'A', 'P', 'P', 'Y', 0 }; public static final byte[] MAGIC_HEADER = new byte[] { -126, 'S', 'N', 'A', 'P', 'P', 'Y', 0 };
public static final int MAGIC_LEN = MAGIC_HEADER.length; public static final int MAGIC_LEN = MAGIC_HEADER.length;
public static final int HEADER_SIZE = MAGIC_LEN + 8; public static final int HEADER_SIZE = MAGIC_LEN + 8;
public static final int MAGIC_HEADER_HEAD = SnappyOutputStream.readInt(MAGIC_HEADER, 0);
public static final int MAGIC_HEADER_TAIL = SnappyOutputStream.readInt(MAGIC_HEADER, 4);
static {
assert(MAGIC_HEADER_HEAD < 0);
}
public static final int DEFAULT_VERSION = 1; public static final int DEFAULT_VERSION = 1;
public static final int MINIMUM_COMPATIBLE_VERSION = 1; public static final int MINIMUM_COMPATIBLE_VERSION = 1;

View File

@ -45,7 +45,7 @@ public class SnappyInputStream extends InputStream
private int uncompressedCursor = 0; private int uncompressedCursor = 0;
private int uncompressedLimit = 0; private int uncompressedLimit = 0;
private byte[] chunkSizeBuf = new byte[4]; private byte[] header = new byte[SnappyCodec.headerSize()];
/** /**
* Create a filter for reading compressed data as a uncompressed stream * Create a filter for reading compressed data as a uncompressed stream
@ -73,7 +73,6 @@ public class SnappyInputStream extends InputStream
} }
protected void readHeader() throws IOException { protected void readHeader() throws IOException {
byte[] header = new byte[SnappyCodec.headerSize()];
int readBytes = 0; int readBytes = 0;
while (readBytes < header.length) { while (readBytes < header.length) {
int ret = in.read(header, readBytes, header.length - readBytes); int ret = in.read(header, readBytes, header.length - readBytes);
@ -93,22 +92,28 @@ public class SnappyInputStream extends InputStream
return; return;
} }
SnappyCodec codec = SnappyCodec.readHeader(new ByteArrayInputStream(header)); if(!isValidHeader(header)) {
if (codec.isValidMagicHeader()) {
// The input data is compressed by SnappyOutputStream
if (codec.version < SnappyCodec.MINIMUM_COMPATIBLE_VERSION) {
throw new SnappyIOException(SnappyErrorCode.INCOMPATIBLE_VERSION, String.format(
"Compressed with an incompatible codec version %d. At least version %d is required",
codec.version, SnappyCodec.MINIMUM_COMPATIBLE_VERSION));
}
}
else {
// (probably) compressed by Snappy.compress(byte[]) // (probably) compressed by Snappy.compress(byte[])
readFully(header, readBytes); readFully(header, readBytes);
return; return;
} }
} }
private static boolean isValidHeader(byte[] header) throws IOException {
SnappyCodec codec = SnappyCodec.readHeader(new ByteArrayInputStream(header));
if (codec.isValidMagicHeader()) {
// The input data is compressed by SnappyOutputStream
if(codec.version < SnappyCodec.MINIMUM_COMPATIBLE_VERSION) {
throw new SnappyIOException(SnappyErrorCode.INCOMPATIBLE_VERSION, String.format(
"Compressed with an incompatible codec version %d. At least version %d is required",
codec.version, SnappyCodec.MINIMUM_COMPATIBLE_VERSION));
}
return true;
}
else
return false;
}
protected void readFully(byte[] fragment, int fragmentLength) throws IOException { protected void readFully(byte[] fragment, int fragmentLength) throws IOException {
if(fragmentLength == 0) { if(fragmentLength == 0) {
finishedReading = true; finishedReading = true;
@ -324,6 +329,26 @@ public class SnappyInputStream extends InputStream
return read(d, 0, d.length); return read(d, 0, d.length);
} }
/**
* Read next len bytes
* @param dest
* @param offset
* @param len
* @return read bytes
*/
private int readNext(byte[] dest, int offset, int len) throws IOException {
int readBytes = 0;
while (readBytes < len) {
int ret = in.read(dest, readBytes + offset, len - readBytes);
if (ret == -1) {
finishedReading = true;
return readBytes;
}
readBytes += ret;
}
return readBytes;
}
protected boolean hasNextChunk() throws IOException { protected boolean hasNextChunk() throws IOException {
if (finishedReading) if (finishedReading)
return false; return false;
@ -331,16 +356,24 @@ public class SnappyInputStream extends InputStream
uncompressedCursor = 0; uncompressedCursor = 0;
uncompressedLimit = 0; uncompressedLimit = 0;
int readBytes = 0; int readBytes = readNext(header, 0, 4);
while (readBytes < 4) { if(readBytes < 4)
int ret = in.read(chunkSizeBuf, readBytes, 4 - readBytes); return false;
if (ret == -1) {
finishedReading = true; int chunkSize = SnappyOutputStream.readInt(header, 0);
if(chunkSize == SnappyCodec.MAGIC_HEADER_HEAD) {
// Concatenated data
int remainingHeaderSize = SnappyCodec.headerSize() - 4;
readBytes = readNext(header, 4, remainingHeaderSize);
if(readBytes < remainingHeaderSize)
return false;
if(isValidHeader(header))
return hasNextChunk();
else
return false; return false;
}
readBytes += ret;
} }
int chunkSize = SnappyOutputStream.readInt(chunkSizeBuf, 0);
// extend the compressed data buffer size // extend the compressed data buffer size
if (compressed == null || chunkSize > compressed.length) { if (compressed == null || chunkSize > compressed.length) {
compressed = new byte[chunkSize]; compressed = new byte[chunkSize];

View File

@ -1,2 +0,0 @@
VERSION=1.1.1

View File

@ -0,0 +1 @@
VERSION=1.1.2

View File

@ -35,6 +35,7 @@ import java.io.InputStream;
import org.junit.Test; import org.junit.Test;
import org.xerial.util.FileResource; import org.xerial.util.FileResource;
import org.xerial.util.log.Logger; import org.xerial.util.log.Logger;
import scala.Array;
public class SnappyInputStreamTest public class SnappyInputStreamTest
{ {
@ -142,4 +143,37 @@ public class SnappyInputStreamTest
} }
} }
public static byte[] compressResource(String resourcePath) throws Exception {
ByteArrayOutputStream compressedBuf = new ByteArrayOutputStream();
SnappyOutputStream snappyOut = new SnappyOutputStream(compressedBuf);
byte[] orig = readResourceFile(resourcePath);
snappyOut.write(orig);
snappyOut.close();
return compressedBuf.toByteArray();
}
@Test
public void chunkRead() throws Exception {
byte[] chunk1 = compressResource("alice29.txt");
byte[] chunk2 = compressResource("testdata/calgary/paper6");
byte[] concatenated = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, concatenated, 0, chunk1.length);
System.arraycopy(chunk2, 0, concatenated, chunk1.length, chunk2.length);
SnappyInputStream in = new SnappyInputStream(new ByteArrayInputStream(concatenated));
byte[] uncompressed = readFully(in);
byte[] orig1 = readResourceFile("alice29.txt");
byte[] orig2 = readResourceFile("testdata/calgary/paper6");
assertEquals(orig1.length + orig2.length, uncompressed.length);
byte[] uncompressed1 = new byte[orig1.length];
byte[] uncompressed2 = new byte[orig2.length];
System.arraycopy(uncompressed, 0, uncompressed1, 0, orig1.length);
System.arraycopy(uncompressed, orig1.length, uncompressed2, 0, orig2.length);
assertArrayEquals(orig1, uncompressed1);
assertArrayEquals(orig2, uncompressed2);
}
} }

View File

@ -1,2 +1,2 @@
version in ThisBuild := "1.1.1.8-SNAPSHOT" version in ThisBuild := "1.1.2-SNAPSHOT"