Tag: rx java2

使用Realm和RxJava 2

我在我的Android应用程序中使用RxJava 2,并且正在集成Realm。 据我所知,Realm默认只支持RxJava 1,并且在查询RealmResults时允许返回Observable ,如下所示: Realm.getDefaultInstance() .where(VideoBundle.class) .findAll() .asObservable() .first() 返回的Observable来自RxJava 1.我如何一起使用Realm和RxJava 2? 我在这里和这里找到了 2个相关问题,但没有找到简洁的答案。 此外,文档(在此处: https : RxObservableFactory )提到创建自定义RxObservableFactory ,但没有提供有关如何执行此操作的资源。 Realm如何与已经使用RxJava 2的项目一起使用?

如何在RxJava 2中发生错误后继续处理?

我有一个PublishSubject和一个Subscriber ,我用它来处理(可能)无限的预处理数据流。 问题是某些元素可能包含一些错误。 我想忽略它们并继续处理。 我怎么能这样做? 我尝试过这样的事情: val subject = PublishSubject.create() subject.retry().subscribe({ println(“next: $it”) }, { println(“error”) }, { println(“complete”) }) subject.onNext(“foo”) subject.onNext(“bar”) subject.onError(RuntimeException()) subject.onNext(“wom”) subject.onComplete() 我的问题是没有任何error handling方法可以帮助我: onErrorResumeNext() – 指示Observable在遇到错误时发出一系列项 onErrorReturn( ) – 指示Observable在遇到错误时发出特定项 onExceptionResumeNext( ) – 指示Observable在遇到exception后继续发出项目(但不是另一种throwable) retry( ) – 如果源Observable发出错误,重新订阅它,希望它能完成而不会出错 retryWhen( ) – 如果源Observable发出错误,则将该错误传递给另一个Observable以确定是否重新订阅源 我试过retry()例如但是它无限期地在错误之后挂起我的进程。 我也试过onErrorResumeNext()但它没有按预期工作: val backupSubject = PublishSubject.create() val subject = […]

RxJava在列表上执行操作并返回一个observable

我是RxJava的新手(特别是RxJava2)而且我遇到了一些看似相对简单的操作的问题。 我需要从db获取一些数据,遍历数据(它表示为列表),对每个项执行操作,将数据包装在另一个对象中并返回。 这是我到目前为止: mDataManager .getStuffList(id) .flatMapIterable(listOfStuff -> listOfStuff) .flatMap(item -> mDataManager .performCount(id, item.getTitle()) .doOnNext(item::setCounter) .takeLast(1) .map(counter -> item)) .toList() .toObservable() .flatMap(listOfStuff -> Observable.just(new StuffWrapper(listOfStuff)); 我遇到的问题是我的数据管理器调用从未完成。 这个想法是,每当底层数据发生变化时,UI也会发生变化。 但是,如果不完成这些调用,toList()将不会发出该事件。

如何在RxJava2中链接两个Completable

我有两个可完成的。 我想做以下场景:如果第一个Completable到达onComplete,继续第二个Completable。 最终结果将是第二次完成的完成。 当我有单个getUserIdAlreadySavedInDevice()和Completable login()时,我就是这样做的: @Override public Completable loginUserThatIsAlreadySavedInDevice(String password) { return getUserIdAlreadySavedInDevice() .flatMapCompletable(s -> login(password, s)) }

反应式编程优点/缺点

我一直在研究并尝试使用Reactor和RxJava进行编码的Reactive Style。 我确实理解,与单线程执行相比,反应式编码可以更好地利用CPU。 在基于Web的应用程序中,反应式编程与命令式编程之间是否有任何具体比较? 通过对非反应式编程使用反应式编程,我实现了多少性能提升和吞吐量? 还原反应编程有哪些优点和缺点? 有没有统计基准?

RxJava2 observable抛出UndeliverableException

据我所知,RxJava2 values.take(1)创建了另一个Observable,它只包含原始Observable中的一个元素。 哪个不能抛出exception,因为它被take(1)的效果过滤掉,因为它发生在第二个。 如下面的代码片段所示 Observable values = Observable.create(o -> { o.onNext(1); o.onError(new Exception(“Oops”)); }); values.take(1) .subscribe( System.out::println, e -> System.out.println(“Error: ” + e.getMessage()), () -> System.out.println(“Completed”) ); 产量 1 Completed io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366) at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83) at ch02.lambda$main$0(ch02.java:28) at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.Observable.subscribe(Observable.java:10827) at io.reactivex.Observable.subscribe(Observable.java:10787) at ch02.main(ch02.java:32) Caused by: […]

RxJava是否适合分支工作流程?

我正在使用RxJava来处理我们从队列中提取的一些通知。 RxJava似乎在一个简单的工作流程中运行良好,现在有了新的要求,流程越来越复杂,分支越多(请参见下图作为参考) 我尝试通过一个小unit testing来举例说明流程: @Test public void test() { Observable.range(1, 100) .groupBy(n -> n % 3) .toMap(GroupedObservable::getKey) .flatMap(m1 -> { Observable ones1 = m1.get(0); Observable twos1 = m1.get(1).map(n -> n – 10); Observable threes = m1.get(2).map(n -> n + 100); Observable onesAndTwos = Observable.merge(ones1, twos1) .map(n -> n * 3) .groupBy(n -> n % 2) […]