如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?

我从队列服务器获取数据,我需要处理它并发送确认。 像这样的东西:

while (true) { queueserver.get.data ThreadPoolExecutor //send data to thread queueserver.acknowledgement 

我不完全理解线程中发生了什么,但我认为这个程序获取数据,发送线程然后立即确认它。 因此,即使我有一个每个队列的限制只能有200个未确认的项目,它只会拉到它可以接收它的速度。 当我在单个服务器上编写程序时,这很好,但如果我使用多个工作程序,那么这就成了一个问题,因为线程队列中的项目数量不是它完成的工作的反映,而是它的速度有多快可以从队列服务器获取项目。

如果线程队列充满工作,有什么办法可以让程序等待吗?

如果需要处理太多数据,我如何让ThreadPoolExecutor命令等待?

我不是百分百肯定我在这里理解你的问题。 当然,您可以使用具有限制的BlockingQueue而不是开放式队列:

 BlockingQueue queue = new ArrayBlockingQueue(200); 

就提交给ExecutorService的作业而言,您可以创建自己的队列,而不是使用使用ExecutorService创建的默认ExecutorService (使用无界队列)。

 return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(200)); 

队列填满后,将导致它拒绝任何提交的新任务。 您需要设置一个提交到队列的RejectedExecutionHandler 。 就像是:

 final BlockingQueue queue = new ArrayBlockingQueue(200); ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, queue); // by default (unfortunately) the ThreadPoolExecutor will throw an exception // when you submit the 201st job, to have it block you do: threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // this will block if the queue is full executor.getQueue().put(r); } }); 

我认为Java没有ThreadPoolExecutor.CallerBlocksPolicy是一个主要的错过。

如果您希望在工作人员开始处理任务时进行确认,则可以创建一个自定义ThreadFactory ,在执行实际工作之前从线程发送确认。 或者你可以覆盖ThreadPoolExecutor

如果您希望在为新任务释放新工作时进行确认,我认为您可以使用SynchronousQueueThreadPoolExecutor.CallerRunsPolicy初始化ThreadPoolExecutor.CallerRunsPolicy ,或者使用您自己的策略来调用调用程序。

首先,我认为你的态度是错误的,因为你在伪代码中所做的是忙着等待,你应该阅读java toturial http://docs.oracle.com/javase/tutorial/essential/concurrency/的Concurrency教程。

忽略这一点,恶意为你提供忙碌等待的解决方案(不推荐):

  ExecutorService e1 = Executors.newFixedThreadPool(20); while (true) { if (!serverq.isEmpty() && !myq.isFull()) myq.enq(serverq.poll()); if (!myq.isEmpty()) e1.execute(myq.poll()); } 

笔记:

1.确保你的myq是同步的,如其他答案所述。 您可以扩展一些阻塞队列以确保同步正确。

2.你实现了一个runnable类,它在服务的迭代中完成你从服务器中执行的操作,那些runnables必须将myq作为参数提供给构造函数并将其保存为全局变量。

3.myq获取runnables,在run方法的最后,你必须确保runnable从myq中删除自己

如何在提交201任务之前使用一个不会执行超过200个任务并等待任务完成的blockingPool。 我在我的应用程序中使用信号量实现了它。 您还可以通过将值传递给其构造函数来更改限制。

只有@Gray回答的区别在于,在这种情况下很少会有任何任务被拒绝。 除非其他任务结束,否则信号量将使任何201任务等待。 然而,我们有拒绝处理程序,以便在任何拒绝的情况下将该任务重新提交给执行人。

 private class BlockingPool extends ThreadPoolExecutor { private final Semaphore semaphore; public BlockingPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, int tasksAllowedInThreads){ super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { executor.execute(r); } }); semaphore = new Semaphore(tasksAllowedInThreads); } @Override public void execute(Runnable task){ boolean acquired = false; do{ try{ semaphore.acquire(); acquired = true; } catch (final InterruptedException e){ // log } } while (!acquired); // run in loop to handle InterruptedException try{ super.execute(task); } catch (final RejectedExecutionException e){ System.out.println("Task Rejected"); semaphore.release(); throw e; } } @Override protected void afterExecute(Runnable r, Throwable t){ super.afterExecute(r, t); if (t != null){ t.printStackTrace(); } semaphore.release(); } } 

这有意义吗!