Java 8 CompletableFuture中具有默认值的超时
假设我有一些异步计算,例如:
CompletableFuture .supplyAsync(() -> createFoo()) .thenAccept(foo -> doStuffWithFoo(foo));
如果异步供应商根据某些指定的超时超时,是否有一种很好的方法为foo提供默认值? 理想情况下,此类function也会尝试取消运行缓慢的供应商。 例如,是否存在类似于以下假设代码的标准库function:
CompletableFuture .supplyAsync(() -> createFoo()) .acceptEither( CompletableFuture.completedAfter(50, TimeUnit.MILLISECONDS, DEFAULT_FOO), foo -> doStuffWithFoo(foo));
或者甚至更好:
CompletableFuture .supplyAsync(() -> createFoo()) .withDefault(DEFAULT_FOO, 50, TimeUnit.MILLISECONDS) .thenAccept(foo -> doStuffWithFoo(foo));
我知道get(timeout, unit)
,但我想知道是否有一种更好的标准方法,以异步和反应方式应用超时,如上面的代码所示。
编辑:这是一个受Java 8启发的解决方案:在lambda表达式中强制检查exception处理。 为什么强制,不是可选的? ,但不幸的是它阻止了一个线程。 如果我们依赖createFoo()异步检查超时并抛出自己的超时exception,它可以在不阻塞线程的情况下工作,但会给供应商的创建者带来更多负担,并且仍然会产生创建exception的成本(可以没有“快速投掷”而昂贵
static Supplier wrapped(Callable callable) { return () -> { try { return callable.call(); } catch (RuntimeException e1) { throw e1; } catch (Throwable e2) { throw new RuntimeException(e2); } }; } CompletableFuture .supplyAsync(wrapped(() -> CompletableFuture.supplyAsync(() -> createFoo()).get(50, TimeUnit.MILLISECONDS))) .exceptionally(e -> "default") .thenAcceptAsync(s -> doStuffWithFoo(foo));
CompletableFuture.supplyAsync只是一个帮助方法,可以为您创建CompletableFuture,并将任务提交给ForkJoin池。
您可以按照以下要求创建自己的supplyAsync:
private static final ScheduledExecutorService schedulerExecutor = Executors.newScheduledThreadPool(10); private static final ExecutorService executorService = Executors.newCachedThreadPool(); public static CompletableFuture supplyAsync( final Supplier supplier, long timeoutValue, TimeUnit timeUnit, T defaultValue) { final CompletableFuture cf = new CompletableFuture (); // as pointed out by Peti, the ForkJoinPool.commonPool() delivers a // ForkJoinTask implementation of Future, that doesn't interrupt when cancelling // Using Executors.newCachedThreadPool instead in the example // submit task Future> future = executorService.submit(() -> { try { cf.complete(supplier.get()); } catch (Throwable ex) { cf.completeExceptionally(ex); } }); //schedule watcher schedulerExecutor.schedule(() -> { if (!cf.isDone()) { cf.complete(defaultValue); future.cancel(true); } }, timeoutValue, timeUnit); return cf; }
使用该帮助器创建CompletableFuture就像在CompletableFuture中使用静态方法一样简单:
CompletableFuture a = supplyAsync(() -> "hi", 1, TimeUnit.SECONDS, "default");
测试它:
a = supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e1) { // ignore } return "hi"; }, 1, TimeUnit.SECONDS, "default");
在Java 9中,将有completeOnTimeout(T值,长超时,TimeUnit单元) ,它可以满足你的需要,尽管它不会取消缓慢的供应商。
还有一个orTimeout(长超时,TimeUnit单元) ,在超时的情况下exception完成。
我认为在提供默认值的时候,你总是需要额外的线程监控。 我可能会选择两个supplyAsync调用,默认情况下包含在一个实用程序API中,由acceptEither链接。 如果您更愿意包装您的供应商,那么您可以使用实用程序API为您做出“要么”的调用:
public class TimeoutDefault { public static CompletableFuture with(T t, int ms) { return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(ms); } catch (InterruptedException e) { } return t; }); } public static Supplier with(Supplier supplier, T t, int ms) { return () -> CompletableFuture.supplyAsync(supplier) .applyToEither(TimeoutDefault.with(t, ms), i -> i).join(); } } CompletableFuture future = CompletableFuture .supplyAsync(Example::createFoo) .acceptEither( TimeoutDefault.with("default", 1000), Example::doStuffWithFoo); CompletableFuture future = CompletableFuture .supplyAsync(TimeoutDefault.with(Example::createFoo, "default", 1000)) .thenAccept(Example::doStuffWithFoo);
DZone有一篇很好的文章如何解决这个问题: https : //dzone.com/articles/asynchronous-timeouts
我不确定代码的版权,因此我不能在这里复制它。 解决方案非常类似于Dane White的解决方案,但它使用带有单个线程和schedule()
的线程池,以避免浪费线程只是为了等待超时。
它还会抛出TimeoutException
而不是返回默认值。
没有标准的库方法来构造一个在超时后提供值的CompletableFuture。 也就是说,以最小的资源开销自行推出是非常简单的:
private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(); public static CompletableFuture delayedValue(final T value, final Duration delay) { final CompletableFuture result = new CompletableFuture<>(); EXECUTOR.schedule(() -> result.complete(value), delay.toMillis(), TimeUnit.MILLISECONDS); return result; }
它可以与CompleteableFuture
的“ either
”方法一起使用:
-
accceptEither
,acceptEitherAsync
-
applyToEither
,applyToEitherAsync
-
runAfterEither
,runAfterEitherAsync
如果远程服务调用超过某个延迟阈值,则一个应用程序使用缓存值:
interface RemoteServiceClient { CompletableFuture getFoo(); } final RemoteServiceClient client = /* ... */; final Foo cachedFoo = /* ... */; final Duration timeout = /* ... */; client.getFoos() .exceptionally(ignoredException -> cachedFoo) .acceptEither(delayedValue(cachedFoo, timeout), foo -> /* do something with foo */) .join();
如果远程客户端调用exception完成(例如SocketTimeoutException
),我们可以快速失败并立即使用缓存值。
CompletableFuture.anyOf(CompletableFuture>...)
可与此delayedValue
原语组合,以使用上述语义包装CompletableFuture
:
@SuppressWarnings("unchecked") public static CompletableFuture withDefault(final CompletableFuture cf, final T defaultValue, final Duration timeout) { return (CompletableFuture ) CompletableFuture.anyOf( cf.exceptionally(ignoredException -> defaultValue), delayedValue(defaultValue, timeout)); }
这很好地简化了上面的远程服务调用示例:
withDefault(client.getFoos(), cachedFoo, timeout) .thenAccept(foo -> /* do something with foo */) .join();
CompletableFuture
更准确地称为承诺,因为它们将Future
创建与其完成分开。 确保使用专用线程池来进行繁重的CPU工作。 要为昂贵的计算创建CompletableFuture
,您应该使用CompletableFuture#supplyAsync(Supplier, Executor)
重载,因为#supplyAsync(Supplier)
重载默认为公共ForkJoinPool
。 返回的CompletableFuture
无法取消其任务,因为Executor
接口未公开此function。 更一般地,依赖的CompletableFuture
不会取消他们的父母,例如cf.thenApply(f).cancel(true)
不取消cf.thenApply(f).cancel(true)
如果你需要这个function,我建议坚持使用ExecutorService
返回的Future
。