通过Streams并行执行多个查询

我有以下方法:

public String getResult() { List serversList = getServerListFromDB(); List appList = getAppListFromDB(); List userList = getUserFromDB(); return getResult(serversList, appList, userList); } 

在这里,我按顺序调用三个方法,然后点击DB并获取结果,然后我对从DB命中获得的结果进行后处理。 我知道如何通过使用Threads同时调用这三种方法。 但我想使用Java 8 Parallel Stream来实现这一目标。 有人可以指导我如何通过Parallel Streams实现相同的目标吗?

编辑我只想通过Stream并行调用方法。

 private void getInformation() { method1(); method2(); method3(); method4(); method5(); } 

您可以这样使用CompletableFuture

 public String getResult() { // Create Stream of tasks: Stream>> tasks = Stream.of( () -> getServerListFromDB(), () -> getAppListFromDB(), () -> getUserFromDB()); List> lists = tasks // Supply all the tasks for execution and collect CompletableFutures .map(CompletableFuture::supplyAsync).collect(Collectors.toList()) // Join all the CompletableFutures to gather the results .stream() .map(CompletableFuture::join).collect(Collectors.toList()); // Use the results. They are guaranteed to be ordered in the same way as the tasks return getResult(lists.get(0), lists.get(1), lists.get(2)); } 

foreach是用于side-effects ,你可以在parallel stream上调用foreach 。 例如:

 listOfTasks.parallelStream().foreach(list->{ submitToDb(list); }); 

但是, parallelStream使用常见的ForkJoinPool ,这可能对IO-bound任务不利。

考虑使用CompletableFuture并提供适当的ExecutorService 。 它提供了更大的灵活性( continuation ,配置)。 例如:

 ExecutorService executorService = Executors.newCachedThreadPool(); List allFutures = new ArrayList<>(); for(Query query:queries){ CompletableFuture query = CompletableFuture.supplyAsync(() -> { // submit query to db return result; }, executorService); allFutures.add(query); } CompletableFuture all = CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[allFutures.size()])); 

如前所述,标准并行流可能不是最适合您的用例。 我将使用ExecutorService异步完成每个任务,并在调用getResult方法时“加入”它们:

 ExecutorService es = Executors.newFixedThreadPool(3); Future> serversList = es.submit(() -> getServerListFromDB()); Future> appList = es.submit(() -> getAppListFromDB()); Future> userList = es.submit(() -> getUserFromDB()); return getResult(serversList.get(), appList.get(), userList.get()); 

不太清楚你的意思是什么,但如果你只是想在并行的这些列表上运行一些进程,你可以这样做:

  List list1 = Arrays.asList("1", "234", "33"); List list2 = Arrays.asList("a", "b", "cddd"); List list3 = Arrays.asList("1331", "22", "33"); List> listOfList = Arrays.asList(list1, list2, list3); listOfList.parallelStream().forEach(list -> System.out.println(list.stream().max((o1, o2) -> Integer.compare(o1.length(), o2.length())))); 

(它将打印每个列表中最冗长的元素)。