From 8b83f6e2d4ed860373203367879c95bd1491d361 Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Sat, 19 Jul 2014 05:29:11 +0900 Subject: [PATCH] Using buffer recycler to minimize memory allocation cost --- .../org/xerial/snappy/BufferRecycler.java | 176 ++++++++++++++++++ .../org/xerial/snappy/SnappyOutputStream.java | 29 ++- version.sbt | 2 +- 3 files changed, 196 insertions(+), 11 deletions(-) create mode 100644 src/main/java/org/xerial/snappy/BufferRecycler.java diff --git a/src/main/java/org/xerial/snappy/BufferRecycler.java b/src/main/java/org/xerial/snappy/BufferRecycler.java new file mode 100644 index 0000000..b8382b6 --- /dev/null +++ b/src/main/java/org/xerial/snappy/BufferRecycler.java @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2011 the original author or authors. + * See the NOTICE file distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.xerial.snappy; +import java.lang.ref.SoftReference; + +/** + * Simple helper class to encapsulate details of basic buffer + * recycling scheme, which helps a lot (as per profiling) for + * smaller encoding cases. + * + * @author tatu + */ +class BufferRecycler +{ + private final static int MIN_ENCODING_BUFFER = 4000; + + private final static int MIN_OUTPUT_BUFFER = 8000; + + /** + * This ThreadLocal contains a {@link java.lang.ref.SoftReference} + * to a {@link BufferRecycler} used to provide a low-cost + * buffer recycling for buffers we need for encoding, decoding. + */ + final protected static ThreadLocal> recyclerRef + = new ThreadLocal>(); + + + private byte[] inputBuffer; + private byte[] outputBuffer; + + private byte[] decodingBuffer; + private byte[] encodingBuffer; + + private short[] encodingHash; + + + /** + * Accessor to get thread-local recycler instance + */ + public static BufferRecycler instance() + { + SoftReference ref = recyclerRef.get(); + + BufferRecycler bufferRecycler; + if (ref == null) { + bufferRecycler = null; + } + else { + bufferRecycler = ref.get(); + } + + if (bufferRecycler == null) { + bufferRecycler = new BufferRecycler(); + recyclerRef.set(new SoftReference(bufferRecycler)); + } + return bufferRecycler; + } + + /////////////////////////////////////////////////////////////////////// + // Buffers for encoding (output) + /////////////////////////////////////////////////////////////////////// + + public byte[] allocEncodingBuffer(int minSize) + { + byte[] buf = encodingBuffer; + if (buf == null || buf.length < minSize) { + buf = new byte[Math.max(minSize, MIN_ENCODING_BUFFER)]; + } + else { + encodingBuffer = null; + } + return buf; + } + + public void releaseEncodeBuffer(byte[] buffer) + { + if (encodingBuffer == null || buffer.length > encodingBuffer.length) { + encodingBuffer = buffer; + } + } + + public byte[] allocOutputBuffer(int minSize) + { + byte[] buf = outputBuffer; + if (buf == null || buf.length < minSize) { + buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)]; + } + else { + outputBuffer = null; + } + return buf; + } + + public void releaseOutputBuffer(byte[] buffer) + { + if (outputBuffer == null || (buffer != null && buffer.length > outputBuffer.length)) { + outputBuffer = buffer; + } + } + + public short[] allocEncodingHash(int suggestedSize) + { + short[] buf = encodingHash; + if (buf == null || buf.length < suggestedSize) { + buf = new short[suggestedSize]; + } + else { + encodingHash = null; + } + return buf; + } + + public void releaseEncodingHash(short[] buffer) + { + if (encodingHash == null || (buffer != null && buffer.length > encodingHash.length)) { + encodingHash = buffer; + } + } + + /////////////////////////////////////////////////////////////////////// + // Buffers for decoding (input) + /////////////////////////////////////////////////////////////////////// + + public byte[] allocInputBuffer(int minSize) + { + byte[] buf = inputBuffer; + if (buf == null || buf.length < minSize) { + buf = new byte[Math.max(minSize, MIN_OUTPUT_BUFFER)]; + } + else { + inputBuffer = null; + } + return buf; + } + + public void releaseInputBuffer(byte[] buffer) + { + if (inputBuffer == null || (buffer != null && buffer.length > inputBuffer.length)) { + inputBuffer = buffer; + } + } + + public byte[] allocDecodeBuffer(int size) + { + byte[] buf = decodingBuffer; + if (buf == null || buf.length < size) { + buf = new byte[size]; + } + else { + decodingBuffer = null; + } + return buf; + } + + public void releaseDecodeBuffer(byte[] buffer) + { + if (decodingBuffer == null || (buffer != null && buffer.length > decodingBuffer.length)) { + decodingBuffer = buffer; + } + } +} \ No newline at end of file diff --git a/src/main/java/org/xerial/snappy/SnappyOutputStream.java b/src/main/java/org/xerial/snappy/SnappyOutputStream.java index 9542658..3547c95 100755 --- a/src/main/java/org/xerial/snappy/SnappyOutputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyOutputStream.java @@ -56,11 +56,13 @@ public class SnappyOutputStream extends OutputStream { static final int DEFAULT_BLOCK_SIZE = 32 * 1024; // Use 32kb for the default block size protected final OutputStream out; + + private final BufferRecycler recycler; private final int blockSize; + protected final byte[] inputBuffer; + protected final byte[] outputBuffer; private int inputCursor = 0; - protected byte[] uncompressed; private int outputCursor = 0; - protected byte[] outputBuffer; public SnappyOutputStream(OutputStream out) { this(out, DEFAULT_BLOCK_SIZE); @@ -73,9 +75,10 @@ public class SnappyOutputStream extends OutputStream { */ public SnappyOutputStream(OutputStream out, int blockSize) { this.out = out; + this.recycler = BufferRecycler.instance(); this.blockSize = Math.max(MIN_BLOCK_SIZE, blockSize); - uncompressed = new byte[this.blockSize]; - outputBuffer = new byte[SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(this.blockSize)]; + inputBuffer = recycler.allocInputBuffer(this.blockSize); + outputBuffer = recycler.allocOutputBuffer(SnappyCodec.HEADER_SIZE + 4 + Snappy.maxCompressedLength(this.blockSize)); outputCursor = SnappyCodec.currentHeader.writeHeader(outputBuffer, 0); } @@ -214,7 +217,7 @@ public class SnappyOutputStream extends OutputStream { if(inputCursor + byteLength < MIN_BLOCK_SIZE) { // copy the input data to uncompressed buffer - Snappy.arrayCopy(array, byteOffset, byteLength, uncompressed, inputCursor); + Snappy.arrayCopy(array, byteOffset, byteLength, inputBuffer, inputCursor); inputCursor += byteLength; return; } @@ -244,10 +247,10 @@ public class SnappyOutputStream extends OutputStream { */ @Override public void write(int b) throws IOException { - if(inputCursor >= uncompressed.length) { + if(inputCursor >= inputBuffer.length) { compressInput(); } - uncompressed[inputCursor++] = (byte) b; + inputBuffer[inputCursor++] = (byte) b; } /* (non-Javadoc) @@ -291,7 +294,7 @@ public class SnappyOutputStream extends OutputStream { if(!hasSufficientOutputBufferFor(inputCursor)) { dumpOutput(); } - int compressedSize = Snappy.compress(uncompressed, 0, inputCursor, outputBuffer, outputCursor + 4); + int compressedSize = Snappy.compress(inputBuffer, 0, inputCursor, outputBuffer, outputCursor + 4); // Write compressed data size writeInt(outputBuffer, outputCursor, compressedSize); outputCursor += 4 + compressedSize; @@ -306,8 +309,14 @@ public class SnappyOutputStream extends OutputStream { */ @Override public void close() throws IOException { - flush(); - out.close(); + try { + flush(); + out.close(); + } + finally { + recycler.releaseInputBuffer(inputBuffer); + recycler.releaseOutputBuffer(outputBuffer); + } } } diff --git a/version.sbt b/version.sbt index 50b13ff..f906b2e 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.1.2" +version in ThisBuild := "1.1.1.3-SNAPSHOT"