Merge master
This commit is contained in:
parent
a5de996866
commit
f80b3f0041
23
build.sbt
23
build.sbt
|
@ -67,6 +67,26 @@ javacOptions in doc := {
|
||||||
opts
|
opts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Configuration for SnappyHadoopCompatibleOutputStream testing
|
||||||
|
fork in Test := true
|
||||||
|
import java.io.File
|
||||||
|
val libTemp = {
|
||||||
|
val path = s"${System.getProperty("java.io.tmpdir")}/snappy_test_${System.currentTimeMillis()}"
|
||||||
|
// certain older Linux systems (debian/trusty in Travis CI) requires the libsnappy.so, loaded by
|
||||||
|
// libhadoop.so, be copied to the temp path before the child JVM is forked.
|
||||||
|
// because of that, cannot define as an additional task in Test scope
|
||||||
|
IO.copyFile(file("src/test/resources/lib/Linux/libsnappy.so"), file(s"$path/libsnappy.so"))
|
||||||
|
IO.copyFile(file("src/test/resources/lib/Linux/libsnappy.so"), file(s"$path/libsnappy.so.1"))
|
||||||
|
path
|
||||||
|
}
|
||||||
|
|
||||||
|
val macOSXLibPath = s"$libTemp:${System.getenv("DYLD_LIBRARY_PATH")}"
|
||||||
|
val linuxLibPath = s"$libTemp:${System.getenv("LD_LIBRARY_PATH")}"
|
||||||
|
|
||||||
|
// have to add to system dynamic library path since hadoop native library indirectly load libsnappy.1
|
||||||
|
// can't use javaOptions in Test because it causes the expression to eval twice yielding different temp path values
|
||||||
|
envVars in Test := Map("XERIAL_SNAPPY_LIB" -> libTemp, "DYLD_LIBRARY_PATH" -> macOSXLibPath, "LD_LIBRARY_PATH" -> linuxLibPath)
|
||||||
|
|
||||||
testOptions += Tests.Argument(TestFrameworks.JUnit, "-q", "-v")
|
testOptions += Tests.Argument(TestFrameworks.JUnit, "-q", "-v")
|
||||||
concurrentRestrictions in Global := Seq(Tags.limit(Tags.Test, 1))
|
concurrentRestrictions in Global := Seq(Tags.limit(Tags.Test, 1))
|
||||||
autoScalaLibrary := false
|
autoScalaLibrary := false
|
||||||
|
@ -83,7 +103,8 @@ libraryDependencies ++= Seq(
|
||||||
"org.wvlet.airframe" %% "airframe-log" % "0.25" % "test",
|
"org.wvlet.airframe" %% "airframe-log" % "0.25" % "test",
|
||||||
"org.scalatest" %% "scalatest" % "3.0.4" % "test",
|
"org.scalatest" %% "scalatest" % "3.0.4" % "test",
|
||||||
"org.osgi" % "org.osgi.core" % "4.3.0" % "provided",
|
"org.osgi" % "org.osgi.core" % "4.3.0" % "provided",
|
||||||
"com.novocode" % "junit-interface" % "0.11" % "test"
|
"com.novocode" % "junit-interface" % "0.11" % "test",
|
||||||
|
"org.apache.hadoop" % "hadoop-common" % "2.7.3" % "test" exclude ("org.xerial.snappy", "snappy-java")
|
||||||
)
|
)
|
||||||
|
|
||||||
enablePlugins(SbtOsgi)
|
enablePlugins(SbtOsgi)
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
package org.xerial.snappy;
|
||||||
|
|
||||||
|
import java.io.OutputStream;
|
||||||
|
|
||||||
|
import org.xerial.snappy.buffer.CachedBufferAllocator;
|
||||||
|
|
||||||
|
public class SnappyHadoopCompatibleOutputStream extends SnappyOutputStream
|
||||||
|
{
|
||||||
|
public SnappyHadoopCompatibleOutputStream(OutputStream out)
|
||||||
|
{
|
||||||
|
this(out, DEFAULT_BLOCK_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SnappyHadoopCompatibleOutputStream(OutputStream out, int blockSize)
|
||||||
|
{
|
||||||
|
super(out, blockSize, CachedBufferAllocator.getBufferAllocatorFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int writeHeader()
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void writeBlockPreemble()
|
||||||
|
{
|
||||||
|
writeCurrentDataSize();
|
||||||
|
}
|
||||||
|
}
|
|
@ -72,6 +72,7 @@ public class SnappyOutputStream
|
||||||
protected byte[] outputBuffer;
|
protected byte[] outputBuffer;
|
||||||
private int inputCursor = 0;
|
private int inputCursor = 0;
|
||||||
private int outputCursor = 0;
|
private int outputCursor = 0;
|
||||||
|
private boolean headerWritten;
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
|
|
||||||
public SnappyOutputStream(OutputStream out)
|
public SnappyOutputStream(OutputStream out)
|
||||||
|
@ -101,8 +102,6 @@ public class SnappyOutputStream
|
||||||
|
|
||||||
inputBuffer = inputBufferAllocator.allocate(inputSize);
|
inputBuffer = inputBufferAllocator.allocate(inputSize);
|
||||||
outputBuffer = outputBufferAllocator.allocate(outputSize);
|
outputBuffer = outputBufferAllocator.allocate(outputSize);
|
||||||
|
|
||||||
outputCursor = SnappyCodec.currentHeader.writeHeader(outputBuffer, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
/* (non-Javadoc)
|
||||||
|
@ -369,10 +368,18 @@ public class SnappyOutputStream
|
||||||
return; // no need to dump
|
return; // no need to dump
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!headerWritten) {
|
||||||
|
outputCursor = writeHeader();
|
||||||
|
headerWritten = true;
|
||||||
|
}
|
||||||
|
|
||||||
// Compress and dump the buffer content
|
// Compress and dump the buffer content
|
||||||
if (!hasSufficientOutputBufferFor(inputCursor)) {
|
if (!hasSufficientOutputBufferFor(inputCursor)) {
|
||||||
dumpOutput();
|
dumpOutput();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
writeBlockPreemble();
|
||||||
|
|
||||||
int compressedSize = Snappy.compress(inputBuffer, 0, inputCursor, outputBuffer, outputCursor + 4);
|
int compressedSize = Snappy.compress(inputBuffer, 0, inputCursor, outputBuffer, outputCursor + 4);
|
||||||
// Write compressed data size
|
// Write compressed data size
|
||||||
writeInt(outputBuffer, outputCursor, compressedSize);
|
writeInt(outputBuffer, outputCursor, compressedSize);
|
||||||
|
@ -380,6 +387,24 @@ public class SnappyOutputStream
|
||||||
inputCursor = 0;
|
inputCursor = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected int writeHeader(){
|
||||||
|
return SnappyCodec.currentHeader.writeHeader(outputBuffer, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Optionally write a preemble before a block. Hadoop requires the actual block data size being written. This base
|
||||||
|
* implementation does nothing. Derive implementation can call {@code writeCurrentDataSize()}.
|
||||||
|
*/
|
||||||
|
protected void writeBlockPreemble()
|
||||||
|
{
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void writeCurrentDataSize(){
|
||||||
|
writeInt(outputBuffer, outputCursor, inputCursor);
|
||||||
|
outputCursor += 4;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* close the stream
|
* close the stream
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,140 @@
|
||||||
|
package org.xerial.snappy;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.commons.lang.SystemUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.compress.SnappyCodec;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.*;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class SnappyHadoopCompatibleOutputStreamTest
|
||||||
|
{
|
||||||
|
|
||||||
|
private static File tempNativeLibFolder;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void loadHadoopNativeLibrary() throws Exception
|
||||||
|
{
|
||||||
|
final String libResourceFolder;
|
||||||
|
Map<String, String> libraryNames = new LinkedHashMap<>();
|
||||||
|
if (SystemUtils.IS_OS_LINUX) {
|
||||||
|
libResourceFolder = "/lib/Linux";
|
||||||
|
libraryNames.put("libhadoop.so", "libhadoop.so");
|
||||||
|
// certain Linux systems need these shared library be copied before the JVM started, see build.sbt
|
||||||
|
libraryNames.put("libsnappy.so", "libsnappy.so");
|
||||||
|
libraryNames.put("libsnappy.so.1", "libsnappy.so");
|
||||||
|
} else if (SystemUtils.IS_OS_MAC_OSX) {
|
||||||
|
libResourceFolder = "/lib/MacOSX";
|
||||||
|
libraryNames.put("libhadoop.dylib", "libhadoop.dylib");
|
||||||
|
libraryNames.put("libsnappy.dylib", "libsnappy.dylib");
|
||||||
|
libraryNames.put("libsnappy.1.dylib", "libsnappy.dylib");
|
||||||
|
} else {
|
||||||
|
return; // not support
|
||||||
|
}
|
||||||
|
|
||||||
|
String testLibDir = System.getenv("XERIAL_SNAPPY_LIB");
|
||||||
|
|
||||||
|
tempNativeLibFolder = new File(testLibDir);
|
||||||
|
tempNativeLibFolder.mkdirs();
|
||||||
|
|
||||||
|
for (Map.Entry<String, String> entry : libraryNames.entrySet()) {
|
||||||
|
copyNativeLibraryToFS(libResourceFolder, entry.getValue(), entry.getKey());
|
||||||
|
}
|
||||||
|
|
||||||
|
System.setProperty("java.library.path", tempNativeLibFolder.getAbsolutePath());
|
||||||
|
|
||||||
|
// credit: https://stackoverflow.com/questions/15409223/adding-new-paths-for-native-libraries-at-runtime-in-java
|
||||||
|
//set sys_paths to null so that java.library.path will be reevalueted next time it is needed
|
||||||
|
final Field sysPathsField = ClassLoader.class.getDeclaredField("sys_paths");
|
||||||
|
sysPathsField.setAccessible(true);
|
||||||
|
sysPathsField.set(null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void copyNativeLibraryToFS(String libResourceFolder, String libraryName, String toLibraryName) {
|
||||||
|
final String libraryResourceName = libResourceFolder + "/" + libraryName;
|
||||||
|
final File libraryPath = new File(tempNativeLibFolder, toLibraryName);
|
||||||
|
try (InputStream inputStream = SnappyHadoopCompatibleOutputStream.class.getResourceAsStream(libraryResourceName);
|
||||||
|
FileOutputStream outputStream = new FileOutputStream(libraryPath)) {
|
||||||
|
IOUtils.copy(inputStream, outputStream);
|
||||||
|
FileDescriptor fd = outputStream.getFD();
|
||||||
|
fd.sync();
|
||||||
|
} catch (IOException ex) {
|
||||||
|
throw new RuntimeException(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void cleanUpLibraryFolder()
|
||||||
|
{
|
||||||
|
FileUtils.deleteQuietly(tempNativeLibFolder);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testXerialCompressionHadoopDecompressionCodec() throws Exception
|
||||||
|
{
|
||||||
|
File inputFile = File.createTempFile("TEST_hadoop_compatibility", ".txt");
|
||||||
|
File snappyFile = File.createTempFile("TEST_hadoop_compatibility", ".snappy");
|
||||||
|
InputStream snappyInput = null;
|
||||||
|
FileOutputStream outputStream = new FileOutputStream(inputFile);
|
||||||
|
try {
|
||||||
|
String text = "";
|
||||||
|
for (int i = 0; i < 1024; i++) {
|
||||||
|
text += "Some long long strings to be compressed. Some long long strings to be compressed.";
|
||||||
|
}
|
||||||
|
text += "odd bytes";
|
||||||
|
final byte[] bytes = text.getBytes("UTF-8");
|
||||||
|
outputStream.write(bytes);
|
||||||
|
outputStream.flush();
|
||||||
|
outputStream.close();
|
||||||
|
|
||||||
|
compress(inputFile, snappyFile);
|
||||||
|
|
||||||
|
if (tempNativeLibFolder != null) {
|
||||||
|
SnappyCodec hadoopCodec = new SnappyCodec();
|
||||||
|
hadoopCodec.setConf(new Configuration());
|
||||||
|
snappyInput = hadoopCodec.createInputStream(new FileInputStream(snappyFile));
|
||||||
|
byte[] buf = new byte[bytes.length];
|
||||||
|
int byteRead = IOUtils.read(snappyInput, buf);
|
||||||
|
String decompressed = new String(buf, 0, byteRead, "UTF-8");
|
||||||
|
Assert.assertEquals(decompressed, text);
|
||||||
|
} else {
|
||||||
|
System.err.println("WARNING: no hadoop library for this platform. skip hadoop decompression test");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (snappyInput != null) {
|
||||||
|
snappyInput.close();
|
||||||
|
}
|
||||||
|
inputFile.delete();
|
||||||
|
snappyFile.delete();
|
||||||
|
outputStream.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void compress(File inputPath, File outputPath) throws Exception
|
||||||
|
{
|
||||||
|
FileInputStream fileInputStream = new FileInputStream(inputPath);
|
||||||
|
FileOutputStream fileOutputStream = new FileOutputStream(outputPath);
|
||||||
|
try {
|
||||||
|
InputStream inputStream = new BufferedInputStream(fileInputStream);
|
||||||
|
OutputStream outputStream = new SnappyHadoopCompatibleOutputStream(fileOutputStream);
|
||||||
|
int readCount;
|
||||||
|
byte[] buffer = new byte[64 * 1024];
|
||||||
|
while ((readCount = inputStream.read(buffer)) > 0) {
|
||||||
|
outputStream.write(buffer, 0, readCount);
|
||||||
|
}
|
||||||
|
inputStream.close();
|
||||||
|
outputStream.close();
|
||||||
|
} finally {
|
||||||
|
fileInputStream.close();
|
||||||
|
fileOutputStream.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,8 @@
|
||||||
|
log4j.rootLogger = INFO, console
|
||||||
|
|
||||||
|
# Set the appender named X to be a File appender
|
||||||
|
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||||
|
|
||||||
|
# Define the layout for X appender
|
||||||
|
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||||
|
log4j.appender.console.layout.conversionPattern=%m%n
|
Loading…
Reference in New Issue