使用ReactiveX for Java进行Http调用

我是ReactiveX for Java的新手,我有以下代码块来进行外部http调用,但它不是异步的。 我们使用的是rxjava 1.2和Java 1.8

private ResponseEntity callExternalUrl(String url, String json, HttpMethod method) { RestTemplate restTemplate; HttpEntity request; request = new HttpEntity(jsonContent, httpHeaders); return restTemplate.exchange(url, httpMethod, request, String.class); } 

我在网上找到了以下代码块,但我无法理解它以及如何将其应用到我的代码库中。

 private RxClient httpClient; public  Observable fetchResult(String url, Func1 mapper) { return httpClient.target(url) .request() .rx() .get() .subscribeOn(Schedulers.io()) .map(mapper); } 

如果我理解正确,你需要这样的东西来包装你现有的callExternalUrl

 static Observable callExternalUrlAsync(String url, String json, HttpMethod method) { return Observable.fromCallable(() -> callExternalUrl(url, json, method)) .subscribeOn(Schedulers.io()) .flatMap(re -> { if (re.hasBody()) return Observable.just(re.getBody()); else return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode())); }, e -> Observable.error(e), (Func0>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-( .observeOn(Schedulers.computation()); } 

代码简述:

  1. 它计划在Schedulers.io上执行现有的callExternalUrl
  2. ResponseEntity最小化转换为成功的T和错误情况。 它也发生在io调度程序上,但它并不重要,因为它非常简短。 (如果callExternalUrl有exception, callExternalUrl传递。)
  3. 使订阅者在Schedulers.computation上执行结果

警告

  1. 您可能希望将自定义调度程序用于subscribeOnobserveOn
  2. 您可能希望在传递给flatMap的第一个lambda中有一些更好的逻辑来区分成功和错误,并且您肯定需要一些更具体的exception类型。

高阶魔术

如果您愿意使用高阶函数并交换一点性能以减少代码重复,您可以执行以下操作:

 // Universal wrapper method static  Observable wrapCallExternalAsAsync(Func3> externalCall, String url, String json, HttpMethod method) { return Observable.fromCallable(() -> externalCall.call(url, json, method)) .subscribeOn(Schedulers.io()) .flatMap(re -> { if (re.hasBody()) return Observable.just(re.getBody()); else return Observable.error(new RuntimeException("Bad response status " + re.getStatusCode())); }, e -> Observable.error(e), (Func0>) (() -> Observable.empty())) // I need explicit cast or it won't compile :-( .observeOn(Schedulers.computation()); } static Observable callExternalUrlAsync_HigherOrder(String url, String json, HttpMethod method) { return wrapCallExternalAsAsync(MyClass::callExternalUrl, url, json, method); } 

MyClass就在你的callExternalUrl所在的地方。


更新 (仅限异步调用)

private static RxClient httpClient = Rx.newClient(RxObservableInvoker.class); //这里你可以传递自定义ExecutorService

 private  Observable executeHttpAsync(String url, String httpMethod, Entity entity) { return httpClient.target(url) .request() .headers(httpHeaders) // assuming httpHeaders is something global as in your example .rx() .method(httpMethod, entity) .map(resp -> { if (200 != resp.getStatus()) { throw new RuntimeException("Bad status code " + resp.getStatus()); } else { if (!resp.hasEntity()) { // return null; // or error? throw new RuntimeException("Empty response"); // or empty? } else { try { return resp.readEntity(String.class); } catch (Exception ex) { throw new RuntimeException(ex); // wrap exception into unchecked } } } }) .observeOn(Schedulers.computation()); } private Observable executeGetAsync(String url) { return executeHttpAsync(url, "GET", null); } private Observable executePostAsync(String url, String json) { return executeHttpAsync(url, "POST", Entity.json(json)); } 

类似的警告同样适用:

  1. 您可能希望将自定义调度程序用于newClient调用和observeOn
  2. 您可能希望有一些更好的error handling逻辑,而不仅仅是检查它是否是HTTP 200,并且您肯定想要一些更具体的exception类型。 但这是所有业务逻辑特定的,因此它取决于您。

此外,从您的示例中还不清楚请求的主体( HttpEntity )是如何构建的,以及您是否确实总是希望String作为响应,就像在原始示例中一样。 我仍然按原样复制你的逻辑。 如果您需要更多内容,可以参考https://jersey.java.net/documentation/2.25/media.html#json上的文档。