From 5b5a3a379db72143fbcf6a4c90cba27258e611b5 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Fri, 18 Jul 2014 16:22:59 +0900 Subject: [PATCH 1/3] Add test code to see the performance --- build.sbt | 8 +++- .../xerial/snappy/SnappyPerformanceTest.scala | 48 +++++++++++++++++++ .../scala/org/xerial/snappy/SnappySpec.scala | 22 +++++++++ 3 files changed, 76 insertions(+), 2 deletions(-) create mode 100644 src/test/scala/org/xerial/snappy/SnappyPerformanceTest.scala create mode 100644 src/test/scala/org/xerial/snappy/SnappySpec.scala diff --git a/build.sbt b/build.sbt index 58c57de..dab77b0 100644 --- a/build.sbt +++ b/build.sbt @@ -47,11 +47,13 @@ pomExtra := { } +scalaVersion := "2.11.1" + javacOptions in (Compile, compile) ++= Seq("-encoding", "UTF-8", "-Xlint:unchecked", "-Xlint:deprecation", "-source", "1.6", "-target", "1.6") testOptions += Tests.Argument(TestFrameworks.JUnit, "-q", "-v") -//concurrentRestrictions in Global := Seq(Tags.limit(Tags.Test, 1)) +concurrentRestrictions in Global := Seq(Tags.limit(Tags.Test, 1)) autoScalaLibrary := false @@ -64,7 +66,9 @@ incOptions := incOptions.value.withNameHashing(true) libraryDependencies ++= Seq( "junit" % "junit" % "4.8.2" % "test", "org.codehaus.plexus" % "plexus-classworlds" % "2.4" % "test", - "org.xerial" % "xerial-core" % "1.0.21" % "test", + "org.xerial.java" % "xerial-core" % "2.1" % "test", + "org.xerial" % "xerial-core" % "3.2.3" % "test", + "org.scalatest" % "scalatest_2.11" % "2.2.0" % "test", "org.osgi" % "org.osgi.core" % "4.3.0" % "provided", "com.novocode" % "junit-interface" % "0.10" % "test" ) diff --git a/src/test/scala/org/xerial/snappy/SnappyPerformanceTest.scala b/src/test/scala/org/xerial/snappy/SnappyPerformanceTest.scala new file mode 100644 index 0000000..d60c164 --- /dev/null +++ b/src/test/scala/org/xerial/snappy/SnappyPerformanceTest.scala @@ -0,0 +1,48 @@ +package org.xerial.snappy + +import java.io.{ByteArrayOutputStream, ByteArrayInputStream} + +import xerial.core.log.LogLevel + +import scala.util.Random + + +/** + * + */ +class SnappyPerformanceTest extends SnappySpec { + + lazy val data = { + val a = new Array[Byte](32 * 1024 * 1024) + + for (i <- (0 until a.length).par) { + a(i) = Math.sin(i * 0.01).toByte + } + a + } + + + "SnappyOutputStream" should { + + "improve output performance" taggedAs("out") in { + + val input = data + + time("compression", repeat=100, logLevel = LogLevel.INFO) { + block("default") { + val out = new ByteArrayOutputStream() + val sout = new SnappyOutputStream(out) + sout.write(input) + out.close() + } + + } + + //info(f"compressed size: ${compressed.length}%,d, input: ${data.length}%,d") + } + + + + } + +} diff --git a/src/test/scala/org/xerial/snappy/SnappySpec.scala b/src/test/scala/org/xerial/snappy/SnappySpec.scala new file mode 100644 index 0000000..1b66135 --- /dev/null +++ b/src/test/scala/org/xerial/snappy/SnappySpec.scala @@ -0,0 +1,22 @@ +package org.xerial.snappy + +import org.scalatest._ +import xerial.core.log.Logger +import xerial.core.util.Timer + +/** + * + */ +trait SnappySpec + extends WordSpec + with Matchers + with GivenWhenThen + with OptionValues + with BeforeAndAfter + with Timer + with Logger +{ + + implicit def toTag(s:String) : Tag = Tag(s) + +} From f695e84cac17441ff8e35c02ba8fb0cf44d651b1 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Fri, 18 Jul 2014 18:05:51 +0900 Subject: [PATCH 2/3] #82: Improving SnappyOutputStream performance by reducing the numbe of data copies --- .../java/org/xerial/snappy/SnappyCodec.java | 39 ++-- .../org/xerial/snappy/SnappyOutputStream.java | 193 +++++++++--------- .../xerial/snappy/SnappyPerformanceTest.scala | 4 +- 3 files changed, 125 insertions(+), 111 deletions(-) diff --git a/src/main/java/org/xerial/snappy/SnappyCodec.java b/src/main/java/org/xerial/snappy/SnappyCodec.java index 8358a1c..1a15b8c 100755 --- a/src/main/java/org/xerial/snappy/SnappyCodec.java +++ b/src/main/java/org/xerial/snappy/SnappyCodec.java @@ -50,7 +50,8 @@ import java.util.Arrays; public class SnappyCodec { public static final byte[] MAGIC_HEADER = new byte[] { -126, 'S', 'N', 'A', 'P', 'P', 'Y', 0 }; - public static final int MAGIC_LEN = 8; + public static final int MAGIC_LEN = MAGIC_HEADER.length; + public static final int HEADER_SIZE = MAGIC_LEN + 8; public static final int DEFAULT_VERSION = 1; public static final int MINIMUM_COMPATIBLE_VERSION = 1; @@ -58,11 +59,25 @@ public class SnappyCodec public final byte[] magic; public final int version; public final int compatibleVersion; + private final byte[] headerArray; private SnappyCodec(byte[] magic, int version, int compatibleVersion) { this.magic = magic; this.version = version; this.compatibleVersion = compatibleVersion; + + ByteArrayOutputStream header = new ByteArrayOutputStream(HEADER_SIZE); + DataOutputStream d = new DataOutputStream(header); + try { + d.write(magic, 0, MAGIC_LEN); + d.writeInt(version); + d.writeInt(compatibleVersion); + d.close(); + } + catch(IOException e) { + throw new RuntimeException(e); + } + headerArray = header.toByteArray(); } @Override @@ -71,17 +86,17 @@ public class SnappyCodec } public static int headerSize() { - return MAGIC_LEN + 4 * 2; + return HEADER_SIZE; } - public void writeHeader(OutputStream out) throws IOException { - ByteArrayOutputStream header = new ByteArrayOutputStream(); - DataOutputStream d = new DataOutputStream(header); - d.write(magic, 0, MAGIC_LEN); - d.writeInt(version); - d.writeInt(compatibleVersion); - d.close(); - out.write(header.toByteArray(), 0, header.size()); + public int writeHeader(byte[] dst, int dstOffset) { + System.arraycopy(headerArray, 0, dst, dstOffset, headerArray.length); + return headerArray.length; + } + + public int writeHeader(OutputStream out) throws IOException { + out.write(headerArray, 0, headerArray.length); + return headerArray.length; } public boolean isValidMagicHeader() { @@ -97,8 +112,6 @@ public class SnappyCodec return new SnappyCodec(magic, version, compatibleVersion); } - public static SnappyCodec currentHeader() { - return new SnappyCodec(MAGIC_HEADER, DEFAULT_VERSION, MINIMUM_COMPATIBLE_VERSION); - } + public static SnappyCodec currentHeader = new SnappyCodec(MAGIC_HEADER, DEFAULT_VERSION, MINIMUM_COMPATIBLE_VERSION); } diff --git a/src/main/java/org/xerial/snappy/SnappyOutputStream.java b/src/main/java/org/xerial/snappy/SnappyOutputStream.java index d523a95..21fcbbc 100755 --- a/src/main/java/org/xerial/snappy/SnappyOutputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyOutputStream.java @@ -30,10 +30,10 @@ import java.io.OutputStream; /** * This class implements a stream filter for writing compressed data using * Snappy. - * + *

