使用CompletableFuture处理Java 8供应商exception
请考虑以下代码 –
public class TestCompletableFuture { BiConsumer biConsumer = (x,y) -> { System.out.println(x); System.out.println(y); }; public static void main(String args[]) { TestCompletableFuture testF = new TestCompletableFuture(); testF.start(); } public void start() { Supplier numberSupplier = new Supplier() { @Override public Integer get() { return SupplyNumbers.sendNumbers(); } }; CompletableFuture testFuture = CompletableFuture.supplyAsync(numberSupplier).whenComplete(biConsumer); } } class SupplyNumbers { public static Integer sendNumbers(){ return 25; // just for working sake its not correct. } }
以上的事情很好。 但是sendNumbers
也可以在我的情况下抛出一个检查过的exception,例如:
class SupplyNumbers { public static Integer sendNumbers() throws Exception { return 25; // just for working sake its not correct. } }
现在我想在我的biConsumer
处理这个exception。 这将有助于我在单个函数( biConsumer
)中处理结果以及exception(如果有)。
有任何想法吗? 我可以在这里或其他任何地方使用CompletableFuture.exceptionally(fn)
吗?
当您想要处理已检查的exception时,使用标准function接口的工厂方法没有帮助。 当您将代码捕获到lambda表达式中时,您遇到的问题是catch子句需要CompletableFuture
实例来设置exception,而工厂方法需要Supplier
,鸡蛋和鸡蛋。
您可以使用类的实例字段在创建后允许变异,但最终,生成的代码不是干净的,而且比基于Executor
的直接解决方案更复杂。 CompletableFuture
的文档说:
- 使用
ForkJoinPool.commonPool()
执行所有没有显式Executor参数的异步方法…
因此,您知道以下代码将直接处理已检查的exception时显示CompletableFuture.supplyAsync(Supplier)
的标准行为:
CompletableFuture f=new CompletableFuture<>(); ForkJoinPool.commonPool().submit(()-> { try { f.complete(SupplyNumbers.sendNumbers()); } catch(Exception ex) { f.completeExceptionally(ex); } });
文档还说:
…为了简化监视,调试和跟踪,所有生成的异步任务都是标记接口
CompletableFuture.AsynchronousCompletionTask
实例。
如果您希望遵循此约定以使解决方案更像原始的supplyAsync
方法,请将代码更改为:
CompletableFuture f=new CompletableFuture<>(); ForkJoinPool.commonPool().submit( (Runnable&CompletableFuture.AsynchronousCompletionTask)()-> { try { f.complete(SupplyNumbers.sendNumbers()); } catch(Exception ex) { f.completeExceptionally(ex); } });
您已经在y
捕获了exception。 也许你没有看到它,因为在CompletableFuture有机会完成之前, main
退出了?
下面的代码按预期打印“null”和“Hello”:
public static void main(String args[]) throws InterruptedException { TestCompletableFuture testF = new TestCompletableFuture(); testF.start(); Thread.sleep(1000); //wait for the CompletableFuture to complete } public static class TestCompletableFuture { BiConsumer biConsumer = (x, y) -> { System.out.println(x); System.out.println(y); }; public void start() { CompletableFuture.supplyAsync(SupplyNumbers::sendNumbers) .whenComplete(biConsumer); } } static class SupplyNumbers { public static Integer sendNumbers() { throw new RuntimeException("Hello"); } }
我不太确定你想要实现的目标。 如果您的供应商抛出exception,当您调用testFuture .get()
您将获得由供应商抛出的任何exception引起的java.util.concurrent.ExecutionException
,您可以通过在ExecutionException
上调用getCause()
来检索。
或者,正如您所提到的,您可以在CompletableFuture
使用exceptionally
。 这段代码:
public class TestCompletableFuture { private static BiConsumer biConsumer = (x,y) -> { System.out.println(x); System.out.println(y); }; public static void main(String args[]) throws Exception { Supplier numberSupplier = () -> { throw new RuntimeException(); // or return integer }; CompletableFuture testFuture = CompletableFuture.supplyAsync(numberSupplier) .whenComplete(biConsumer) .exceptionally(exception -> 7); System.out.println("result = " + testFuture.get()); } }
打印此结果:
null java.util.concurrent.CompletionException: java.lang.RuntimeException result = 7
编辑:
如果您已检查exception,则只需添加try-catch即可。
原始代码:
Supplier numberSupplier = new Supplier () { @Override public Integer get() { return SupplyNumbers.sendNumbers(); } };
修改后的代码
Supplier numberSupplier = new Supplier () { @Override public Integer get() { try { return SupplyNumbers.sendNumbers(); } catch (Excetpion e) { throw new RuntimeExcetpion(e); } } };
也许您可以使用新的Object来包装整数和错误,如下所示:
public class Result { private Integer integer; private Exception exception; // getter setter }
接着:
public void start(){ Supplier numberSupplier = new Supplier () { @Override public Result get() { Result r = new Result(); try { r.setInteger(SupplyNumbers.sendNumbers()); } catch (Exception e){ r.setException(e); } return r; } }; CompletableFuture testFuture = CompletableFuture.supplyAsync(numberSupplier).whenComplete(biConsumer); }
使用completeExceptionally()
时,在CompletableFuture
exception处理需要考虑的另一点是, handle()
和whenComplete()
可以使用确切的exception,但在调用join()
或转发到的时它将被包装在CompletionException
中任何下游阶段。
因此,应用于下游阶段的handle()
或exceptionally()
将看到CompletionException
而不是原始的,并且必须查看其原因以找到原始exception。
此外,任何操作(包括supplyAsync()
)抛出的任何RuntimeException
也包含在CompletionException
,除非它已经是CompletionException
。
考虑到这一点,最好在安全方面进行播放并让exception处理程序解包CompletionException
。
如果你这样做,就没有必要在CompletableFuture
上设置确切的(已检查的)exception,并且直接在CompletionException
包装已检查的exception要简单得多:
Supplier numberSupplier = () -> { try { return SupplyNumbers.sendNumbers(); } catch (Exception e) { throw new CompletionException(e); } };
为了将这种方法与Holger的方法进行比较,我使用2个解决方案调整了代码( simpleWrap()
如上所示, customWrap()
是Holger的代码):
public class TestCompletableFuture { public static void main(String args[]) { TestCompletableFuture testF = new TestCompletableFuture(); System.out.println("Simple wrap"); testF.handle(testF.simpleWrap()); System.out.println("Custom wrap"); testF.handle(testF.customWrap()); } private void handle(CompletableFuture future) { future.whenComplete((x1, y) -> { System.out.println("Before thenApply(): " + y); }); future.thenApply(x -> x).whenComplete((x1, y) -> { System.out.println("After thenApply(): " + y); }); try { future.join(); } catch (Exception e) { System.out.println("Join threw " + e); } try { future.get(); } catch (Exception e) { System.out.println("Get threw " + e); } } public CompletableFuture simpleWrap() { Supplier numberSupplier = () -> { try { return SupplyNumbers.sendNumbers(); } catch (Exception e) { throw new CompletionException(e); } }; return CompletableFuture.supplyAsync(numberSupplier); } public CompletableFuture customWrap() { CompletableFuture f = new CompletableFuture<>(); ForkJoinPool.commonPool().submit( (Runnable & CompletableFuture.AsynchronousCompletionTask) () -> { try { f.complete(SupplyNumbers.sendNumbers()); } catch (Exception ex) { f.completeExceptionally(ex); } }); return f; } } class SupplyNumbers { public static Integer sendNumbers() throws Exception { throw new Exception("test"); // just for working sake its not correct. } }
输出:
Simple wrap After thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test Before thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test Join threw java.util.concurrent.CompletionException: java.lang.Exception: test Get threw java.util.concurrent.ExecutionException: java.lang.Exception: test Custom wrap After thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test Before thenApply(): java.lang.Exception: test Join threw java.util.concurrent.CompletionException: java.lang.Exception: test Get threw java.util.concurrent.ExecutionException: java.lang.Exception: test
正如您将注意到的,唯一的区别是thenApply()
在customWrap()
情况下的customWrap()
之前看到原始exception。 在thenApply()
,在所有其他情况下,原始exception被包装。
最令人惊讶的是get()
将在“Simple wrap”情况下解包CompletionException
,并用ExecutionException
替换它。