Java 8 CompletableFuture中具有默认值的超时

假设我有一些异步计算,例如:

CompletableFuture .supplyAsync(() -> createFoo()) .thenAccept(foo -> doStuffWithFoo(foo)); 

如果异步供应商根据某些指定的超时超时,是否有一种很好的方法为foo提供默认值? 理想情况下,此类function也会尝试取消运行缓慢的供应商。 例如,是否存在类似于以下假设代码的标准库function:

 CompletableFuture .supplyAsync(() -> createFoo()) .acceptEither( CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO), foo -> doStuffWithFoo(foo)); 

或者甚至更好:

 CompletableFuture .supplyAsync(() -> createFoo()) .withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS) .thenAccept(foo -> doStuffWithFoo(foo)); 

我知道get(timeout, unit) ,但我想知道是否有一种更好的标准方法,以异步和反应方式应用超时,如上面的代码所示。

编辑:这是一个受Java 8启发的解决方案:在lambda表达式中强制检查exception处理。 为什么强制,不是可选的? ,但不幸的是它阻止了一个线程。 如果我们依赖createFoo()异步检查超时并抛出自己的超时exception,它可以在不阻塞线程的情况下工作,但会给供应商的创建者带来更多负担,并且仍然会产生创建exception的成本(可以没有“快速投掷”而昂贵

 static  Supplier wrapped(Callable callable) { return () -> { try { return callable.call(); } catch (RuntimeException e1) { throw e1; } catch (Throwable e2) { throw new RuntimeException(e2); } }; } CompletableFuture .supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS))) .exceptionally(e -> "default") .thenAcceptAsync(s -> doStuffWithFoo(foo)); 

CompletableFuture.supplyAsync只是一个帮助方法,可以为您创建CompletableFuture,并将任务提交给ForkJoin池。

您可以按照以下要求创建自己的supplyAsync:

 private static final ScheduledExecutorService schedulerExecutor = Executors.newScheduledThreadPool(10); private static final ExecutorService executorService = Executors.newCachedThreadPool(); public static  CompletableFuture supplyAsync( final Supplier supplier, long timeoutValue, TimeUnit timeUnit, T defaultValue) { final CompletableFuture cf = new CompletableFuture(); // as pointed out by Peti, the ForkJoinPool.commonPool() delivers a // ForkJoinTask implementation of Future, that doesn't interrupt when cancelling // Using Executors.newCachedThreadPool instead in the example // submit task Future future = executorService.submit(() -> { try { cf.complete(supplier.get()); } catch (Throwable ex) { cf.completeExceptionally(ex); } }); //schedule watcher schedulerExecutor.schedule(() -> { if (!cf.isDone()) { cf.complete(defaultValue); future.cancel(true); } }, timeoutValue, timeUnit); return cf; } 

使用该帮助器创建CompletableFuture就像在CompletableFuture中使用静态方法一样简单:

  CompletableFuture a = supplyAsync(() -> "hi", 1, TimeUnit.SECONDS, "default"); 

测试它:

  a = supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e1) { // ignore } return "hi"; }, 1, TimeUnit.SECONDS, "default"); 

在Java 9中,将有completeOnTimeout(T值,长超时,TimeUnit单元) ,它可以满足你的需要,尽管它不会取消缓慢的供应商。

还有一个orTimeout(长超时,TimeUnit单元) ,在超时的情况下exception完成。

我认为在提供默认值的时候,你总是需要额外的线程监控。 我可能会选择两个supplyAsync调用,默认情况下包含在一个实用程序API中,由acceptEither链接。 如果您更愿意包装您的供应商,那么您可以使用实用程序API为您做出“要么”的调用:

 public class TimeoutDefault { public static  CompletableFuture with(T t, int ms) { return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(ms); } catch (InterruptedException e) { } return t; }); } public static  Supplier with(Supplier supplier, T t, int ms) { return () -> CompletableFuture.supplyAsync(supplier) .applyToEither(TimeoutDefault.with(t, ms), i -> i).join(); } } CompletableFuture future = CompletableFuture .supplyAsync(Example::createFoo) .acceptEither( TimeoutDefault.with("default", 1000), Example::doStuffWithFoo); CompletableFuture future = CompletableFuture .supplyAsync(TimeoutDefault.with(Example::createFoo, "default", 1000)) .thenAccept(Example::doStuffWithFoo); 

DZone有一篇很好的文章如何解决这个问题: https : //dzone.com/articles/asynchronous-timeouts

我不确定代码的版权,因此我不能在这里复制它。 解决方案非常类似于Dane White的解决方案,但它使用带有单个线程和schedule()的线程池,以避免浪费线程只是为了等待超时。

它还会抛出TimeoutException而不是返回默认值。

没有标准的库方法来构造一个在超时后提供值的CompletableFuture。 也就是说,以最小的资源开销自行推出是非常简单的:

 private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(); public static  CompletableFuture delayedValue(final T value, final Duration delay) { final CompletableFuture result = new CompletableFuture<>(); EXECUTOR.schedule(() -> result.complete(value), delay.toMillis(), TimeUnit.MILLISECONDS); return result; } 

它可以与CompleteableFuture的“ either ”方法一起使用:

  • accceptEitheracceptEitherAsync
  • applyToEitherapplyToEitherAsync
  • runAfterEitherrunAfterEitherAsync

如果远程服务调用超过某个延迟阈值,则一个应用程序使用缓存值:

 interface RemoteServiceClient { CompletableFuture getFoo(); } final RemoteServiceClient client = /* ... */; final Foo cachedFoo = /* ... */; final Duration timeout = /* ... */; client.getFoos() .exceptionally(ignoredException -> cachedFoo) .acceptEither(delayedValue(cachedFoo, timeout), foo -> /* do something with foo */) .join(); 

如果远程客户端调用exception完成(例如SocketTimeoutException ),我们可以快速失败并立即使用缓存值。

CompletableFuture.anyOf(CompletableFuture...)可与此delayedValue原语组合,以使用上述语义包装CompletableFuture

 @SuppressWarnings("unchecked") public static  CompletableFuture withDefault(final CompletableFuture cf, final T defaultValue, final Duration timeout) { return (CompletableFuture) CompletableFuture.anyOf( cf.exceptionally(ignoredException -> defaultValue), delayedValue(defaultValue, timeout)); } 

这很好地简化了上面的远程服务调用示例:

 withDefault(client.getFoos(), cachedFoo, timeout) .thenAccept(foo -> /* do something with foo */) .join(); 

CompletableFuture更准确地称为承诺,因为它们将Future创建与其完成分开。 确保使用专用线程池来进行繁重的CPU工作。 要为昂贵的计算创建CompletableFuture ,您应该使用CompletableFuture#supplyAsync(Supplier, Executor)重载,因为#supplyAsync(Supplier)重载默认为公共ForkJoinPool 。 返回的CompletableFuture无法取消其任务,因为Executor接口未公开此function。 更一般地,依赖的CompletableFuture不会取消他们的父母,例如cf.thenApply(f).cancel(true)不取消cf.thenApply(f).cancel(true) 如果你需要这个function,我建议坚持使用ExecutorService返回的Future