Tag: rx android

使用RXJava 2对Realm进行异步读/写

我第一次使用RXJava 2实现异步操作 目标: 使用库Retrofit2从服务器获取json数据。 如果成功,则将数据写入Realm并在记录后立即获取数据并发送到RecyclerView的适配器。 所以,我用这种方式意识到了这一切: private void fetchChatsFromNetwork(int count, AccessDataModel accessDataModel) { String accessToken = accessDataModel.getAccessToken(); MyApplication.getRestApi().getChats(count, accessToken, Constants.api_version) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(new DisposableSubscriber() { @Override public void onNext(ChatsModel chatsModel) { if (chatsRepository.hasData()) { chatsRepository.updateChatsData(chatsModel) .subscribe(new DisposableObserver() { @Override public void onNext(ChatsModel localChatsModel) { Log.d(TAG, “DO, onSuccess updated!”); iGetChatsCallback.onGetChatsSuccess(localChatsModel); } @Override public void onError(Throwable e) […]

RxJava和RxAndroid的组合?

我的场景与此图像非常相似: 应用程序的流程将如下所示: 查看需要更新。 使用RxAndroid创建一个observable来从缓存/本地文件中获取数据。 更新视图。 使用Retrofit和RxJava进行另一次网络呼叫,使用来自Web服务的新数据再次更新视图。 使用新数据更新本地文件。 所以,我正在两次更新视图(一个来自本地文件,之后就是通过webservices) 如何使用RxJava和RxAndroid实现结果? 我在想的是 创建observable1以从本地文件系统获取数据。 在observable1的onNext方法中,我可以创建另一个observable2 。 observable2.onNext()我可以更新本地文件。 现在,我将如何使用更新的数据更新view (加载到文件中)? 什么是好方法?

如何使用Retrofit和RxJava / RxAndroid处理响应错误?

我无法通过改造和RxAndroid来解决如何处理响应错误的问题。 如果出现网络错误等,则调用onError(),但我需要能够获得响应以检查是否存在身份validation错误。 相反,我得到的是一个带有空字符串的令牌,我找不到原因。 最好的方法是什么? 这是我目前的RxAndroid电话。 Client.getInstance().getService() .getToken(usernameET.getText().toString(), passwordET.getText().toString()) .subscribe(new Subscriber() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onNext(SiteInfo siteInfo) { Log.d(TAG, “onNext “+ token.toString()); } }); 这是我的改造服务 @GET(“my_url_here”) Observable getToken( @Query(“username”) String username, @Query(“password”) String password ); 这是我当前的restadapter RestAdapter restAdapter = new RestAdapter.Builder() .setLogLevel(RestAdapter.LogLevel.FULL) […]

在RxJava中,如何重试/恢复错误,而不是完成observable

我想要实现的是: 监控某个变化的偏好 检测到更改时,使用新值启动新的网络呼叫 转换结果 在UI中显示结果 我知道变化何时发生,现在我假设我需要在主题上调用onNext。 然后应该触发一个Rx链,最后我可以更新UI。 mViewPeriodSubject = PublishSubject.create(); mAdapterObservable = mViewPeriodSubject .flatMap(period -> MyRetrofitAPI.getService().fetchData(period)) // this might fail .flatMap(Observable::from) .map(MyItem::modifyItem) .toList() .map(obj -> new MyAdapter(obj)); mViewPeriodSubject.onNext(“week”); // this one starts the chain mViewPeriodSubject.onNext(“year”); // this one does not 但是在网络调用失败的情况下,可观察的错误和调用onNext()不会导致另一个网络调用。 所以我的问题是,我该如何处理? 我怎样才能保持Observable完好无损,这样我就可以抛出另一个值了? 我可以想象一个简单的重试按钮,它只是想忽略发生错误的事实。

RxAndroid:在Schedulers.io()线程上更改UI

我在IO线程上有简单的工作,它正在改变主屏幕壁纸,之后我试图在UI线程上运行一些动画: AppObservable.bindFragment(this, Observable.just(0)) .observeOn(Schedulers.io()) .subscribe(v -> setWallpaperOnSeparateThread()); private void setWallpaperOnSeparateThread() { WallpaperHelper.setBitmapAsWallpaper(photoViewAttacher.getVisibleRectangleBitmap(), getBaseActivity()); AppObservable.bindFragment(this, Observable.just(0)) .delay(500, TimeUnit.MILLISECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> loadFinishAnimationAfterSetWallpaper()); } 但是这种方法导致错误: java.lang.IllegalStateException: Observers must subscribe from the main UI thread, but was Thread[RxCachedThreadScheduler-1,5,main] 我试图将第二个Observable改为: AppObservable.bindFragment(this, Observable.just(0)) .delay(2000, TimeUnit.MILLISECONDS) .observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> loadFinishAnimationAfterSetWallpaper()); 但它没有帮助。

如何终止Observable?

我有一个Observable,如果不满足某个条件,我想终止(即如果某个网站的响应不成功),这样我就可以重新查询网站,并再次调用observable。 我该怎么做呢? 这就是我想要做的: Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { //Perform network actions here if (!response.isSuccessful()) { //terminate this Observable so I can retrieve the token and call this observable again } } });

如何使用RXJava Observables处理用于填充模型的多个请求?

我们在网络堆栈中使用ReactiveX和Retrofit以异步方式处理所有API请求。 我们的目标是创建一个返回完全填充的User模型集合的方法。 每个User模型都有一个Pet对象列表。 我们可以通过一个请求获取所有User模型。 但是,每个User需要请求Pet模型。 获得用户很简单: // Service.java @GET(“users/?locationId={id}”) Observable<List> getUsersForLocation(@Path(“id”) int locationId); @GET(“pets/?userId={id}”) Observable<List> getPetsForUser(@Path(“id”) int userId); // DataManager.java public Observable<List> getUsersForLocation(int locationId) { return api.getUsersForLocation(locationId); } public Observable<List> getPetsForUser(int userId) { return api.getPetsForUser(userId); } 我们希望找到一些方便的(RX风格)循环User列表的方法,为每个用户获取Pet ,将它们分配给User并最终返回Observable<List> 。 我对RX很新。 我查看了文档,并尝试使用各种方法,如flatMap()和zip ,但是,我还没有找到变换或组合的确切组合来实现它。

如何让这个rxjava zip并行运行?

我有一个睡眠方法来模拟一个长时间运行的过程。 private void sleep() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } 然后我有一个方法返回一个Observable,其中包含参数中给出的2个字符串的列表。 它在返回字符串之前调用sleep。 private Observable<List> getStrings(final String str1, final String str2) { return Observable.fromCallable(new Callable<List>() { @Override public List call() { sleep(); List strings = new ArrayList(); strings.add(str1); strings.add(str2); return strings; } }); } 然后我在Observalb.zip中调用了三次getStrings,我希望这三个调用并行运行,所以执行的总时间应该在2秒或3秒之内,因为睡眠只有2秒。 但是,它总共花了六秒钟。 如何让它并行运行,以便在2秒内完成? Observable .zip(getStrings(“One”, “Two”), […]

当我在FlowableOnSubscribe类中调用onNext时,我的订阅者的onNext和onComplete函数不会运行

在使用RxJava 2的Android项目中,我在我的初始活动的onCreate中创建了这样的Flowable: Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER) .doOnComplete(new MyAction()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new MySubscriber()); FlowableOnSubscribe的实现是: public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe { public static final String TAG = “XX MyFlOnSub1”; @Override public void subscribe(FlowableEmitter emitter) { Log.i(TAG, “subscribe”); emitter.onNext(“hello”); emitter.onComplete(); } } 这是订户实施: public class MySubscriber implements Subscriber { public static final String TAG = “XX MySubscriber”; @Override public […]