如何在流中将1个完成的未来分成许多可完成的未来?

例如,我有这样的方法:

public CompletableFuture getPage(int i) { ... } public CompletableFuture getDocument(int i) { ... } public CompletableFuture parseLinks(Document doc) { ... } 

我的流量:

 List list = IntStream .range(0, 10) .mapToObj(i -> getPage(i)) // I want method like this: .thenApplyAndSplit(CompletableFuture page -> { List<CompletableFuture> docs = page.getDocsId() .stream() .map(i -> getDocument(i)) .collect(Collectors.toList()); return docs; }) .map(CompletableFuture future -> { return future.thenApply(Document doc -> parseLink(doc); }) .collect(Collectors.toList()); 

它应该通过flatMap()CompletableFuture ,所以我想实现这个流程:

 List -> Stream<CompletableFuture> -> Stream<CompletableFuture> -> parse each 

UPDATE

 Stream<CompletableFuture> pagesCFS = IntStream .range(0, 10) .mapToObj(i -> getPage(i)); Stream<CompletableFuture> documentCFS = listCFS.flatMap(page -> { // How to return stream of Document when page finishes? // page.thenApply( ... ) }) 

我还想尝试为CompletableFutures流实现Spliterator ,所以这是我的尝试。

请注意,如果您在并行模式下使用此function,请注意为流和CompletableFuture后面运行的任务使用不同的ForkJoinPool 。 流将等待期货完成,因此如果它们共享相同的执行程序,甚至遇到死锁,您实际上可能会失去性能。

所以这是实现:

 public static  Stream flattenStreamOfFutures(Stream> stream, boolean parallel) { return StreamSupport.stream(new CompletableFutureSpliterator(stream), parallel); } public static  Stream flattenStreamOfFuturesOfStream(Stream>> stream, boolean parallel) { return flattenStreamOfFutures(stream, parallel).flatMap(Function.identity()); } public static class CompletableFutureSpliterator implements Spliterator { private List> futures; CompletableFutureSpliterator(Stream> stream) { futures = stream.collect(Collectors.toList()); } CompletableFutureSpliterator(CompletableFuture[] futures) { this.futures = new ArrayList<>(Arrays.asList(futures)); } CompletableFutureSpliterator(final List> futures) { this.futures = new ArrayList<>(futures); } @Override public boolean tryAdvance(final Consumer action) { if (futures.isEmpty()) return false; CompletableFuture.anyOf(futures.stream().toArray(CompletableFuture[]::new)).join(); // now at least one of the futures has finished, get its value and remove it ListIterator> it = futures.listIterator(futures.size()); while (it.hasPrevious()) { final CompletableFuture future = it.previous(); if (future.isDone()) { it.remove(); action.accept(future.join()); return true; } } throw new IllegalStateException("Should not reach here"); } @Override public Spliterator trySplit() { if (futures.size() > 1) { int middle = futures.size() >>> 1; // relies on the constructor copying the list, as it gets modified in place Spliterator result = new CompletableFutureSpliterator<>(futures.subList(0, middle)); futures = futures.subList(middle, futures.size()); return result; } return null; } @Override public long estimateSize() { return futures.size(); } @Override public int characteristics() { return IMMUTABLE | SIZED | SUBSIZED; } } 

它的工作原理是将给定的Stream>转换为那些期货的List – 假设构建流是快速的,艰苦的工作由期货本身完成,因此制作一个列表不应该很贵。 这也确保所有任务都已被触发,因为它强制处理源流。

为了生成输出流,它只是在流式传输其值之前等待任何未来完成。

一个简单的非并行用法示例(执行程序用于CompletableFuture ,以便同时启动它们):

 ExecutorService executor = Executors.newFixedThreadPool(20); long start = System.currentTimeMillis(); flattenStreamOfFutures(IntStream.range(0, 20) .mapToObj(i -> CompletableFuture.supplyAsync(() -> { try { Thread.sleep((i % 10) * 1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } System.out.println("Finished " + i + " @ " + (System.currentTimeMillis() - start) + "ms"); return i; }, executor)), false) .forEach(x -> { System.out.println(Thread.currentThread().getName() + " @ " + (System.currentTimeMillis() - start) + "ms handle result: " + x); }); executor.shutdown(); 

输出:

 Finished 10 @ 103ms Finished 0 @ 105ms main @ 114ms handle result: 10 main @ 114ms handle result: 0 Finished 1 @ 1102ms main @ 1102ms handle result: 1 Finished 11 @ 1104ms main @ 1104ms handle result: 11 Finished 2 @ 2102ms main @ 2102ms handle result: 2 Finished 12 @ 2104ms main @ 2105ms handle result: 12 Finished 3 @ 3102ms main @ 3102ms handle result: 3 Finished 13 @ 3104ms main @ 3105ms handle result: 13 … 

正如您所看到的,即使期货未按顺序完成,流也几乎立即产生价值。

将它应用于问题中的示例,这将给出(假设parseLinks()返回CompletableFuture而不是~ ):

 flattenStreamOfFuturesOfStream(IntStream.range(0, 10) .mapToObj(this::getPage) // the next map() will give a Stream>> // hence the need for flattenStreamOfFuturesOfStream() .map(pcf -> pcf .thenApply(page -> flattenStreamOfFutures(page .getDocsId() .stream() .map(this::getDocument) .map(docCF -> docCF.thenCompose(this::parseLinks)), false))), false) .forEach(System.out::println); 

你真的必须使用Streams吗? 你不能只对你的CompletableFutures一些依赖行动吗? 特别是因为你的上一次调用返回CompletableFutures (当然,也可以使用Collection.forEach

 List> completableFutures = IntStream .range(0, 10) .mapToObj(i -> getPage(i)).collect(Collectors.toList()); for (CompletableFuture page : completableFutures) { page.thenAccept(p -> { List docsId = p.getDocsId(); for (Integer integer : docsId) { getDocument(integer).thenAccept(d-> parseLinks(d)); } }); } 

编辑:那么我做了另一次尝试,但我不确定这是不是一个好主意,因为我不是CompletableFuture的专家。

使用以下方法(可能有更好的实现):

 public static  CompletableFuture> flatMapCF(Stream> stream){ return CompletableFuture.supplyAsync( ()-> stream.map(CompletableFuture::join) ); } Stream> pagesCFS = IntStream .range(0, 10) .mapToObj(i -> getPage(i)); CompletableFuture> pageCF = flatMapCF(pagesCFS); CompletableFuture> docCF= pageCF.thenCompose(a -> flatMapCF(a.flatMap( b -> b.getDocsId() .stream() .map(c -> getDocument(c)) ))); 

问题可能是,只有当所有结果都可用时, CompletableFuture才会返回

如果您不关心操作何时完成,那么以下内容将简单地触发所有文档上的parseLinks()

 IntStream.range(0, 10) .mapToObj(this::getPage) .forEach(pcf -> pcf .thenAccept(page -> page .getDocsId() .stream() .map(this::getDocument) .forEach(docCF -> docCF.thenCompose(this::parseLinks)))); 

否则,当您的上一个操作返回CompletableFuture ,我假设您主要想知道什么时候一切都完成了。 你可以这样做:

 CompletableFuture result = CompletableFuture.allOf(IntStream.range(0, 10) .mapToObj(this::getPage) .map(pcf -> pcf .thenCompose(page -> CompletableFuture.allOf(page .getDocsId() .stream() .map(docId -> getDocument(docId) .thenCompose(this::parseLinks)) .toArray(CompletableFuture[]::new)))) .toArray(CompletableFuture[]::new)); 

如果您对单个CompletableFuture的结果感兴趣,最好的方法是直接在流中,在它们创建的位置处理它们。

你甚至可以用可重用的方法将它们全部包装起来。 例如,如果parseLinks()返回CompletableFuture> ,则可以定义如下方法:

 public CompletableFuture processLinks(Function>, ? extends CompletableFuture> processor) { return CompletableFuture.allOf(IntStream.range(0, 10) .mapToObj(this::getPage) .map(pcf -> pcf .thenCompose(page -> CompletableFuture.allOf(page .getDocsId() .stream() .map(docId -> getDocument(docId) .thenCompose(this::parseLinks)) .map(processor) // here we apply the received function .toArray(CompletableFuture[]::new)))) .toArray(CompletableFuture[]::new)); } 

并处理结果列表,如下所示:

 processLinks(linksCF -> linksCF .thenAccept(links -> links.forEach(System.out::println))); 

一旦打印完所有链接,返回的CompletableFuture就会完成。