Tag: 生成器 消费者

生产者/消费者 – 生产者将数据添加到集合中而不会阻塞,消费者会批量使用集合中的数据

我有一个生产者/消费者用例,这有点不寻常。 我有一些真实世界的用例,我希望他们能够在没有阻塞的情况下将对象添加到集合中。 消费者(只有一个)应该阻止,直到集合中有一定数量的对象可用(例如500),然后批量消费它们。 虽然少于500,但它应该阻止并等待集合填充。 我不介意队列是否超过这个值(700,1000等)很短的时间。 我目前似乎没有找到解决这个确切问题的解决方案。 我正在考虑使用ConcurrentLinkedQueue并让消费者定期检查队列是否有足够的数据,但这似乎适得其反。 另一个想法是使用LinkedBlockingQueue。 生产者不会阻止(除非队列已满,这意味着它有Integer.MAX_VALUE值 – 这不是我的情况,所以这一切都很好)。 使用者将执行queue.take()并将元素添加到内部集合中。 当内部集合达到500个元素时,它将批量使用它们。 你有什么建议吗? 谢谢!

缓冲后台InputStream实现

我编写了包含其他流的后台InputStream (和OutputStream )实现,并在后台线程上OutputStream读,主要允许解压缩/压缩在解压缩流处理的不同线程中发生。 这是一个相当标准的生产者/消费者模型。 这似乎是一种简单的方法,可以通过简单的流程读取,处理和写入数据来充分利用多核CPU,从而更有效地利用CPU和磁盘资源。 也许“高效”不是最好的词,但与直接从ZipInputStream读取并直接写入ZipOutputStream相比,它提供了更高的利用率,对我来说更感兴趣,减少了运行时间。 我很高兴发布代码,但我的问题是我是否正在重新发明现有(和运算量更大)库中现有的东西? 编辑 – 发布代码…… 我的BackgroundInputStream代码如下( BackgroundOutputStream非常相似),但我想改进它的各个方面。 看起来我的工作太过艰难,无法前后传递缓冲区。 如果调用代码抛弃了对BackgroundInputStream引用,则backgroundReaderThread将永远挂起。 信号eof需要改进。 应将exception传播到前台线程。 我想允许使用来自提供的Executor的线程。 close()方法应该通知后台线程,并且不应该关闭包装的流,因为包装的流应该由从中读取的后台线程拥有。 在关闭之后做一些愚蠢的事情应该适当地照顾。 package nz.co.datacute.io; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.concurrent.LinkedBlockingQueue; public class BackgroundInputStream extends InputStream { private static final int DEFAULT_QUEUE_SIZE = 1; private static final int DEFAULT_BUFFER_SIZE = 64*1024; private final int queueSize; private final […]