#82: Improving SnappyOutputStream performance by reducing the numbe of data copies

This commit is contained in:
Taro L. Saito 2014-07-18 18:05:51 +09:00
parent 5b5a3a379d
commit f695e84cac
3 changed files with 125 additions and 111 deletions

View File

@ -50,7 +50,8 @@ import java.util.Arrays;
public class SnappyCodec public class SnappyCodec
{ {
public static final byte[] MAGIC_HEADER = new byte[] { -126, 'S', 'N', 'A', 'P', 'P', 'Y', 0 }; public static final byte[] MAGIC_HEADER = new byte[] { -126, 'S', 'N', 'A', 'P', 'P', 'Y', 0 };
public static final int MAGIC_LEN = 8; public static final int MAGIC_LEN = MAGIC_HEADER.length;
public static final int HEADER_SIZE = MAGIC_LEN + 8;
public static final int DEFAULT_VERSION = 1; public static final int DEFAULT_VERSION = 1;
public static final int MINIMUM_COMPATIBLE_VERSION = 1; public static final int MINIMUM_COMPATIBLE_VERSION = 1;
@ -58,11 +59,25 @@ public class SnappyCodec
public final byte[] magic; public final byte[] magic;
public final int version; public final int version;
public final int compatibleVersion; public final int compatibleVersion;
private final byte[] headerArray;
private SnappyCodec(byte[] magic, int version, int compatibleVersion) { private SnappyCodec(byte[] magic, int version, int compatibleVersion) {
this.magic = magic; this.magic = magic;
this.version = version; this.version = version;
this.compatibleVersion = compatibleVersion; this.compatibleVersion = compatibleVersion;
ByteArrayOutputStream header = new ByteArrayOutputStream(HEADER_SIZE);
DataOutputStream d = new DataOutputStream(header);
try {
d.write(magic, 0, MAGIC_LEN);
d.writeInt(version);
d.writeInt(compatibleVersion);
d.close();
}
catch(IOException e) {
throw new RuntimeException(e);
}
headerArray = header.toByteArray();
} }
@Override @Override
@ -71,17 +86,17 @@ public class SnappyCodec
} }
public static int headerSize() { public static int headerSize() {
return MAGIC_LEN + 4 * 2; return HEADER_SIZE;
} }
public void writeHeader(OutputStream out) throws IOException { public int writeHeader(byte[] dst, int dstOffset) {
ByteArrayOutputStream header = new ByteArrayOutputStream(); System.arraycopy(headerArray, 0, dst, dstOffset, headerArray.length);
DataOutputStream d = new DataOutputStream(header); return headerArray.length;
d.write(magic, 0, MAGIC_LEN); }
d.writeInt(version);
d.writeInt(compatibleVersion); public int writeHeader(OutputStream out) throws IOException {
d.close(); out.write(headerArray, 0, headerArray.length);
out.write(header.toByteArray(), 0, header.size()); return headerArray.length;
} }
public boolean isValidMagicHeader() { public boolean isValidMagicHeader() {
@ -97,8 +112,6 @@ public class SnappyCodec
return new SnappyCodec(magic, version, compatibleVersion); return new SnappyCodec(magic, version, compatibleVersion);
} }
public static SnappyCodec currentHeader() { public static SnappyCodec currentHeader = new SnappyCodec(MAGIC_HEADER, DEFAULT_VERSION, MINIMUM_COMPATIBLE_VERSION);
return new SnappyCodec(MAGIC_HEADER, DEFAULT_VERSION, MINIMUM_COMPATIBLE_VERSION);
}
} }

View File

