如何在RxJava 2中发生错误后继续处理?
我有一个PublishSubject
和一个Subscriber
,我用它来处理(可能)无限的预处理数据流。 问题是某些元素可能包含一些错误。 我想忽略它们并继续处理。 我怎么能这样做? 我尝试过这样的事情:
val subject = PublishSubject.create() subject.retry().subscribe({ println("next: $it") }, { println("error") }, { println("complete") }) subject.onNext("foo") subject.onNext("bar") subject.onError(RuntimeException()) subject.onNext("wom") subject.onComplete()
我的问题是没有任何error handling方法可以帮助我:
onErrorResumeNext()
– 指示Observable在遇到错误时发出一系列项
onErrorReturn( )
– 指示Observable在遇到错误时发出特定项
onExceptionResumeNext( )
– 指示Observable在遇到exception后继续发出项目(但不是另一种throwable)
retry( )
– 如果源Observable发出错误,重新订阅它,希望它能完成而不会出错
retryWhen( )
– 如果源Observable发出错误,则将该错误传递给另一个Observable以确定是否重新订阅源
我试过retry()
例如但是它无限期地在错误之后挂起我的进程。
我也试过onErrorResumeNext()
但它没有按预期工作:
val backupSubject = PublishSubject.create() val subject = PublishSubject.create() var currentSubject = subject subject.onErrorResumeNext(backupSubject).subscribe({ println("next: $it") }, { println("error") currentSubject = backupSubject }, { println("complete") }) backupSubject.subscribe({ println("backup") }, { println("backup error") }) currentSubject.onNext("foo") currentSubject.onNext("bar") currentSubject.onError(RuntimeException()) currentSubject.onNext("wom") currentSubject.onComplete()
这只打印foo
和bar
。
如果你想在错误后继续处理,这意味着你的错误就像你的String
一样,并且应该通过onNext
。 为确保在这种情况下的类型安全,您应该使用某种forms的包装器,它可以采用常规值或错误; 例如, io.reactivex.Notification
在RxJava 2中可用:
PublishSubject> subject = PublishSubject.create(); subject.subscribe(System.out::println); subject.onNext(Notification.createOnNext("Hello")); subject.onNext(Notification.createOnError(new RuntimeException("oops"))); subject.onNext(Notification.createOnNext("World"));