缓冲后台InputStream实现

我编写了包含其他流的后台InputStream (和OutputStream )实现,并在后台线程上OutputStream读,主要允许解压缩/压缩在解压缩流处理的不同线程中发生。

这是一个相当标准的生产者/消费者模型。

这似乎是一种简单的方法,可以通过简单的流程读取,处理和写入数据来充分利用多核CPU,从而更有效地利用CPU和磁盘资源。 也许“高效”不是最好的词,但与直接从ZipInputStream读取并直接写入ZipOutputStream相比,它提供了更高的利用率,对我来说更感兴趣,减少了运行时间。

我很高兴发布代码,但我的问题是我是否正在重新发明现有(和运算量更大)库中现有的东西?

编辑 – 发布代码……

我的BackgroundInputStream代码如下( BackgroundOutputStream非常相似),但我想改进它的各个方面。

  1. 看起来我的工作太过艰难,无法前后传递缓冲区。
  2. 如果调用代码抛弃了对BackgroundInputStream引用,则backgroundReaderThread将永远挂起。
  3. 信号eof需要改进。
  4. 应将exception传播到前台线程。
  5. 我想允许使用来自提供的Executor的线程。
  6. close()方法应该通知后台线程,并且不应该关闭包装的流,因为包装的流应该由从中读取的后台线程拥有。
  7. 在关闭之后做一些愚蠢的事情应该适当地照顾。

 package nz.co.datacute.io; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.concurrent.LinkedBlockingQueue; public class BackgroundInputStream extends InputStream { private static final int DEFAULT_QUEUE_SIZE = 1; private static final int DEFAULT_BUFFER_SIZE = 64*1024; private final int queueSize; private final int bufferSize; private volatile boolean eof = false; private LinkedBlockingQueue bufferQueue; private final InputStream wrappedInputStream; private byte[] currentBuffer; private volatile byte[] freeBuffer; private int pos; public BackgroundInputStream(InputStream wrappedInputStream) { this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE); } public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) { this.wrappedInputStream = wrappedInputStream; this.queueSize = queueSize; this.bufferSize = bufferSize; } @Override public int read() throws IOException { if (bufferQueue == null) { bufferQueue = new LinkedBlockingQueue(queueSize); BackgroundReader backgroundReader = new BackgroundReader(); Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream"); backgroundReaderThread.start(); } if (currentBuffer == null) { try { if ((!eof) || (bufferQueue.size() > 0)) { currentBuffer = bufferQueue.take(); pos = 0; } else { return -1; } } catch (InterruptedException e) { e.printStackTrace(); } } int b = currentBuffer[pos++]; if (pos == currentBuffer.length) { freeBuffer = currentBuffer; currentBuffer = null; } return b; } @Override public int available() throws IOException { if (currentBuffer == null) return 0; return currentBuffer.length; } @Override public void close() throws IOException { wrappedInputStream.close(); currentBuffer = null; freeBuffer = null; } class BackgroundReader implements Runnable { @Override public void run() { try { while (!eof) { byte[] newBuffer; if (freeBuffer != null) { newBuffer = freeBuffer; freeBuffer = null; } else { newBuffer = new byte[bufferSize]; } int bytesRead = 0; int writtenToBuffer = 0; while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer  0) { if (writtenToBuffer < bufferSize) { newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer); } bufferQueue.put(newBuffer); } if (bytesRead == -1) { eof = true; } } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } } 

听起来不错。 我从来没有遇到任何开箱即用的事情,但如果它可用,尝试使用空闲核心进行压缩是完全合理的。

也许你可以使用Commons I / O–它是一个经过良好测试的lib,可以帮助处理一些更无聊的东西,让你专注于扩展酷的并行部分。 也许你甚至可以将你的代码贡献给Commons项目;-)

我会感兴趣的。 我已经考虑过一个类似的项目,但无法弄清楚如何处理无序完成压缩的部分。