为什么我的RxJava Observable仅向第一个消费者发出?

有人可以解释为什么以下测试失败?

public class ObservableTest { @Test public void badObservableUsedTwiceDoesNotEmitToSecondConsumer() { // Any simpler observable makes the test pass Observable badObservable = Observable.just(1) .zipWith(Observable.just(2), (one, two) -> Observable.just(3)) .flatMap(observable -> observable); ObservableCalculator calc1 = new ObservableCalculator(badObservable); ObservableCalculator calc2 = new ObservableCalculator(badObservable); // zipping causes the failure // Calling calculate().toBlocking().subscribe() on each calc passes // Observable.from(listOfCalcs).flatMap(calc -> calc.calculate()) passes Observable.zip(ImmutableList.of(calc1.calculate(), calc2.calculate()), results -> results) .toBlocking() .subscribe(); assertThat(calc1.hasCalculated).isTrue(); assertThat(calc2.hasCalculated).isTrue(); // this fails } private static class ObservableCalculator { private final Observable observable; public boolean hasCalculated = false; public ObservableCalculator(Observable observable) { this.observable = observable; } public Observable calculate() { return observable.concatMap(o -> { hasCalculated = true; // returning Observable.just(null) makes the test pass return Observable.empty(); }); } } } 

我试图进一步简化“坏”观察,但找不到任何我可以删除的东西,以使其更简单。

然而,我目前的理解是,它是一个Observable(无论它是如何构造的),应该发出一个值然后完成。 然后,我们基于Observable创建一个对象的两个类似实例,并在那些使用Observable的对象上调用一个方法,记下这样做,然后返回Observable.empty()。

任何人都可以解释为什么使用这个observable导致测试失败 (当使用更简单的observable导致测试通过)?

也可以通过串行调用calculate()。toBlocking()。subscribe()而不是使用zip,或者使计算返回Observable.just(null)来使测试通过。 这对我来说是有道理的(如果calc1为空,zip将不会订阅calc2,因为在那种情况下zip可能永远不会产生任何东西),但不是完全意义(我不明白为什么zip不像那样表现badObservable的一个更简单的版本 – 不管输入如何,calculate()方法仍然返回空。

如果您使用某些内容压缩空源,则操作员会检测到它不再产生任何值,并取消其所有来源的订阅。 有一个zip和merge的混合,merge合并取消订阅:它根本不发出值3,因此concatMap也不会调用第二个源的映射函数。