Java中的可迭代gzip deflate / inflate

对于隐藏在互联网中的ByteBuffers而言,是否存在用于gzip-deflating的库? 是什么让我们可以推送原始数据然后拉出缩小的数据? 我们已经搜索过它,但只找到了处理InputStreams和OutputStreams的库。

我们的任务是创建gzipfilter,用于在管道体系结构中缩小ByteBuffers流。 这是一种拉结构,其中最后一个元素从早期元素中提取数据。 我们的gzipfilter处理ByteBuffers流,没有单个Stream对象可用。

我们已经玩弄了将数据流调整为某种InputStream然后使用GZipOutputStream来满足我们的要求,但适配器代码的数量至少令人讨厌。

接受后编辑 :为了记录,我们的架构类似于GStreamer等。

马克·阿德勒(Mark Adler)建议采用这种方法非常有用,这比我原来的答案要好得多。

package stack; import java.io.*; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.zip.CRC32; import java.util.zip.Deflater; public class BufferDeflate2 { /** The standard 10 byte GZIP header */ private static final byte[] GZIP_HEADER = new byte[] { 0x1f, (byte) 0x8b, Deflater.DEFLATED, 0, 0, 0, 0, 0, 0, 0 }; /** CRC-32 of uncompressed data. */ private final CRC32 crc = new CRC32(); /** Deflater to deflate data */ private final Deflater deflater = new Deflater(Deflater.BEST_COMPRESSION, true); /** Output buffer building area */ private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); /** Internal transfer space */ private final byte[] transfer = new byte[1000]; /** The flush mode to use at the end of each buffer */ private final int flushMode; /** * New buffer deflater * * @param syncFlush * if true, all data in buffer can be immediately decompressed * from output buffer */ public BufferDeflate2(boolean syncFlush) { flushMode = syncFlush ? Deflater.SYNC_FLUSH : Deflater.NO_FLUSH; buffer.write(GZIP_HEADER, 0, GZIP_HEADER.length); } /** * Deflate the buffer * * @param in * the buffer to deflate * @return deflated representation of the buffer */ public ByteBuffer deflate(ByteBuffer in) { // convert buffer to bytes byte[] inBytes; int off = in.position(); int len = in.remaining(); if( in.hasArray() ) { inBytes = in.array(); } else { off = 0; inBytes = new byte[len]; in.get(inBytes); } // update CRC and deflater crc.update(inBytes, off, len); deflater.setInput(inBytes, off, len); while( !deflater.needsInput() ) { int r = deflater.deflate(transfer, 0, transfer.length, flushMode); buffer.write(transfer, 0, r); } byte[] outBytes = buffer.toByteArray(); buffer.reset(); return ByteBuffer.wrap(outBytes); } /** * Write the final buffer. This writes any remaining compressed data and the GZIP trailer. * @return the final buffer */ public ByteBuffer doFinal() { // finish deflating deflater.finish(); // write all remaining data int r; do { r = deflater.deflate(transfer, 0, transfer.length, Deflater.FULL_FLUSH); buffer.write(transfer, 0, r); } while( r == transfer.length ); // write GZIP trailer writeInt((int) crc.getValue()); writeInt((int) deflater.getBytesRead()); // reset deflater deflater.reset(); // final output byte[] outBytes = buffer.toByteArray(); buffer.reset(); return ByteBuffer.wrap(outBytes); } /** * Write a 32 bit value in little-endian order * * @param v * the value to write */ private void writeInt(int v) { System.out.println("v="+v); buffer.write(v & 0xff); buffer.write((v >> 8) & 0xff); buffer.write((v >> 16) & 0xff); buffer.write((v >> 24) & 0xff); } /** * For testing. Pass in the name of a file to GZIP compress * @param args * @throws IOException */ public static void main(String[] args) throws IOException { File inFile = new File(args[0]); File outFile = new File(args[0]+".test.gz"); FileChannel inChan = (new FileInputStream(inFile)).getChannel(); FileChannel outChan = (new FileOutputStream(outFile)).getChannel(); BufferDeflate2 def = new BufferDeflate2(false); ByteBuffer buf = ByteBuffer.allocate(500); while( true ) { buf.clear(); int r = inChan.read(buf); if( r==-1 ) break; buf.flip(); ByteBuffer compBuf = def.deflate(buf); outChan.write(compBuf); } ByteBuffer compBuf = def.doFinal(); outChan.write(compBuf); inChan.close(); outChan.close(); } } 

我不明白“隐藏在互联网”部分,但zlib做内存gzip格式压缩和解压缩。 java.util.zip API提供了对zlib的一些访问,尽管它是有限的。 由于接口限制,您无法请求zlib直接生成和使用gzip流。 但是,您可以使用nowrap选项生成和使用原始deflate数据。 然后使用java.util.zipCRC32类轻松地滚动自己的gzip头和尾部。 您可以添加一个固定的10字节标头,附加四字节CRC,然后以小端顺序附加四字节未压缩长度(模2 32 ),你就可以了。

处理ByteBuffers并不难。 请参阅下面的示例代码。 您需要知道如何创建缓冲区。 选项是:

  1. 每个缓冲区独立压缩。 这很容易处理我假设情况并非如此。 您只需将缓冲区转换为字节数组并将其包装在GZIPInputStream中的ByteArrayInputStream中。
  2. 每个缓冲区由编写器以SYNC_FLUSH结束,因此包括流内的整个数据块。 读写器可以立即读取写入器写入缓冲区的所有数据。
  3. 每个缓冲区只是GZIP流的一部分。 无法保证读者可以从缓冲区中读取任何内容。

必须按顺序处理GZIP生成的数据。 ByteBuffers必须按照它们生成的相同顺序进行处理。

示例代码:

 package stack; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.Pipe; import java.nio.channels.SelectableChannel; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.GZIPInputStream; public class BufferDeflate { static AtomicInteger idSrc = new AtomicInteger(1); /** Queue for transferring buffers */ final BlockingQueue buffers = new LinkedBlockingQueue(); /** The entry point for deflated buffers */ final Pipe.SinkChannel bufSink; /** The source for the inflater */ final Pipe.SourceChannel infSource; /** The destination for the inflater */ final Pipe.SinkChannel infSink; /** The source for the outside world */ public final SelectableChannel source; class Relayer extends Thread { public Relayer(int id) { super("BufferRelayer" + id); } public void run() { try { while( true ) { ByteBuffer buf = buffers.take(); if( buf != null ) { bufSink.write(buf); } else { bufSink.close(); break; } } } catch (Exception e) { e.printStackTrace(); } } } class Inflater extends Thread { public Inflater(int id) { super("BufferInflater" + id); } public void run() { try { InputStream in = Channels.newInputStream(infSource); GZIPInputStream gzip = new GZIPInputStream(in); OutputStream out = Channels.newOutputStream(infSink); int ch; while( (ch = gzip.read()) != -1 ) { out.write(ch); } out.close(); } catch (Exception e) { e.printStackTrace(); } } } /** * New buffer inflater */ public BufferDeflate() throws IOException { Pipe pipe = Pipe.open(); bufSink = pipe.sink(); infSource = pipe.source(); pipe = Pipe.open(); infSink = pipe.sink(); source = pipe.source().configureBlocking(false); int id = idSrc.incrementAndGet(); Thread thread = new Relayer(id); thread.setDaemon(true); thread.start(); thread = new Inflater(id); thread.setDaemon(true); thread.start(); } /** * Add the buffer to the stream. A null buffer closes the stream * * @param buf * the buffer to add * @throws IOException */ public void add(ByteBuffer buf) throws IOException { buffers.offer(buf); } } 

只需将缓冲区传递给add方法并从公共source通道读取即可。 在处理给定数量的字节之后可以从GZIP读取的数据量是不可能预测的。 因此,我已将source通道设置为非阻塞,因此您可以在添加字节缓冲区的同一线程中安全地读取它。