如何让这个rxjava zip并行运行?

我有一个睡眠方法来模拟一个长时间运行的过程。

private void sleep() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } 

然后我有一个方法返回一个Observable,其中包含参数中给出的2个字符串的列表。 它在返回字符串之前调用sleep。

 private Observable<List> getStrings(final String str1, final String str2) { return Observable.fromCallable(new Callable<List>() { @Override public List call() { sleep(); List strings = new ArrayList(); strings.add(str1); strings.add(str2); return strings; } }); } 

然后我在Observalb.zip中调用了三次getStrings,我希望这三个调用并行运行,所以执行的总时间应该在2秒或3秒之内,因为睡眠只有2秒。 但是,它总共花了秒钟。 如何让它并行运行,以便在2秒内完成?

 Observable .zip(getStrings("One", "Two"), getStrings("Three", "Four"), getStrings("Five", "Six"), mergeStringLists()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(List strings) { //Display the strings } }); 

mergeStringLists方法

 private Func3<List, List, List, List> mergeStringLists() { return new Func3<List, List, List, List>() { @Override public List call(List strings, List strings2, List strings3) { Log.d(TAG, "..."); for (String s : strings2) { strings.add(s); } for (String s : strings3) { strings.add(s); } return strings; } }; } 

这种情况正在发生,因为订阅你的zipped observable发生在同一个io线程中。

为什么不尝试这样做:

 Observable .zip( getStrings("One", "Two") .subscribeOn(Schedulers.newThread()), getStrings("Three", "Four") .subscribeOn(Schedulers.newThread()), getStrings("Five", "Six") .subscribeOn(Schedulers.newThread()), mergeStringLists()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(List strings) { //Display the strings } }); 

如果有帮助,请告诉我

这里我有一个例子,我以异步方式使用Zip,以防你好奇

 /** * Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel. * By default Rx is not async, only if you explicitly use subscribeOn. */ @Test public void testAsyncZip() { scheduler = Schedulers.newThread(); scheduler1 = Schedulers.newThread(); scheduler2 = Schedulers.newThread(); long start = System.currentTimeMillis(); Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2).concat(s3)) .subscribe(result -> showResult("Async in:", start, result)); } public Observable obAsyncString() { return Observable.just("") .observeOn(scheduler) .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName())) .map(val -> "Hello"); } public Observable obAsyncString1() { return Observable.just("") .observeOn(scheduler1) .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName())) .map(val -> " World"); } public Observable obAsyncString2() { return Observable.just("") .observeOn(scheduler2) .doOnNext(val -> System.out.println("Thread " + Thread.currentThread().getName())) .map(val -> "!"); } 

您可以在此处查看更多示例https://github.com/politrons/reactive