mirror of
https://github.com/xerial/snappy-java.git
synced 2025-09-23 18:18:03 +02:00
Merge pull request #108 from JoshRosen/close-should-be-idempotent
Make SnappyOutputStream.close() idempotent; guard against writing to closed stream
This commit is contained in:
commit
5bd06f4526
@ -65,10 +65,12 @@ public class SnappyOutputStream extends OutputStream {
|
|||||||
private final BufferAllocator inputBufferAllocator;
|
private final BufferAllocator inputBufferAllocator;
|
||||||
private final BufferAllocator outputBufferAllocator;
|
private final BufferAllocator outputBufferAllocator;
|
||||||
|
|
||||||
protected final byte[] inputBuffer;
|
// The input and output buffer fields are set to null when closing this stream:
|
||||||
protected final byte[] outputBuffer;
|
protected byte[] inputBuffer;
|
||||||
|
protected byte[] outputBuffer;
|
||||||
private int inputCursor = 0;
|
private int inputCursor = 0;
|
||||||
private int outputCursor = 0;
|
private int outputCursor = 0;
|
||||||
|
private boolean closed;
|
||||||
|
|
||||||
public SnappyOutputStream(OutputStream out) {
|
public SnappyOutputStream(OutputStream out) {
|
||||||
this(out, DEFAULT_BLOCK_SIZE);
|
this(out, DEFAULT_BLOCK_SIZE);
|
||||||
@ -231,6 +233,9 @@ public class SnappyOutputStream extends OutputStream {
|
|||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void rawWrite(Object array, int byteOffset, int byteLength) throws IOException {
|
public void rawWrite(Object array, int byteOffset, int byteLength) throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
throw new IOException("Stream is closed");
|
||||||
|
}
|
||||||
int cursor = 0;
|
int cursor = 0;
|
||||||
while(cursor < byteLength) {
|
while(cursor < byteLength) {
|
||||||
int readLen = Math.min(byteLength - cursor, blockSize - inputCursor);
|
int readLen = Math.min(byteLength - cursor, blockSize - inputCursor);
|
||||||
@ -258,6 +263,9 @@ public class SnappyOutputStream extends OutputStream {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void write(int b) throws IOException {
|
public void write(int b) throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
throw new IOException("Stream is closed");
|
||||||
|
}
|
||||||
if(inputCursor >= inputBuffer.length) {
|
if(inputCursor >= inputBuffer.length) {
|
||||||
compressInput();
|
compressInput();
|
||||||
}
|
}
|
||||||
@ -269,6 +277,9 @@ public class SnappyOutputStream extends OutputStream {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void flush() throws IOException {
|
public void flush() throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
throw new IOException("Stream is closed");
|
||||||
|
}
|
||||||
compressInput();
|
compressInput();
|
||||||
dumpOutput();
|
dumpOutput();
|
||||||
out.flush();
|
out.flush();
|
||||||
@ -320,12 +331,18 @@ public class SnappyOutputStream extends OutputStream {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
if (closed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
flush();
|
flush();
|
||||||
out.close();
|
out.close();
|
||||||
} finally {
|
} finally {
|
||||||
|
closed = true;
|
||||||
inputBufferAllocator.release(inputBuffer);
|
inputBufferAllocator.release(inputBuffer);
|
||||||
outputBufferAllocator.release(outputBuffer);
|
outputBufferAllocator.release(outputBuffer);
|
||||||
|
inputBuffer = null;
|
||||||
|
outputBuffer = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,8 +29,13 @@ import static org.junit.Assert.*;
|
|||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.ref.WeakReference;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.xerial.snappy.buffer.BufferAllocatorFactory;
|
||||||
|
import org.xerial.snappy.buffer.CachedBufferAllocator;
|
||||||
|
import org.xerial.snappy.buffer.DefaultBufferAllocator;
|
||||||
import org.xerial.util.FileResource;
|
import org.xerial.util.FileResource;
|
||||||
import org.xerial.util.log.Logger;
|
import org.xerial.util.log.Logger;
|
||||||
|
|
||||||
@ -157,6 +162,83 @@ public class SnappyOutputStreamTest
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void closeShouldBeIdempotent() throws Exception {
|
||||||
|
// Regression test for issue #107, a bug where close() was non-idempotent and would release
|
||||||
|
// its buffers to the allocator multiple times, which could cause scenarios where two open
|
||||||
|
// SnappyOutputStreams could share the same buffers, leading to stream corruption issues.
|
||||||
|
final BufferAllocatorFactory bufferAllocatorFactory = CachedBufferAllocator.factory;
|
||||||
|
final int BLOCK_SIZE = 4096;
|
||||||
|
// Create a stream, use it, then close it once:
|
||||||
|
ByteArrayOutputStream ba1 = new ByteArrayOutputStream();
|
||||||
|
SnappyOutputStream os1 = new SnappyOutputStream(ba1, BLOCK_SIZE, bufferAllocatorFactory);
|
||||||
|
os1.write(42);
|
||||||
|
os1.close();
|
||||||
|
// Create a new output stream, which should end up re-using the first stream's freed buffers
|
||||||
|
ByteArrayOutputStream ba2 = new ByteArrayOutputStream();
|
||||||
|
SnappyOutputStream os2 = new SnappyOutputStream(ba2, BLOCK_SIZE, bufferAllocatorFactory);
|
||||||
|
// Close the first stream a second time, which is supposed to be safe due to idempotency:
|
||||||
|
os1.close();
|
||||||
|
// Allocate a third output stream, which is supposed to get its own fresh set of buffers:
|
||||||
|
ByteArrayOutputStream ba3 = new ByteArrayOutputStream();
|
||||||
|
SnappyOutputStream os3 = new SnappyOutputStream(ba3, BLOCK_SIZE, bufferAllocatorFactory);
|
||||||
|
// Since the second and third streams should have distinct sets of buffers, writes to these
|
||||||
|
// streams should not interfere with one another:
|
||||||
|
os2.write(2);
|
||||||
|
os3.write(3);
|
||||||
|
os2.close();
|
||||||
|
os3.close();
|
||||||
|
SnappyInputStream in2 = new SnappyInputStream(new ByteArrayInputStream(ba2.toByteArray()));
|
||||||
|
assertEquals(2, in2.read());
|
||||||
|
in2.close();
|
||||||
|
SnappyInputStream in3 = new SnappyInputStream(new ByteArrayInputStream(ba3.toByteArray()));
|
||||||
|
assertEquals(3, in3.read());
|
||||||
|
in3.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void writingToClosedStreamShouldThrowIOException() throws IOException {
|
||||||
|
ByteArrayOutputStream b = new ByteArrayOutputStream();
|
||||||
|
SnappyOutputStream os = new SnappyOutputStream(b);
|
||||||
|
os.close();
|
||||||
|
try {
|
||||||
|
os.write(4);
|
||||||
|
fail("Expected write() to throw IOException");
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Expected exception
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
os.write(new int[] { 1, 2, 3, 4});
|
||||||
|
fail("Expected write() to throw IOException");
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Expected exception
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void flushingClosedStreamShouldThrowIOException() throws IOException {
|
||||||
|
ByteArrayOutputStream b = new ByteArrayOutputStream();
|
||||||
|
SnappyOutputStream os = new SnappyOutputStream(b);
|
||||||
|
os.close();
|
||||||
|
try {
|
||||||
|
os.flush();
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Expected exception
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void closingStreamShouldMakeBuffersEligibleForGarbageCollection() throws IOException {
|
||||||
|
ByteArrayOutputStream b = new ByteArrayOutputStream();
|
||||||
|
SnappyOutputStream os = new SnappyOutputStream(b, 4095, DefaultBufferAllocator.factory);
|
||||||
|
WeakReference<byte[]> inputBuffer = new WeakReference<byte[]>(os.inputBuffer);
|
||||||
|
WeakReference<byte[]> outputBuffer = new WeakReference<byte[]>(os.inputBuffer);
|
||||||
|
os.close();
|
||||||
|
System.gc();
|
||||||
|
assertNull(inputBuffer.get());
|
||||||
|
assertNull(outputBuffer.get());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void longArrayCompress() throws Exception {
|
public void longArrayCompress() throws Exception {
|
||||||
long[] l = new long[10];
|
long[] l = new long[10];
|
||||||
|
Loading…
x
Reference in New Issue
Block a user