Tag: rx java

如何等待异步Observable完成

我正在尝试使用rxjava构建一个示例。 该示例应协调ReactiveWareService和ReactiveReviewService重新运行WareAndReview组合。 ReactiveWareService public Observable findWares() { return Observable.from(wareService.findWares()); } ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency! public Observable findReviewsByItem(final String item) { return Observable.create((Observable.OnSubscribe) observer -> executor.execute(() -> { try { List reviews = reviewService.findReviewsByItem(item); reviews.forEach(observer::onNext); observer.onCompleted(); } catch (Exception e) { observer.onError(e); } })); } public List findWaresWithReviews() throws RuntimeException { […]

RxJava Observable“Iteration”如何工作?

我开始玩RxJava和ReactFX,我对此非常着迷。 但是当我在试验时,我有几十个问题而且我一直在研究答案。 我正在观察的一件事(没有双关语)当然是懒惰的执行。 使用下面的探索性代码,我注意到在merge.subscribe(pet -> System.out.println(pet))之前没有执行任何操作。 但令我着迷的是,当我订阅第二个订阅者merge.subscribe(pet -> System.out.println(“Feed ” + pet)) ,它再次触发了“迭代”。 我想要了解的是迭代的行为。 它似乎不像只能使用一次的Java 8 stream 。 它是否逐字逐句地遍历每个String并将其作为该时刻的值发布? 任何以前被解雇的用户跟随任何新用户接收这些项目是否像新用户一样? public class RxTest { public static void main(String[] args) { Observable dogs = Observable.from(ImmutableList.of(“Dasher”, “Rex”)) .filter(dog -> dog.matches(“D.*”)); Observable cats = Observable.from(ImmutableList.of(“Tabby”, “Grumpy Cat”, “Meowmers”, “Peanut”)); Observable ferrets = Observable.from(CompletableFuture.supplyAsync(() -> “Harvey”)); Observable merge = […]

如何在RxJava2中链接两个Completable

我有两个可完成的。 我想做以下场景:如果第一个Completable到达onComplete,继续第二个Completable。 最终结果将是第二次完成的完成。 当我有单个getUserIdAlreadySavedInDevice()和Completable login()时,我就是这样做的: @Override public Completable loginUserThatIsAlreadySavedInDevice(String password) { return getUserIdAlreadySavedInDevice() .flatMapCompletable(s -> login(password, s)) }

RxAndroid:在Schedulers.io()线程上更改UI

我在IO线程上有简单的工作,它正在改变主屏幕壁纸,之后我试图在UI线程上运行一些动画: AppObservable.bindFragment(this, Observable.just(0)) .observeOn(Schedulers.io()) .subscribe(v -> setWallpaperOnSeparateThread()); private void setWallpaperOnSeparateThread() { WallpaperHelper.setBitmapAsWallpaper(photoViewAttacher.getVisibleRectangleBitmap(), getBaseActivity()); AppObservable.bindFragment(this, Observable.just(0)) .delay(500, TimeUnit.MILLISECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> loadFinishAnimationAfterSetWallpaper()); } 但是这种方法导致错误: java.lang.IllegalStateException: Observers must subscribe from the main UI thread, but was Thread[RxCachedThreadScheduler-1,5,main] 我试图将第二个Observable改为: AppObservable.bindFragment(this, Observable.just(0)) .delay(2000, TimeUnit.MILLISECONDS) .observeOn(Schedulers.io()) .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(integer -> loadFinishAnimationAfterSetWallpaper()); 但它没有帮助。

如何终止Observable?

我有一个Observable,如果不满足某个条件,我想终止(即如果某个网站的响应不成功),这样我就可以重新查询网站,并再次调用observable。 我该怎么做呢? 这就是我想要做的: Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { //Perform network actions here if (!response.isSuccessful()) { //terminate this Observable so I can retrieve the token and call this observable again } } });

使用RXJava进行缓存处理

我正在尝试用rxJava实现这个工作流程,但我确定我是在滥用还是做错了。 用户要求登录 如果loginResult在缓存中可用,则“发出”缓存的LoginResult 实际上,如果一切都成功,则实际执行对Web服务的请求并缓存结果 如果发生错误,则最多重试3次,如果有第4次,则清除缓存。 这是我完整的代码片段。 public class LoginTask extends BaseBackground { private static CachedLoginResult cachedLoginResult = new CachedLoginResult(); private XMLRPCClient xmlrpcClient; private UserCredentialsHolder userCredentialsHolder; @Inject public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) { this.xmlrpcClient = client; this.userCredentialsHolder = userCredentialsHolder; } @Override public LoginResult performRequest() throws Exception { return UserApi.login( xmlrpcClient, userCredentialsHolder.getUserName(), userCredentialsHolder.getPlainPassword()); } @Override public Observable […]

RxJava – 观察可能始终发生变化的数据

我正试图跳上反应潮流,但在阅读并经历了很多例子后,我仍然没有找到我正在寻找的东西。 我有一个模型对象,可能会在应用程序的生命周期中的任何时间发生变化。 更改可能来自更新它的特定请求(来自服务器,数据库等),也可能因应用程序中触发的事件而更新。 我的问题是如何创建这样一个对象的Observable ,以及如何在发生变化时不断更新订阅者? 从我到目前为止看到的,我可以像这样创建observable: Observable.create(new Observable.OnSubscribe() { @Override public void call(final Subscriber subscriber) { subscriber.onNext(MyModel instance); } }); 我在这里缺少的是: 我不想发出不同的值( MyModel实例),但只是想让订阅者知道相同的实例(他们订阅的实例)已经改变。 如果我理解正确,那么每当新订户注册时call方法,但这不是我需要的,我需要在有更新时才采取行动然后我想通知所有订户。 我很有可能把这一切都搞错了,这就是为什么我很高兴能够理解如何用RxJava来满足我的需求。 谢谢。

在RxJava中线程安全是否必需SerializedSubject

我在RxJava中创建了一个Subject实例,并从多个线程调用它的onNext() : PublishSubject subject = PublishSubject.create(); //… subject.onNext(“A”); //thread A subject.onNext(“B”); //thread B RxJava文档说: 注意不要从多个线程调用其onNext( )方法(或其他方法),因为这可能导致非序列化调用,这违反了Observable合约并在生成的Subject产生歧义。 我是否必须在这样的Subject上调用toSerialized() ,假设我不关心”A”是在”B”之前还是之后? 序列化将如何帮助? 无论如何都是Subject线程安全还是我会在没有toSerialized()情况下中断RxJava? 文件提到的“ 可观察合同 ”是什么?

RxJava – 调度程序与ExecutorService?

我有一种预感,对于RxJava中的高度计算,并行化的任务,传统的ExecutorService会比Scheduler更快。 我有一个理论认为这个代码 Observable source = … source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.computation())) .subscribe(); 会跑得比这慢 final ExecutorService svc = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); Observable source = … source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.from(svc))) .finallyDo(svc::shutdown) .subscribe(); 我将这两种方法与我在工作中进行的典型并行处理进行了比较,得到了以下结果。 EXECUTOR START: 2016-01-25T09:47:04.350 END: 2016-01-25T09:48:37.181 TOTAL TIME (SEC): 92 COMPUTATION SCHEDULER START: 2016-01-25T09:50:37.799 END: 2016-01-25T09:54:23.674 TOTAL TIME (SEC): 225 所以我的粗略测试表明,传统的ExecutorService比Scheduler快得多。 这些结果有原因吗? RxJava调度程序是否未针对并行化进行优化? 我的印象是计算调度程序使用的线程比执行程序少。

当回复有时是一个对象而有时是一个数组时,如何在使用改进时解析JSON回复?

我正在使用Retrofit来获取JSON回复。 以下是我实施的部分内容 – @GET(“/api/report/list”) Observable listBill(@Query(“employee_id”) String employeeID); 和class级比尔是 – public static class Bills { @SerializedName(“report”) public ArrayList billItems; } BillItem类如下 – public static class BillItem { @SerializedName(“id”) Integer entryID; @SerializedName(“employee_id”) Integer employeeDBID; @SerializedName(“type”) String type; @SerializedName(“subtype”) String subtype; @SerializedName(“date”) String date; @SerializedName(“to”) String to; @SerializedName(“from”) String from; @SerializedName(“amount”) Double amount; @SerializedName(“location”) String location; @SerializedName(“remark”) […]