将给定ID的任务绑定到同一线程的线程池

是否存在线程池(在Java中)的任何实现,以确保在同一线程上执行相同逻辑ID的所有任务?

我所追求的逻辑是,如果已经在给定逻辑ID的特定线程上执行了任务,则在同一线程上调度具有相同ID的新任务。 如果没有线程为同一ID执行任务,则可以使用任何线程。

这将允许并行执行不相关ID的任务,但是同一ID的任务将以串行和提交的顺序执行。

如果没有,是否有任何关于如何扩展ThreadPoolExecutor以获得此行为的建议(如果可能的话)?

UPDATE

花了更长时间考虑这个问题,我实际上并不要求在同一个线程上执行相同逻辑ID的任务,只是它们不会同时执行。

这方面的一个例子是处理客户订单的系统,可以同时处理多个订单,但不能同一个客户(并且必须按顺序处理同一客户的所有订单)。

我现在采用的方法是使用标准的ThreadPoolExecutor,使用自定义的BlockingQueue并使用自定义包装器包装RunnableRunnable包装器逻辑是:

  1. primefaces地尝试将ID添加到并发“运行”集( ConcurrentHashMap )以查看当前是否正在运行相同ID的任务
    • 如果添加失败,请将任务重新推送到队列的前面并立即返回
    • 如果成功,继续
  2. 运行任务
  3. 从“正在运行”的集合中删除任务的关联ID

然后队列的poll()方法只返回具有当前不在“运行”集中的ID的任务。

这样做的麻烦在于我确信会有很多我没有想过的极端情况,因此需要进行大量的测试。

创建每个运行一个线程的执行程序服务数组,并通过项ID的哈希代码将队列条目分配给它们。 该数组可以是任何大小,具体取决于您最多想要使用多少线程。

这将限制我们可以从执行程序服务使用但仍允许使用其function在不再需要时关闭唯一的线程(使用allowCoreThreadTimeOut(true) )并根据需要重新启动它。 此外,所有排队的东西都可以在不重写的情况下工作。

最简单的想法可能是这样的:

有一个固定的BlockingQueue地图。 使用哈希机制根据任务ID选择队列。 哈希算法应为相同的ID选择相同的队列。 为每个队列启动一个单独的线程。 每个线程都会从它自己的专用队列中选择一个任务并执行它。

ps适当的解决方案很大程度上取决于您为线程分配的工作类型

UPDATE

好的,这个疯狂的想法怎么样,请耐心等我:)

比如,我们有一个ConcurrentHashMap ,它包含引用id -> OrderQueue

 ID1->Q1, ID2->Q2, ID3->Q3, ... 

这意味着现在每个id都与它自己的队列相关联。 OrderQueue是一个自定义阻塞队列,带有一个额外的布尔标志 – isAssociatedWithWorkingThread

还有一个常规的BlockingQueue ,我们现在称之为amortizationQueue ,稍后你会看到它的使用。

接下来,我们有N工作线程。 每个工作线程都有自己的工作队列,这是一个包含与该线程关联的ID的BlockingQueue

当新ID出现时,我们会执行以下操作:

 create a new OrderQueue(isAssociatedWithWorkingThread=false) put the task to the queue put id->OrderQueue to the map put this OrderQueue to amortizationQueue 

当现有ID的更新到来时,我们执行以下操作:

 pick OrderQueue from the map put the task to the queue if isAssociatedWithWorkingThread == false put this OrderQueue to amortizationQueue 

每个工作线程执行以下操作:

 take next id from the working queue take the OrderQueue associated with this id from the map take all tasks from this queue execute them mark isAssociatedWithWorkingThread=false for this OrderQueue put this OrderQueue to amortizationQueue 

很简单。 现在到有趣的部分 – 偷工作:)

如果某个工作线程在某个时间点发现自己有空的工作队列,那么它会执行以下操作:

 go to the pool of all working threads pick one (say, one with the longest working queue) steal id from *the tail* of that thread's working queue put this id to it's own working queue continue with regular execution 

