diff --git a/build.sbt b/build.sbt index 309c60f..44ab21a 100644 --- a/build.sbt +++ b/build.sbt @@ -67,6 +67,26 @@ javacOptions in doc := { opts } +// Configuration for SnappyHadoopCompatibleOutputStream testing +fork in Test := true +import java.io.File +val libTemp = { + val path = s"${System.getProperty("java.io.tmpdir")}/snappy_test_${System.currentTimeMillis()}" + // certain older Linux systems (debian/trusty in Travis CI) requires the libsnappy.so, loaded by + // libhadoop.so, be copied to the temp path before the child JVM is forked. + // because of that, cannot define as an additional task in Test scope + IO.copyFile(file("src/test/resources/lib/Linux/libsnappy.so"), file(s"$path/libsnappy.so")) + IO.copyFile(file("src/test/resources/lib/Linux/libsnappy.so"), file(s"$path/libsnappy.so.1")) + path +} + +val macOSXLibPath = s"$libTemp:${System.getenv("DYLD_LIBRARY_PATH")}" +val linuxLibPath = s"$libTemp:${System.getenv("LD_LIBRARY_PATH")}" + +// have to add to system dynamic library path since hadoop native library indirectly load libsnappy.1 +// can't use javaOptions in Test because it causes the expression to eval twice yielding different temp path values +envVars in Test := Map("XERIAL_SNAPPY_LIB" -> libTemp, "DYLD_LIBRARY_PATH" -> macOSXLibPath, "LD_LIBRARY_PATH" -> linuxLibPath) + testOptions += Tests.Argument(TestFrameworks.JUnit, "-q", "-v") concurrentRestrictions in Global := Seq(Tags.limit(Tags.Test, 1)) autoScalaLibrary := false @@ -83,7 +103,8 @@ libraryDependencies ++= Seq( "org.wvlet.airframe" %% "airframe-log" % "0.25" % "test", "org.scalatest" %% "scalatest" % "3.0.4" % "test", "org.osgi" % "org.osgi.core" % "4.3.0" % "provided", - "com.novocode" % "junit-interface" % "0.11" % "test" + "com.novocode" % "junit-interface" % "0.11" % "test", + "org.apache.hadoop" % "hadoop-common" % "2.7.3" % "test" exclude ("org.xerial.snappy", "snappy-java") ) enablePlugins(SbtOsgi) diff --git a/src/main/java/org/xerial/snappy/SnappyHadoopCompatibleOutputStream.java b/src/main/java/org/xerial/snappy/SnappyHadoopCompatibleOutputStream.java new file mode 100644 index 0000000..67d2d18 --- /dev/null +++ b/src/main/java/org/xerial/snappy/SnappyHadoopCompatibleOutputStream.java @@ -0,0 +1,30 @@ +package org.xerial.snappy; + +import java.io.OutputStream; + +import org.xerial.snappy.buffer.CachedBufferAllocator; + +public class SnappyHadoopCompatibleOutputStream extends SnappyOutputStream +{ + public SnappyHadoopCompatibleOutputStream(OutputStream out) + { + this(out, DEFAULT_BLOCK_SIZE); + } + + public SnappyHadoopCompatibleOutputStream(OutputStream out, int blockSize) + { + super(out, blockSize, CachedBufferAllocator.getBufferAllocatorFactory()); + } + + @Override + protected int writeHeader() + { + return 0; + } + + @Override + protected void writeBlockPreemble() + { + writeCurrentDataSize(); + } +} diff --git a/src/main/java/org/xerial/snappy/SnappyOutputStream.java b/src/main/java/org/xerial/snappy/SnappyOutputStream.java index 0812679..4cfe771 100755 --- a/src/main/java/org/xerial/snappy/SnappyOutputStream.java +++ b/src/main/java/org/xerial/snappy/SnappyOutputStream.java @@ -72,6 +72,7 @@ public class SnappyOutputStream protected byte[] outputBuffer; private int inputCursor = 0; private int outputCursor = 0; + private boolean headerWritten; private boolean closed; public SnappyOutputStream(OutputStream out) @@ -101,8 +102,6 @@ public class SnappyOutputStream inputBuffer = inputBufferAllocator.allocate(inputSize); outputBuffer = outputBufferAllocator.allocate(outputSize); - - outputCursor = SnappyCodec.currentHeader.writeHeader(outputBuffer, 0); } /* (non-Javadoc) @@ -369,10 +368,18 @@ public class SnappyOutputStream return; // no need to dump } + if (!headerWritten) { + outputCursor = writeHeader(); + headerWritten = true; + } + // Compress and dump the buffer content if (!hasSufficientOutputBufferFor(inputCursor)) { dumpOutput(); } + + writeBlockPreemble(); + int compressedSize = Snappy.compress(inputBuffer, 0, inputCursor, outputBuffer, outputCursor + 4); // Write compressed data size writeInt(outputBuffer, outputCursor, compressedSize); @@ -380,6 +387,24 @@ public class SnappyOutputStream inputCursor = 0; } + protected int writeHeader(){ + return SnappyCodec.currentHeader.writeHeader(outputBuffer, 0); + } + + /** + * Optionally write a preemble before a block. Hadoop requires the actual block data size being written. This base + * implementation does nothing. Derive implementation can call {@code writeCurrentDataSize()}. + */ + protected void writeBlockPreemble() + { + // do nothing + } + + protected void writeCurrentDataSize(){ + writeInt(outputBuffer, outputCursor, inputCursor); + outputCursor += 4; + } + /** * close the stream */ diff --git a/src/test/java/org/xerial/snappy/SnappyHadoopCompatibleOutputStreamTest.java b/src/test/java/org/xerial/snappy/SnappyHadoopCompatibleOutputStreamTest.java new file mode 100644 index 0000000..d478102 --- /dev/null +++ b/src/test/java/org/xerial/snappy/SnappyHadoopCompatibleOutputStreamTest.java @@ -0,0 +1,140 @@ +package org.xerial.snappy; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.SystemUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.SnappyCodec; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.*; +import java.lang.reflect.Field; +import java.util.LinkedHashMap; +import java.util.Map; + +public class SnappyHadoopCompatibleOutputStreamTest +{ + + private static File tempNativeLibFolder; + + @BeforeClass + public static void loadHadoopNativeLibrary() throws Exception + { + final String libResourceFolder; + Map libraryNames = new LinkedHashMap<>(); + if (SystemUtils.IS_OS_LINUX) { + libResourceFolder = "/lib/Linux"; + libraryNames.put("libhadoop.so", "libhadoop.so"); + // certain Linux systems need these shared library be copied before the JVM started, see build.sbt + libraryNames.put("libsnappy.so", "libsnappy.so"); + libraryNames.put("libsnappy.so.1", "libsnappy.so"); + } else if (SystemUtils.IS_OS_MAC_OSX) { + libResourceFolder = "/lib/MacOSX"; + libraryNames.put("libhadoop.dylib", "libhadoop.dylib"); + libraryNames.put("libsnappy.dylib", "libsnappy.dylib"); + libraryNames.put("libsnappy.1.dylib", "libsnappy.dylib"); + } else { + return; // not support + } + + String testLibDir = System.getenv("XERIAL_SNAPPY_LIB"); + + tempNativeLibFolder = new File(testLibDir); + tempNativeLibFolder.mkdirs(); + + for (Map.Entry entry : libraryNames.entrySet()) { + copyNativeLibraryToFS(libResourceFolder, entry.getValue(), entry.getKey()); + } + + System.setProperty("java.library.path", tempNativeLibFolder.getAbsolutePath()); + + // credit: https://stackoverflow.com/questions/15409223/adding-new-paths-for-native-libraries-at-runtime-in-java + //set sys_paths to null so that java.library.path will be reevalueted next time it is needed + final Field sysPathsField = ClassLoader.class.getDeclaredField("sys_paths"); + sysPathsField.setAccessible(true); + sysPathsField.set(null, null); + } + + private static void copyNativeLibraryToFS(String libResourceFolder, String libraryName, String toLibraryName) { + final String libraryResourceName = libResourceFolder + "/" + libraryName; + final File libraryPath = new File(tempNativeLibFolder, toLibraryName); + try (InputStream inputStream = SnappyHadoopCompatibleOutputStream.class.getResourceAsStream(libraryResourceName); + FileOutputStream outputStream = new FileOutputStream(libraryPath)) { + IOUtils.copy(inputStream, outputStream); + FileDescriptor fd = outputStream.getFD(); + fd.sync(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @AfterClass + public static void cleanUpLibraryFolder() + { + FileUtils.deleteQuietly(tempNativeLibFolder); + } + + @Test + public void testXerialCompressionHadoopDecompressionCodec() throws Exception + { + File inputFile = File.createTempFile("TEST_hadoop_compatibility", ".txt"); + File snappyFile = File.createTempFile("TEST_hadoop_compatibility", ".snappy"); + InputStream snappyInput = null; + FileOutputStream outputStream = new FileOutputStream(inputFile); + try { + String text = ""; + for (int i = 0; i < 1024; i++) { + text += "Some long long strings to be compressed. Some long long strings to be compressed."; + } + text += "odd bytes"; + final byte[] bytes = text.getBytes("UTF-8"); + outputStream.write(bytes); + outputStream.flush(); + outputStream.close(); + + compress(inputFile, snappyFile); + + if (tempNativeLibFolder != null) { + SnappyCodec hadoopCodec = new SnappyCodec(); + hadoopCodec.setConf(new Configuration()); + snappyInput = hadoopCodec.createInputStream(new FileInputStream(snappyFile)); + byte[] buf = new byte[bytes.length]; + int byteRead = IOUtils.read(snappyInput, buf); + String decompressed = new String(buf, 0, byteRead, "UTF-8"); + Assert.assertEquals(decompressed, text); + } else { + System.err.println("WARNING: no hadoop library for this platform. skip hadoop decompression test"); + } + } finally { + if (snappyInput != null) { + snappyInput.close(); + } + inputFile.delete(); + snappyFile.delete(); + outputStream.close(); + } + } + + private void compress(File inputPath, File outputPath) throws Exception + { + FileInputStream fileInputStream = new FileInputStream(inputPath); + FileOutputStream fileOutputStream = new FileOutputStream(outputPath); + try { + InputStream inputStream = new BufferedInputStream(fileInputStream); + OutputStream outputStream = new SnappyHadoopCompatibleOutputStream(fileOutputStream); + int readCount; + byte[] buffer = new byte[64 * 1024]; + while ((readCount = inputStream.read(buffer)) > 0) { + outputStream.write(buffer, 0, readCount); + } + inputStream.close(); + outputStream.close(); + } finally { + fileInputStream.close(); + fileOutputStream.close(); + } + } +} diff --git a/src/test/resources/lib/Linux/libhadoop.so b/src/test/resources/lib/Linux/libhadoop.so new file mode 100755 index 0000000..bc75a17 Binary files /dev/null and b/src/test/resources/lib/Linux/libhadoop.so differ diff --git a/src/test/resources/lib/Linux/libsnappy.so b/src/test/resources/lib/Linux/libsnappy.so new file mode 100755 index 0000000..74e33f5 Binary files /dev/null and b/src/test/resources/lib/Linux/libsnappy.so differ diff --git a/src/test/resources/lib/MacOSX/libhadoop.dylib b/src/test/resources/lib/MacOSX/libhadoop.dylib new file mode 100755 index 0000000..ec07a89 Binary files /dev/null and b/src/test/resources/lib/MacOSX/libhadoop.dylib differ diff --git a/src/test/resources/lib/MacOSX/libsnappy.dylib b/src/test/resources/lib/MacOSX/libsnappy.dylib new file mode 100755 index 0000000..d2b91f5 Binary files /dev/null and b/src/test/resources/lib/MacOSX/libsnappy.dylib differ diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties new file mode 100644 index 0000000..9f79d47 --- /dev/null +++ b/src/test/resources/log4j.properties @@ -0,0 +1,8 @@ +log4j.rootLogger = INFO, console + +# Set the appender named X to be a File appender +log4j.appender.console=org.apache.log4j.ConsoleAppender + +# Define the layout for X appender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.conversionPattern=%m%n