Tag: blockingqueue

ArrayBlockingQueue:并发put和take

为什么没有使用LinkedBlockingQueue的方式实现ABQ。 我们可以使用AtomicInteger来保持ABQ中的跟踪计数,也与LBQ一样。 我们也可以使用Two Locks for ABQ。 我偶然发现了关于SO的类似问题。 ArrayBlockingQueue使用单个锁进行插入和删除,但LinkedBlockingQueue使用2个单独的锁 但我无法理解这个问题的答案。 我需要帮助来理解如果我们使用两个锁实现ABQ会出现的问题。 如果有人可以举一个可能失败的竞争条件的例子,那将是非常好的。 这个问题可以标记为重复,但我真的在寻找更具描述性的答案。 那将是一个很大的帮助。 我在这里贴了一个代码http://pastebin.com/ZD1uFy7S 。 任何人都可以显示粘贴的代码中是否存在可能的竞争条件。

如何以multithreading方式调用不同类的相同方法

我在我的两个类中有一个名为process的方法,比如说CLASS-A and CLASS-B 。 现在在下面的循环中,我调用我的两个类的process method顺序意义一个接一个,它工作正常,但这不是我想要的方式。 for (ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) { final Map response = entry.getPlugin().process(outputs); // write to database System.out.println(response); } 有什么办法,我可以用multithreading方式调用我的两个类的进程方法。 这意味着一个线程将调用CLASS-A的进程方法,第二个线程将调用CLASS-B的进程方法。 之后我想将process方法返回的数据写入数据库。 所以我可以再写一个线程来写入数据库。 下面是我以multithreading方式提出的代码,但不知何故它根本没有运行。 public void writeEvents(final Map data) { // Three threads: one thread for the database writer, two threads for the plugin processors final ExecutorService executor = Executors.newFixedThreadPool(3); final […]

为什么LogWriter中的竞争条件会导致生产者阻塞?

首先要防止标记问题由不喜欢读到最后的人重复我已经阅读了生产者 – 消费者日志服务,并且关闭问题的方式不可靠 。 但它没有完全回答问题和答案与书中的文字相矛盾。 在书中提供以下代码: public class LogWriter { private final BlockingQueue queue; private final LoggerThread logger; private static final int CAPACITY = 1000; public LogWriter(Writer writer) { this.queue = new LinkedBlockingQueue(CAPACITY); this.logger = new LoggerThread(writer); } public void start() { logger.start(); } public void log(String msg) throws InterruptedException { queue.put(msg); } private class […]

如何在ForkJoinPool中阻止队列?

我需要在其队列已满时阻止ForkJoinPool上的线程。 这可以在标准的ThreadPoolExecutor中完成,例如: private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { return new ThreadPoolExecutor(nThreads, nThreads, 5000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); } 我知道,ForkJoinPool中有一些Dequeue,但是我无法通过它访问它。 更新:请参阅下面的答案。

Java:线程生成器使用者等待生成数据的最有效方法是什么

使用BlockingQueue消耗生成的数据时,等待数据出现的最有效方法是什么? 场景: 步骤1)数据列表将是添加时间戳的数据存储。 这些时间戳需要按最接近当前时间优先级排序。 此列表可能为空。 线程将时间戳插入其中。 生产 步骤2)我想在另一个线程中使用此处的数据,该线程将从数据中获取时间戳并检查它们是否在当前时间之后。 消费者然后生产 步骤3)如果它们在当前时间之后,则将它们发送到另一个线程以供消费和处理。 在此处理时间戳数据后,从步骤1数据存储中删除。 消费然后编辑原始列表。 在下面的代码中,数据字段引用步骤1中的数据存储。结果是在当前时间之后发送的时间戳列表。 步骤2.然后将结果消耗在步骤3中。 private BlockingQueue data; private final LinkedBlockingQueue results = new LinkedBlockingQueue(); @Override public void run() { while (!data.isEmpty()) { for (LocalTime dataTime : data) { if (new LocalTime().isAfter(dataTime)) { results.put(result); } } } } 问题等待数据列表中可能可能为空的数据的最有效方法是什么? 专注于: while (!data.isEmpty()) 从之前的问题开始。

