RxJava Observable“Iteration”如何工作?

我开始玩RxJava和ReactFX,我对此非常着迷。 但是当我在试验时,我有几十个问题而且我一直在研究答案。

我正在观察的一件事(没有双关语)当然是懒惰的执行。 使用下面的探索性代码,我注意到在merge.subscribe(pet -> System.out.println(pet))之前没有执行任何操作。 但令我着迷的是,当我订阅第二个订阅者merge.subscribe(pet -> System.out.println("Feed " + pet)) ,它再次触发了“迭代”。

我想要了解的是迭代的行为。 它似乎不像只能使用一次的Java 8 stream 。 它是否逐字逐句地遍历每个String并将其作为该时刻的值发布? 任何以前被解雇的用户跟随任何新用户接收这些项目是否像新用户一样?

 public class RxTest { public static void main(String[] args) { Observable dogs = Observable.from(ImmutableList.of("Dasher", "Rex")) .filter(dog -> dog.matches("D.*")); Observable cats = Observable.from(ImmutableList.of("Tabby", "Grumpy Cat", "Meowmers", "Peanut")); Observable ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey")); Observable merge = dogs.mergeWith(cats).mergeWith(ferrets); merge.subscribe(pet -> System.out.println(pet)); merge.subscribe(pet -> System.out.println("Feed " + pet)); } } 

Observable表示monad,链接操作,而不是操作本身的执行。 它是描述性语言,而不是您习惯的命令。 要执行操作,请使用.subscribe() 。 每次订阅时,都会从头开始创建新的执行流。 不要将流与线程混淆,因为订阅是同步执行的,除非您使用.subscribeOn().observeOn() 指定线程更改 。 您将新元素链接到任何现有操作/ monad / Observable以添加新行为 ,例如更改线程,过滤,累积,转换等。如果您的observable是一项昂贵的操作,您不希望重复每次订阅,您可以使用.cache()阻止娱乐。

要将任何异步/同步Observable操作转换为同步内联操作,请使用.toBlocking()将其类型更改为BlockingObservable 。 而不是.subscribe()它包含用.forEach()对每个结果执行操作的新方法,或强制使用.forEach() .first()

Observables是一个很好的工具,因为它们主要是*确定性的(相同的输入总是产生相同的输出,除非你做错了),可重用(你可以将它们作为命令/策略模式的一部分发送)并且大多数情况下忽略同意,因为他们不应该依赖共享状态(也就是说做错了)。 如果你试图将一个基于可观察的库带入命令式语言,或者只是在一个Observable上执行一个你有100%信心并且管理得当的操作,那么BlockingObservables是很好的。

围绕这些原则构建应用程序是一种范式的变化,我无法真正涵盖这个答案。

*像SubjectObservable.create()这样的漏洞需要与命令式框架集成。