Applied a patch from Alec Wysocker to postpone the buffer allocation in SnappyInputStream

This commit is contained in:
Taro L. Saito 2011-06-20 09:42:46 +09:00
parent ae6a126d1c
commit f027c52899

View File

@ -1,189 +1,183 @@
/*-------------------------------------------------------------------------- /*--------------------------------------------------------------------------
* Copyright 2011 Taro L. Saito * Copyright 2011 Taro L. Saito
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*--------------------------------------------------------------------------*/ *--------------------------------------------------------------------------*/
//-------------------------------------- //--------------------------------------
// XerialJ // XerialJ
// //
// SnappyInputStream.java // SnappyInputStream.java
// Since: 2011/03/31 20:14:56 // Since: 2011/03/31 20:14:56
// //
// $URL$ // $URL$
// $Author$ // $Author$
//-------------------------------------- //--------------------------------------
package org.xerial.snappy; package org.xerial.snappy;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
/** /**
* A stream filter for reading data compressed by {@link SnappyOutputStream}. * A stream filter for reading data compressed by {@link SnappyOutputStream}.
* *
* @author leo * @author leo
* *
*/ */
public class SnappyInputStream extends InputStream public class SnappyInputStream extends InputStream
{ {
private boolean finishedReading = false; private boolean finishedReading = false;
protected final InputStream in; protected final InputStream in;
private int blockSize = SnappyOutputStream.DEFAULT_BLOCK_SIZE;
private byte[] compressed;
private byte[] compressed; private byte[] uncompressed;
private byte[] uncompressed; private int uncompressedCursor = 0;
private int uncompressedCursor = 0; private int uncompressedLimit = 0;
private int uncompressedLimit = 0;
private byte[] chunkSizeBuf = new byte[4];
private byte[] chunkSizeBuf = new byte[4];
public SnappyInputStream(InputStream input) throws IOException {
public SnappyInputStream(InputStream input) throws IOException { this.in = input;
this.in = input; readHeader();
readHeader(); }
if (compressed == null) protected void readHeader() throws IOException {
compressed = new byte[blockSize]; byte[] header = new byte[SnappyCodec.headerSize()];
if (uncompressed == null) int readBytes = in.read(header, 0, header.length);
uncompressed = new byte[blockSize];
} // Quick test of the header
if (header[0] != SnappyCodec.MAGIC_HEADER[0]) {
protected void readHeader() throws IOException { // do the default uncompression
byte[] header = new byte[SnappyCodec.headerSize()]; readFully(header, readBytes);
int readBytes = in.read(header, 0, header.length); return;
}
// Quick test of the header
if (header[0] != SnappyCodec.MAGIC_HEADER[0]) { SnappyCodec codec = SnappyCodec.readHeader(new ByteArrayInputStream(header));
// do the default uncompression if (codec.isValidMagicHeader()) {
readFully(header, readBytes); // The input data is compressed by SnappyOutputStream
return; if (codec.version < SnappyCodec.MINIMUM_COMPATIBLE_VERSION) {
} throw new IOException(String.format(
"compressed with imcompatible codec version %d. At least version %d is required",
SnappyCodec codec = SnappyCodec.readHeader(new ByteArrayInputStream(header)); codec.version, SnappyCodec.MINIMUM_COMPATIBLE_VERSION));
if (codec.isValidMagicHeader()) { }
// compressed by SnappyOutputStream }
if (codec.version < SnappyCodec.MINIMUM_COMPATIBLE_VERSION) { else {
throw new IOException(String.format( // (probably) compressed by Snappy.compress(byte[])
"compressed with imcompatible codec version %d. At least version %d is required", readFully(header, readBytes);
codec.version, SnappyCodec.MINIMUM_COMPATIBLE_VERSION)); return;
} }
} }
else {
// (probably) compressed by Snappy.compress(byte[]) protected void readFully(byte[] fragment, int fragmentLength) throws IOException {
readFully(header, readBytes); // read the entire input data to the buffer
return; compressed = new byte[Math.max(8 * 1024, fragmentLength)]; // 8K
} System.arraycopy(fragment, 0, compressed, 0, fragmentLength);
} int cursor = fragmentLength;
for (int readBytes = 0; (readBytes = in.read(compressed, cursor, compressed.length - cursor)) != -1;) {
protected void readFully(byte[] fragment, int fragmentLength) throws IOException { cursor += readBytes;
// read the entire input data to the buffer if (cursor >= compressed.length) {
compressed = new byte[Math.max(8 * 1024, fragmentLength)]; // 8K byte[] newBuf = new byte[(compressed.length * 2)];
System.arraycopy(fragment, 0, compressed, 0, fragmentLength); System.arraycopy(compressed, 0, newBuf, 0, compressed.length);
int cursor = fragmentLength; compressed = newBuf;
for (int readBytes = 0; (readBytes = in.read(compressed, cursor, compressed.length - cursor)) != -1;) { }
cursor += readBytes; }
if (cursor >= compressed.length) {
byte[] newBuf = new byte[(compressed.length * 2)]; finishedReading = true;
System.arraycopy(compressed, 0, newBuf, 0, compressed.length);
compressed = newBuf; // Uncompress
} try {
} int uncompressedLength = Snappy.uncompressedLength(compressed, 0, cursor);
uncompressed = new byte[uncompressedLength];
finishedReading = true; Snappy.uncompress(compressed, 0, cursor, uncompressed, 0);
this.uncompressedCursor = 0;
// Uncompress this.uncompressedLimit = uncompressedLength;
try { }
int uncompressedLength = Snappy.uncompressedLength(compressed, 0, cursor); catch (SnappyException e) {
uncompressed = new byte[uncompressedLength]; throw new IOException(e.getMessage());
Snappy.uncompress(compressed, 0, cursor, uncompressed, 0); }
this.uncompressedCursor = 0;
this.uncompressedLimit = uncompressedLength; }
}
catch (SnappyException e) { @Override
throw new IOException(e.getMessage()); public int read(byte[] b, int off, int len) throws IOException {
} int writtenBytes = 0;
for (; writtenBytes < len;) {
} if (uncompressedCursor >= uncompressedLimit) {
if (hasNextChunk())
@Override continue;
public int read(byte[] b, int off, int len) throws IOException { else {
int writtenBytes = 0; return writtenBytes == 0 ? -1 : writtenBytes;
for (; writtenBytes < len;) { }
if (uncompressedCursor >= uncompressedLimit) { }
if (hasNextChunk()) int bytesToWrite = Math.min(uncompressedLimit - uncompressedCursor, len - writtenBytes);
continue; System.arraycopy(uncompressed, uncompressedCursor, b, off + writtenBytes, bytesToWrite);
else { writtenBytes += bytesToWrite;
return writtenBytes == 0 ? -1 : writtenBytes; uncompressedCursor += bytesToWrite;
} }
}
int bytesToWrite = Math.min(uncompressedLimit - uncompressedCursor, len - writtenBytes); return writtenBytes;
System.arraycopy(uncompressed, uncompressedCursor, b, off + writtenBytes, bytesToWrite); }
writtenBytes += bytesToWrite;
uncompressedCursor += bytesToWrite; protected boolean hasNextChunk() throws IOException {
} if (finishedReading)
return false;
return writtenBytes;
} uncompressedCursor = 0;
uncompressedLimit = 0;
protected boolean hasNextChunk() throws IOException {
if (finishedReading) int chunkSizeDataLen = in.read(chunkSizeBuf, 0, 4);
return false; if (chunkSizeDataLen < 4) {
finishedReading = true;
uncompressedCursor = 0; return false;
uncompressedLimit = 0; }
int chunkSize = SnappyOutputStream.readInt(chunkSizeBuf, 0);
int chunkSizeDataLen = in.read(chunkSizeBuf, 0, 4); // extend the compressed data buffer size
if (chunkSizeDataLen < 4) { if (compressed == null || chunkSize > compressed.length) {
finishedReading = true; compressed = new byte[chunkSize];
return false; }
} int readBytes = in.read(compressed, 0, chunkSize);
int chunkSize = SnappyOutputStream.readInt(chunkSizeBuf, 0); if (readBytes < chunkSize) {
// extend the compressed data buffer size throw new IOException("failed to read chunk");
if (chunkSize > compressed.length) { }
compressed = new byte[chunkSize]; try {
} int uncompressedLength = Snappy.uncompressedLength(compressed, 0, chunkSize);
int readBytes = in.read(compressed, 0, chunkSize); if (uncompressed == null || uncompressedLength > uncompressed.length) {
if (readBytes < chunkSize) { uncompressed = new byte[uncompressedLength];
throw new IOException("failed to read chunk"); }
} int actualUncompressedLength = Snappy.uncompress(compressed, 0, chunkSize, uncompressed, 0);
try { if (uncompressedLength != actualUncompressedLength) {
int uncompressedLength = Snappy.uncompressedLength(compressed, 0, chunkSize); throw new IOException("invalid uncompressed byte size");
if (uncompressedLength > uncompressed.length) { }
uncompressed = new byte[uncompressedLength]; uncompressedLimit = actualUncompressedLength;
} }
int actualUncompressedLength = Snappy.uncompress(compressed, 0, chunkSize, uncompressed, 0); catch (SnappyException e) {
if (uncompressedLength != actualUncompressedLength) { throw new IOException("failed to uncompress the chunk: " + e.getMessage());
throw new IOException("invalid uncompressed byte size"); }
}
uncompressedLimit = actualUncompressedLength; return true;
} }
catch (SnappyException e) {
throw new IOException("failed to uncompress the chunk: " + e.getMessage()); @Override
} public int read() throws IOException {
if (uncompressedCursor < uncompressedLimit) {
return true; return uncompressed[uncompressedCursor++] & 0xFF;
} }
else {
@Override if (hasNextChunk())
public int read() throws IOException { return read();
if (uncompressedCursor < uncompressedLimit) { else
return uncompressed[uncompressedCursor++] & 0xFF; return -1;
} }
else { }
if (hasNextChunk())
return read(); }
else
return -1;
}
}
}