通过Streams并行执行多个查询
我有以下方法:
public String getResult() { List serversList = getServerListFromDB(); List appList = getAppListFromDB(); List userList = getUserFromDB(); return getResult(serversList, appList, userList); }
在这里,我按顺序调用三个方法,然后点击DB并获取结果,然后我对从DB命中获得的结果进行后处理。 我知道如何通过使用Threads
同时调用这三种方法。 但我想使用Java 8 Parallel Stream
来实现这一目标。 有人可以指导我如何通过Parallel Streams实现相同的目标吗?
编辑我只想通过Stream并行调用方法。
private void getInformation() { method1(); method2(); method3(); method4(); method5(); }
您可以这样使用CompletableFuture
:
public String getResult() { // Create Stream of tasks: Stream>> tasks = Stream.of( () -> getServerListFromDB(), () -> getAppListFromDB(), () -> getUserFromDB()); List> lists = tasks // Supply all the tasks for execution and collect CompletableFutures .map(CompletableFuture::supplyAsync).collect(Collectors.toList()) // Join all the CompletableFutures to gather the results .stream() .map(CompletableFuture::join).collect(Collectors.toList()); // Use the results. They are guaranteed to be ordered in the same way as the tasks return getResult(lists.get(0), lists.get(1), lists.get(2)); }
foreach
是用于side-effects
,你可以在parallel stream
上调用foreach
。 例如:
listOfTasks.parallelStream().foreach(list->{ submitToDb(list); });
但是, parallelStream
使用常见的ForkJoinPool
,这可能对IO-bound
任务不利。
考虑使用CompletableFuture
并提供适当的ExecutorService
。 它提供了更大的灵活性( continuation
,配置)。 例如:
ExecutorService executorService = Executors.newCachedThreadPool(); List allFutures = new ArrayList<>(); for(Query query:queries){ CompletableFuture query = CompletableFuture.supplyAsync(() -> { // submit query to db return result; }, executorService); allFutures.add(query); } CompletableFuture all = CompletableFuture.allOf(allFutures.toArray(new CompletableFuture[allFutures.size()]));
如前所述,标准并行流可能不是最适合您的用例。 我将使用ExecutorService异步完成每个任务,并在调用getResult方法时“加入”它们:
ExecutorService es = Executors.newFixedThreadPool(3); Future> serversList = es.submit(() -> getServerListFromDB()); Future> appList = es.submit(() -> getAppListFromDB()); Future> userList = es.submit(() -> getUserFromDB()); return getResult(serversList.get(), appList.get(), userList.get());
不太清楚你的意思是什么,但如果你只是想在并行的这些列表上运行一些进程,你可以这样做:
List list1 = Arrays.asList("1", "234", "33"); List list2 = Arrays.asList("a", "b", "cddd"); List list3 = Arrays.asList("1331", "22", "33"); List> listOfList = Arrays.asList(list1, list2, list3); listOfList.parallelStream().forEach(list -> System.out.println(list.stream().max((o1, o2) -> Integer.compare(o1.length(), o2.length()))));
(它将打印每个列表中最冗长的元素)。