使用RXJava 2对Realm进行异步读/写

我第一次使用RXJava 2实现异步操作

目标:

使用库Retrofit2从服务器获取json数据。 如果成功,则将数据写入Realm并在记录后立即获取数据并发送到RecyclerView的适配器。

所以,我用这种方式意识到了这一切:

 private void fetchChatsFromNetwork(int count, AccessDataModel accessDataModel) { String accessToken = accessDataModel.getAccessToken(); MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableSubscriber() { @Override public void onNext(ChatsModel chatsModel) { if (chatsRepository.hasData()) { chatsRepository.updateChatsData(chatsModel) .subscribe(new DisposableObserver() { @Override public void onNext(ChatsModel localChatsModel) { Log.d(TAG, "DO, onSuccess updated!"); iGetChatsCallback.onGetChatsSuccess(localChatsModel); } @Override public void onError(Throwable e) { Log.d(TAG, "DO, onError when update!"); iGetChatsCallback.onGetChatsError(e.getMessage()); } @Override public void onComplete() { dispose(); Log.d(TAG, "DO, onComplete!"); } }); } else { chatsRepository.insertChatsData(chatsModel) .subscribe(new DisposableObserver() { @Override public void onNext(ChatsModel localChatsModel) { iGetChatsCallback.onGetChatsSuccess(localChatsModel); Log.d(TAG, "DO, onSuccess inserted!"); } @Override public void onError(Throwable e) { iGetChatsCallback.onGetChatsError(e.getMessage()); Log.d(TAG, "DO, onError when inserting!"); } @Override public void onComplete() { dispose(); Log.d(TAG, "DO, onComplete!"); } }); } } @Override public void onError(Throwable t) { Log.d(TAG, "onError" + t.getMessage()); } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); } 

我在用户MyApplication.getRestApi().GetChats()onNext()方法中在Realm中写入数据MyApplication.getRestApi().GetChats()

这是输入代码:

 public Observable updateChatsData(final ChatsModel chatsModel) { return Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { if (chatsModel != null) { realm.executeTransactionAsync( realm -> realm.copyToRealmOrUpdate(chatsModel), () -> { Log.d(LOG_TAG, "Data success updated!"); ChatsModel localChatsModel = getAllChatsData(); e.onNext(localChatsModel); e.onComplete(); }, error -> { Log.d(LOG_TAG, "Update data failed!"); e.onError(error); }); } } }); } 

updateChatsData()异步方式写入并在另一个类中声明。

正如您所看到的, fetchChatsFromNetwork()方法写得很麻烦,或者在我看来

题:

无论我做得对不对,如果没有,那怎么回事?

 private void fetchChatsFromNetwork(int count, AccessDataModel accessDataModel) { String accessToken = accessDataModel.getAccessToken(); Single chats = MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version); chats.doOnNext((chats) -> { chatsRepository.insertOrUpdate(chats); }).subscribeOn(Schedulers.io()) .subscribe(); } 

 public void updateChatsData(final ChatsModel chatsModel) { try(Realm realm = Realm.getDefaultInstance()) { realm.executeTransaction(r -> { r.insertOrUpdate(chatsModel); }); } } 

 public Flowable> getAllChatsData(Realm realm) { RealmQuery query = realm.where(ChatsModel.class); if(realm.isAutoRefresh()) { return query.findAllAsync().asFlowable().filter(RealmResults::isLoaded); } else { return Flowable.just(query.findAll()); } }