Tag: rx java

将Observable收集到List中似乎不会立即发出集合

我正在使用RxJava来实质上收集单独发出的Observable的列表,并将它们组合成一个Observable列表(基本上与flatMap相反)。 这是我的代码: // myEvent.findMemberships() returns an Observable<List> myEvent.findMemberships() .flatMap(new Func1<List, Observable>() { @Override public Observable call(List memberships) { List users = new ArrayList(); for (Membership membership : memberships) { users.add(membership.getUser()); } return Observable.from(users); } }) .toList() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Timber.e(e, “Error […]

在Web应用程序中使用RxJava Observables无法解决性能提升

我正在执行一些测试来评估使用基于Observables的反应API是否真的有优势,而不是阻止传统的API。 整个例子可以在Githug上找到 令人惊讶的是,结果表明,输出的结果是: 最好的 :返回包含阻塞操作的Callable / DeferredResult REST服务。 同样糟糕 :阻止REST服务。 最糟糕的 :返回DeferredResult的REST服务,其结果由RxJava Observable设置。 这是我的Spring WebApp: 应用 : @SpringBootApplication public class SpringNioRestApplication { @Bean public ThreadPoolTaskExecutor executor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); return executor; } public static void main(String[] args) { SpringApplication.run(SpringNioRestApplication.class, args); } } SyncController : @RestController(“SyncRestController”) @Api(value=””, description=”Synchronous data controller”) public class […]

RxJava:d​​oOnNext和doOnEach之间的区别

在哪些情况下我应该使用doOnNext,在哪些情况下doOnEach? .doOnEach(new Action1<Notification>() { @Override public void call(Notification notification) { } }) .doOnNext(new Action1() { @Override public void call(MessageProfile profile) { messageProfileDao.save(profile); } }) 这看起来两个运营商都有相同的效果。

使用RxJava / Jersey2的异步RestAPI。 线程问题?

我们正在使用反应式编程对REST API进行原型设计。 如图所示,我们保留3层与我们在上一代同步API设计中使用的相同; http://oi59.tinypic.com/339hhki.jpg 使用Jersey2实现的API层将处理请求/反序列化JSON并切换到服务层。 服务层实现业务逻辑。使用反应式编程实现(RxJava) Dao Layer用于Service Layer的持久化操作。因为我们使用CouchBase,所以这将使用CouchBase RxClient。 根据我的理解,流程如下: a) HTTP请求到来,Jersery将从“容器线程池”处理RequestThread内的请求/解析JSON /反序列化请求模型。 b)使用Jersey2异步支持,RequestThread将返回到Container Thread Pool,服务层将在Schedulers.computation()调度程序中执行。 @Path(“/resource”) public class AsyncUserResource { @GET public void asyncGet(@Suspended final AsyncResponse asyncResponse) { Observable user = userService.getUser(…); //this is executed using Schedulers.computation() inside Service implementation user.subscribe(new Observer() { @Override public void onCompleted() { } @Override public void onError(Throwable […]

RxJava:如何用依赖关系组合多个Observable并在最后收集所有结果?

我正在学习RxJava,并且,作为我的第一个实验,试图在这段代码中重写代码 (在Netflix的博客上引用作为RxJava可以帮助解决的问题),以使用RxJava改进其异步性,即所以它在继续执行其余代码之前,不会等待第一个Future( f1.get() )的结果。 f3取决于f1 。 我看到如何处理这个问题, flatMap似乎可以解决这个问题: Observable f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA())) .flatMap(new Func1<String, Observable>() { @Override public Observable call(String s) { return Observable.from(executor.submit(new CallToRemoteServiceC(s))); } }); 接下来, f4和f5取决于f2 。 我有这个: final Observable f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB())) .flatMap(new Func1<Integer, Observable>() { @Override public Observable call(Integer i) { Observable f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i))); Observable f5Observable […]

RxJava- cache()与replay()相同吗?

我想知道是否有一个cache()运算符可以缓存x个数量的排放,但也会在指定的时间间隔(例如1分钟)后使它们到期。 我在寻找像…… Observable<ImmutableList> cachedList = otherObservable .cache(1, 1, TimeUnit.MINUTES); 这会缓存一个项目,但会在一分钟后过期并清除缓存。 我做了一些研究,找到了重播算子。 看起来它可以满足这种需求,但我有一些问题。 为什么它很热并且需要连接? 这是否与cache()运算符不同? 我知道cache()模仿主题,但它不需要连接。

RxJava – 获取列表中的每个项目

我有一个返回Observable<ArrayList> ,它是一些Items的id。 我想通过这个列表并使用另一个返回Observable方法下载每个Item。 我如何使用RxJava运算符执行此操作?

RxJava与Java 8并行流

它们之间有什么相似之处和不同之处,看起来Java Parallel Stream有一些RXJava中可用的元素,是吗?

RxJava是否适合分支工作流程?

我正在使用RxJava来处理我们从队列中提取的一些通知。 RxJava似乎在一个简单的工作流程中运行良好,现在有了新的要求,流程越来越复杂,分支越多(请参见下图作为参考) 我尝试通过一个小unit testing来举例说明流程: @Test public void test() { Observable.range(1, 100) .groupBy(n -> n % 3) .toMap(GroupedObservable::getKey) .flatMap(m1 -> { Observable ones1 = m1.get(0); Observable twos1 = m1.get(1).map(n -> n – 10); Observable threes = m1.get(2).map(n -> n + 100); Observable onesAndTwos = Observable.merge(ones1, twos1) .map(n -> n * 3) .groupBy(n -> n % 2) […]

屏幕旋转后恢复可流动转换为实时数据

说我有这样的活动: public class TestActivity extends AppCompatActivity { @Override public void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); final TextView countdown = new TextView(this); setContentView(countdown); ViewModelProviders.of(this) .get(TestViewModel.class) .getCountdown() .observe(this, countdown::setText); } } 视图模型是: class TestViewModel extends ViewModel { private final LiveData countdown = LiveDataReactiveStreams.fromPublisher( Flowable.concat( Flowable.just(“Falcon Heavy rocket will launch in…”), Flowable.intervalRange(0, 10, 3, 1, TimeUnit.SECONDS) .map(x -> […]