使用gzip压缩InputStream

我想使用Gzip压缩在java中压缩输入流。

假设我们有一个未压缩的输入流(1GB数据..)。 我希望结果来自源的压缩输入流:

public InputStream getCompressedStream(InputStream unCompressedStream) { // Not working because it's uncompressing the stream, I want the opposite. return new GZIPInputStream(unCompressedStream); } 

要压缩数据,您需要GZIPOutputStream 。 但是,由于您需要像从InputStream那样读取数据,因此需要将OutputStream转换为InputStream。 您可以使用getBytes()来执行此操作:

 GZIPOutputStream gout = new GZIPOutputStream(out); //... Code to read from your original uncompressed data and write to out. //Convert to InputStream. new ByteArrayInputStream(gout.getBytes()); 

但是这种方法有一个限制,你需要首先读入所有数据 – 这意味着你必须有足够的内存来保存该缓冲区。

此线程中提到了使用Pipes的替代方法 – 如何将OutputStream转换为InputStream?

DeflaterInputStream不是你想要的,因为它缺少gzip头文件/预告片并使用稍微不同的压缩。

如果从OutputStream(push)更改为InputStream(pull),则需要执行不同的操作。

GzipOutputStream的作用是:

  • 写一个静态的gzip头
  • 使用DeflaterOutputStream编写一个缩小的流。 写入流时,将根据未压缩的数据构建CRC32校验和,并计算字节数
  • 编写一个包含CRC32校验和和字节数的预告片。

如果要对InputStreams执行相同操作,则需要包含以下内容的流:

  • 标题
  • 泄密的内容
  • 预告片

最好的方法是提供3个不同的流并将它们合并为一个。 幸运的是,有一个SequenceInputStream可以为您组合流。

这是我的实现加上一个简单的unit testing:

 import java.io.ByteArrayInputStream; import java.io.FileInputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.SequenceInputStream; import java.util.Enumeration; import java.util.zip.CRC32; import java.util.zip.Deflater; import java.util.zip.DeflaterInputStream; import java.util.zip.DeflaterOutputStream; /** * @author mwyraz * Wraps an input stream and compresses it's contents. Similiar to DeflateInputStream but adds GZIP-header and trailer * See GzipOutputStream for details. * LICENSE: Free to use. Contains some lines from GzipOutputStream, so oracle's license might apply as well! */ public class GzipCompressingInputStream extends SequenceInputStream { public GzipCompressingInputStream(InputStream in) throws IOException { this(in,512); } public GzipCompressingInputStream(InputStream in, int bufferSize) throws IOException { super(new StatefullGzipStreamEnumerator(in,bufferSize)); } static enum StreamState { HEADER, CONTENT, TRAILER } protected static class StatefullGzipStreamEnumerator implements Enumeration { protected final InputStream in; protected final int bufferSize; protected StreamState state; public StatefullGzipStreamEnumerator(InputStream in, int bufferSize) { this.in=in; this.bufferSize=bufferSize; state=StreamState.HEADER; } public boolean hasMoreElements() { return state!=null; } public InputStream nextElement() { switch (state) { case HEADER: state=StreamState.CONTENT; return createHeaderStream(); case CONTENT: state=StreamState.TRAILER; return createContentStream(); case TRAILER: state=null; return createTrailerStream(); } return null; } static final int GZIP_MAGIC = 0x8b1f; static final byte[] GZIP_HEADER=new byte[] { (byte) GZIP_MAGIC, // Magic number (short) (byte)(GZIP_MAGIC >> 8), // Magic number (short) Deflater.DEFLATED, // Compression method (CM) 0, // Flags (FLG) 0, // Modification time MTIME (int) 0, // Modification time MTIME (int) 0, // Modification time MTIME (int) 0, // Modification time MTIME (int) 0, // Extra flags (XFLG) 0 // Operating system (OS) }; protected InputStream createHeaderStream() { return new ByteArrayInputStream(GZIP_HEADER); } protected InternalGzipCompressingInputStream contentStream; protected InputStream createContentStream() { contentStream=new InternalGzipCompressingInputStream(new CRC32InputStream(in), bufferSize); return contentStream; } protected InputStream createTrailerStream() { return new ByteArrayInputStream(contentStream.createTrailer()); } } /** * Internal stream without header/trailer */ protected static class CRC32InputStream extends FilterInputStream { protected CRC32 crc = new CRC32(); protected long byteCount; public CRC32InputStream(InputStream in) { super(in); } @Override public int read() throws IOException { int val=super.read(); if (val>=0) { crc.update(val); byteCount++; } return val; } @Override public int read(byte[] b, int off, int len) throws IOException { len=super.read(b, off, len); if (len>=0) { crc.update(b,off,len); byteCount+=len; } return len; } public long getCrcValue() { return crc.getValue(); } public long getByteCount() { return byteCount; } } /** * Internal stream without header/trailer */ protected static class InternalGzipCompressingInputStream extends DeflaterInputStream { protected final CRC32InputStream crcIn; public InternalGzipCompressingInputStream(CRC32InputStream in, int bufferSize) { super(in, new Deflater(Deflater.DEFAULT_COMPRESSION, true),bufferSize); crcIn=in; } public void close() throws IOException { if (in != null) { try { def.end(); in.close(); } finally { in = null; } } } protected final static int TRAILER_SIZE = 8; public byte[] createTrailer() { byte[] trailer= new byte[TRAILER_SIZE]; writeTrailer(trailer, 0); return trailer; } /* * Writes GZIP member trailer to a byte array, starting at a given * offset. */ private void writeTrailer(byte[] buf, int offset) { writeInt((int)crcIn.getCrcValue(), buf, offset); // CRC-32 of uncompr. data writeInt((int)crcIn.getByteCount(), buf, offset + 4); // Number of uncompr. bytes } /* * Writes integer in Intel byte order to a byte array, starting at a * given offset. */ private void writeInt(int i, byte[] buf, int offset) { writeShort(i & 0xffff, buf, offset); writeShort((i >> 16) & 0xffff, buf, offset + 2); } /* * Writes short integer in Intel byte order to a byte array, starting * at a given offset */ private void writeShort(int s, byte[] buf, int offset) { buf[offset] = (byte)(s & 0xff); buf[offset + 1] = (byte)((s >> 8) & 0xff); } } } 

 import static org.junit.Assert.*; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.zip.CRC32; import java.util.zip.GZIPInputStream; import org.junit.Test; public class TestGzipCompressingInputStream { @Test public void test() throws Exception { testCompressor("test1 test2 test3"); testCompressor("1MB binary data",createTestPattern(1024*1024)); for (int i=0;i<4096;i++) { testCompressor(i+" bytes of binary data",createTestPattern(i)); } } protected byte[] createTestPattern(int size) { byte[] data=new byte[size]; byte pattern=0; for (int i=0;i 

如果您不想将内容加载到大字节数组并需要真正的流式解决方案:

 package xyz; import org.apache.commons.io.IOUtils; import java.io.*; import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipOutputStream; /** * Stream Compression Utility * * @author Thamme Gowda N */ public enum CompressionUtil { INSTANCE; public static final int NUM_THREADS = 5; private final ExecutorService pool; CompressionUtil(){ this.pool = Executors.newFixedThreadPool(NUM_THREADS); } public static CompressionUtil getInstance(){ return INSTANCE; } /** * Supported compression type names */ public static enum CompressionType { GZIP, ZIP } /** * Wraps the given stream in a Compressor stream based on given type * @param sourceStream : Stream to be wrapped * @param type : Compression type * @return source stream wrapped in a compressor stream * @throws IOException when some thing bad happens */ public static OutputStream getCompressionWrapper(OutputStream sourceStream, CompressionType type) throws IOException { switch (type) { case GZIP: return new GZIPOutputStream(sourceStream); case ZIP: return new ZipOutputStream(sourceStream); default: throw new IllegalArgumentException("Possible values :" + Arrays.toString(CompressionType.values())); } } /** * Gets Compressed Stream for given input Stream * @param sourceStream : Input Stream to be compressed to * @param type: Compression types such as GZIP * @return Compressed Stream * @throws IOException when some thing bad happens */ public static InputStream getCompressedStream(final InputStream sourceStream, CompressionType type ) throws IOException { if(sourceStream == null) { throw new IllegalArgumentException("Source Stream cannot be NULL"); } /** * sourceStream --> zipperOutStream(->intermediateStream -)--> resultStream */ final PipedInputStream resultStream = new PipedInputStream(); final PipedOutputStream intermediateStream = new PipedOutputStream(resultStream); final OutputStream zipperOutStream = getCompressionWrapper(intermediateStream, type); Runnable copyTask = new Runnable() { @Override public void run() { try { int c; while((c = sourceStream.read()) >= 0) { zipperOutStream.write(c); } zipperOutStream.flush(); } catch (IOException e) { IOUtils.closeQuietly(resultStream); // close it on error case only throw new RuntimeException(e); } finally { // close source stream and intermediate streams IOUtils.closeQuietly(sourceStream); IOUtils.closeQuietly(zipperOutStream); IOUtils.closeQuietly(intermediateStream); } } }; getInstance().pool.submit(copyTask); return resultStream; } public static void main(String[] args) throws IOException { String input = "abcdefghij"; InputStream sourceStream = new ByteArrayInputStream(input.getBytes()); InputStream compressedStream = getCompressedStream(sourceStream, CompressionType.GZIP); GZIPInputStream decompressedStream = new GZIPInputStream(compressedStream); List lines = IOUtils.readLines(decompressedStream); String output = lines.get(0); System.out.println("test passed ? " + input.equals(output)); } } 

可以在流行的开源ESB Mule : GZIPCompressorInputStream找到压缩输入流的工作示例。

它使用JRE提供的DeflaterInputStream进行压缩,预先添加gzip头并附加gzip预告片(也就是页脚)。

不幸的是,它属于CPA许可证 ,这似乎并不常见。 此外,似乎没有unit testing。

我好像已经迟到了3年,但也许对某人有用。 我的解决方案类似于@Michael Wyraz的解决方案,唯一不同的是我的解决方案基于FilterInputStream

 import java.io.ByteArrayInputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.util.zip.CRC32; import java.util.zip.Deflater; public class GZipInputStreamDeflater extends FilterInputStream { private static enum Stage { HEADER, DATA, FINALIZATION, TRAILER, FINISH } private GZipInputStreamDeflater.Stage stage = Stage.HEADER; private final Deflater deflater = new Deflater( Deflater.DEFLATED, true ); private final CRC32 crc = new CRC32(); /* GZIP header magic number */ private final static int GZIP_MAGIC = 0x8b1f; private ByteArrayInputStream trailer = null; private ByteArrayInputStream header = new ByteArrayInputStream( new byte[] { (byte) GZIP_MAGIC, // Magic number (short) (byte) ( GZIP_MAGIC >> 8 ), // Magic number (short) Deflater.DEFLATED, // Compression method (CM) 0, // Flags (FLG) 0, // Modification time MTIME (int) 0, // Modification time MTIME (int) 0, // Modification time MTIME (int) 0, // Modification time MTIME (int) 0, // Extra flags (XFLG) 0, // Operating system (OS) } ); public GZipInputStreamDeflater(InputStream in) { super( in ); crc.reset(); } @Override public int read( byte[] b, int off, int len ) throws IOException { int read = -1; switch( stage ) { case FINISH: return -1; case HEADER: read = header.read( b, off, len ); if( header.available() == 0 ) { stage = Stage.DATA; } return read; case DATA: byte[] b2 = new byte[len]; read = super.read( b2, 0, len ); if( read <= 0 ) { stage = Stage.FINALIZATION; deflater.finish(); return 0; } else { deflater.setInput( b2, 0, read ); crc.update( b2, 0, read ); read = 0; while( !deflater.needsInput() && len - read > 0 ) { read += deflater.deflate( b, off + read, len - read, Deflater.NO_FLUSH ); } return read; } case FINALIZATION: if( deflater.finished() ) { stage = Stage.TRAILER; int crcVaue = (int) crc.getValue(); int totalIn = deflater.getTotalIn(); trailer = new ByteArrayInputStream( new byte[] { (byte) ( crcVaue >> 0 ), (byte) ( crcVaue >> 8 ), (byte) ( crcVaue >> 16 ), (byte) ( crcVaue >> 24 ), (byte) ( totalIn >> 0 ), (byte) ( totalIn >> 8 ), (byte) ( totalIn >> 16 ), (byte) ( totalIn >> 24 ), } ); return 0; } else { read = deflater.deflate( b, off, len, Deflater.FULL_FLUSH ); return read; } case TRAILER: read = trailer.read( b, off, len ); if( trailer.available() == 0 ) { stage = Stage.FINISH; } return read; } return -1; } @Override public void close( ) throws IOException { super.close(); deflater.end(); if( trailer != null ) { trailer.close(); } header.close(); } } 

用法:

 AmazonS3Client s3client = new AmazonS3Client( ... ); try ( InputStream in = new GZipInputStreamDeflater( new URL( "http://....../very-big-file.csv" ).openStream() ); ) { PutObjectRequest putRequest = new PutObjectRequest( "BUCKET-NAME", "/object/key", in, new ObjectMetadata() ); s3client.putObject( putRequest ); } 

在这种情况下你不应该看GZIPOutputStream吗?

 public OutputStream getCompressedStream(InputStream input) { OutputStream output = new GZIPOutputStream(new ByteArrayOutputStream()); IOUtils.copy(input, output); return output; } 

JRE中没有DeflatingGZIPInputStream 。 要使用“deflate”压缩格式进行收缩,请使用java.util.zip.DeflaterInputStreamjava.util.zip.DeflaterOutputStream

 public InputStream getCompressedStream(InputStream unCompressedStream) { return new DeflaterInputStream(unCompressedStream); } 

您可以从java.util.zip.DeflaterInputStream派生一个类,该类通过查看java.util.zip.GZIPOutputStream的源来以GZIP格式进行缩减。

您可以使用EasyStream 。

 try(final InputStreamFromOutputStream isOs = new InputStreamFromOutputStream() { @Override protected void produce(final OutputStream dataSink) throws Exception { InputStream in = new GZIPInputStream(unCompressedStream); IOUtils.copy(in, dataSink); } }) { //You can use the compressed input stream here } catch (final IOException e) { //Handle exceptions here } 

PipedOutputStream允许您写入GZIPOutputStream并通过InputStream公开该数据。 它具有固定的内存成本,与将整个数据流缓冲到数组或文件的其他解决方案不同。 唯一的问题是你无法从同一个线程读取和写入,你必须使用一个单独的。

 private InputStream gzipInputStream(InputStream in) throws IOException { PipedInputStream zipped = new PipedInputStream(); PipedOutputStream pipe = new PipedOutputStream(zipped); new Thread( () -> { try(OutputStream zipper = new GZIPOutputStream(pipe)){ IOUtils.copy(in, zipper); } catch (IOException e) { e.printStackTrace(); } } ).start(); return zipped; } 

我建议使用Apache Commons Compress的 GzipCompressorInputStream 。