* The input data is blocked into 32kb size (in default), and each block is * compressed and then passed to the given {@link OutputStream}. - * + *

* The output data format is: *
    *
  1. snappy codec header defined in {@link SnappyCodec} (8 bytes) @@ -42,55 +42,43 @@ import java.io.OutputStream; *
  2. compressed block 2 *
  3. ... *
- * + *

* Note that the compressed data created by {@link SnappyOutputStream} cannot be * uncompressed by {@link Snappy#uncompress(byte[])} since the output formats of * {@link Snappy#compress(byte[])} and {@link SnappyOutputStream} are different. * Use {@link SnappyInputStream} for uncompress the data generated by * {@link SnappyOutputStream}. - * + * * @author leo - * */ -public class SnappyOutputStream extends OutputStream -{ - static final int DEFAULT_BLOCK_SIZE = 32 * 1024; // Use 32kb for the default block size +public class SnappyOutputStream extends OutputStream { + static final int MIN_BLOCK_SIZE = 1 * 1024; + static final int DEFAULT_BLOCK_SIZE = 32 * 1024; // Use 32kb for the default block size protected final OutputStream out; - private final int blockSize; - private int cursor = 0; - protected byte[] uncompressed; - protected byte[] compressed; + private final int blockSize; + private int inputCursor = 0; + protected byte[] uncompressed; + private int outputCursor = 0; + protected byte[] outputBuffer; - public SnappyOutputStream(OutputStream out) throws IOException { + public SnappyOutputStream(OutputStream out) { this(out, DEFAULT_BLOCK_SIZE); } /** * @param out - * @param blockSize - * byte size of the internal buffer size + * @param blockSize byte size of the internal buffer size * @throws IOException */ - public SnappyOutputStream(OutputStream out, int blockSize) throws IOException { + public SnappyOutputStream(OutputStream out, int blockSize) { this.out = out; - this.blockSize = blockSize; + this.blockSize = Math.max(MIN_BLOCK_SIZE, blockSize); uncompressed = new byte[blockSize]; - compressed = new byte[Snappy.maxCompressedLength(blockSize)]; - writeHeader(); + outputBuffer = new byte[SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(blockSize)]; + outputCursor = SnappyCodec.currentHeader.writeHeader(outputBuffer, 0); } - protected void writeHeader() throws IOException { - SnappyCodec.currentHeader().writeHeader(out); - } - - /** - * Writes len bytes from the specified byte array starting at offset off to - * this output stream. The general contract for write(b, off, len) is that - * some of the bytes in the array b are written to the output stream in - * order; element b[off] is the first byte written and b[off+len-1] is the - * last byte written by this operation. - */ /* (non-Javadoc) * @see java.io.OutputStream#write(byte[], int, int) */ @@ -101,13 +89,10 @@ public class SnappyOutputStream extends OutputStream /** * Compress the input long array data - * - * @param d - * input array - * @param off - * offset in the array - * @param len - * the number of elements in the array to copy + * + * @param d input array + * @param off offset in the array + * @param len the number of elements in the array to copy * @throws IOException */ public void write(long[] d, int off, int len) throws IOException { @@ -116,13 +101,10 @@ public class SnappyOutputStream extends OutputStream /** * Compress the input double array data - * - * @param f - * input array - * @param off - * offset in the array - * @param len - * the number of elements in the array to copy + * + * @param f input array + * @param off offset in the array + * @param len the number of elements in the array to copy * @throws IOException */ public void write(double[] f, int off, int len) throws IOException { @@ -131,13 +113,10 @@ public class SnappyOutputStream extends OutputStream /** * Compress the input float array data - * - * @param f - * input array - * @param off - * offset in the array - * @param len - * the number of elements in the array to copy + * + * @param f input array + * @param off offset in the array + * @param len the number of elements in the array to copy * @throws IOException */ public void write(float[] f, int off, int len) throws IOException { @@ -146,13 +125,10 @@ public class SnappyOutputStream extends OutputStream /** * Compress the input int array data - * - * @param f - * input array - * @param off - * offset in the array - * @param len - * the number of elements in the array to copy + * + * @param f input array + * @param off offset in the array + * @param len the number of elements in the array to copy * @throws IOException */ public void write(int[] f, int off, int len) throws IOException { @@ -161,13 +137,10 @@ public class SnappyOutputStream extends OutputStream /** * Compress the input short array data - * - * @param f - * input array - * @param off - * offset in the array - * @param len - * the number of elements in the array to copy + * + * @param f input array + * @param off offset in the array + * @param len the number of elements in the array to copy * @throws IOException */ public void write(short[] f, int off, int len) throws IOException { @@ -176,7 +149,7 @@ public class SnappyOutputStream extends OutputStream /** * Compress the input array data - * + * * @param d * @throws IOException */ @@ -186,7 +159,7 @@ public class SnappyOutputStream extends OutputStream /** * Compress the input array data - * + * * @param f * @throws IOException */ @@ -196,7 +169,7 @@ public class SnappyOutputStream extends OutputStream /** * Compress the input array data - * + * * @param f * @throws IOException */ @@ -206,7 +179,7 @@ public class SnappyOutputStream extends OutputStream /** * Compress the input array data - * + * * @param f * @throws IOException */ @@ -216,7 +189,7 @@ public class SnappyOutputStream extends OutputStream /** * Compress the input array data - * + * * @param f * @throws IOException */ @@ -224,25 +197,39 @@ public class SnappyOutputStream extends OutputStream write(f, 0, f.length); } + private boolean hasSufficientOutputBufferFor(int inputSize) { + int maxCompressedSize = Snappy.maxCompressedLength(inputSize); + return maxCompressedSize < outputBuffer.length - outputCursor - 4; + } + /** * Compress the raw byte array data. - * - * @param array - * array data of any type (e.g., byte[], float[], long[], ...) + * + * @param array array data of any type (e.g., byte[], float[], long[], ...) * @param byteOffset * @param byteLength * @throws IOException */ public void rawWrite(Object array, int byteOffset, int byteLength) throws IOException { - for (int readBytes = 0; readBytes < byteLength;) { - int copyLen = Math.min(uncompressed.length - cursor, byteLength - readBytes); - Snappy.arrayCopy(array, byteOffset + readBytes, copyLen, uncompressed, cursor); - readBytes += copyLen; - cursor += copyLen; - if (cursor >= uncompressed.length) { - dump(); + if(inputCursor + byteLength < MIN_BLOCK_SIZE) { + // copy the input data to uncompressed buffer + Snappy.arrayCopy(array, byteOffset, byteLength, uncompressed, inputCursor); + inputCursor += byteLength; + return; + } + + compressInput(); + + for(int readBytes = 0; readBytes < byteLength; ) { + int inputLen = Math.min(blockSize, byteLength - readBytes); + if(!hasSufficientOutputBufferFor(inputLen)) { + dumpOutput(); } + int compressedSize = Snappy.rawCompress(array, byteOffset + readBytes, inputLen, outputBuffer, outputCursor + 4); + writeInt(outputBuffer, outputCursor, compressedSize); + outputCursor += 4 + compressedSize; + readBytes += inputLen; } } @@ -257,10 +244,10 @@ public class SnappyOutputStream extends OutputStream */ @Override public void write(int b) throws IOException { - if (cursor >= uncompressed.length) { - dump(); + if(inputCursor >= uncompressed.length) { + compressInput(); } - uncompressed[cursor++] = (byte) b; + uncompressed[inputCursor++] = (byte) b; } /* (non-Javadoc) @@ -268,15 +255,17 @@ public class SnappyOutputStream extends OutputStream */ @Override public void flush() throws IOException { - dump(); + compressInput(); + dumpOutput(); out.flush(); } - static void writeInt(OutputStream out, int value) throws IOException { - out.write((value >> 24) & 0xFF); - out.write((value >> 16) & 0xFF); - out.write((value >> 8) & 0xFF); - out.write((value >> 0) & 0xFF); + static void writeInt(byte[] dst, int offset, int v) { + int p = offset; + dst[offset++] = (byte) ((v >> 24) & 0xFF); + dst[offset++] = (byte) ((v >> 16) & 0xFF); + dst[offset++] = (byte) ((v >> 8) & 0xFF); + dst[offset++] = (byte) ((v >> 0) & 0xFF); } static int readInt(byte[] buffer, int pos) { @@ -287,15 +276,27 @@ public class SnappyOutputStream extends OutputStream return b1 | b2 | b3 | b4; } - protected void dump() throws IOException { - if (cursor <= 0) + protected void dumpOutput() throws IOException { + if(outputCursor > 0) { + out.write(outputBuffer, 0, outputCursor); + outputCursor = 0; + } + } + + protected void compressInput() throws IOException { + if(inputCursor <= 0) { return; // no need to dump + } // Compress and dump the buffer content - int compressedSize = Snappy.compress(uncompressed, 0, cursor, compressed, 0); - writeInt(out, compressedSize); - out.write(compressed, 0, compressedSize); - cursor = 0; + if(!hasSufficientOutputBufferFor(inputCursor)) { + dumpOutput(); + } + int compressedSize = Snappy.compress(uncompressed, 0, inputCursor, outputBuffer, outputCursor + 4); + // Write compressed data size + writeInt(outputBuffer, outputCursor, compressedSize); + outputCursor += 4 + compressedSize; + inputCursor = 0; } /** @@ -307,8 +308,6 @@ public class SnappyOutputStream extends OutputStream @Override public void close() throws IOException { flush(); - - super.close(); out.close(); } diff --git a/src/test/scala/org/xerial/snappy/SnappyPerformanceTest.scala b/src/test/scala/org/xerial/snappy/SnappyPerformanceTest.scala index d60c164..7eeb60d 100644 --- a/src/test/scala/org/xerial/snappy/SnappyPerformanceTest.scala +++ b/src/test/scala/org/xerial/snappy/SnappyPerformanceTest.scala @@ -13,7 +13,7 @@ import scala.util.Random class SnappyPerformanceTest extends SnappySpec { lazy val data = { - val a = new Array[Byte](32 * 1024 * 1024) + val a = new Array[Byte](128 * 1024 * 1024) for (i <- (0 until a.length).par) { a(i) = Math.sin(i * 0.01).toByte @@ -29,10 +29,12 @@ class SnappyPerformanceTest extends SnappySpec { val input = data time("compression", repeat=100, logLevel = LogLevel.INFO) { + // 0.037 sec. => 0.026 block("default") { val out = new ByteArrayOutputStream() val sout = new SnappyOutputStream(out) sout.write(input) + sout.close() out.close() } From dbc610ef6b7f2956745f209b43cf186f213b3cea Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Fri, 18 Jul 2014 18:41:38 +0900 Subject: [PATCH 3/3] version 1.1.1.2 --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index 4979d78..50b13ff 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.1.1" +version in ThisBuild := "1.1.1.2"