如何使用AsyncRestTemplate同时进行多个调用?

我不明白如何有效地使用AsyncRestTemplate进行外部服务调用。 对于以下代码:

 class Foo { public void doStuff() { Future<ResponseEntity> future1 = asyncRestTemplate.getForEntity( url1, String.class); String response1 = future1.get(); Future<ResponseEntity> future2 = asyncRestTemplate.getForEntity( url2, String.class); String response2 = future2.get(); Future<ResponseEntity> future3 = asyncRestTemplate.getForEntity( url3, String.class); String response3 = future3.get(); } } 

理想情况下,我希望同时执行所有3个调用,并在完成所有操作后处理结果。 但是,在调用get()之前不会获取每个外部服务调用,但get()阻止get() 。 那么这不是打败了AsyncRestTemplate的目的吗? 我不妨使用RestTemplate

所以我不明白我怎么能让他们同时执行?

在调度所有异步调用之前,不要调用阻塞get()

 class Foo { public void doStuff() { ListenableFuture> future1 = asyncRestTemplate .getForEntity(url1, String.class); ListenableFuture> future2 = asyncRestTemplate .getForEntity(url2, String.class); ListenableFuture> future3 = asyncRestTemplate .getForEntity(url3, String.class); String response1 = future1.get(); String response2 = future2.get(); String response3 = future3.get(); } } 

您可以同时执行调度和循环,但请注意,当前的结果收集效率低下,因为它会陷入下一个未完成的未来。

您可以将所有期货添加到集合中,并迭代测试每个未来非阻塞isDone() 。 当该调用返回true时,您可以调用get()

这样,您的整体结果收集将被优化,而不是按照调用get() s的顺序等待下一个缓慢的未来结果。

更好的是,您可以在ListenableFuture返回的每个ListenableFuture注册回调(运行时),您不必担心周期性地检查潜在结果。

