Java并行工作迭代器?

我正在寻找一个类,我可以覆盖一个方法来完成工作,并像迭代器一样返回结果。 像这样的东西:

ParallelWorkIterator itr = new ParallelWorkIterator(trials,threads) { public Result work() { //do work here for a single trial... return answer; } }; while (itr.hasNext()) { Result result = itr.next(); //process result... } 

这主要用于像monte carlo模拟这样的东西,但我不想每次都要处理设置线程池和管理返回线程。 我推出了自己的课程, 希望能够完成这一课,但我对此并不充分,并且认为我会检查这样的事情是否已经存在。

编辑:要清楚,我希望它在后台运行并在每个工作方法返回后排队结果,直到所有试验都完成。 因此,下一个方法可能会等待返回,直到队列中出现结果。

看看ExecutorCompletionService 。 它做你想要的一切。

  void solve(Executor e, Collection> solvers) throws InterruptedException, ExecutionException { //This class will hold and execute your tasks CompletionService ecs = new ExecutorCompletionService(e); //Submit (start) all the tasks asynchronously for (Callable s : solvers) ecs.submit(s); //Retrieve completed task results and use them int n = solvers.size(); for (int i = 0; i < n; ++i) { Result r = ecs.take().get(); if (r != null) use(r); } } 

使用CompletionService的好处是它总是返回第一个完成的结果。 这可以确保您不会等待任务完成,并且可以让未完成的任务在后台运行。

我建议看一下Java Executors 。

您提交了许多任务,并为每个任务获取Future对象。 您的工作在后台处理,并迭代Future对象(就像您在上面所做的那样)。 每个Future在结果可用时返回结果(通过调用get() – 这将阻塞,直到在单独的线程中生成结果)

我能想到的最接近的事情是使用CompletionService在结果CompletionService时累积结果。

简单的例子:

 ExecutorService executor = Executors.newSingleThreadExecutor(); // Create vanilla executor service. CompletionService completionService = new ExecutorCompletionService(executor); // Completion service wraps executor and is notified of results as they complete. Callable callable = new MyCallable(); executor.submit(callable); // Do not store handle to Future here but rather obtain from CompletionService when we *know* the result is complete. Future fut = completionService.take(); // Will block until a completed result is available. Result result = fut.get(); // Will not block as we know this future represents a completed result. 

我不建议将它包装在Iterator接口后面,因为Future get()方法可以抛出两个可能的已检查exception: ExecutionExceptionInterruptedException ,因此您需要捕获并吞下它们或者将它们重新抛出为RuntimeException ,两者都不是一件非常好的事情。 此外,如果正在进行任务,您的IteratorhasNext()next()方法可能需要阻止,这对于使用Iterator客户端来说可能被认为是违反直觉的。 相反,我会实现自己更具描述性的界面; 例如

 public interface BlockingResultSet { /** * Returns next result when it is ready, blocking is required. * Returns null if no more results are available. */ Result take() throws InterruptedException, ExecutionException; } 

(名为take()方法通常表示java.util.concurrent包中的阻塞调用)。