如何在RxJava 2中发生错误后继续处理?

我有一个PublishSubject和一个Subscriber ,我用它来处理(可能)无限的预处理数据流。 问题是某些元素可能包含一些错误。 我想忽略它们并继续处理。 我怎么能这样做? 我尝试过这样的事情:

  val subject = PublishSubject.create() subject.retry().subscribe({ println("next: $it") }, { println("error") }, { println("complete") }) subject.onNext("foo") subject.onNext("bar") subject.onError(RuntimeException()) subject.onNext("wom") subject.onComplete() 

我的问题是没有任何error handling方法可以帮助我:

onErrorResumeNext() – 指示Observable在遇到错误时发出一系列项

onErrorReturn( ) – 指示Observable在遇到错误时发出特定项

onExceptionResumeNext( ) – 指示Observable在遇到exception后继续发出项目(但不是另一种throwable)

retry( ) – 如果源Observable发出错误,重新订阅它,希望它能完成而不会出错

retryWhen( ) – 如果源Observable发出错误,则将该错误传递给另一个Observable以确定是否重新订阅源

我试过retry()例如但是它无限期地在错误之后挂起我的进程。

我也试过onErrorResumeNext()但它没有按预期工作:

  val backupSubject = PublishSubject.create() val subject = PublishSubject.create() var currentSubject = subject subject.onErrorResumeNext(backupSubject).subscribe({ println("next: $it") }, { println("error") currentSubject = backupSubject }, { println("complete") }) backupSubject.subscribe({ println("backup") }, { println("backup error") }) currentSubject.onNext("foo") currentSubject.onNext("bar") currentSubject.onError(RuntimeException()) currentSubject.onNext("wom") currentSubject.onComplete() 

这只打印foobar

如果你想在错误后继续处理,这意味着你的错误就像你的String一样,并且应该通过onNext 。 为确保在这种情况下的类型安全,您应该使用某种forms的包装器,它可以采用常规值或错误; 例如, io.reactivex.Notification在RxJava 2中可用:

 PublishSubject> subject = PublishSubject.create(); subject.subscribe(System.out::println); subject.onNext(Notification.createOnNext("Hello")); subject.onNext(Notification.createOnError(new RuntimeException("oops"))); subject.onNext(Notification.createOnNext("World"));