还有+1额外的线程,提供摊销工作:

 while (true) take next OrderQueue from amortizationQueue if queue is not empty and isAssociatedWithWorkingThread == false set isAssociatedWithWorkingThread=true pick any working thread and add the id to it's working queue 

将不得不花更多的时间思考你是否可以使用AtomicBoolean来获取AtomicBoolean标志,或者需要使其阻塞操作以检查/更改此标志。

我最近不得不处理类似的情况。

我最终得到了一个类似于你的设计。 唯一的区别是“当前”是地图而不是集合:从ID到Runnables队列的映射。 当任务的runnable周围的包装器看到它的ID存在于映射中时,它将任务的runnable添加到ID的队列并立即返回。 否则,ID将添加到具有空队列的映射中,并执行任务。

任务完成后,包装器再次检查ID的队列。 如果队列不为空,则选择runnable。 否则它将从地图中删除,我们就完成了。

我将关闭和取消作为练习留给读者:)

我们的方法类似于原始问题的更新。 我们有一个包装类,它是一个包含队列(LinkedTransferQueue)的runnable,我们称之为RunnableQueue。 可运行队列具有以下基本API:

 public class RunnableQueue implements Runnable { public RunnableQueue(String name, Executor executor); public void run(); public void execute(Runnable runnable); } 

当用户通过执行调用提交第一个Runnable时,RunnableQueue会在执行程序上排队。 后续的执行调用get在RunnableQueue内的队列中排队。 当ThreadPool执行可运行队列时(通过其run方法),它通过逐个串行执行runnables来开始“排空”内部队列。 如果在RunnableQueue执行时调用execute,则新的runnables只会附加到内部队列。 排空队列后,可运行队列的run方法完成,并“离开”执行程序池。 冲洗重复。

在RunnableQueue将自身重新发布到执行程序池之前,我们还有其他一些优化可以让一些runnable运行(例如四个)。

内部唯一真正棘手的问题并不是很难)是在发布到执行程序时是否同步,以便它不会重新发布,或者在它应该发布时错过。

总的来说,我们发现这很好用。 我们的“ID”(语义上下文)是可运行的队列。 我们需要(即一个插件)引用RunnableQueue而不是执行器池,因此它被强制通过RunnableQueue专门工作。 这不仅保证所有访问都是顺序序列(线程限制),而且让RunnableQueue“适度”插件的工作负载。 此外,它不需要集中管理结构或其他争用点。

我必须实现一个类似的解决方案,并且h22创建一个执行程序服务数组的建议对我来说似乎是最好的方法,我需要注意ID的模数% (假设它是long / int的原始ID)或者哈希码)相对于某个所需的最大大小并使用该结果作为新ID,这样我就可以在以尽可能多的执行者服务对象结束同时仍然在处理中获得大量并发性之间取得平衡。

 import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExecutorServiceRouter { private List services; private int size; public ExecutorServiceRouter(int size) { services = new ArrayList(size); this.size = size; for (int i = 0; i < size; i++) { services.add(Executors.newSingleThreadExecutor()); } } public void route(long id, Runnable r) { services.get((int) (id % size)).execute(r); } public void shutdown() { for (ExecutorService service : services) { service.shutdown(); } } } 

扩展ThreadPoolExecutor会非常困难。 我建议你去生产者 – 消费者系统。 这是我的建议。

  1. 您可以创建典型的生产者消费者系统 查看此问题中提到的代码。
  2. 现在,这些系统中的每一个都将具有队列和单个使用者线程,它将串行处理队列中的任务
  3. 现在,创建一个这样的单独系统池
  4. 当您为相关ID提交任务时,查看是否已经为当前正在处理任务的相关ID标记了系统,如果是,则提交任务,
  5. 如果它没有处理任何任务,则使用这个新的相关ID标记该系统并提交任务。
  6. 这样,单个系统将仅适用于一个逻辑相关ID。

在这里,我假设一个相关的ID是一堆逻辑的个人ID,并且将为相关的ID而不是个人ID创建生产者消费者系统。