如何取消Java 8可完成的未来?
我正在玩Java 8可完成的期货。 我有以下代码:
CountDownLatch waitLatch = new CountDownLatch(1); CompletableFuture future = CompletableFuture.runAsync(() -> { try { System.out.println("Wait"); waitLatch.await(); //cancel should interrupt System.out.println("Done"); } catch (InterruptedException e) { System.out.println("Interrupted"); throw new RuntimeException(e); } }); sleep(10); //give it some time to start (ugly, but works) future.cancel(true); System.out.println("Cancel called"); assertTrue(future.isCancelled()); assertTrue(future.isDone()); sleep(100); //give it some time to finish
使用runAsync我计划执行等待锁存器的代码。 接下来我取消了未来,期望被抛入中断的exception。 但似乎线程在await调用上仍然被阻塞,即使未来被取消(断言传递),也不会抛出InterruptedException。 使用ExecutorService的等效代码按预期工作。 它是CompletableFuture中的错误还是我的示例中的错误?
显然,这是故意的。 CompletableFuture :: cancel方法的Javadoc状态:
[参数:] mayInterruptIfRunning – 此值在此实现中无效,因为中断不用于控制处理。
有趣的是, ForkJoinTask :: cancel方法对参数mayInterruptIfRunning使用几乎相同的措辞。
我猜这个问题:
- 中断旨在用于阻塞操作,如睡眠 , 等待或I / O操作,
- 但CompletableFuture和ForkJoinTask都不打算用于阻塞操作。
CompletableFuture应该创建一个新的CompletionStage ,而不是阻塞,而cpu绑定任务是fork-join模型的先决条件。 因此,使用其中任何一个中断都会破坏他们的目的。 而另一方面,它可能会增加复杂性,如果按预期使用则不需要。
当您调用CompletableFuture#cancel
,您只会停止链的下游部分。 上游部分,即最终将调用complete(...)
或completeExceptionally(...)
,不会得到任何不再需要结果的信号。
什么是’上游’和’下游’的东西?
我们考虑以下代码:
CompletableFuture .supplyAsync(() -> "hello") //1 .thenApply(s -> s + " world!") //2 .thenAccept(s -> System.out.println(s)); //3
在这里,数据从上到下流动 – 从供应商创建,到function修改,再到println
消费。 上述特定步骤的部分称为上游部分,下部部分称为下游部分。 E. g。 步骤1和2是步骤3的上游。
这是幕后发生的事情。 这不是精确的,而是一个方便的思维模型。
- 正在执行供应商(步骤1)(在JVM的常见
ForkJoinPool
)。 - 然后供应商的结果通过
complete(...)
传递到下一个CompletableFuture
下游。 - 在收到结果后,
CompletableFuture
调用下一步 – 一个函数(步骤2),它接收前一步结果并返回一些将进一步传递给下游CompletableFuture
的complete(...)
。 - 在收到步骤2结果后,步骤3
CompletableFuture
调用消费者System.out.println(s)
。 消费者完成后,下游的CompletableFuture
将收到它的值,(Void) null
正如我们所看到的,此链中的每个CompletableFuture
必须知道下游有谁在等待将值传递给它们的complete(...)
(或者completeExceptionally(...)
)。 但CompletableFuture
不必知道它的上游(或上游 – 可能有几个)。
因此,在步骤3中调用cancel()
不会中止步骤1和2,因为从步骤3到步骤2没有链接。
假设你正在使用CompletableFuture
那么你的步数就足够小了,如果执行几个额外的步骤就没有坏处。
如果要将取消传播到上游,您有两种选择:
- 自己实现 – 创建一个专用的
CompletableFuture
(名称就像cancelled
),在每一步之后检查(类似step.applyToEither(cancelled, Function.identity())
) - 使用反应堆栈,如RxJava 2,ProjectReactor / Flux或Akka Streams
您需要CompletionStage的替代实现来完成真正的线程中断。 我刚刚发布了一个小型库,正是为了这个目的 – https://github.com/vsilaev/tascalate-concurrent
CancellationException是内部ForkJoin取消例程的一部分。 检索未来结果时会出现exception:
try { future.get(); } catch (Exception e){ System.out.println(e.toString()); }
花了一段时间才在调试器中看到这个。 JavaDoc并不清楚发生了什么或者你应该期待什么。