如何处理RxJava中观察者的onNext引发的exception?

请考虑以下示例:

Observable.range(1, 10).subscribe(i -> { System.out.println(i); if (i == 5) { throw new RuntimeException("oops!"); } }, Throwable::printStackTrace); 

这将输出1到5之间的数字,然后打印exception。

我想要实现的是让观察者保持订阅并在抛出exception后继续运行,即打印从1到10的所有数字。

我尝试过使用retry()和其他各种error handling操作符 ,但是,如文档中所述,它们的目的是处理observable本身发出的错误。

最简单的解决方案就是将整个onNext成try-catch块,但这对我来说听起来不是一个好方法。 在类似的Rx.NET问题中 ,建议的解决方案是创建一个扩展方法,通过创建代理可观察来进行包装。 我试图重拍它:

 Observable origin = Observable.range(1, 10); Observable proxy = Observable.create((Observable.OnSubscribe) s -> origin.subscribe(i -> {try { s.onNext(i); } catch (Exception ignored) {}}, s::onError, s::onCompleted)); proxy.subscribe(i -> { System.out.println(i); if (i == 5) { throw new RuntimeException("oops!"); } }, Throwable::printStackTrace); 

这不会改变任何东西,因为RxJava本身将订阅者包装到SafeSubscriber 。 使用unsafeSubscribe来解决它似乎也不是一个好的解决方案。

我该怎么做才能解决这个问题?

这是学习Rx时出现的常见问题。

TL; DR

您将exception处理逻辑放在订阅者中的建议优于创建通用可观察包装器。

说明

请记住,Rx是将事件推送给订阅者。

从可观察的界面可以清楚地看出,除了处理事件所花费的时间或任何抛出exception中包含的信息之外,没有任何观察者可以知道它的订阅者。

处理订户exception并继续向该订户发送事件的通用包装器是一个坏主意。

为什么? 那么observable应该只知道订户现在处于未知的故障状态。 在这种情况下继续发送事件是不明智的 – 例如,订阅者处于这样一种情况,即从这一点开始的每个事件都将抛出exception并花费一些时间来完成它。

一旦订阅者抛出exception,对于可观察者,只有两个可行的行动方案:

  • 重新抛出exception
  • 实施通用处理以记录故障并停止向其发送事件(任何类型)并清理由该订户引起的任何资源并继续进行任何剩余订阅。

对用户exception的具体处理将是一个糟糕的设计选择; 它会在订阅者和可观察者之间产生不适当的行为耦合。 因此,如果您想要对不良订阅者具有弹性,那么上述两种选择实际上是观察者本身的合理责任限制。

如果您希望您的订户具有弹性并继续运行,那么您应该将其包装在exception处理逻辑中,该逻辑旨在处理您知道如何从中恢复的特定exception (并且可能处理瞬态exception,记录,重试逻辑,断路等) )。

只有订户本身才具有上下文,以了解在面对失败时是否适合接收更多事件。

如果您的情况需要开发可重用的error handling逻辑,请将自己置于包装观察者事件处理程序而不是可观察事件的思维模式中 – 并且注意不要盲目地在失败时继续传输事件。 发布吧! 虽然没有写关于Rx,但是有趣的软件工程经典在这最后一点上有很多话要说。 如果你还没看过,我强烈建议。