如何在可观察流中处理前n个项并保持不同的项

例如,

给定一定数量(m)的数字流(m1,m2,m3,m4,m5,m6 ……),并对前n项应用变换(2 * i)(n可以更小,相等或者大于m),对其余项目应用另一个转换(3 * i)。 和

返回结果:m1 * 2,m2 * 2,m3 * 3,m4 * 3,m5 * 3,m6 * 3 …(假设这里n = 2)。

我试图使用take(n)和skip(n)然后concatwith,但看起来take(n)将删除序列中的剩余项目,并在之后返回任何内容后跳过(n)。

您可以共享m的流,然后将take()skip()流合并回来,如下所示:

  int m = 10; int n = 8; Observable numbersStream = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .publish(); Observable firstNItemsStream = numbersStream.take(n) .map(i -> i * 2); Observable remainingItemsStream = numbersStream.skip(n) .map(i -> i * 3); Observable.merge(firstNItemsStream, remainingItemsStream) .subscribe(integer -> System.out.println("result = " + integer)); numbersStream.connect(); 

编辑:
正如@AE Daphne指出的那样, share()将开始使用第一个订阅者进行发送,因此如果Observable已经开始发出item / s,则第二个订阅者可能会错过notification / s,因此在这种情况下还有其他可能性:
cache() – 将回复所有缓存发出的项目并将其回复给每个新订阅者,但会牺牲取消订阅能力,因此需要谨慎使用。
reply().refCount() – 将创建Observable ,它reply()所有以前的每个新订阅者的项目(类似于缓存),但是当最后一个订阅者取消订阅时将取消订阅。

在这两种情况下,都应考虑内存,因为Observable会将所有发出的项缓存在内存中。

publish() – 在不缓存所有先前项目的情况下,使用publish()创建ConnectableObservable ,并在所有订阅者订阅后调用它的connect()方法开始排放,从而获得同步并且所有订阅者将获得所有通知正确。