multithreading搜索操作

我有一个方法需要一系列查询,我需要针对不同的搜索引擎Web API运行它们,例如谷歌或雅虎。 为了并行化进程,为每个查询生成一个线程,然后在最后join ,因为我的应用程序只能获得每个查询的结果才能继续。 我目前有以下几点:

 public abstract class class Query extends Thread { private String query; public abstract Result[] querySearchEngine(); @Override public void run() { Result[] results = querySearchEngine(query); Querier.addResults(results); } } public class GoogleQuery extends Query { public Result querySearchEngine(String query) { // access google rest API } } public class Querier { /* Every class that implements Query fills this array */ private static ArrayList aggregatedResults; public static void addResults(Result[]) { // add to aggregatedResults } public static Result[] queryAll(Query[] queries) { /* for each thread, start it, to aggregate results */ for (Query query : queries) { query.start(); } for (Query query : queries) { query.join(); } return aggregatedResults; } } 

最近,我发现Java中有一个用于执行并发作业的 API。 即, Callable接口, FutureTaskExecutorService 。 我想知道这个新API是否应该使用,如果它们比传统的API更有效, RunnableThread

在研究了这个新API之后,我想出了以下代码(简化版):

  public abstract class Query implements Callable { private final String query; // gets set in the constructor public abstract Result[] querySearchEngine(); @Override public Result[] call() { return querySearchEngine(query); } } public class Querier { private ArrayList aggregatedResults; public Result[] queryAll(Query[] queries) { List<Future> futures = new ArrayList<Future>(queries.length); final ExecutorService service = Executors.newFixedThreadPool(queries.length); for (Query query : queries) { futures.add(service.submit(query)); } for (Future future : futures) { aggregatedResults.add(future.get()); // get() is somewhat similar to join? } return aggregatedResults; } } 

我是这个并发API的新手,我想知道在上面的代码中是否有可以改进的东西,如果它比第一个选项(使用Thread )更好。 我没有探索过一些类,比如FutureTask等等。 我也很乐意听到任何建议。

你的代码有几个问题。

  1. 您可能应该使用ExecutorService.invokeAll()方法。 创建新线程和新线程池的成本可能很高(尽管可能与调用外部搜索引擎无比)。 invokeAll()可以为您管理线程。
  2. 您可能不希望混合数组和generics。
  3. 您正在调用aggregatedResults.add()而不是addAll()。
  4. 当它们可能是queryAll()函数调用的本地变量时,您不需要使用成员变量。

所以,像下面这样的东西应该工作:

 public abstract class Query implements Callable> { private final String query; // gets set in the constructor public abstract List querySearchEngine(); @Override public List call() { return querySearchEngine(query); } } public class Querier { private static final ExecutorService executor = Executors.newCachedThreadPool(); public List queryAll(List queries) { List>> futures = executor.submitAll(queries); List aggregatedResults = new ArrayList(); for (Future> future : futures) { aggregatedResults.addAll(future.get()); // get() is somewhat similar to join? } return aggregatedResults; } } 

作为进一步的改进,您可以考虑使用CompletionService它将提交和检索的顺序分离,而是将所有未来结果放在一个队列中,您可以按照它们完成的顺序从中获取结果。

我可以建议您使用Future.get()超时吗?

否则它只会让一个搜索引擎无法响应才能使一切停止(如果您在最后遇到网络问题,它甚至不需要成为搜索引擎问题)