Tag: producer consumer

排队多个下载,寻找生产者消费者API

我有一个应用程序(一个servlet,但这不是很重要),它下载一组文件并解析它们以提取信息。 到目前为止,我在循环中完成了这些操作: – 在Internet上获取新文件 – 分析它。 multithreading下载管理器似乎是解决此问题的更好解决方案,我希望以尽可能最快的方式实现它。 某些下载依赖于其他下载(因此,此集合已部分订购)。 multithreading编程很难,如果我能找到一个API来做到这一点我会很高兴。 我需要将一组文件(已排序)放入队列中,并获取完全下载的第一组文件。 你知道我可以用来实现的任何库吗? 此致,Stéphane

Java:线程生成器使用者等待生成数据的最有效方法是什么

使用BlockingQueue消耗生成的数据时,等待数据出现的最有效方法是什么? 场景: 步骤1)数据列表将是添加时间戳的数据存储。 这些时间戳需要按最接近当前时间优先级排序。 此列表可能为空。 线程将时间戳插入其中。 生产 步骤2)我想在另一个线程中使用此处的数据,该线程将从数据中获取时间戳并检查它们是否在当前时间之后。 消费者然后生产 步骤3)如果它们在当前时间之后,则将它们发送到另一个线程以供消费和处理。 在此处理时间戳数据后,从步骤1数据存储中删除。 消费然后编辑原始列表。 在下面的代码中,数据字段引用步骤1中的数据存储。结果是在当前时间之后发送的时间戳列表。 步骤2.然后将结果消耗在步骤3中。 private BlockingQueue data; private final LinkedBlockingQueue results = new LinkedBlockingQueue(); @Override public void run() { while (!data.isEmpty()) { for (LocalTime dataTime : data) { if (new LocalTime().isAfter(dataTime)) { results.put(result); } } } } 问题等待数据列表中可能可能为空的数据的最有效方法是什么? 专注于: while (!data.isEmpty()) 从之前的问题开始。

生产者 – Java中的消费者multithreading

我想在Java中使用multithreading等待和通知方法编写程序。 该程序有一个堆栈(max-length = 5)。 生产者永远生成数字并将其放入堆栈中,消费者从堆栈中选择它。 当堆栈已满时,生产者必须等待,当堆栈为空时,消费者必须等待。 问题是它只运行一次,我的意思是一旦它产生5个数字就会停止,但是我将run方法放入while(true)块以运行不间断但它没有。 这是我到目前为止所尝试的。 制片人类: package trail; import java.util.Random; import java.util.Stack; public class Thread1 implements Runnable { int result; Random rand = new Random(); Stack A = new Stack(); public Thread1(Stack A) { this.A = A; } public synchronized void produce() { while (A.size() >= 5) { System.out.println(“List is Full”); try […]

生产者 – 消费; 消费者如何停止?

所以我模拟了我的生产者消费者问题,我有下面的代码。 我的问题是:如果消费者处于不变的状态,消费者将如何停止(真实)。 在下面的代码中,我添加了 if (queue.peek()==null) Thread.currentThread().interrupt(); 在这个例子中很好用。 但是在我的真实世界设计中,这不起作用(有时生产者需要更长的时间来“放置”数据,因此消费者抛出的exception是不正确的。一般来说,我知道我可以放一个’毒’数据比如Object是XYZ,我可以在消费者中检查它。但这种毒药使代码看起来真的很糟糕。想知道是否有人采用不同的方法。 public class ConsumerThread implements Runnable { private BlockingQueue queue; private String name; private boolean isFirstTimeConsuming = true; public ConsumerThread(String name, BlockingQueue queue) { this.queue=queue; this.name=name; } @Override public void run() { try { while (true) { if (isFirstTimeConsuming) { System.out.println(name+” is initilizing…”); Thread.sleep(4000); isFirstTimeConsuming=false; } try{ if […]

多生产者多消费者multithreadingJava

我正在尝试多生产者 – 生产者 – 消费者问题的多个消费者使用案例。 我正在使用BlockingQueue在多个生产者/消费者之间共享公共队列。 以下是我的代码。 制片人 import java.util.concurrent.BlockingQueue; public class Producer implements Runnable { private BlockingQueue inputQueue; private static volatile int i = 0; private volatile boolean isRunning = true; public Producer(BlockingQueue q){ this.inputQueue=q; } public synchronized void run() { //produce messages for(i=0; i<10; i++) { try { inputQueue.put(new Integer(i)); Thread.sleep(1000); } catch […]

RabbitMQ:快速的生产者和缓慢的消费者

我有一个应用程序使用RabbitMQ作为消息队列在两个组件之间发送/接收消息:发送方和接收方。 发件人以非常快的方式发送消息。 接收器接收消息然后执行一些非常耗时的任务(主要是针对非常大的数据大小的数据库写入)。 由于接收器需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填满队列。 所以我的问题是:这会导致消息队列溢出吗? 消息使用者如下所示: public void onMessage() throws IOException, InterruptedException { channel.exchangeDeclare(EXCHANGE_NAME, “fanout”); String queueName = channel.queueDeclare(“allDataCase”, true, false, false, null).getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, “”); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(” [x] Received ‘” + message + “‘”); JSONObject […]

生产者/消费者工作队列

我正在努力实现我的处理管道的最佳方法。 我的制作人将作品提供给BlockingQueue。 在消费者方面,我轮询队列,包装我在Runnable任务中获得的内容,并将其提交给ExecutorService。 while (!isStopping()) { String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS); if (work == null) { break; } executorService.execute(new Worker(work)); // needs to block if no threads! } 这不理想; 当然,ExecutorService有自己的队列,所以真正发生的事情是我总是完全耗尽工作队列并填充任务队列,随着任务的完成,队列会慢慢排空。 我意识到我可以在生产者端排队任务,但我真的不愿意这样做 – 我喜欢我的工作队列的间接/隔离是愚蠢的字符串; 这真的不是制片人的任何事情会发生在他们身上。 迫使生产者对Runnable或Callable进行排队会打破抽象,恕我直言。 但我确实希望共享工作队列代表当前的处理状态。 如果消费者没有跟上,我希望能够阻止生产者。 我喜欢使用Executors,但我觉得我正在与他们的设计作斗争。 我可以部分喝Kool-ade,还是我必须吞下它? 我是否在拒绝排队任务方面做错了? (我怀疑我可以设置ThreadPoolExecutor来使用1任务队列并覆盖它的执行方法来阻止而不是拒绝队列满,但这感觉很糟糕。) 建议?

在Consumer和Producer Threads中等待并通知

刚开始学习multithreading。 我有多个线程的5个生产者和2个消费者。 基本上这个程序会在队列中添加100个项目。 当队列大小为100时,生产者将停止添加。我希望消费者在消费者从队列中删除所有项目时通知生产者,以便生产者可以再次开始添加。 目前生产者将等待,但永远不会得到消费者的通知。 制片人: public class Producer implements Runnable { private BlockingQueue sharedQueue; private final int queueSize; private Object lock = new Object(); public Producer(BlockingQueue sharedQueue, int queueSize){ this.sharedQueue = sharedQueue; this.queueSize = queueSize; } public void run() { while(true) { if(sharedQueue.size()== queueSize){ try { synchronized (lock) { sharedQueue.wait(); } } catch (InterruptedException […]