diff --git a/src/main/java/org/xerial/snappy/SnappyOutputStream.java b/src/main/java/org/xerial/snappy/SnappyOutputStream.java index c930a4b..709c804 100755 --- a/src/main/java/org/xerial/snappy/SnappyOutputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyOutputStream.java @@ -65,10 +65,12 @@ public class SnappyOutputStream extends OutputStream { private final BufferAllocator inputBufferAllocator; private final BufferAllocator outputBufferAllocator; - protected final byte[] inputBuffer; - protected final byte[] outputBuffer; + // The input and output buffer fields are set to null when closing this stream: + protected byte[] inputBuffer; + protected byte[] outputBuffer; private int inputCursor = 0; private int outputCursor = 0; + private boolean closed; public SnappyOutputStream(OutputStream out) { this(out, DEFAULT_BLOCK_SIZE); @@ -231,6 +233,9 @@ public class SnappyOutputStream extends OutputStream { * @throws IOException */ public void rawWrite(Object array, int byteOffset, int byteLength) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } int cursor = 0; while(cursor < byteLength) { int readLen = Math.min(byteLength - cursor, blockSize - inputCursor); @@ -258,6 +263,9 @@ public class SnappyOutputStream extends OutputStream { */ @Override public void write(int b) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } if(inputCursor >= inputBuffer.length) { compressInput(); } @@ -269,6 +277,9 @@ public class SnappyOutputStream extends OutputStream { */ @Override public void flush() throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } compressInput(); dumpOutput(); out.flush(); @@ -320,12 +331,18 @@ public class SnappyOutputStream extends OutputStream { */ @Override public void close() throws IOException { + if (closed) { + return; + } try { flush(); out.close(); } finally { + closed = true; inputBufferAllocator.release(inputBuffer); outputBufferAllocator.release(outputBuffer); + inputBuffer = null; + outputBuffer = null; } } diff --git a/src/test/java/org/xerial/snappy/SnappyOutputStreamTest.java b/src/test/java/org/xerial/snappy/SnappyOutputStreamTest.java index 10850a8..0942eb0 100755 --- a/src/test/java/org/xerial/snappy/SnappyOutputStreamTest.java +++ b/src/test/java/org/xerial/snappy/SnappyOutputStreamTest.java @@ -29,8 +29,13 @@ import static org.junit.Assert.*; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.lang.ref.WeakReference; import org.junit.Test; +import org.xerial.snappy.buffer.BufferAllocatorFactory; +import org.xerial.snappy.buffer.CachedBufferAllocator; +import org.xerial.snappy.buffer.DefaultBufferAllocator; import org.xerial.util.FileResource; import org.xerial.util.log.Logger; @@ -157,6 +162,83 @@ public class SnappyOutputStreamTest } } + @Test + public void closeShouldBeIdempotent() throws Exception { + // Regression test for issue #107, a bug where close() was non-idempotent and would release + // its buffers to the allocator multiple times, which could cause scenarios where two open + // SnappyOutputStreams could share the same buffers, leading to stream corruption issues. + final BufferAllocatorFactory bufferAllocatorFactory = CachedBufferAllocator.factory; + final int BLOCK_SIZE = 4096; + // Create a stream, use it, then close it once: + ByteArrayOutputStream ba1 = new ByteArrayOutputStream(); + SnappyOutputStream os1 = new SnappyOutputStream(ba1, BLOCK_SIZE, bufferAllocatorFactory); + os1.write(42); + os1.close(); + // Create a new output stream, which should end up re-using the first stream's freed buffers + ByteArrayOutputStream ba2 = new ByteArrayOutputStream(); + SnappyOutputStream os2 = new SnappyOutputStream(ba2, BLOCK_SIZE, bufferAllocatorFactory); + // Close the first stream a second time, which is supposed to be safe due to idempotency: + os1.close(); + // Allocate a third output stream, which is supposed to get its own fresh set of buffers: + ByteArrayOutputStream ba3 = new ByteArrayOutputStream(); + SnappyOutputStream os3 = new SnappyOutputStream(ba3, BLOCK_SIZE, bufferAllocatorFactory); + // Since the second and third streams should have distinct sets of buffers, writes to these + // streams should not interfere with one another: + os2.write(2); + os3.write(3); + os2.close(); + os3.close(); + SnappyInputStream in2 = new SnappyInputStream(new ByteArrayInputStream(ba2.toByteArray())); + assertEquals(2, in2.read()); + in2.close(); + SnappyInputStream in3 = new SnappyInputStream(new ByteArrayInputStream(ba3.toByteArray())); + assertEquals(3, in3.read()); + in3.close(); + } + + @Test + public void writingToClosedStreamShouldThrowIOException() throws IOException { + ByteArrayOutputStream b = new ByteArrayOutputStream(); + SnappyOutputStream os = new SnappyOutputStream(b); + os.close(); + try { + os.write(4); + fail("Expected write() to throw IOException"); + } catch (IOException e) { + // Expected exception + } + try { + os.write(new int[] { 1, 2, 3, 4}); + fail("Expected write() to throw IOException"); + } catch (IOException e) { + // Expected exception + } + } + + @Test + public void flushingClosedStreamShouldThrowIOException() throws IOException { + ByteArrayOutputStream b = new ByteArrayOutputStream(); + SnappyOutputStream os = new SnappyOutputStream(b); + os.close(); + try { + os.flush(); + } catch (IOException e) { + // Expected exception + } + } + + @Test + public void closingStreamShouldMakeBuffersEligibleForGarbageCollection() throws IOException { + ByteArrayOutputStream b = new ByteArrayOutputStream(); + SnappyOutputStream os = new SnappyOutputStream(b, 4095, DefaultBufferAllocator.factory); + WeakReference inputBuffer = new WeakReference(os.inputBuffer); + WeakReference outputBuffer = new WeakReference(os.inputBuffer); + os.close(); + System.gc(); + assertNull(inputBuffer.get()); + assertNull(outputBuffer.get()); + } + @Test public void longArrayCompress() throws Exception { long[] l = new long[10];