队列已满,在阻塞队列的深度,需要澄清

从文件内容填充队列时,深度似乎不会增加,因为此实现中未添加元素。 BlockingQueue q = new SynchronousQueue(); … fstream = new FileInputStream(“/path/to/file.txt”); … while ((line = br.readLine()) != null) { if (q.offer(line)) System.out.println(“Depth: ” + q.size()); //0 } 用add替换offer时,抛出exception Exception in thread “main” java.lang.IllegalStateException: Queue full … 我做错了什么? 插入第一个元素后,为什么队列立即满了?

多生产者多消费者multithreadingJava

我正在尝试多生产者 – 生产者 – 消费者问题的多个消费者使用案例。 我正在使用BlockingQueue在多个生产者/消费者之间共享公共队列。 以下是我的代码。 制片人 import java.util.concurrent.BlockingQueue; public class Producer implements Runnable { private BlockingQueue inputQueue; private static volatile int i = 0; private volatile boolean isRunning = true; public Producer(BlockingQueue q){ this.inputQueue=q; } public synchronized void run() { //produce messages for(i=0; i<10; i++) { try { inputQueue.put(new Integer(i)); Thread.sleep(1000); } catch […]

java BlockingQueue没有阻塞偷看?

我有一个阻塞队列的对象。 我想写一个阻塞的线程,直到队列中有一个对象。 与BlockingQueue.take()提供的function类似。 但是,由于我不知道我是否能够成功处理对象,我想只是peek()而不是删除对象。 我想删除该对象只有我能够成功处理它。 所以,我想要一个阻塞的peek()函数。 目前,peek()只是在队列为空时根据javadoc返回。 我错过了什么吗? 有没有其他方法来实现此function? 编辑: 如果我只使用线程安全队列而偷看和睡觉的任何想法? public void run() { while (!__exit) { while (__queue.size() != 0) { Object o = __queue.peek(); if (o != null) { if (consume(o) == true) { __queue.remove(); } else { Thread.sleep(10000); //need to backoff (60s) and try again } } } Thread.sleep(1000); //wait 1s […]

在java中实现自己的阻塞队列

我知道这个问题之前已被多次询问和回答,但我无法弄清楚互联网上的例子,比如这个或那个 。 这两个解决方案都检查阻塞队列的数组/队列/链表的空白,以便在get()方法中通知put()方法中的所有等待线程,反之亦然。 第二个链接中的评论强调了这种情况,并提到这不是必要的。 所以问题是; 检查队列是否为空,对我来说似乎有点奇怪 完全通知所有等待的线程。 有任何想法吗? 提前致谢。

“关闭”阻塞队列

我在一个非常简单的生产者 – 消费者场景中使用java.util.concurrent.BlockingQueue 。 例如,这个伪代码描述了消费者部分: class QueueConsumer implements Runnable { @Override public void run() { while(true) { try { ComplexObject complexObject = myBlockingQueue.take(); //do something with the complex object } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } 到现在为止还挺好。 在阻塞队列的javadoc中,我读到: BlockingQueue本质上不支持任何类型的“关闭”或“关闭”操作,以指示不再添加任何项目。 这些function的需求和使用倾向于依赖于实现。 例如,一种常见的策略是生产者插入特殊的流末端或毒物对象,这些对象在被消费者采用时会相应地进行解释。 不幸的是,由于使用的generics和ComplexObject的性质,将“毒物对象”推入队列并非易事。 所以这种“常用策略”在我的场景中并不是很方便。 我的问题是:我可以用什么其他好的策略/模式来“关闭”队列? 谢谢!