RxJava与Java 8并行流

它们之间有什么相似之处和不同之处,看起来Java Parallel Stream有一些RXJava中可用的元素,是吗?

Rx是用于创建和处理可观察序列的API。 Streams API用于处理可迭代序列。 Rx序列是基于推的 ; 当元素可用时,您会收到通知。 Stream是基于拉的 ; 它“询问”要处理的项目。 它们可能看似相似,因为它们都支持类似的运算符/变换,但机制基本上是彼此相反的。

流是基于拉力的。 我个人认为这是Oracle对C#IEnumerable <>,LINQ及其相关扩展方法的回答。

RxJava是基于推送的,我不确定它是首先发布的.NET的反应式扩展还是Rx项目首先上线。

从概念上讲,它们完全不同,它们的应用也不同。

如果您在一个文本文件上实现文本搜索程序,该文件太大而无法加载所有内容并且适合内存,那么您可能希望使用Stream,因为您可以通过跟踪您的内容轻松确定是否有下一行可用迭代器,逐行扫描。

Stream的另一个应用是对数据集合的并行计算。 现在每台机器都有多个内核,但您不会轻易准确地知道客户机可用的内核数量。 预先配置要操作的线程数是很困难的。 所以我们使用并行流并让JVM为我们确定(应该更优化)。

另一方面,如果您正在实现一个带有用户输入字符串并在Web上搜索可用video的程序,您将使用RX,因为您甚至不知道程序何时开始获得任何结果(或收到错误)网络超时)。 为了使您的程序响应,您必须让程序“订阅”网络更新并完成信号。

Rx的另一个常见应用是在GUI上“检测用户完成输入”而无需用户单击按钮进行确认。 例如,您希望每当用户停止输入时都有一个文本字段,您可以在不等待“搜索按钮”单击的情况下开始搜索。 在这种情况下,您使用Rx在“KeyEvent”和“throttle”(例如500ms)上创建一个observable,这样每当他停止输入500ms时,你会收到一个onNext()来“开始搜索”。

线程也有区别。

Stream#parallel将序列拆分为多个部分,每个部分在单独的线程中处理。

Observable#subscribeOn和Observable#observeOn都是’move’执行到另一个线程,但不拆分序列。

换句话说,对于任何特定的处理阶段:

  • parallel Stream可以处理不同线程上的不同元素
  • Observable将为舞台使用一个线程

E. g。 我们有许多元素的Observable / Stream和两个处理阶段:

 Observable.create(...) .observeOn(Schedulers.io()) .map(x -> stage1(x)) .observeOn(Schedulers.io()) .map(y -> stage2(y)) .forEach(...); Stream.generate(...) .parallel() .map(x -> stage1(x)) .map(y -> stage2(y)) .forEach(...); 

Observable将使用不超过2个额外的线程(每个阶段一个),因此不同的线程不会访问两个x’es或y。 在计数器上,流可以跨越多个线程跨越每个阶段。