具有限制/吞吐量控制的Java Executor

我正在寻找一个Java Executor,它允许我指定限制/吞吐量/起搏限制,例如,不超过100个任务可以在一秒钟内处理 – 如果更多的任务被提交,他们应该排队并稍后执行。 这样做的主要目的是避免在访问外部API或服务器时遇到限制。

我想知道基础Java(我怀疑,因为我检查过)或其他可靠的地方(例如Apache Commons)是否提供了这个,或者我是否必须编写自己的。 最好是轻量级的。 我不介意自己写,但如果某个地方有“标准”版本,我至少要先看一下。

看看guavas RateLimiter :

速率限制器。 从概念上讲,速率限制器以可配置的速率分配许可。 如果需要,每个acquire()都会阻止,直到有许可证可用,然后接受它。 获得后,不需要发放许可证。 速率限制器通常用于限制访问某些物理或逻辑资源的速率。 这与信号量相反,信号量限制并发访问的数量而不是速率(注意并发和速率密切相关,例如参见Little’s定律)。

它的线程安全,但仍然是@Beta 。 无论如何,可能值得一试。

您必须针对速率限制器将每次调用包装到Executor 。 对于更干净的解决方案,您可以为ExecutorService创建某种包装ExecutorService

来自javadoc:

  final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second" void submitTasks(List tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // may wait executor.execute(task); } } 

Java Executor不提供这样的限制,仅限于线程数量,这不是您要寻找的。

一般来说,执行程序是限制此类操作的错误位置,它应该是在线程尝试调用外部服务器的时刻。 例如,您可以通过在提交请求之前使用线程等待的限制信号量来执行此操作。

调用线程:

 public void run() { // ... requestLimiter.acquire(); connection.send(); // ... } 

同时您定期(例如每60秒)安排一个(单个)辅助线程释放获取的资源:

  public void run() { // ... requestLimiter.drainPermits(); // make sure not more than max are released by draining the Semaphore empty requestLimiter.release(MAX_NUM_REQUESTS); // ... } 

只需要说一秒就可以处理100个任务 – 如果提交了更多任务,他们应该排队并稍后执行

您需要查看Executors.newFixedThreadPool(int limit) 。 这将允许您限制可以同时执行的线程数。 如果您提交多个线程,它们将排队并稍后执行。

 ExecutorService threadPool = Executors.newFixedThreadPool(100); Future result1 = threadPool.submit(runnable1); Future result2 = threadPool.submit(runnable2); Futurte result3 = threadPool.submit(callable1); ... 

上面的代码片段显示了如何使用ExecutorService ,它允许同时执行不超过100个线程。

更新:
在回顾完评论之后,我就提出了这个问题(有点愚蠢)。 如何手动保持要执行的线程的跟踪? 如何首先将它们存储在ArrayList ,然后根据最后一秒内已执行的线程数将它们提交给Executor
因此,假设已经将200个任务提交到我们维护的ArrayList ,我们可以迭代并向Executor添加100个任务。 当第二个传递时,我们可以根据在Executor完成的线程数添加更multithreading,依此类推

我个人觉得这个场景非常有趣。 就我而言,我想强调一个有趣的节流阶段是消费方面,就像在经典的生产者/消费者并发理论中一样。 这与之前的一些建议答案相反。 这是,我们不想阻止提交线程,而是基于速率(任务/秒)策略阻止消费线程。 因此,即使队列中有任务准备就绪,执行/消耗线程也可能阻止等待满足节点策略。

也就是说,我认为一个好的候选者将是Executors.newScheduledThreadPool(int corePoolSize)。 这样,您需要在执行程序前面有一个简单的队列(一个简单的LinkedBlockingQueue适合),然后安排一个定期任务从队列中选择实际任务(ScheduledExecutorService.scheduleAtFixedRate)。 因此,这不是一个简单的解决方案,但如果您尝试按照之前的讨论限制消费者,它应该足够执行goog。

根据场景,以及之前的一个响应中的建议,ThreadPoolExecutor的基本function可以解决问题。

但是如果线程池由多个客户端共享并且您想要限制,限制每个客户端的使用,确保一个客户端不会使用所有线程,那么BoundedExecutor将完成工作。

可以在以下示例中找到更多详细信息:

http://jcip.net/listings/BoundedExecutor.java