使用Realm和RxJava 2

我在我的Android应用程序中使用RxJava 2,并且正在集成Realm。 据我所知,Realm默认只支持RxJava 1,并且在查询RealmResults时允许返回Observable ,如下所示:

 Realm.getDefaultInstance() .where(VideoBundle.class) .findAll() .asObservable() .first() 

返回的Observable来自RxJava 1.我如何一起使用Realm和RxJava 2? 我在这里和这里找到了 2个相关问题,但没有找到简洁的答案。 此外,文档(在此处: https : RxObservableFactory )提到创建自定义RxObservableFactory ,但没有提供有关如何执行此操作的资源。

Realm如何与已经使用RxJava 2的项目一起使用?

解决方案是使用最新的背压策略将RealmResults与Flowable包装在一起。

 private io.reactivex.Flowable> getSomeItems() { return io.reactivex.Flowable.create(new FlowableOnSubscribe>() { @Override public void subscribe(FlowableEmitter<__>> emitter) throws Exception { Realm realm = Realm.getDefaultInstance(); RealmResults<__> results = realm.where(__.class).findAllSortedAsync("__"); final RealmChangeListener<__>> listener = _realm -> { if(!emitter.isUnsubscribed() && results.isLoaded()) { emitter.onNext(results); } }; emitter.setDisposable(Disposables.fromRunnable(() -> { results.removeChangeListener(listener); realm.close(); })); results.addChangeListener(listener); } }, BackpressureStrategy.LATEST) .subscribeOn(AndroidSchedulers.mainThread()) .unsubscribeOn(AndroidSchedulers.mainThread()); 

从Realm 4.0.0-RC1及更高版本开始,我在上面展示的这种行为是使用realmResults.asFlowable()

 Disposable subscription = realm.where(__.class) .findAllSortedAsync("__") .asFlowable() .filter(RealmResults::isLoaded) .subscribe(...);