如果您不必使用’AsyncRestTemplate’,我建议使用RxJava。 RxJava zip运算符正是您所需要的。 检查以下代码:

 private rx.Observable externalCall(String url, int delayMilliseconds) { return rx.Observable.create( subscriber -> { try { Thread.sleep(delayMilliseconds); //simulate long operation subscriber.onNext("response(" + url + ") "); subscriber.onCompleted(); } catch (InterruptedException e) { subscriber.onError(e); } } ); } public void callServices() { rx.Observable call1 = externalCall("url1", 1000).subscribeOn(Schedulers.newThread()); rx.Observable call2 = externalCall("url2", 4000).subscribeOn(Schedulers.newThread()); rx.Observable call3 = externalCall("url3", 5000).subscribeOn(Schedulers.newThread()); rx.Observable.zip(call1, call2, call3, (resp1, resp2, resp3) -> resp1 + resp2 + resp3) .subscribeOn(Schedulers.newThread()) .subscribe(response -> System.out.println("done with: " + response)); } 

所有对外部服务的请求都将在单独的线程中执行,当最后一次调用完成时,将应用转换函数(在示例中简单的字符串连接),结果(连接字符串)将从’zip’可观察到。

我理解你的问题是你有一个预定义的异步方法,你尝试做的是使用RestTemplate类异步调用这个方法。

我写了一个方法,可以帮助你异步调用你的方法。

  public void testMyAsynchronousMethod(String... args) throws Exception { // Start the clock long start = System.currentTimeMillis(); // Kick of multiple, asynchronous lookups Future future1 = asyncRestTemplate .getForEntity(url1, String.class);; Future future2 = asyncRestTemplate .getForEntity(url2, String.class); Future future3 = asyncRestTemplate .getForEntity(url3, String.class); // Wait until they are all done while (!(future1 .isDone() && future2.isDone() && future3.isDone())) { Thread.sleep(10); //10-millisecond pause between each check } // Print results, including elapsed time System.out.println("Elapsed time: " + (System.currentTimeMillis() - start)); System.out.println(future1.get()); System.out.println(future2.get()); System.out.println(future3.get()); } 

您可能想要使用CompletableFuture类( javadoc )。

  1. 将您的调用转换为CompletableFuture 。 例如。

     final CompletableFuture> cf = CompletableFuture.supplyAsync(() -> { try { return future.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); 
  2. 接下来使用您新创建的3个可完成期货调用CompletableFuture::allOf方法。

  3. 在结果上调用join()方法。 在得到的可完成的未来得到解决后,您可以从步骤3中创建的每个单独的可完成的未来中获得结果。

我想你在这里误解了一些事情。 当您调用getForEntity方法时,请求已被触发。 调用future对象的get()方法时,您只是在等待请求完成。 因此,为了在同一亚秒内触发所有这三个请求,您只需要:

 // Each of the lines below will fire an http request when it's executed Future> future1 = asyncRestTemplate.getForEntity(url1, String.class); Future> future2 = asyncRestTemplate.getForEntity(url2, String.class); Future> future3 = asyncRestTemplate.getForEntity(url3, String.class); 

运行所有这些代码后,所有请求都已被触发(最可能在同一亚秒内)。 然后你可以随心所欲地做任何你想做的事。 只要调用任何get()方法,就会等待每个请求完成。 如果它们已经完成,那么它将立即返回。

 // do whatever you want in the meantime // get the response of the http call and wait if it's not completed String response1 = future1.get(); String response2 = future2.get(); String response3 = future3.get(); 

我不认为以前的任何答案实际上实现了并行性。 @diginoise响应的问题在于它实际上并没有实现并行性。 一旦我们打电话给get ,我们就被阻止了。 考虑到调用非常慢,因此future1需要3秒才能完成, future2 2秒和future3 3秒再次完成。 随着3次接听电话,我们最终等待3 + 2 + 3 = 8秒。 @Vikrant Kashyap同时回答块while (!(future1 .isDone() && future2.isDone() && future3.isDone())) 。 除了while循环是一个非常难看的3个期货的代码片段,如果你有更多呢? @lkz答案使用的技术与您要求的技术不同,即便如此,我也不确定zip是否可以完成这项工作。 来自Observable Javadoc:

zip以严格的顺序应用此函数,因此新Observable发出的第一个项将是应用于每个源Observable发出的第一个项的函数的结果; 新Observable发出的第二个项目将是应用于每个Observable发出的第二个项目的函数的结果; 等等。

由于Spring的广泛普及,他们非常努力地保持向后兼容性,并且这样做有时会对API做出妥协。 返回ListenableFuture AsyncRestTemplate方法就是这种情况。 如果他们致力于Java 8+,则可以使用CompletableFuture 。 为什么? 由于我们不会直接处理线程池,因此我们无法知道所有ListenableFutures何时完成。 CompletableFuture有一个allOf方法,可以创建一个新的CompletableFuture ,它在所有给定的CompletableFutures完成时完成。 由于我们在ListenableFutureListenableFuture ,我们将不得不即兴发挥。 我没有编译下面的代码,但应该清楚我正在尝试做什么。 我使用Java 8因为它是2016年底。

 // Lombok FTW @RequiredArgsConstructor public final class CounterCallback implements ListenableFutureCallback> { private final LongAdder adder; public void onFailure(Throwable ex) { adder.increment(); } public void onSuccess(ResponseEntity result) { adder.increment(); } } ListenableFuture> f1 = asyncRestTemplate .getForEntity(url1, String.class); f1.addCallback(//); // more futures LongAdder adder = new LongAdder(); ListenableFutureCallback> callback = new CounterCallback(adder); Stream.of(f1, f2, f3) .forEach {f -> f.addCallback(callback)} for (int counter = 1; adder.sum() < 3 && counter < 10; counter++) { Thread.sleep(1000); } // either all futures are done or we're done waiting Map> futures = Stream.of(f1, f2, f3) .collect(Collectors.partitioningBy(Future::isDone)); 

现在我们有一个Map ,其中futures.get(Boolean.TRUE)将给我们所有已完成的期货, futures.get(Boolean.FALSE)将给我们那些没有的futures.get(Boolean.FALSE) 。 我们希望取消那些没有完成的。

这段代码做了一些对并行编程很重要的事情:

  1. 它不会阻止。
  2. 它将操作限制在某个最大允许时间。
  3. 它明确区分了成功案例和失败案例。