RxJava是否适合分支工作流程?

我正在使用RxJava来处理我们从队列中提取的一些通知。

RxJava似乎在一个简单的工作流程中运行良好,现在有了新的要求,流程越来越复杂,分支越多(请参见下图作为参考) 工作流程 我尝试通过一个小unit testing来举例说明流程:

@Test public void test() { Observable.range(1, 100) .groupBy(n -> n % 3) .toMap(GroupedObservable::getKey) .flatMap(m1 -> { Observable ones1 = m1.get(0); Observable twos1 = m1.get(1).map(n -> n - 10); Observable threes = m1.get(2).map(n -> n + 100); Observable onesAndTwos = Observable.merge(ones1, twos1) .map(n -> n * 3) .groupBy(n -> n % 2) .toMap(GroupedObservable::getKey) .flatMap(m2 -> { Observable ones2 = m2.get(0).map(n -> n * 10); Observable twos2 = m2.get(1).map(n -> n * 100); return Observable.merge(ones2, twos2); }); return Observable.merge(onesAndTwos, threes).map(n -> n +1); }) .subscribe(System.out::println); } 

虽然使用RxJava在技术上仍然可以实现,但我现在想知道它是否是一个不错的选择,为了正式化分支我必须在主flatMap内进行2级嵌套,这看起来并不是很整洁。

这是描述上述工作流程的正确方法吗? 或者RxJava不适合分支工作流程?

感谢您的帮助!

分组观察是走AFAIK的正确方法。 就个人而言,如果您的图片中的“按类型拆分”和“合并所有内容”之间的任何内容都是异步的,那么在RX中执行此操作肯定有很多优点,如重试逻辑,缓冲,error handling,背压等。如果它是常规的非同步代码我猜是个人偏好。 您可以使用RX执行此操作,但您也可以使用常规同步代码执行“按类型拆分”和“合并所有内容”之间的所有操作。

无论你选择何种方式,分割代码以使其更具可读性始终是一个好主意,因此您可以“阅读流程”,就像我们可以阅读您附加的图像一样简单。

只是想出另一种可能适合您的方法:您可以组播源并单独处理分支,而不是分组/ toMap。

例:

 @Test public void multicastingShare() { final Observable sharedSource = Observable.range(1, 10) .doOnSubscribe(dummy -> System.out.println("subscribed")) .share(); // split by some criteria final Observable oddItems = sharedSource .filter(n -> n % 2 == 1) .map(odd -> "odd: " + odd) .doOnNext(System.out::println); final Observable evenItems = sharedSource .filter(n -> n % 2 == 0) .map(even -> "even: " + even) .doOnNext(System.out::println); // recombine the individual streams at some point Observable.concat(oddItems, evenItems) .subscribe(result -> System.out.println("result: " + result)); } 

此video可能会有所帮助(至少前15分钟)