Tag: reactive programming

为什么我的RxJava Observable仅向第一个消费者发出?

有人可以解释为什么以下测试失败? public class ObservableTest { @Test public void badObservableUsedTwiceDoesNotEmitToSecondConsumer() { // Any simpler observable makes the test pass Observable badObservable = Observable.just(1) .zipWith(Observable.just(2), (one, two) -> Observable.just(3)) .flatMap(observable -> observable); ObservableCalculator calc1 = new ObservableCalculator(badObservable); ObservableCalculator calc2 = new ObservableCalculator(badObservable); // zipping causes the failure // Calling calculate().toBlocking().subscribe() on each calc passes // Observable.from(listOfCalcs).flatMap(calc -> […]

做运营商而不是整个订户

当你只需要OnNext()仅仅因为它更具可读性时,使用Action而不是整个Subscriber是非常有吸引力的。 但是,当然会发生错误,如果您只使用Action1 ,您的应用程序中会出现Exception 。 在这里, 运营商可以提供帮助。 我只关心这两种方法是完全一样的,请确认或否认。 任何陷阱? 第一种方法: Observable .just(readFromDatabase()) .doOnError(new Action1() { @Override public void call(Throwable throwable) { // handle error } }).subscribe(new Action1() { @Override public void call(SomeData someData) { // react! } }); 第二种方法: Observable .just(readFromDatabase()) .subscribe(new Subscriber() { @Override public void onCompleted() { // do nothing } @Override public void […]

RxJava在列表上执行操作并返回一个observable

我是RxJava的新手(特别是RxJava2)而且我遇到了一些看似相对简单的操作的问题。 我需要从db获取一些数据,遍历数据(它表示为列表),对每个项执行操作,将数据包装在另一个对象中并返回。 这是我到目前为止: mDataManager .getStuffList(id) .flatMapIterable(listOfStuff -> listOfStuff) .flatMap(item -> mDataManager .performCount(id, item.getTitle()) .doOnNext(item::setCounter) .takeLast(1) .map(counter -> item)) .toList() .toObservable() .flatMap(listOfStuff -> Observable.just(new StuffWrapper(listOfStuff)); 我遇到的问题是我的数据管理器调用从未完成。 这个想法是,每当底层数据发生变化时,UI也会发生变化。 但是,如果不完成这些调用,toList()将不会发出该事件。

使用RXJava进行缓存处理

我正在尝试用rxJava实现这个工作流程,但我确定我是在滥用还是做错了。 用户要求登录 如果loginResult在缓存中可用,则“发出”缓存的LoginResult 实际上,如果一切都成功,则实际执行对Web服务的请求并缓存结果 如果发生错误,则最多重试3次,如果有第4次,则清除缓存。 这是我完整的代码片段。 public class LoginTask extends BaseBackground { private static CachedLoginResult cachedLoginResult = new CachedLoginResult(); private XMLRPCClient xmlrpcClient; private UserCredentialsHolder userCredentialsHolder; @Inject public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) { this.xmlrpcClient = client; this.userCredentialsHolder = userCredentialsHolder; } @Override public LoginResult performRequest() throws Exception { return UserApi.login( xmlrpcClient, userCredentialsHolder.getUserName(), userCredentialsHolder.getPlainPassword()); } @Override public Observable […]

在Web应用程序中使用RxJava Observables无法解决性能提升

我正在执行一些测试来评估使用基于Observables的反应API是否真的有优势,而不是阻止传统的API。 整个例子可以在Githug上找到 令人惊讶的是,结果表明,输出的结果是: 最好的 :返回包含阻塞操作的Callable / DeferredResult REST服务。 同样糟糕 :阻止REST服务。 最糟糕的 :返回DeferredResult的REST服务,其结果由RxJava Observable设置。 这是我的Spring WebApp: 应用 : @SpringBootApplication public class SpringNioRestApplication { @Bean public ThreadPoolTaskExecutor executor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); return executor; } public static void main(String[] args) { SpringApplication.run(SpringNioRestApplication.class, args); } } SyncController : @RestController(“SyncRestController”) @Api(value=””, description=”Synchronous data controller”) public class […]

使用RxJava / Jersey2的异步RestAPI。 线程问题?

我们正在使用反应式编程对REST API进行原型设计。 如图所示,我们保留3层与我们在上一代同步API设计中使用的相同; http://oi59.tinypic.com/339hhki.jpg 使用Jersey2实现的API层将处理请求/反序列化JSON并切换到服务层。 服务层实现业务逻辑。使用反应式编程实现(RxJava) Dao Layer用于Service Layer的持久化操作。因为我们使用CouchBase,所以这将使用CouchBase RxClient。 根据我的理解,流程如下: a) HTTP请求到来,Jersery将从“容器线程池”处理RequestThread内的请求/解析JSON /反序列化请求模型。 b)使用Jersey2异步支持,RequestThread将返回到Container Thread Pool,服务层将在Schedulers.computation()调度程序中执行。 @Path(“/resource”) public class AsyncUserResource { @GET public void asyncGet(@Suspended final AsyncResponse asyncResponse) { Observable user = userService.getUser(…); //this is executed using Schedulers.computation() inside Service implementation user.subscribe(new Observer() { @Override public void onCompleted() { } @Override public void onError(Throwable […]

RxJava:如何用依赖关系组合多个Observable并在最后收集所有结果?

我正在学习RxJava,并且,作为我的第一个实验,试图在这段代码中重写代码 (在Netflix的博客上引用作为RxJava可以帮助解决的问题),以使用RxJava改进其异步性,即所以它在继续执行其余代码之前,不会等待第一个Future( f1.get() )的结果。 f3取决于f1 。 我看到如何处理这个问题, flatMap似乎可以解决这个问题: Observable f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA())) .flatMap(new Func1<String, Observable>() { @Override public Observable call(String s) { return Observable.from(executor.submit(new CallToRemoteServiceC(s))); } }); 接下来, f4和f5取决于f2 。 我有这个: final Observable f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB())) .flatMap(new Func1<Integer, Observable>() { @Override public Observable call(Integer i) { Observable f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i))); Observable f5Observable […]

RxJava- cache()与replay()相同吗?

我想知道是否有一个cache()运算符可以缓存x个数量的排放,但也会在指定的时间间隔(例如1分钟)后使它们到期。 我在寻找像…… Observable<ImmutableList> cachedList = otherObservable .cache(1, 1, TimeUnit.MINUTES); 这会缓存一个项目,但会在一分钟后过期并清除缓存。 我做了一些研究,找到了重播算子。 看起来它可以满足这种需求,但我有一些问题。 为什么它很热并且需要连接? 这是否与cache()运算符不同? 我知道cache()模仿主题,但它不需要连接。

反应式编程优点/缺点

我一直在研究并尝试使用Reactor和RxJava进行编码的Reactive Style。 我确实理解,与单线程执行相比,反应式编码可以更好地利用CPU。 在基于Web的应用程序中,反应式编程与命令式编程之间是否有任何具体比较? 通过对非反应式编程使用反应式编程,我实现了多少性能提升和吞吐量? 还原反应编程有哪些优点和缺点? 有没有统计基准?

RxJava – 获取列表中的每个项目

我有一个返回Observable<ArrayList> ,它是一些Items的id。 我想通过这个列表并使用另一个返回Observable方法下载每个Item。 我如何使用RxJava运算符执行此操作?