CompletableFuture:等待第一个正常返回?

我有一些CompletableFuture ,我想并行运行它们,等待正常返回的第一个。

我知道我可以使用CompletableFuture.anyOf等待第一个返回,但这将正常exception返回。 我想忽略exception。

 List<CompletableFuture> futures = names.stream().map( (String name) -> CompletableFuture.supplyAsync( () -> // this calling may throw exceptions. new Task(name).run() ) ).collect(Collectors.toList()); //FIXME Can not ignore exceptionally returned takes. Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{})); try { logger.info(any.get().toString()); } catch (Exception e) { e.printStackTrace(); } 

您可以使用以下帮助程序方法:

 public static  CompletableFuture anyOf(List> l) { CompletableFuture f=new CompletableFuture<>(); Consumer complete=f::complete; l.forEach(s -> s.thenAccept(complete)); return f; } 

您可以像这样使用,以certificate它将忽略先前的exception但返回第一个提供的值:

 List> futures = Arrays.asList( CompletableFuture.supplyAsync( () -> { throw new RuntimeException("failing immediately"); } ), CompletableFuture.supplyAsync( () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)); return "with 5s delay"; }), CompletableFuture.supplyAsync( () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10)); return "with 10s delay"; }) ); CompletableFuture c = anyOf(futures); logger.info(c.join()); 

该解决方案的一个缺点是,如果所有期货都exception完成,它将永远不会完成。 一个解决方案,如果成功计算将提供第一个值,但如果根本没有成功计算则会exception失败,这个解决方案涉及更多:

 public static  CompletableFuture anyOf(List> l) { CompletableFuture f=new CompletableFuture<>(); Consumer complete=f::complete; CompletableFuture.allOf( l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture[]::new) ).exceptionally(ex -> { f.completeExceptionally(ex); return null; }); return f; } 

它利用了allOf的特殊处理程序仅在所有期货完成后(exception或非allOf )被调用的事实,并且未来只能完成一次(让特殊的东西像obtrude…一样)。 执行exception处理程序时,如果有结果,则已完成任何使用结果完成未来的尝试,因此,如果之前没有成功完成,则exception完成它的尝试只会成功。

它可以与第一个解决方案完全相同的方式使用,并且只有在所有计算都失败时才会表现出不同的行为,例如:

 List> futures = Arrays.asList( CompletableFuture.supplyAsync( () -> { throw new RuntimeException("failing immediately"); } ), CompletableFuture.supplyAsync( // delayed to demonstrate that the solution will wait for all completions // to ensure it doesn't miss a possible successful computation () -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)); throw new RuntimeException("failing later"); } ) ); CompletableFuture c = anyOf(futures); try { logger.info(c.join()); } catch(CompletionException ex) { logger.severe(ex.toString()); } 

上面的示例使用了一个延迟,certificate解决方案将在没有成功时等待所有完成,而ideone上的这个示例将演示以后的成功将如何将结果转化为成功。 请注意,由于Ideones缓存结果,您可能不会注意到延迟。

请注意,如果所有期货都失败,则无法保证会报告哪些例外情况。 由于它在错误的情况下等待所有完成,任何可以使它达到最终结果。

考虑到:

  1. Java哲学的基础之一是防止或阻止糟糕的编程实践。

    (它在多大程度上成功地这样做是另一场辩论的主题;这一点仍然表明,这无疑是该语言的主要目标之一。)

  2. 忽略exception是一种非常糟糕的做法。

    一个例外应该总是重新抛出上面的层,或者处理,或者至少报告。 具体而言, 永远不应该悄悄地吞下exception

  3. 应尽早报告错误。

    例如,查看运行时经历的痛苦,以便提供失败的快速迭代器,如果在迭代时修改了集合,则会抛出ConcurrentModificationException

  4. 忽略exception完成的CompletableFuture意味着a)您没有尽早报告错误,并且b)您可能计划根本不报告错误。

  5. 无法简单地等待第一次非特殊完成而不必受到特殊完成的困扰并不会带来任何重大负担,因为您总是可以从列表中删除特殊完成的项目,(同时不要忘记报告失败, 对吗? )并重复等待。

