将Observable收集到List中似乎不会立即发出集合

我正在使用RxJava来实质上收集单独发出的Observable的列表,并将它们组合成一个Observable列表(基本上与flatMap相反)。 这是我的代码:

// myEvent.findMemberships() returns an Observable<List> myEvent.findMemberships() .flatMap(new Func1<List, Observable>() { @Override public Observable call(List memberships) { List users = new ArrayList(); for (Membership membership : memberships) { users.add(membership.getUser()); } return Observable.from(users); } }) .toList() .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Timber.e(e, "Error when trying to get memberships"); } @Override public void onNext(List users) { Timber.d("%d users emitted", users.size()); } }) 

我注意到我的onNext从未被调用过。 我似乎无法理解这一点。 如果我删除.toList调用并基本输出各个用户(如下所示),它通过发出每个项目来工作。

 subscriptions // .add(currentEvent.findMemberships() .flatMap(new Func1<List, Observable>() { @Override public Observable call(List memberships) { List users = new ArrayList(); for (Membership membership : memberships) { users.add(membership.getUser()); } return Observable.from(users); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Timber.e(e, "Error when trying to get memberships"); } @Override public void onNext(User user) { Timber.d("%d users emitted", user.getName()); } })); 

Q1。 我对.toList的理解不正确吗?

Q2。 如何将单独发出的Observable流组合成一个Observable<List>

**编辑

@kjones完全解决了这个问题。 我没有通过我的findMemberships电话呼叫onComplete。 我在下面添加了代码段。 我真正的用例有一些更多的转换,这就是为什么我需要使用.toList调用。 正如@zsxwing也正确地指出的那样,对于这个用例,一个简单的地图就足够了。

 public Observable<List> findMemberships() { return Observable.create(new Observable.OnSubscribe<List>() { @Override public void call(Subscriber<? super List> subscriber) { try { // ..... List memberships = queryMyDb(); subscriber.onNext(memberships); // BELOW STATEMENT FIXES THE PROBLEM >< // subscriber.onCompleted(); } catch (SQLException e) { // ... } } }); 

}

看起来myEvent.findMemberships()调用返回的Observable永远不会调用onComplete。 你能展示这段代码吗?

如果是这种情况,它会解释您所看到的行为。 在发出所有项目之前, .toList()不会发出列表(由onComplete发出信号)。

没有.toList()第二个版本.toList()如下方式进行:

 .findMemberships() emits a single List .flatMap() transforms List into a single List Observable.from(users) creates an observable that emits each user .subscribe() onNext() is called for each user onCompleted() is never called. 

你的原始版本:

 .findMemberships() emits a single List .flatMap() transforms List into a single List Observable.from(users) creates an observable that emits each user .toList() buffers each User waiting for onCompleted() to be called onCompleted is never called because the .findMemberships Observable never completes 

有几种解决方案:

1)使findMemberShips()可观察调用onComplete。如果findMemberShips()返回的Observable是Rx主题(PublishSubject,BehaviorSubject等),则可能不需要这样做。

2)使用Observable.just()而不是Observable.from() 。 你已经在.flatMap()中有一个List ,只需返回它。 使用Observable.from(users)创建一个发出每个用户的Observable。 Observable.just(users)将创建一个Observable,它发出一个List 。 不需要.toList()

3)使用.map()代替.flatMap() 。 同样,不需要.toList() 。 由于每个List都被转换为List您只需要使用.map()

 myEvent .findMemberships() .map(new Func1, List>() { @Override public List call(List memberships) { List users = new ArrayList(); for (Membership membership : memberships) { users.add(membership.getUser()); } return users; } }) 
Interesting Posts