Tag: 生产者 消费者

正确实现生产者 – 消费者场景和“优雅”终止线程池

我正在开发我的第一个multithreading项目,因此有一些我不确定的事情。 关于我的设置的详细信息是在上一个问题上 ,简而言之:我有一个由Executors.newFixedThreadPool(N)实现的线程池。 一个线程被赋予一个动作,该动作对本地和远程资源执行一系列查询,并迭代地填充ArrayBlockingQueue ,而其余​​线程调用队列上的take()方法并处理队列中的对象。 尽管小型和监督测试似乎运行正常,但我不确定如何处理特殊情况,例如开头(队列还没有项目),结束(队列清空)以及任何最终的InterruptedExceptions 。 我在这里做了一些阅读,然后让我读到 Goetz和Kabutz的两篇非常好的文章。 共识似乎是不应忽视这些例外。 但是我不确定提供的示例是如何与我的情况相关的,我没有在我的代码中的任何地方调用thread.interrupt() …说到这,我不确定我是否应该这样做… 总结一下,鉴于下面的代码,我如何最好地处理特殊情况,例如终止条件和InterrruptedExceptions? 希望问题有意义,否则我会尽力进一步描述。 提前致谢, 编辑:我已经在实施了一段时间了,我遇到了一个新的打嗝,所以我想我会更新情况。 我遇到了遇到ConcurrentModificationException的不幸,这很可能是由于线程池的不完全关闭/终止造成的。 一旦我发现我可以使用isTerminated()我试过,然后由于一个不同步的wait()我得到一个IllegalMonitorStateException 。 代码的当前状态如下: 我已经听过@ Jonathan的回答中的一些建议,但我不认为他的建议与我需要/想要的一样。 背景故事与我上面提到的相同,相关的代码如下: 持有/管理池的类,以及提交runnables: public void serve() { try { this.started = true; pool.execute(new QueryingAction(pcqs)); for(;;){ PathwayImpl p = bq.take(); if (p.getId().equals(“0”)){ System.out.println(“–DEBUG: Termination criteria found, shutdown initiated..”); pool.shutdown(); // give 3 minutes per item […]

生产者消费者程序在Java中使用wait()和notify()

我正在使用低级同步和wait()和notify()在Java中执行经典的Producer-Consumer问题。 我知道有更好的实现使用java.util.concurrent包中的结构,但我的问题围绕低级实现: private static ArrayList list = new ArrayList(); static Object obj = new Object(); public static void producer() throws InterruptedException { synchronized (obj) { while (true) { if (list.size() == 10) { System.out.println(“Queue full.. Waiting to Add”); obj.wait(); } else { int value = new Random().nextInt(100); if (value <= 10) { Thread.sleep(200); System.out.println("The element […]

使用Java RealTime的生产者 – 消费者体系结构

我正在使用Java Realtime(Sun JRTS 2.2)设计交易系统,并且想要了解最佳实践的几个问题,因为我害怕发明轮子并且非常确定我的任务已经解决了。 所以我有线程不断读取套接字,解析字节和提取消息(二进制协议)。 之后,我应该向算法发送消息,实际上做了一些计算并决定交易与否。 所以我认为我应该设计这个系统的方法是将它分成两部分。 生产者(定期(?)实时线程从套接字中提取字节,解析它)和消费者(实时线程(周期性/偶发性?)从生产者中提取消息,与它们一起操作等)。 那么第一个问题是如何在这两个线程(生产者/消费者)之间设计高性能通信? 此外,我很想听听有关设计此类系统,建议等的现有经验的评论。 感谢您的帮助!

Java:高性能消息传递(单生产者/单一消费者)

我最初在这里问过这个问题,但我意识到我的问题不是关于一个真正的循环。 我想知道的是,在Java中进行高性能异步消息传递的正确方法是什么? 我想做什么…… 我有大约10,000名消费者,每个消费者都从他们的私人队列中消费消息。 我有一个线程一个接一个地生成消息并将它们放在正确的消费者队列中。 每个使用者无限循环,检查消息是否出现在队列中并进行处理。 我认为这个术语是“单一生产者/单一消费者”,因为有一个生产者,每个消费者只能在他们的私人队列上工作(多个消费者永远不会从同一个队列中读取)。 在Consumer.java里面: @Override public void run() { while (true) { Message msg = messageQueue.poll(); if (msg != null) { … // do something with the message } } } Producer正在快速地将消息放入消费者消息队列中(每秒数百万条消息)。 消费者应该尽快处理这些消息! 注意: while (true) { … }由Producer作为最后一条消息发送的KILL消息终止。 但是,我的问题是关于设计此消息传递的正确方法。 我应该为messageQueue使用什么样的队列? 应该是同步还是异步? 如何设计消息? 我应该使用while-true循环吗? 消费者应该是一个线程,还是其他什么? 10,000个线程会慢慢爬行吗? 什么是线程的替代品? 那么, 在Java中进行高性能消息传递的正确方法是什么?

Java线程等待并通知

我有两个主题。 线程A从队列中提取一些元素,线程B将一些元素添加到队列中。 我希望线程A在队列为空时进入hibernate状态。 当线程B向队列添加一些元素时,应该确保线程A正在工作。 如何在Java中完成?

为生产者消费者问题的变体选择数据结构

现在,我有一个队列,有多个生产者和单个消费者。 消费者线程操作很慢。 此外,使用者通过窥视操作从队列中获取元素,直到消耗操作完成,该元素无法从队列中删除。 这是因为作为副操作的生产者线程还拍摄了在该时间点未完全处理的所有元素的快照。 现在,我想更改我的代码以支持多个消费者。 所以,假设我有三个线程,一个线程将采用第一个元素,可以通过窥视操作读取。 第二个消费者线程可以用于第二个元素,但我无法检索,因为队列不支持检索第二个元素。 因此,使用标准ConcurrentLinkedQueue(我现在正在使用)的选项已经完成。 我正在考虑使用优先级队列,但是我必须将每个元素与一个标志相关联,该标志告诉我这个元素是否已经被某个线程使用过。 哪种数据结构最适合这个问题?