RxJava和观察者代码的并行执行
我使用RxJava Observable api获得以下代码:
Observable observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath()); observable .buffer(10000) .observeOn(Schedulers.computation()) .subscribe(recordInfo -> { _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId()); for(Info info : recordInfo) { // some I/O operation logic } }, exception -> { }, () -> { });
我的期望是,在指定了计算调度程序之后,观察代码即subscribe()方法中的代码将并行执行。 相反,代码仍然在单个线程上顺序执行。 如何使用RxJava api使代码并行运行。
当涉及到异步/multithreading方面时,RxJava经常被误解。 multithreading操作的编码很简单,但理解抽象是另一回事。
关于RxJava的一个常见问题是如何从Observable实现并行化或同时发出多个项目。 当然,这个定义违反了Observable Contract,它声明onNext()必须按顺序调用,而且一次不能同时调用多个线程。
要实现并行性,您需要多个Observable。
这在一个线程中运行:
Observable vals = Observable.range(1,10); vals.subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .subscribe(val -> System.out.println("Subscriber received " + val + " on " + Thread.currentThread().getName()));
这在多个线程中运行:
Observable vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val));
代码和文字来自这篇博文。
您必须为此目的指定subscribeOn(Schedulers.computation())
而不是observeOn(Schedulers.computation())
。 在subscribeOn
您声明要在哪个线程中发出您的值。 在observeOn
您声明要处理的线程并观察它们。
使用flatMap
并指定在Schedulers.computation()
上进行订阅将实现并发。
这是一个使用Callable
的更实用的例子,从输出中我们可以看到完成所有任务需要大约2000毫秒。
static class MyCallable implements Callable { private static final Object CALLABLE_COUNT_LOCK = new Object(); private static int callableCount; @Override public Integer call() throws Exception { Thread.sleep(2000); synchronized (CALLABLE_COUNT_LOCK) { return callableCount++; } } public static int getCallableCount() { synchronized (CALLABLE_COUNT_LOCK) { return callableCount; } } } private static void runMyCallableConcurrentlyWithRxJava() { long startTimeMillis = System.currentTimeMillis(); final Semaphore semaphore = new Semaphore(1); try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable()) .flatMap(new Function>() { @Override public ObservableSource> apply(@NonNull MyCallable myCallable) throws Exception { return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation()); } }) .subscribeOn(Schedulers.computation()) .subscribe(new Observer
RxJava 2.0.5引入了并行流和ParallelFlowable ,这使得并行执行更简单,更具说明性。
您不再需要在flatMap
创建Observable
/ flatMap
,您只需在flatMap
调用parallel()
并返回ParallelFlowable
。
它并不像普通的Flowable
那样function丰富,因为并发会引发Rx契约的许多问题,但是你有基本的map()
, filter()
等等,在大多数情况下应该足够了。
所以代替来自@LordRaydenMK的这个流程回答
Observable vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val));
现在你可以这样做:
Flowable vals = Flowable.range(1, 10); vals.parallel() .runOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .sequential() .subscribe(val -> System.out.println(val));
这仍然是相同的序列。 即使在新线程上
可观察的ob3 = Observable.range(1,5);
ob3.flatMap(new Func1>() { @Override public Observable call(Integer pArg0) { return Observable.just(pArg0); } }).subscribeOn(Schedulers.newThread()).map(new Func1() { @Override public Integer call(Integer pArg0) { try { Thread.sleep(1000 - (pArg0 * 100)); System.out.println(pArg0 + " ccc " + Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } return pArg0; } }).subscribe();
输出1 ccc RxNewThreadScheduler-1 2 ccc RxNewThreadScheduler-1 3 ccc RxNewThreadScheduler-1 4 ccc RxNewThreadScheduler-1 5 ccc RxNewThreadScheduler-1