@ -30,10 +30,10 @@ import java.io.OutputStream;
/** /**
* This class implements a stream filter for writing compressed data using * This class implements a stream filter for writing compressed data using
* Snappy. * Snappy.
* * <p>
* The input data is blocked into 32kb size (in default), and each block is * The input data is blocked into 32kb size (in default), and each block is
* compressed and then passed to the given {@link OutputStream}. * compressed and then passed to the given {@link OutputStream}.
* * </p>
* The output data format is: * The output data format is:
* <ol> * <ol>
* <li>snappy codec header defined in {@link SnappyCodec} (8 bytes) * <li>snappy codec header defined in {@link SnappyCodec} (8 bytes)
@ -42,7 +42,7 @@ import java.io.OutputStream;
* <li>compressed block 2 * <li>compressed block 2
* <li>... * <li>...
* </ol> * </ol>
* * <p/>
* Note that the compressed data created by {@link SnappyOutputStream} cannot be * Note that the compressed data created by {@link SnappyOutputStream} cannot be
* uncompressed by {@link Snappy#uncompress(byte[])} since the output formats of * uncompressed by {@link Snappy#uncompress(byte[])} since the output formats of
* {@link Snappy#compress(byte[])} and {@link SnappyOutputStream} are different. * {@link Snappy#compress(byte[])} and {@link SnappyOutputStream} are different.
@ -50,47 +50,35 @@ import java.io.OutputStream;
* {@link SnappyOutputStream}. * {@link SnappyOutputStream}.
* *
* @author leo * @author leo
*
*/ */
public class SnappyOutputStream extends OutputStream public class SnappyOutputStream extends OutputStream {
{ static final int MIN_BLOCK_SIZE = 1 * 1024;
static final int DEFAULT_BLOCK_SIZE = 32 * 1024; // Use 32kb for the default block size static final int DEFAULT_BLOCK_SIZE = 32 * 1024; // Use 32kb for the default block size
protected final OutputStream out; protected final OutputStream out;
private final int blockSize; private final int blockSize;
private int cursor = 0; private int inputCursor = 0;
protected byte[] uncompressed; protected byte[] uncompressed;
protected byte[] compressed; private int outputCursor = 0;
protected byte[] outputBuffer;
public SnappyOutputStream(OutputStream out) throws IOException { public SnappyOutputStream(OutputStream out) {
this(out, DEFAULT_BLOCK_SIZE); this(out, DEFAULT_BLOCK_SIZE);
} }
/** /**
* @param out * @param out
* @param blockSize * @param blockSize byte size of the internal buffer size
* byte size of the internal buffer size
* @throws IOException * @throws IOException
*/ */
public SnappyOutputStream(OutputStream out, int blockSize) throws IOException { public SnappyOutputStream(OutputStream out, int blockSize) {
this.out = out; this.out = out;
this.blockSize = blockSize; this.blockSize = Math.max(MIN_BLOCK_SIZE, blockSize);
uncompressed = new byte[blockSize]; uncompressed = new byte[blockSize];
compressed = new byte[Snappy.maxCompressedLength(blockSize)]; outputBuffer = new byte[SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(blockSize)];
writeHeader(); outputCursor = SnappyCodec.currentHeader.writeHeader(outputBuffer, 0);
} }
protected void writeHeader() throws IOException {
SnappyCodec.currentHeader().writeHeader(out);
}
/**
* Writes len bytes from the specified byte array starting at offset off to
* this output stream. The general contract for write(b, off, len) is that
* some of the bytes in the array b are written to the output stream in
* order; element b[off] is the first byte written and b[off+len-1] is the
* last byte written by this operation.
*/
/* (non-Javadoc) /* (non-Javadoc)
* @see java.io.OutputStream#write(byte[], int, int) * @see java.io.OutputStream#write(byte[], int, int)
*/ */
@ -102,12 +90,9 @@ public class SnappyOutputStream extends OutputStream
/** /**
* Compress the input long array data * Compress the input long array data
* *
* @param d * @param d input array
* input array * @param off offset in the array
* @param off * @param len the number of elements in the array to copy
* offset in the array
* @param len
* the number of elements in the array to copy
* @throws IOException * @throws IOException
*/ */
public void write(long[] d, int off, int len) throws IOException { public void write(long[] d, int off, int len) throws IOException {
@ -117,12 +102,9 @@ public class SnappyOutputStream extends OutputStream
/** /**
* Compress the input double array data * Compress the input double array data
* *
* @param f * @param f input array
* input array * @param off offset in the array
* @param off * @param len the number of elements in the array to copy
* offset in the array
* @param len
* the number of elements in the array to copy
* @throws IOException * @throws IOException
*/ */
public void write(double[] f, int off, int len) throws IOException { public void write(double[] f, int off, int len) throws IOException {
@ -132,12 +114,9 @@ public class SnappyOutputStream extends OutputStream
/** /**
* Compress the input float array data * Compress the input float array data
* *
* @param f * @param f input array
* input array * @param off offset in the array
* @param off * @param len the number of elements in the array to copy
* offset in the array
* @param len
* the number of elements in the array to copy
* @throws IOException * @throws IOException
*/ */
public void write(float[] f, int off, int len) throws IOException { public void write(float[] f, int off, int len) throws IOException {
@ -147,12 +126,9 @@ public class SnappyOutputStream extends OutputStream
/** /**
* Compress the input int array data * Compress the input int array data
* *
* @param f * @param f input array
* input array * @param off offset in the array
* @param off * @param len the number of elements in the array to copy
* offset in the array
* @param len
* the number of elements in the array to copy
* @throws IOException * @throws IOException
*/ */
public void write(int[] f, int off, int len) throws IOException { public void write(int[] f, int off, int len) throws IOException {
@ -162,12 +138,9 @@ public class SnappyOutputStream extends OutputStream
/** /**
* Compress the input short array data * Compress the input short array data
* *
* @param f * @param f input array
* input array * @param off offset in the array
* @param off * @param len the number of elements in the array to copy
* offset in the array
* @param len
* the number of elements in the array to copy
* @throws IOException * @throws IOException
*/ */
public void write(short[] f, int off, int len) throws IOException { public void write(short[] f, int off, int len) throws IOException {
@ -224,25 +197,39 @@ public class SnappyOutputStream extends OutputStream
write(f, 0, f.length); write(f, 0, f.length);
} }
private boolean hasSufficientOutputBufferFor(int inputSize) {
int maxCompressedSize = Snappy.maxCompressedLength(inputSize);
return maxCompressedSize < outputBuffer.length - outputCursor - 4;
}
/** /**
* Compress the raw byte array data. * Compress the raw byte array data.
* *
* @param array * @param array array data of any type (e.g., byte[], float[], long[], ...)
* array data of any type (e.g., byte[], float[], long[], ...)
* @param byteOffset * @param byteOffset
* @param byteLength * @param byteLength
* @throws IOException * @throws IOException
*/ */
public void rawWrite(Object array, int byteOffset, int byteLength) throws IOException { public void rawWrite(Object array, int byteOffset, int byteLength) throws IOException {
for (int readBytes = 0; readBytes < byteLength;) {
int copyLen = Math.min(uncompressed.length - cursor, byteLength - readBytes);
Snappy.arrayCopy(array, byteOffset + readBytes, copyLen, uncompressed, cursor);
readBytes += copyLen;
cursor += copyLen;
if (cursor >= uncompressed.length) { if(inputCursor + byteLength < MIN_BLOCK_SIZE) {
dump(); // copy the input data to uncompressed buffer
Snappy.arrayCopy(array, byteOffset, byteLength, uncompressed, inputCursor);
inputCursor += byteLength;
return;
} }
compressInput();
for(int readBytes = 0; readBytes < byteLength; ) {
int inputLen = Math.min(blockSize, byteLength - readBytes);
if(!hasSufficientOutputBufferFor(inputLen)) {
dumpOutput();
}
int compressedSize = Snappy.rawCompress(array, byteOffset + readBytes, inputLen, outputBuffer, outputCursor + 4);
writeInt(outputBuffer, outputCursor, compressedSize);
outputCursor += 4 + compressedSize;
readBytes += inputLen;
} }
} }
@ -257,10 +244,10 @@ public class SnappyOutputStream extends OutputStream
*/ */
@Override @Override
public void write(int b) throws IOException { public void write(int b) throws IOException {
if (cursor >= uncompressed.length) { if(inputCursor >= uncompressed.length) {
dump(); compressInput();
} }
uncompressed[cursor++] = (byte) b; uncompressed[inputCursor++] = (byte) b;
} }
/* (non-Javadoc) /* (non-Javadoc)
@ -268,15 +255,17 @@ public class SnappyOutputStream extends OutputStream
*/ */
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
dump(); compressInput();
dumpOutput();
out.flush(); out.flush();
} }
static void writeInt(OutputStream out, int value) throws IOException { static void writeInt(byte[] dst, int offset, int v) {
out.write((value >> 24) & 0xFF); int p = offset;
out.write((value >> 16) & 0xFF); dst[offset++] = (byte) ((v >> 24) & 0xFF);
out.write((value >> 8) & 0xFF); dst[offset++] = (byte) ((v >> 16) & 0xFF);
out.write((value >> 0) & 0xFF); dst[offset++] = (byte) ((v >> 8) & 0xFF);
dst[offset++] = (byte) ((v >> 0) & 0xFF);
} }
static int readInt(byte[] buffer, int pos) { static int readInt(byte[] buffer, int pos) {
@ -287,15 +276,27 @@ public class SnappyOutputStream extends OutputStream
return b1 | b2 | b3 | b4; return b1 | b2 | b3 | b4;
} }
protected void dump() throws IOException { protected void dumpOutput() throws IOException {
if (cursor <= 0) if(outputCursor > 0) {
out.write(outputBuffer, 0, outputCursor);
outputCursor = 0;
}
}
protected void compressInput() throws IOException {
if(inputCursor <= 0) {
return; // no need to dump return; // no need to dump
}
// Compress and dump the buffer content // Compress and dump the buffer content
int compressedSize = Snappy.compress(uncompressed, 0, cursor, compressed, 0); if(!hasSufficientOutputBufferFor(inputCursor)) {
writeInt(out, compressedSize); dumpOutput();
out.write(compressed, 0, compressedSize); }
cursor = 0; int compressedSize = Snappy.compress(uncompressed, 0, inputCursor, outputBuffer, outputCursor + 4);
// Write compressed data size
writeInt(outputBuffer, outputCursor, compressedSize);
outputCursor += 4 + compressedSize;
inputCursor = 0;
} }
/** /**
@ -307,8 +308,6 @@ public class SnappyOutputStream extends OutputStream
@Override @Override
public void close() throws IOException { public void close() throws IOException {
flush(); flush();
super.close();
out.close(); out.close();
} }

View File

@ -13,7 +13,7 @@ import scala.util.Random
class SnappyPerformanceTest extends SnappySpec { class SnappyPerformanceTest extends SnappySpec {
lazy val data = { lazy val data = {
val a = new Array[Byte](32 * 1024 * 1024) val a = new Array[Byte](128 * 1024 * 1024)
for (i <- (0 until a.length).par) { for (i <- (0 until a.length).par) {
a(i) = Math.sin(i * 0.01).toByte a(i) = Math.sin(i * 0.01).toByte
@ -29,10 +29,12 @@ class SnappyPerformanceTest extends SnappySpec {
val input = data val input = data
time("compression", repeat=100, logLevel = LogLevel.INFO) { time("compression", repeat=100, logLevel = LogLevel.INFO) {
// 0.037 sec. => 0.026
block("default") { block("default") {
val out = new ByteArrayOutputStream() val out = new ByteArrayOutputStream()
val sout = new SnappyOutputStream(out) val sout = new SnappyOutputStream(out)
sout.write(input) sout.write(input)
sout.close()
out.close() out.close()
} }