Java并行工作迭代器?
我正在寻找一个类,我可以覆盖一个方法来完成工作,并像迭代器一样返回结果。 像这样的东西:
ParallelWorkIterator itr = new ParallelWorkIterator(trials,threads) { public Result work() { //do work here for a single trial... return answer; } }; while (itr.hasNext()) { Result result = itr.next(); //process result... }
这主要用于像monte carlo模拟这样的东西,但我不想每次都要处理设置线程池和管理返回线程。 我推出了自己的课程, 希望能够完成这一课,但我对此并不充分,并且认为我会检查这样的事情是否已经存在。
编辑:要清楚,我希望它在后台运行并在每个工作方法返回后排队结果,直到所有试验都完成。 因此,下一个方法可能会等待返回,直到队列中出现结果。
看看ExecutorCompletionService 。 它做你想要的一切。
void solve(Executor e, Collection> solvers) throws InterruptedException, ExecutionException { //This class will hold and execute your tasks CompletionService ecs = new ExecutorCompletionService (e); //Submit (start) all the tasks asynchronously for (Callable s : solvers) ecs.submit(s); //Retrieve completed task results and use them int n = solvers.size(); for (int i = 0; i < n; ++i) { Result r = ecs.take().get(); if (r != null) use(r); } }
使用CompletionService的好处是它总是返回第一个完成的结果。 这可以确保您不会等待任务完成,并且可以让未完成的任务在后台运行。
我建议看一下Java Executors 。
您提交了许多任务,并为每个任务获取Future对象。 您的工作在后台处理,并迭代Future对象(就像您在上面所做的那样)。 每个Future在结果可用时返回结果(通过调用get()
– 这将阻塞,直到在单独的线程中生成结果)
我能想到的最接近的事情是使用CompletionService
在结果CompletionService
时累积结果。
简单的例子:
ExecutorService executor = Executors.newSingleThreadExecutor(); // Create vanilla executor service. CompletionService completionService = new ExecutorCompletionService (executor); // Completion service wraps executor and is notified of results as they complete. Callable callable = new MyCallable(); executor.submit(callable); // Do not store handle to Future here but rather obtain from CompletionService when we *know* the result is complete. Future fut = completionService.take(); // Will block until a completed result is available. Result result = fut.get(); // Will not block as we know this future represents a completed result.
我不建议将它包装在Iterator
接口后面,因为Future
get()
方法可以抛出两个可能的已检查exception: ExecutionException
和InterruptedException
,因此您需要捕获并吞下它们或者将它们重新抛出为RuntimeException
,两者都不是一件非常好的事情。 此外,如果正在进行任务,您的Iterator
的hasNext()
或next()
方法可能需要阻止,这对于使用Iterator
客户端来说可能被认为是违反直觉的。 相反,我会实现自己更具描述性的界面; 例如
public interface BlockingResultSet { /** * Returns next result when it is ready, blocking is required. * Returns null if no more results are available. */ Result take() throws InterruptedException, ExecutionException; }
(名为take()
方法通常表示java.util.concurrent
包中的阻塞调用)。