RxJava – 调度程序与ExecutorService?
我有一种预感,对于RxJava中的高度计算,并行化的任务,传统的ExecutorService
会比Scheduler
更快。
我有一个理论认为这个代码
Observable source = ... source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.computation())) .subscribe();
会跑得比这慢
final ExecutorService svc = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); Observable source = ... source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.from(svc))) .finallyDo(svc::shutdown) .subscribe();
我将这两种方法与我在工作中进行的典型并行处理进行了比较,得到了以下结果。
EXECUTOR START: 2016-01-25T09:47:04.350 END: 2016-01-25T09:48:37.181 TOTAL TIME (SEC): 92 COMPUTATION SCHEDULER START: 2016-01-25T09:50:37.799 END: 2016-01-25T09:54:23.674 TOTAL TIME (SEC): 225
所以我的粗略测试表明,传统的ExecutorService
比Scheduler
快得多。
这些结果有原因吗? RxJava调度程序是否未针对并行化进行优化? 我的印象是计算调度程序使用的线程比执行程序少。
我做了几次测试,发现创建自己的ExecutorService
实际上可以提高并行化性能。 我在这里写了一篇博文 。
使用Schedulers.computation()
,所有事件都在同一个线程中处理。 您可以参考源代码CachedThreadScheduler.java
和NewThreadWorker.java
。 此实现的好处是,如果在eventB之后发出eventA,则eventA将在eventB之后处理。
使用Schedulers.from()
,事件将在不同的线程中处理。