RxJava2 observable抛出UndeliverableException

据我所知,RxJava2 values.take(1)创建了另一个Observable,它只包含原始Observable中的一个元素。 哪个不能抛出exception,因为它被take(1)的效果过滤掉,因为它发生在第二个。

下面的代码片段所示

  Observable values = Observable.create(o -> { o.onNext(1); o.onError(new Exception("Oops")); }); values.take(1) .subscribe( System.out::println, e -> System.out.println("Error: " + e.getMessage()), () -> System.out.println("Completed") ); 

产量

 1 Completed io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366) at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83) at ch02.lambda$main$0(ch02.java:28) at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.Observable.subscribe(Observable.java:10827) at io.reactivex.Observable.subscribe(Observable.java:10787) at ch02.main(ch02.java:32) Caused by: java.lang.Exception: Oops ... 8 more Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366) at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83) at ch02.lambda$main$0(ch02.java:28) at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.Observable.subscribe(Observable.java:10827) at io.reactivex.Observable.subscribe(Observable.java:10787) at ch02.main(ch02.java:32) Caused by: java.lang.Exception: Oops ... 8 more 

我的问题:

  1. 我理解正确吗?
  2. 导致exception的真正原因是什么。
  3. 如何从消费者那里解决这个问题?

  1. 是的,但是因为可观察的“结束”并不意味着停止在create(...)运行的代码。 要在这种情况下完全安全,您需要使用o.isDisposed()来查看observable是否已经在下游结束。
  2. 例外是因为RxJava 2的策略是永远不允许onError调用丢失。 如果observable已经终止,它将作为全局UndeliverableException传递到下游或抛出。 由Observable的创建者“正确”处理observable已经结束并发生exception的情况。
  3. 问题是生产者( Observable )和消费者( Subscriber )在流结束时不一致。 由于生产者在这种情况下比消费者寿命长,所以问题只能在生产者中解决。

@Kiskae在之前的评论中正确回答了可能发生此类exception的原因。

这里是关于这个主题的官方文档的链接: RxJava2-wiki 。

有时您无法更改此行为,因此有一种方法可以处理此UndeliverableException 。 以下是如何避免崩溃和错误行为的代码片段:

 RxJavaPlugins.setErrorHandler(e -> { if (e instanceof UndeliverableException) { e = e.getCause(); } if ((e instanceof IOException) || (e instanceof SocketException)) { // fine, irrelevant network problem or API that throws on cancellation return; } if (e instanceof InterruptedException) { // fine, some blocking code was interrupted by a dispose call return; } if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) { // that's likely a bug in the application Thread.currentThread().getUncaughtExceptionHandler() .handleException(Thread.currentThread(), e); return; } if (e instanceof IllegalStateException) { // that's a bug in RxJava or in a custom operator Thread.currentThread().getUncaughtExceptionHandler() .handleException(Thread.currentThread(), e); return; } Log.warning("Undeliverable exception received, not sure what to do", e); }); 

此代码取自上面的链接。

重要的提示。 这种方法将全局error handling程序设置为RxJava,因此如果您可以摆脱这些exception – 那将是更好的选择。