在java中并行化任务的最简单方法是什么?

说我有一个类似的任务:

for(Object object: objects) { Result result = compute(objects); list.add(result); } 

并行化每个compute()的最简单方法是什么(假设它们已经可并行化)?

我不需要一个严格符合上述代码的答案,只是一般答案。 但是如果您需要更多信息:我的任务是IO绑定的,这是针对Spring Web应用程序的,任务将在HTTP请求中执行。

我建议看看ExecutorService 。

特别是这样的事情:

 ExecutorService EXEC = Executors.newCachedThreadPool(); List> tasks = new ArrayList>(); for (final Object object: objects) { Callable c = new Callable() { @Override public Result call() throws Exception { return compute(object); } }; tasks.add(c); } List> results = EXEC.invokeAll(tasks); 

请注意,如果objects是一个大列表,使用newCachedThreadPool可能会很糟糕。 缓存的线程池可以为每个任务创建一个线程! 您可能希望使用newFixedThreadPool(n) ,其中n是合理的(例如,您拥有的核心数,假设compute()受CPU限制)。

这是实际运行的完整代码:

 import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ExecutorServiceExample { private static final Random PRNG = new Random(); private static class Result { private final int wait; public Result(int code) { this.wait = code; } } public static Result compute(Object obj) throws InterruptedException { int wait = PRNG.nextInt(3000); Thread.sleep(wait); return new Result(wait); } public static void main(String[] args) throws InterruptedException, ExecutionException { List objects = new ArrayList(); for (int i = 0; i < 100; i++) { objects.add(new Object()); } List> tasks = new ArrayList>(); for (final Object object : objects) { Callable c = new Callable() { @Override public Result call() throws Exception { return compute(object); } }; tasks.add(c); } ExecutorService exec = Executors.newCachedThreadPool(); // some other exectuors you could try to see the different behaviours // ExecutorService exec = Executors.newFixedThreadPool(3); // ExecutorService exec = Executors.newSingleThreadExecutor(); try { long start = System.currentTimeMillis(); List> results = exec.invokeAll(tasks); int sum = 0; for (Future fr : results) { sum += fr.get().wait; System.out.println(String.format("Task waited %d ms", fr.get().wait)); } long elapsed = System.currentTimeMillis() - start; System.out.println(String.format("Elapsed time: %d ms", elapsed)); System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d))); } finally { exec.shutdown(); } } } 

有关更详细的解答,请阅读Java Concurrency in Practice并使用java.util.concurrent 。

这是我在自己的项目中使用的东西:

 public class ParallelTasks { private final Collection tasks = new ArrayList(); public ParallelTasks() { } public void add(final Runnable task) { tasks.add(task); } public void go() throws InterruptedException { final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors()); try { final CountDownLatch latch = new CountDownLatch(tasks.size()); for (final Runnable task : tasks) threads.execute(new Runnable() { public void run() { try { task.run(); } finally { latch.countDown(); } } }); latch.await(); } finally { threads.shutdown(); } } } // ... public static void main(final String[] args) throws Exception { ParallelTasks tasks = new ParallelTasks(); final Runnable waitOneSecond = new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { } } }; tasks.add(waitOneSecond); tasks.add(waitOneSecond); tasks.add(waitOneSecond); tasks.add(waitOneSecond); final long start = System.currentTimeMillis(); tasks.go(); System.err.println(System.currentTimeMillis() - start); } 

在我的双核盒子上打印超过2000。

您可以使用ThreadPoolExecutor 。 以下是示例代码: http : //programmingexamples.wikidot.com/threadpoolexecutor (太长了,无法将其带到这里)

Fork / Join的并行数组是一种选择

人们可以简单地创建一些线程并获得结果。

 Thread t = new Mythread(object); if (t.done()) { // get result // add result } 

编辑:我认为其他解决方案更酷。

我要提到一个执行者课程。 以下是您将在执行程序类中放置的一些示例代码。

  private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4); private List> callableList = new ArrayList>(); public void addCallable(Callable callable) { this.callableList.add(callable); } public void clearCallables(){ this.callableList.clear(); } public void executeThreads(){ try { threadLauncher.invokeAll(this.callableList); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } public Object[] getResult() { List> resultList = null; Object[] resultArray = null; try { resultList = threadLauncher.invokeAll(this.callableList); resultArray = new Object[resultList.size()]; for (int i = 0; i < resultList.size(); i++) { resultArray[i] = resultList.get(i).get(); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return resultArray; } 

然后使用它,您将调用执行程序类来填充和执行它。

 executor.addCallable( some implementation of callable) // do this once for each task Object[] results = executor.getResult(); 

使用Java8及更高版本,您可以创建流,然后与parallelStream并行执行处理:

 List objects = ...; List result = objects.parallelStream().map(object -> { return compute(object); }).collect(Collectors.toList()); 

注意:结果的顺序可能与列表中对象的顺序不匹配。

有关如何设置正确数量的线程的详细信息,请参阅此stackoverflow问题how-many-threads-are-spawned-in-parallelstream-in-java-8