Tag: rx java

使用RXJava 2对Realm进行异步读/写

我第一次使用RXJava 2实现异步操作 目标: 使用库Retrofit2从服务器获取json数据。 如果成功,则将数据写入Realm并在记录后立即获取数据并发送到RecyclerView的适配器。 所以,我用这种方式意识到了这一切: private void fetchChatsFromNetwork(int count, AccessDataModel accessDataModel) { String accessToken = accessDataModel.getAccessToken(); MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableSubscriber() { @Override public void onNext(ChatsModel chatsModel) { if (chatsRepository.hasData()) { chatsRepository.updateChatsData(chatsModel) .subscribe(new DisposableObserver() { @Override public void onNext(ChatsModel localChatsModel) { Log.d(TAG, “DO, onSuccess updated!”); iGetChatsCallback.onGetChatsSuccess(localChatsModel); } @Override public void onError(Throwable e) […]

使用Realm和RxJava 2

我在我的Android应用程序中使用RxJava 2,并且正在集成Realm。 据我所知,Realm默认只支持RxJava 1,并且在查询RealmResults时允许返回Observable ,如下所示: Realm.getDefaultInstance() .where(VideoBundle.class) .findAll() .asObservable() .first() 返回的Observable来自RxJava 1.我如何一起使用Realm和RxJava 2? 我在这里和这里找到了 2个相关问题,但没有找到简洁的答案。 此外,文档(在此处: https : RxObservableFactory )提到创建自定义RxObservableFactory ,但没有提供有关如何执行此操作的资源。 Realm如何与已经使用RxJava 2的项目一起使用?

RxJava和RxAndroid的组合?

我的场景与此图像非常相似: 应用程序的流程将如下所示: 查看需要更新。 使用RxAndroid创建一个observable来从缓存/本地文件中获取数据。 更新视图。 使用Retrofit和RxJava进行另一次网络呼叫,使用来自Web服务的新数据再次更新视图。 使用新数据更新本地文件。 所以,我正在两次更新视图(一个来自本地文件,之后就是通过webservices) 如何使用RxJava和RxAndroid实现结果? 我在想的是 创建observable1以从本地文件系统获取数据。 在observable1的onNext方法中,我可以创建另一个observable2 。 observable2.onNext()我可以更新本地文件。 现在,我将如何使用更新的数据更新view (加载到文件中)? 什么是好方法?

为什么我的RxJava Observable仅向第一个消费者发出?

有人可以解释为什么以下测试失败? public class ObservableTest { @Test public void badObservableUsedTwiceDoesNotEmitToSecondConsumer() { // Any simpler observable makes the test pass Observable badObservable = Observable.just(1) .zipWith(Observable.just(2), (one, two) -> Observable.just(3)) .flatMap(observable -> observable); ObservableCalculator calc1 = new ObservableCalculator(badObservable); ObservableCalculator calc2 = new ObservableCalculator(badObservable); // zipping causes the failure // Calling calculate().toBlocking().subscribe() on each calc passes // Observable.from(listOfCalcs).flatMap(calc -> […]

如何使用Retrofit和RxJava / RxAndroid处理响应错误?

我无法通过改造和RxAndroid来解决如何处理响应错误的问题。 如果出现网络错误等,则调用onError(),但我需要能够获得响应以检查是否存在身份validation错误。 相反,我得到的是一个带有空字符串的令牌,我找不到原因。 最好的方法是什么? 这是我目前的RxAndroid电话。 Client.getInstance().getService() .getToken(usernameET.getText().toString(), passwordET.getText().toString()) .subscribe(new Subscriber() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(SiteInfo siteInfo) { Log.d(TAG, “onNext “+ token.toString()); } }); 这是我的改造服务 @GET(“my_url_here”) Observable getToken( @Query(“username”) String username, @Query(“password”) String password ); 这是我当前的restadapter RestAdapter restAdapter = new RestAdapter.Builder() .setLogLevel(RestAdapter.LogLevel.FULL) […]

重复调用API,直到整个数据被下载数据

我有一个用例,我想反复为每个用户调用一个Web API,直到下载整个数据。 我拥有的Web API允许每个API请求为用户提取最多100条记录。 我可以指定要为该用户下载的记录的startTime和end startTime戳: void downloadRecord(String userId, recordStartTime, recordEndTime, int countOfRecord, ResultCallBack) 如果recordStartTime和recordEndTime之间的记录数大于100,则API响应仅返回100条记录。 然后我必须通过新的开始时间(刚刚下载的第100条记录的时间)循环调用此apis,直到下载所有记录。 final long recordStartTime = timeStampFrom2DaysAgo//; Observable.from(arrayListOfUserIds).flatMap(new Func1<String, Observable>() { @Override public Observable call(String userId) { long recordEndTime = getCurrentTimeMS(); //keep downloading records downloadRecord(userId, recordStartTime, recordEndTime, 100, new ResultCallBack() { //if records are < 100 then stop //else keep downloading […]

做运营商而不是整个订户

当你只需要OnNext()仅仅因为它更具可读性时,使用Action而不是整个Subscriber是非常有吸引力的。 但是,当然会发生错误,如果您只使用Action1 ,您的应用程序中会出现Exception 。 在这里, 运营商可以提供帮助。 我只关心这两种方法是完全一样的,请确认或否认。 任何陷阱? 第一种方法: Observable .just(readFromDatabase()) .doOnError(new Action1() { @Override public void call(Throwable throwable) { // handle error } }).subscribe(new Action1() { @Override public void call(SomeData someData) { // react! } }); 第二种方法: Observable .just(readFromDatabase()) .subscribe(new Subscriber() { @Override public void onCompleted() { // do nothing } @Override public void […]

在RxJava中,如何重试/恢复错误,而不是完成observable

我想要实现的是: 监控某个变化的偏好 检测到更改时,使用新值启动新的网络呼叫 转换结果 在UI中显示结果 我知道变化何时发生,现在我假设我需要在主题上调用onNext。 然后应该触发一个Rx链,最后我可以更新UI。 mViewPeriodSubject = PublishSubject.create(); mAdapterObservable = mViewPeriodSubject .flatMap(period -> MyRetrofitAPI.getService().fetchData(period)) // this might fail .flatMap(Observable::from) .map(MyItem::modifyItem) .toList() .map(obj -> new MyAdapter(obj)); mViewPeriodSubject.onNext(“week”); // this one starts the chain mViewPeriodSubject.onNext(“year”); // this one does not 但是在网络调用失败的情况下,可观察的错误和调用onNext()不会导致另一个网络调用。 所以我的问题是,我该如何处理? 我怎样才能保持Observable完好无损,这样我就可以抛出另一个值了? 我可以想象一个简单的重试按钮,它只是想忽略发生错误的事实。

RxJava – Observable的zip列表

我有一个Observables列表(RxJava 1)。 List observableList = new ArrayList(); 它可以包含至少1个Observable。 每个都有相同类型的结果。 如何压缩所有Observable的结果? 我想到了zip-operator但它不支持List,我不知道可观察量的数量(可以是1,2,3,4 ……)

RxJava在列表上执行操作并返回一个observable

我是RxJava的新手(特别是RxJava2)而且我遇到了一些看似相对简单的操作的问题。 我需要从db获取一些数据,遍历数据(它表示为列表),对每个项执行操作,将数据包装在另一个对象中并返回。 这是我到目前为止: mDataManager .getStuffList(id) .flatMapIterable(listOfStuff -> listOfStuff) .flatMap(item -> mDataManager .performCount(id, item.getTitle()) .doOnNext(item::setCounter) .takeLast(1) .map(counter -> item)) .toList() .toObservable() .flatMap(listOfStuff -> Observable.just(new StuffWrapper(listOfStuff)); 我遇到的问题是我的数据管理器调用从未完成。 这个想法是,每当底层数据发生变化时,UI也会发生变化。 但是,如果不完成这些调用,toList()将不会发出该事件。