RxJava – 调度程序与ExecutorService?

我有一种预感,对于RxJava中的高度计算,并行化的任务,传统的ExecutorService会比Scheduler更快。

我有一个理论认为这个代码

 Observable source = ... source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.computation())) .subscribe(); 

会跑得比这慢

 final ExecutorService svc = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); Observable source = ... source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.from(svc))) .finallyDo(svc::shutdown) .subscribe(); 

我将这两种方法与我在工作中进行的典型并行处理进行了比较,得到了以下结果。

 EXECUTOR START: 2016-01-25T09:47:04.350 END: 2016-01-25T09:48:37.181 TOTAL TIME (SEC): 92 COMPUTATION SCHEDULER START: 2016-01-25T09:50:37.799 END: 2016-01-25T09:54:23.674 TOTAL TIME (SEC): 225 

所以我的粗略测试表明,传统的ExecutorServiceScheduler快得多。

这些结果有原因吗? RxJava调度程序是否未针对并行化进行优化? 我的印象是计算调度程序使用的线程比执行程序少。

我做了几次测试,发现创建自己的ExecutorService实际上可以提高并行化性能。 我在这里写了一篇博文 。

使用Schedulers.computation() ,所有事件都在同一个线程中处理。 您可以参考源代码CachedThreadScheduler.javaNewThreadWorker.java 。 此实现的好处是,如果在eventB之后发出eventA,则eventA将在eventB之后处理。

使用Schedulers.from() ,事件将在不同的线程中处理。