如何使用CompletableFuture.thenComposeAsync()?

鉴于:

public class Test { public static void main(String[] args) { int nThreads = 1; Executor e = Executors.newFixedThreadPool(nThreads); CompletableFuture.runAsync(() -> { System.out.println("Task 1. Thread: " + Thread.currentThread().getId()); }, e).thenComposeAsync((Void unused) -> { return CompletableFuture.runAsync(() -> { System.out.println("Task 2. Thread: " + Thread.currentThread().getId()); }, e); }, e).join(); System.out.println("finished"); } } 

我期待一个执行程序线程运行任务1,然后执行任务2.相反,如果nThreads小于2,代码将挂起。

  1. 请解释代码挂起的原因。 我可以看到它在CompletableFuture中被阻止:616等待一些Future完成,但目前尚不清楚原因。
  2. 如果我允许使用2个线程,那么每个线程用于什么?

简而言之,请帮助我理解thenComposeAsync()实际上是如何工作的。 Javadoc看起来像是为机器人而不是人类而写的:)

  1. thenComposeAsync方法为执行程序放置一个新任务,该任务抓取单个线程并等待Task 2完成。 但是这个没有更多的线程可以运行。 您可以改为使用在与Task 1相同的线程中执行的thenCompose方法来避免死锁。

  2. 一个线程正在执行Task 1Task 2 ,第二个线程正在处理组合两者的结果。

注意: CompletableFuture (s)最适合使用ForkJoinPool ,它可以更有效地处理生成新任务的任务。 出于此目的,在Java 8中添加了默认的ForkJoinPool ,如果未指定执行程序来运行任务,则默认使用它。

以下是关于这些新function在何处发挥作用以及它们如何工作的良好演示: Java 8期货的反应式编程模式 。

它阻塞在runAsync中的thenComposeAsyncthenComposeAsync在执行器e内的线程中运行提供的函数。 但是你给它的函数会尝试在同一个执行器中执行runAsync体。

您可以通过添加另一个跟踪输出来更好地了解正在发生的事情:

 CompletableFuture.runAsync(() -> { System.out.println("Task 1. Thread: " + Thread.currentThread().getId()); }, e).thenComposeAsync((Void unused) -> { System.out.println("Task 1 1/2. Thread: " + Thread.currentThread().getId()); return CompletableFuture.runAsync(() -> { System.out.println("Task 2. Thread: " + Thread.currentThread().getId()); }, e); }, e).join(); 

现在,如果使用双线程执行程序运行它,您将看到任务1 1/2和任务2在不同的线程上运行。

修复它的方法是用常规的thenCompose替换thenCompose 。 我不确定为什么你会使用thenComposeAsync 。 如果您有一个返回CompletableFuture的方法,可能该方法不会阻塞,也不需要异步运行。