RxJava和观察者代码的并行执行

我使用RxJava Observable api获得以下代码:

Observable observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath()); observable .buffer(10000) .observeOn(Schedulers.computation()) .subscribe(recordInfo -> { _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId()); for(Info info : recordInfo) { // some I/O operation logic } }, exception -> { }, () -> { }); 

我的期望是,在指定了计算调度程序之后,观察代码即subscribe()方法中的代码将并行执行。 相反,代码仍然在单个线程上顺序执行。 如何使用RxJava api使代码并行运行。

当涉及到异步/multithreading方面时,RxJava经常被误解。 multithreading操作的编码很简单,但理解抽象是另一回事。

关于RxJava的一个常见问题是如何从Observable实现并行化或同时发出多个项目。 当然,这个定义违反了Observable Contract,它声明onNext()必须按顺序调用,而且一次不能同时调用多个线程。

要实现并行性,您需要多个Observable。

这在一个线程中运行:

 Observable vals = Observable.range(1,10); vals.subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .subscribe(val -> System.out.println("Subscriber received " + val + " on " + Thread.currentThread().getName())); 

这在多个线程中运行:

 Observable vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val)); 

代码和文字来自这篇博文。

您必须为此目的指定subscribeOn(Schedulers.computation())而不是observeOn(Schedulers.computation()) 。 在subscribeOn您声明要在哪个线程中发出您的值。 在observeOn您声明要处理的线程并观察它们。

使用flatMap并指定在Schedulers.computation()上进行订阅将实现并发。

这是一个使用Callable的更实用的例子,从输出中我们可以看到完成所有任务需要大约2000毫秒。

 static class MyCallable implements Callable { private static final Object CALLABLE_COUNT_LOCK = new Object(); private static int callableCount; @Override public Integer call() throws Exception { Thread.sleep(2000); synchronized (CALLABLE_COUNT_LOCK) { return callableCount++; } } public static int getCallableCount() { synchronized (CALLABLE_COUNT_LOCK) { return callableCount; } } } private static void runMyCallableConcurrentlyWithRxJava() { long startTimeMillis = System.currentTimeMillis(); final Semaphore semaphore = new Semaphore(1); try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable()) .flatMap(new Function>() { @Override public ObservableSource apply(@NonNull MyCallable myCallable) throws Exception { return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation()); } }) .subscribeOn(Schedulers.computation()) .subscribe(new Observer() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Object o) { System.out.println("onNext " + o); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { if (MyCallable.getCallableCount() >= 4) { semaphore.release(); } } }); try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis)); } 

RxJava 2.0.5引入了并行流和ParallelFlowable ,这使得并行执行更简单,更具说明性。

您不再需要在flatMap创建Observable / flatMap ,您只需在flatMap调用parallel()并返回ParallelFlowable

它并不像普通的Flowable那样function丰富,因为并发会引发Rx契约的许多问题,但是你有基本的map()filter()等等,在大多数情况下应该足够了。

所以代替来自@LordRaydenMK的这个流程回答

 Observable vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val)); 

现在你可以这样做:

 Flowable vals = Flowable.range(1, 10); vals.parallel() .runOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .sequential() .subscribe(val -> System.out.println(val)); 

这仍然是相同的序列。 即使在新线程上

可观察的ob3 = Observable.range(1,5);

  ob3.flatMap(new Func1>() { @Override public Observable call(Integer pArg0) { return Observable.just(pArg0); } }).subscribeOn(Schedulers.newThread()).map(new Func1() { @Override public Integer call(Integer pArg0) { try { Thread.sleep(1000 - (pArg0 * 100)); System.out.println(pArg0 + " ccc " + Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } return pArg0; } }).subscribe(); 

输出1 ccc RxNewThreadScheduler-1 2 ccc RxNewThreadScheduler-1 3 ccc RxNewThreadScheduler-1 4 ccc RxNewThreadScheduler-1 5 ccc RxNewThreadScheduler-1