Java,在multithreadingevnironments中通过散列统一划分传入的工作
我已经实现了一个java代码来执行传入任务(作为Runnable
),其中n个Threads基于他们的hashCode模块nThreads
。 理想情况下,工作应该在这些线程中一致地传播。 具体来说,我们将dispatchId
作为每个Task的字符串。
这是这个java代码片段:
int nThreads = Runtime.getRuntime().availableProcessors(); // Number of threads Worker[] workers = new Worker[nThreads]; // Those threads, Worker is just a thread class that can run incoming tasks ... Worker getWorker(String dispatchId) { // Get a thread for this Task return workers[(dispatchId.hashCode() & Integer.MAX_VALUE) % nThreads]; }
重要提示:在大多数情况下,dispatchId是:
String dispatchId = 'SomePrefix' + counter.next()
但是,我担心nThreads的模除法不是一个好的选择,因为nThreads应该是一个素数,用于更均匀地分配dispatId键。
关于如何更好地推广工作还有其他选择吗?
更新1:
每个Worker都有一个队列: Queue tasks = new ConcurrentLinkedQueue();
工作人员从中获取任务并执行它们。 任务可以从其他线程添加到此队列。
更新2:
具有相同dispatchId
任务可以多次出现,因此我们需要通过dispatchId
找到它们的线程。
最重要的是,每个Worker线程必须按顺序处理其传入的任务。 因此,上面的更新1中存在数据结构队列。
更新3:此外,一些线程可能很忙,而其他线程是免费的。 因此,我们需要以某种方式将队列与线程分离,但是为任务执行维护相同dispatchId
的FIFO顺序。
解决方案:我已经实现了Ben Manes的想法(他的答案如下),代码可以在这里找到。
听起来你需要每个调度ID的FIFO排序,所以理想的情况是将调度队列作为抽象。 这可以解释您对散列的关注,因为散列不能提供统一的分布,因为一些调度队列可能比其他队列更活跃,并且在工作者之间不公平地平衡。 通过将队列与worker分开,您可以保留FIFO语义并均匀地分散工作。
HawtDispatch是一个提供此抽象的非活动库。 它与Java 6兼容。
一个非常简单的Java 8方法是使用CompletableFuture作为排队机制,使用ConcurrentHashMap进行注册,使用Executor(例如ForkJoinPool )进行计算。 有关此想法的实现,请参阅EventDispatcher ,其中注册是显式的。 如果您的调度员更具动态性,那么您可能需要定期修剪地图。 基本思路如下。
ConcurrentMap> dispatchQueues = ... public CompletableFuture dispatch(String queueName, Runnable task) { return dispatchQueues.compute(queueName, (k, queue) -> { return (queue == null) ? CompletableFuture.runAsync(task) : queue.thenRunAsync(task); }); }
更新(JDK7)
上述想法的后退将与番石榴翻译成类似的东西,
ListeningExecutorService executor = ... Striped locks = Striped.lock(256); ConcurrentMap> dispatchQueues = ... public ListenableFuture> dispatch(String queueName, final Runnable task) { Lock lock = locks.get(queueName); lock.lock(); try { ListenableFuture> future = dispatchQueues.get(queueName); if (future == null) { future = executor.submit(task); } else { final SettableFuture next = SettableFuture.create(); future.addListener(new Runnable() { try { task.run(); } finally { next.set(null); } }, executor); future = next; } dispatchQueues.put(queueName, future); } finally { lock.unlock(); } }