diff --git a/Makefile.common b/Makefile.common index 20c8513..cdc339e 100755 --- a/Makefile.common +++ b/Makefile.common @@ -1,6 +1,6 @@ TARGET:=target SRC:=src/main/java -include $(SRC)/org/xerial/snappy/VERSION +include src/main/resources/org/xerial/snappy/VERSION ifndef JAVA_HOME $(error Set JAVA_HOME environment variable) diff --git a/build.sbt b/build.sbt index 5118fa0..104b2fa 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,3 @@ -import SonatypeKeys._ - -sonatypeSettings name := "snappy-java" @@ -10,7 +7,7 @@ organizationName := "xerial.org" description := "snappy-java: A fast compression/decompression library" -profileName := "org.xerial" +sonatypeProfileName := "org.xerial" pomExtra := { https://github.comm/xerial/snappy-java @@ -47,7 +44,7 @@ pomExtra := { } -scalaVersion := "2.11.1" +scalaVersion := "2.11.6" javacOptions in (Compile, compile) ++= Seq("-encoding", "UTF-8", "-Xlint:unchecked", "-Xlint:deprecation", "-source", "1.6", "-target", "1.6") @@ -114,3 +111,20 @@ OsgiKeys.additionalHeaders := Map( "Bundle-ActivationPolicy" -> "lazy", "Bundle-Name" -> "snappy-java: A fast compression/decompression library" ) + +import ReleaseTransformations._ + +releaseProcess := Seq[ReleaseStep]( + checkSnapshotDependencies, + inquireVersions, + runClean, + runTest, + setReleaseVersion, + commitReleaseVersion, + tagRelease, + ReleaseStep(action = Command.process("publishSigned", _)), + setNextVersion, + commitNextVersion, + ReleaseStep(action = Command.process("sonatypeReleaseAll", _)), + pushChanges +) diff --git a/project/build.properties b/project/build.properties index 005786e..07b0ebb 100755 --- a/project/build.properties +++ b/project/build.properties @@ -1,2 +1,2 @@ -sbt.version=0.13.6 +sbt.version=0.13.8 diff --git a/project/plugins.sbt b/project/plugins.sbt index 609b225..223eaec 100755 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,9 +1,9 @@ -addSbtPlugin("com.github.gseitz" % "sbt-release" % "0.7.1") +addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.0") -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.2.1") +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "0.5.0") -addSbtPlugin("com.typesafe.sbt" % "sbt-pgp" % "0.8.3") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") addSbtPlugin("de.johoop" % "findbugs4sbt" % "1.3.0") diff --git a/src/main/java/org/xerial/snappy/SnappyCodec.java b/src/main/java/org/xerial/snappy/SnappyCodec.java index 1a15b8c..b267f86 100755 --- a/src/main/java/org/xerial/snappy/SnappyCodec.java +++ b/src/main/java/org/xerial/snappy/SnappyCodec.java @@ -52,6 +52,13 @@ public class SnappyCodec public static final byte[] MAGIC_HEADER = new byte[] { -126, 'S', 'N', 'A', 'P', 'P', 'Y', 0 }; public static final int MAGIC_LEN = MAGIC_HEADER.length; public static final int HEADER_SIZE = MAGIC_LEN + 8; + public static final int MAGIC_HEADER_HEAD = SnappyOutputStream.readInt(MAGIC_HEADER, 0); + public static final int MAGIC_HEADER_TAIL = SnappyOutputStream.readInt(MAGIC_HEADER, 4); + + static { + assert(MAGIC_HEADER_HEAD < 0); + } + public static final int DEFAULT_VERSION = 1; public static final int MINIMUM_COMPATIBLE_VERSION = 1; diff --git a/src/main/java/org/xerial/snappy/SnappyInputStream.java b/src/main/java/org/xerial/snappy/SnappyInputStream.java index a56dff1..3e0a3ba 100755 --- a/src/main/java/org/xerial/snappy/SnappyInputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyInputStream.java @@ -45,7 +45,7 @@ public class SnappyInputStream extends InputStream private int uncompressedCursor = 0; private int uncompressedLimit = 0; - private byte[] chunkSizeBuf = new byte[4]; + private byte[] header = new byte[SnappyCodec.headerSize()]; /** * Create a filter for reading compressed data as a uncompressed stream @@ -73,7 +73,6 @@ public class SnappyInputStream extends InputStream } protected void readHeader() throws IOException { - byte[] header = new byte[SnappyCodec.headerSize()]; int readBytes = 0; while (readBytes < header.length) { int ret = in.read(header, readBytes, header.length - readBytes); @@ -93,22 +92,28 @@ public class SnappyInputStream extends InputStream return; } - SnappyCodec codec = SnappyCodec.readHeader(new ByteArrayInputStream(header)); - if (codec.isValidMagicHeader()) { - // The input data is compressed by SnappyOutputStream - if (codec.version < SnappyCodec.MINIMUM_COMPATIBLE_VERSION) { - throw new SnappyIOException(SnappyErrorCode.INCOMPATIBLE_VERSION, String.format( - "Compressed with an incompatible codec version %d. At least version %d is required", - codec.version, SnappyCodec.MINIMUM_COMPATIBLE_VERSION)); - } - } - else { + if(!isValidHeader(header)) { // (probably) compressed by Snappy.compress(byte[]) readFully(header, readBytes); return; } } + private static boolean isValidHeader(byte[] header) throws IOException { + SnappyCodec codec = SnappyCodec.readHeader(new ByteArrayInputStream(header)); + if (codec.isValidMagicHeader()) { + // The input data is compressed by SnappyOutputStream + if(codec.version < SnappyCodec.MINIMUM_COMPATIBLE_VERSION) { + throw new SnappyIOException(SnappyErrorCode.INCOMPATIBLE_VERSION, String.format( + "Compressed with an incompatible codec version %d. At least version %d is required", + codec.version, SnappyCodec.MINIMUM_COMPATIBLE_VERSION)); + } + return true; + } + else + return false; + } + protected void readFully(byte[] fragment, int fragmentLength) throws IOException { if(fragmentLength == 0) { finishedReading = true; @@ -324,6 +329,26 @@ public class SnappyInputStream extends InputStream return read(d, 0, d.length); } + /** + * Read next len bytes + * @param dest + * @param offset + * @param len + * @return read bytes + */ + private int readNext(byte[] dest, int offset, int len) throws IOException { + int readBytes = 0; + while (readBytes < len) { + int ret = in.read(dest, readBytes + offset, len - readBytes); + if (ret == -1) { + finishedReading = true; + return readBytes; + } + readBytes += ret; + } + return readBytes; + } + protected boolean hasNextChunk() throws IOException { if (finishedReading) return false; @@ -331,16 +356,24 @@ public class SnappyInputStream extends InputStream uncompressedCursor = 0; uncompressedLimit = 0; - int readBytes = 0; - while (readBytes < 4) { - int ret = in.read(chunkSizeBuf, readBytes, 4 - readBytes); - if (ret == -1) { - finishedReading = true; + int readBytes = readNext(header, 0, 4); + if(readBytes < 4) + return false; + + int chunkSize = SnappyOutputStream.readInt(header, 0); + if(chunkSize == SnappyCodec.MAGIC_HEADER_HEAD) { + // Concatenated data + int remainingHeaderSize = SnappyCodec.headerSize() - 4; + readBytes = readNext(header, 4, remainingHeaderSize); + if(readBytes < remainingHeaderSize) + return false; + + if(isValidHeader(header)) + return hasNextChunk(); + else return false; - } - readBytes += ret; } - int chunkSize = SnappyOutputStream.readInt(chunkSizeBuf, 0); + // extend the compressed data buffer size if (compressed == null || chunkSize > compressed.length) { compressed = new byte[chunkSize]; diff --git a/src/main/java/org/xerial/snappy/VERSION b/src/main/java/org/xerial/snappy/VERSION deleted file mode 100755 index 333039a..0000000 --- a/src/main/java/org/xerial/snappy/VERSION +++ /dev/null @@ -1,2 +0,0 @@ -VERSION=1.1.1 - diff --git a/src/main/resources/org/xerial/snappy/VERSION b/src/main/resources/org/xerial/snappy/VERSION new file mode 100755 index 0000000..4efb1af --- /dev/null +++ b/src/main/resources/org/xerial/snappy/VERSION @@ -0,0 +1 @@ +VERSION=1.1.2 \ No newline at end of file diff --git a/src/test/java/org/xerial/snappy/SnappyInputStreamTest.java b/src/test/java/org/xerial/snappy/SnappyInputStreamTest.java index 7e66c58..bb5f26e 100755 --- a/src/test/java/org/xerial/snappy/SnappyInputStreamTest.java +++ b/src/test/java/org/xerial/snappy/SnappyInputStreamTest.java @@ -35,6 +35,7 @@ import java.io.InputStream; import org.junit.Test; import org.xerial.util.FileResource; import org.xerial.util.log.Logger; +import scala.Array; public class SnappyInputStreamTest { @@ -142,4 +143,37 @@ public class SnappyInputStreamTest } } + public static byte[] compressResource(String resourcePath) throws Exception { + ByteArrayOutputStream compressedBuf = new ByteArrayOutputStream(); + SnappyOutputStream snappyOut = new SnappyOutputStream(compressedBuf); + byte[] orig = readResourceFile(resourcePath); + snappyOut.write(orig); + snappyOut.close(); + return compressedBuf.toByteArray(); + } + + @Test + public void chunkRead() throws Exception { + byte[] chunk1 = compressResource("alice29.txt"); + byte[] chunk2 = compressResource("testdata/calgary/paper6"); + + byte[] concatenated = new byte[chunk1.length + chunk2.length]; + System.arraycopy(chunk1, 0, concatenated, 0, chunk1.length); + System.arraycopy(chunk2, 0, concatenated, chunk1.length, chunk2.length); + + SnappyInputStream in = new SnappyInputStream(new ByteArrayInputStream(concatenated)); + byte[] uncompressed = readFully(in); + + byte[] orig1 = readResourceFile("alice29.txt"); + byte[] orig2 = readResourceFile("testdata/calgary/paper6"); + assertEquals(orig1.length + orig2.length, uncompressed.length); + byte[] uncompressed1 = new byte[orig1.length]; + byte[] uncompressed2 = new byte[orig2.length]; + System.arraycopy(uncompressed, 0, uncompressed1, 0, orig1.length); + System.arraycopy(uncompressed, orig1.length, uncompressed2, 0, orig2.length); + + assertArrayEquals(orig1, uncompressed1); + assertArrayEquals(orig2, uncompressed2); + } + } diff --git a/version.sbt b/version.sbt index a45b76d..2523c7f 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "1.1.1.8-SNAPSHOT" +version in ThisBuild := "1.1.2-SNAPSHOT"