如何组成Observable以避免给定的嵌套和依赖回调?

在这个博客中 ,他给出了这个 (复制/粘贴以下代码)回调地狱的例子。 但是,没有提到如何使用Reactive Extensions消除该问题。

所以这里F3取决于F1完成,F4和F5取决于F2完成。

  1. 想知道Rx中的function等价物是什么。
  2. 如何在Rx中表示F1,F2,F3,F4和F5都应该异步拉出?

注意:我目前正试图绕过Rx,所以在提出这个问题之前我没有尝试解决这个例子。

import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; public class CallbackB { /** * Demonstration of nested callbacks which then need to composes their responses together. * 

* Various different approaches for composition can be done but eventually they end up relying upon * synchronization techniques such as the CountDownLatch used here or converge on callback design * changes similar to Rx. */ public static void run() throws Exception { final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue()); /* the following are used to synchronize and compose the asynchronous callbacks */ final CountDownLatch latch = new CountDownLatch(3); final AtomicReference f3Value = new AtomicReference(); final AtomicReference f4Value = new AtomicReference(); final AtomicReference f5Value = new AtomicReference(); try { // get f3 with dependent result from f1 executor.execute(new CallToRemoteServiceA(new Callback() { @Override public void call(String f1) { executor.execute(new CallToRemoteServiceC(new Callback() { @Override public void call(String f3) { // we have f1 and f3 now need to compose with others System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5")); // set to thread-safe variable accessible by external scope f3Value.set(f3); latch.countDown(); } }, f1)); } })); // get f4/f5 after dependency f2 completes executor.execute(new CallToRemoteServiceB(new Callback() { @Override public void call(Integer f2) { executor.execute(new CallToRemoteServiceD(new Callback() { @Override public void call(Integer f4) { // we have f2 and f4 now need to compose with others System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5")); // set to thread-safe variable accessible by external scope f4Value.set(f4); latch.countDown(); } }, f2)); executor.execute(new CallToRemoteServiceE(new Callback() { @Override public void call(Integer f5) { // we have f2 and f5 now need to compose with others System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5)); // set to thread-safe variable accessible by external scope f5Value.set(f5); latch.countDown(); } }, f2)); } })); /* we must wait for all callbacks to complete */ latch.await(); System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get())); } finally { executor.shutdownNow(); } } public static void main(String[] args) { try { run(); } catch (Exception e) { e.printStackTrace(); } } private static final class CallToRemoteServiceA implements Runnable { private final Callback callback; private CallToRemoteServiceA(Callback callback) { this.callback = callback; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } callback.call("responseA"); } } private static final class CallToRemoteServiceB implements Runnable { private final Callback callback; private CallToRemoteServiceB(Callback callback) { this.callback = callback; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(40); } catch (InterruptedException e) { e.printStackTrace(); } callback.call(100); } } private static final class CallToRemoteServiceC implements Runnable { private final Callback callback; private final String dependencyFromA; private CallToRemoteServiceC(Callback callback, String dependencyFromA) { this.callback = callback; this.dependencyFromA = dependencyFromA; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(60); } catch (InterruptedException e) { e.printStackTrace(); } callback.call("responseB_" + dependencyFromA); } } private static final class CallToRemoteServiceD implements Runnable { private final Callback callback; private final Integer dependencyFromB; private CallToRemoteServiceD(Callback callback, Integer dependencyFromB) { this.callback = callback; this.dependencyFromB = dependencyFromB; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(140); } catch (InterruptedException e) { e.printStackTrace(); } callback.call(40 + dependencyFromB); } } private static final class CallToRemoteServiceE implements Runnable { private final Callback callback; private final Integer dependencyFromB; private CallToRemoteServiceE(Callback callback, Integer dependencyFromB) { this.callback = callback; this.dependencyFromB = dependencyFromB; } @Override public void run() { // simulate fetching data from remote service try { Thread.sleep(55); } catch (InterruptedException e) { e.printStackTrace(); } callback.call(5000 + dependencyFromB); } } private static interface Callback { public void call(T value); } }

我是关于回调和Java期货的引用博客文章的原作者。 以下是使用flatMap,zip和merge异步进行服务组合的示例。

它获取一个User对象,然后同时获取Social和PersonalizedCatalog数据,然后对于来自PersonalizedCatalog的每个video同时获取书签,评级和元数据,将它们拉到一起,并将所有响应合并为渐进式流输出作为Server-Sent Events 。

 return getUser(userId).flatMap(user -> { Observable> catalog = getPersonalizedCatalog(user) .flatMap(catalogList -> catalogList.videos().> flatMap( video -> { Observable bookmark = getBookmark(video); Observable rating = getRatings(video); Observable metadata = getVideoMetadata(video); return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m)); })); Observable> social = getSocial(user).map(s -> { return s.getDataAsMap(); }); return Observable.merge(catalog, social); }).flatMap(data -> { String json = SimpleJson.mapToJson(data); return response.writeStringAndFlush("data: " + json + "\n"); }); 

可以在https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/上查看function正常的应用程序。 RouteForDeviceHome.java#L33

由于我无法在此提供所有信息,您还可以在https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-找到演示文稿的说明(带有video链接)。 2014?slide = 32 。

根据你的代码。 假设使用Observable完成远程调用。

  Observable callRemoveServiceA() { /* async call */ } /* .... */ Observable callRemoveServiceE(Integer f2) { /* async call */ } 

你想要什么 :

  • 调用serviceA然后使用serviceA的结果调用serviceB
  • 调用serviceC然后使用serviceC的结果调用serviceDserviceC
  • 使用serviceEserviceD的结果,构建一个新值
  • 使用serviceB的结果显示新值

使用RxJava,您将使用以下代码实现此目的:

 Observable f3 = callRemoveServiceA() // call serviceA // call serviceB with the result of serviceA .flatMap((f1) -> callRemoveServiceB(f1)); Observable f4Andf5 = callRemoveServiceC() // call serviceC // call serviceD and serviceE then build a new value .flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5)); // compute the string to display from f3, and the f4, f5 pair f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5) // display the value .subscribe(System.out::println); 

这里重要的部分是使用flapMapzip (或zipWith

你可以在这里获得有关flapMap的更多信息: 你何时在RxJava中使用map vs flatMap?