因此,如果Java中有意遗漏所寻求的function,我不会感到惊讶,我愿意争辩说它是理所当然的

(抱歉Sotirios,没有规范的答案。)

那么,这是框架应该支持的方法。 首先,我认为CompletionStage.applyToEither做了类似的事情,但事实certificate它没有。 所以我提出了这个解决方案:

 public static  CompletionStage firstCompleted(Collection> stages) { final int count = stages.size(); if (count <= 0) { throw new IllegalArgumentException("stages must not be empty"); } final AtomicInteger settled = new AtomicInteger(); final CompletableFuture future = new CompletableFuture(); BiConsumer consumer = (val, exc) -> { if (exc == null) { future.complete(val); } else { if (settled.incrementAndGet() >= count) { // Complete with the last exception. You can aggregate all the exceptions if you wish. future.completeExceptionally(exc); } } }; for (CompletionStage item : stages) { item.whenComplete(consumer); } return future; } 

要看到它的实际效果,这里有一些用法:

 import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; public class Main { public static  CompletionStage firstCompleted(Collection> stages) { final int count = stages.size(); if (count <= 0) { throw new IllegalArgumentException("stages must not be empty"); } final AtomicInteger settled = new AtomicInteger(); final CompletableFuture future = new CompletableFuture(); BiConsumer consumer = (val, exc) -> { if (exc == null) { future.complete(val); } else { if (settled.incrementAndGet() >= count) { // Complete with the last exception. You can aggregate all the exceptions if you wish. future.completeExceptionally(exc); } } }; for (CompletionStage item : stages) { item.whenComplete(consumer); } return future; } private static final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor(); public static  CompletionStage delayed(final U value, long delay) { CompletableFuture future = new CompletableFuture(); worker.schedule(() -> { future.complete(value); }, delay, TimeUnit.MILLISECONDS); return future; } public static  CompletionStage delayedExceptionally(final Throwable value, long delay) { CompletableFuture future = new CompletableFuture(); worker.schedule(() -> { future.completeExceptionally(value); }, delay, TimeUnit.MILLISECONDS); return future; } public static void main(String[] args) throws InterruptedException, ExecutionException { System.out.println("Started..."); /* // Looks like applyToEither doesn't work as expected CompletableFuture a = CompletableFuture.completedFuture(99); CompletableFuture b = Main.completedExceptionally(new Exception("Exc")).toCompletableFuture(); System.out.println(b.applyToEither(a, x -> x).get()); // throws Exc */ try { List> futures = new ArrayList<>(); futures.add(Main.delayedExceptionally(new Exception("Exception #1"), 100)); futures.add(Main.delayedExceptionally(new Exception("Exception #2"), 200)); futures.add(delayed(1, 1000)); futures.add(Main.delayedExceptionally(new Exception("Exception #4"), 400)); futures.add(delayed(2, 500)); futures.add(Main.delayedExceptionally(new Exception("Exception #5"), 600)); Integer value = firstCompleted(futures).toCompletableFuture().get(); System.out.println("Completed normally: " + value); } catch (Exception ex) { System.out.println("Completed exceptionally"); ex.printStackTrace(); } try { List> futures = new ArrayList<>(); futures.add(Main.delayedExceptionally(new Exception("Exception B#1"), 400)); futures.add(Main.delayedExceptionally(new Exception("Exception B#2"), 200)); Integer value = firstCompleted(futures).toCompletableFuture().get(); System.out.println("Completed normally: " + value); } catch (Exception ex) { System.out.println("Completed exceptionally"); ex.printStackTrace(); } System.out.println("End..."); } } 

这会有用吗? 返回正常完成的所有期货的流,并返回其中一个。

 futures.stream() .filter(f -> { try{ f.get(); return true; }catch(ExecutionException | InterruptedException e){ return false; } }) .findAny();