RxJava:缓冲项,直到当前项的某些条件为真

这是我想弄清楚的一个片段:

class RaceCondition { Subject subject = PublishSubject.create(); public void entryPoint(Integer data) { subject.onNext(data); } public void client() { subject /*some operations*/ .buffer(getClosingSelector()) .subscribe(/*handle results*/); } private Observable getClosingSelector() { return subject /* some filtering */; } } 

有一个Subject接受来自外部的事件。 有一个客户订阅了这个主题,它处理事件并buffer它们。 这里的主要思想是每次都应该根据使用流中的项计算的某些条件来发出缓冲的项。

为此,缓冲区边界本身监听主题。

一个重要的期望行为:每当边界发出项目时,它也应该包含在buffer的以下发射中。 当前配置不是这样的项目(至少我认为)是在关闭选择器到达buffer 之前从关闭选择器发出的,因此它不包含在当前发射中,而是留在等待下一个。

有没有办法基本上使关闭选择器等待项目首先被缓冲? 如果没有,是否有另一种方法来缓冲和释放基于下一个传入项目的项目?

如果我理解正确,你想要缓冲,直到一些谓词允许它基于项目。 您可以使用一组复杂的运算符来完成此操作,但也许更容易编写自定义运算符:

 public final class BufferUntil implements Operator, T>{ final Func1 boundaryPredicate; public BufferUntil(Func1 boundaryPredicate) { this.boundaryPredicate = boundaryPredicate; } @Override public Subscriber call( Subscriber> child) { BufferWhileSubscriber parent = new BufferWhileSubscriber(child); child.add(parent); return parent; } final class BufferWhileSubscriber extends Subscriber { final Subscriber> actual; List buffer = new ArrayList<>(); /** * @param actual */ public BufferWhileSubscriber( Subscriber> actual) { this.actual = actual; } @Override public void onNext(T t) { buffer.add(t); if (boundaryPredicate.call(t)) { actual.onNext(buffer); buffer = new ArrayList<>(); } } @Override public void onError(Throwable e) { buffer = null; actual.onError(e); } @Override public void onCompleted() { List b = buffer; buffer = null; if (!b.isEmpty()) { actual.onNext(b); } actual.onCompleted(); } } }