如何取消Java 8可完成的未来?

我正在玩Java 8可完成的期货。 我有以下代码:

CountDownLatch waitLatch = new CountDownLatch(1); CompletableFuture future = CompletableFuture.runAsync(() -> { try { System.out.println("Wait"); waitLatch.await(); //cancel should interrupt System.out.println("Done"); } catch (InterruptedException e) { System.out.println("Interrupted"); throw new RuntimeException(e); } }); sleep(10); //give it some time to start (ugly, but works) future.cancel(true); System.out.println("Cancel called"); assertTrue(future.isCancelled()); assertTrue(future.isDone()); sleep(100); //give it some time to finish 

使用runAsync我计划执行等待锁存器的代码。 接下来我取消了未来,期望被抛入中断的exception。 但似乎线程在await调用上仍然被阻塞,即使未来被取消(断言传递),也不会抛出InterruptedException。 使用ExecutorService的等效代码按预期工作。 它是CompletableFuture中的错误还是我的示例中的错误?

显然,这是故意的。 CompletableFuture :: cancel方法的Javadoc状态:

[参数:] mayInterruptIfRunning – 此值在此实现中无效,因为中断不用于控制处理。

有趣的是, ForkJoinTask :: cancel方法对参数mayInterruptIfRunning使用几乎相同的措辞。

我猜这个问题:

  • 中断旨在用于阻塞操作,如睡眠等待或I / O操作,
  • CompletableFutureForkJoinTask都不打算用于阻塞操作。

CompletableFuture应该创建一个新的CompletionStage ,而不是阻塞,而cpu绑定任务是fork-join模型的先决条件。 因此,使用其中任何一个中断都会破坏他们的目的。 而另一方面,它可能会增加复杂性,如果按预期使用则不需要。

当您调用CompletableFuture#cancel ,您只会停止链的下游部分。 上游部分,即最终将调用complete(...)completeExceptionally(...) ,不会得到任何不再需要结果的信号。

什么是’上游’和’下游’的东西?

我们考虑以下代码:

 CompletableFuture .supplyAsync(() -> "hello") //1 .thenApply(s -> s + " world!") //2 .thenAccept(s -> System.out.println(s)); //3 

在这里,数据从上到下流动 – 从供应商创建,到function修改,再到println消费。 上述特定步骤的部分称为上游部分,下部部分称为下游部分。 E. g。 步骤1和2是步骤3的上游。

这是幕后发生的事情。 这不是精确的,而是一个方便的思维模型。

  1. 正在执行供应商(步骤1)(在JVM的常见ForkJoinPool )。
  2. 然后供应商的结果通过complete(...)传递到下一个CompletableFuture下游。
  3. 在收到结果后, CompletableFuture调用下一步 – 一个函数(步骤2),它接收前一步结果并返回一些将进一步传递给下游CompletableFuturecomplete(...)
  4. 在收到步骤2结果后,步骤3 CompletableFuture调用消费者System.out.println(s) 。 消费者完成后,下游的CompletableFuture将收到它的值, (Void) null

正如我们所看到的,此链中的每个CompletableFuture必须知道下游有谁在等待将值传递给它们的complete(...) (或者completeExceptionally(...) )。 但CompletableFuture不必知道它的上游(或上游 – 可能有几个)。

因此,在步骤3中调用cancel()不会中止步骤1和2,因为从步骤3到步骤2没有链接。

假设你正在使用CompletableFuture那么你的步数就足够小了,如果执行几个额外的步骤就没有坏处。

如果要将取消传播到上游,您有两种选择:

  • 自己实现 – 创建一个专用的CompletableFuture (名称就像cancelled ),在每一步之后检查(类似step.applyToEither(cancelled, Function.identity())
  • 使用反应堆栈,如RxJava 2,ProjectReactor / Flux或Akka Streams

您需要CompletionStage的替代实现来完成真正的线程中断。 我刚刚发布了一个小型库,正是为了这个目的 – https://github.com/vsilaev/tascalate-concurrent

CancellationException是内部ForkJoin取消例程的一部分。 检索未来结果时会出现exception:

 try { future.get(); } catch (Exception e){ System.out.println(e.toString()); } 

花了一段时间才在调试器中看到这个。 JavaDoc并不清楚发生了什么或者你应该期待什么。