RxJava:如何用依赖关系组合多个Observable并在最后收集所有结果?
我正在学习RxJava,并且,作为我的第一个实验,试图在这段代码中重写代码 (在Netflix的博客上引用作为RxJava可以帮助解决的问题),以使用RxJava改进其异步性,即所以它在继续执行其余代码之前,不会等待第一个Future( f1.get()
)的结果。
f3
取决于f1
。 我看到如何处理这个问题, flatMap
似乎可以解决这个问题:
Observable f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA())) .flatMap(new Func1<String, Observable>() { @Override public Observable call(String s) { return Observable.from(executor.submit(new CallToRemoteServiceC(s))); } });
接下来, f4
和f5
取决于f2
。 我有这个:
final Observable f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB())) .flatMap(new Func1<Integer, Observable>() { @Override public Observable call(Integer i) { Observable f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i))); Observable f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i))); return Observable.merge(f4Observable, f5Observable); } });
哪个开始变得怪异( merge
它们可能不是我想要的……)但是允许我最后这样做,而不是我想要的:
f3Observable.subscribe(new Action1() { @Override public void call(String s) { System.out.println("Observed from f3: " + s); f4And5Observable.subscribe(new Action1() { @Override public void call(Integer i) { System.out.println("Observed from f4 and f5: " + i); } }); } });
这给了我:
Observed from f3: responseB_responseA Observed from f4 and f5: 140 Observed from f4 and f5: 5100
这是所有数字,但不幸的是我在单独的调用中得到结果,所以我不能完全替换原始代码中的最终println:
System.out.println(f3.get() + " => " + (f4.get() * f5.get()));
我不明白如何在同一行上访问这两个返回值。 我认为可能有一些function性编程我在这里缺少。 我怎样才能做到这一点? 谢谢。
看起来你真正需要的是对如何使用RX的更多鼓励和观点。 我建议你阅读更多文档和大理石图表(我知道它们并不总是有用)。 我还建议调查lift()
函数和运算符。
- 可观察的整个点是将数据流和数据操作连接成单个对象
-
map
,flatMap
和filter
的调用点是操纵数据流中的数据 - 合并点是组合数据流
-
运算符的要点是允许您中断稳定的可观察流并在数据流上定义自己的操作。 例如,我编码了一个移动平均算子。 这总结了一个Observable of
double
中的n
倍,以返回移动平均线流。 代码字面上看起来像这样Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))
你会感到欣慰的是,许多你认为理所当然的过滤方法都有lift()
。
照这样说; 合并多个依赖项所需的全部是:
- 使用
map
或flatMap
将所有传入数据更改为标准数据类型 - 将标准数据类型合并到流中
- 如果一个对象需要在另一个对象上等待,或者您需要在流中订购数据,则使用自定义运算符。 注意:这种方法会减慢流的速度
- 用于列出或订阅以收集所有这些数据
编辑:有人将我作为编辑问题添加的以下文本转换为答案,我很欣赏,并且理解可能是正确的SO要做的事情,但我不认为这是一个答案,因为它显然不是正确的方法 。 我不会使用此代码,也不会建议任何人复制它。 其他/更好的解决方案和评论欢迎!
我能用以下方法解决这个问题。 我没有意识到你可以不止一次flatMap
一个observable,我假设结果只能被消耗一次。 所以我只是flatMap
f2Observable两次(对不起,我在我原来的post中重命名代码中的一些东西),然后在所有的Observables上zip
,然后订阅。 由于类型杂乱,在zip
中聚合值的Map
是不可取的。 其他/更好的解决方案和评论欢迎! 完整代码可以在要点中查看 。 谢谢。
Future f2 = executor.submit(new CallToRemoteServiceB()); Observable f2Observable = Observable.from(f2); Observable f4Observable = f2Observable .flatMap(new Func1>() { @Override public Observable call(Integer integer) { System.out.println("Observed from f2: " + integer); Future f4 = executor.submit(new CallToRemoteServiceD(integer)); return Observable.from(f4); } }); Observable f5Observable = f2Observable .flatMap(new Func1>() { @Override public Observable call(Integer integer) { System.out.println("Observed from f2: " + integer); Future f5 = executor.submit(new CallToRemoteServiceE(integer)); return Observable.from(f5); } }); Observable.zip(f3Observable, f4Observable, f5Observable, new Func3>() { @Override public Map call(String s, Integer integer, Integer integer2) { Map map = new HashMap(); map.put("f3", s); map.put("f4", String.valueOf(integer)); map.put("f5", String.valueOf(integer2)); return map; } }).subscribe(new Action1
这会产生我想要的输出:
responseB_responseA => 714000
我认为你要找的是switchmap。 我们遇到了类似的问题,我们有一个会话服务来处理从api获取新会话,我们需要该会话才能获得更多数据。 我们可以添加会话observable,它返回sessionToken以便在我们的数据调用中使用。
getSession返回一个observable;
public getSession(): Observable{ if (this.sessionToken) return Observable.of(this.sessionToken); else if(this.sessionObservable) return this.sessionObservable; else { // simulate http call this.sessionObservable = Observable.of(this.sessonTokenResponse) .map(res => { this.sessionObservable = null; return res.headers["X-Session-Token"]; }) .delay(500) .share(); return this.sessionObservable; } }
和getData获取该可观察对象并附加到它。
public getData() { if (this.dataObservable) return this.dataObservable; else { this.dataObservable = this.sessionService.getSession() .switchMap((sessionToken:string, index:number) =>{ //simulate data http call that needed sessionToken return Observable.of(this.dataResponse) .map(res => { this.dataObservable = null; return res.body; }) .delay(1200) }) .map ( data => { return data; }) .catch(err => { console.log("err in data service", err); // return err; }) .share(); return this.dataObservable; } }
您仍然需要一个flatmap来组合不依赖的observable。
Plunkr: http ://plnkr.co/edit/hiA1jP?p = info
我有想法使用开关地图: http : //blog.thoughtram.io/angular/2016/01/06/taking-advantage-of-observables-in-angular2.html