Paginate Observable结果没有递归 – RxJava

我有一个非常标准的API分页问题,​​您可以使用一些简单的递归来处理。 这是一个捏造的例子:

public Observable<List> scan() { return scanPage(Optional.empty(), ImmutableList.of()); } private Observable scanPage(Optional startKey, List results) { return this.scanner.scan(startKey, LIMIT) .flatMap(page -> { if (!page.getLastKey().isPresent()) { return Observable.just(results); } return scanPage(page.getLastKey(), ImmutableList.builder() .addAll(results) .addAll(page.getResults()) .build() ); }); } 

但这显然可以创建一个巨大的callstack。 如何强制执行此操作但保持Observable流?

这是一个命令式阻塞示例:

 public List scan() { Optional startKey = Optional.empty(); final ImmutableList.Builder results = ImmutableList.builder(); do { final Page page = this.scanner.scan(startKey); startKey = page.getLastKey(); results.addAll(page.getResults()); } while (startKey.isPresent()); return results.build(); } 

它不是最优雅的解决方案,但您可以使用主题和副作用。 请参阅下面的玩具示例

 import rx.Observable; import rx.Subscriber; import java.util.ArrayList; import java.util.List; import java.util.HashMap; import rx.subjects.*; public class Pagination { static HashMap> pages = new HashMap>(); public static void main(String[] args) throws InterruptedException { pages.put("default", new ArrayList()); pages.put("2", new ArrayList()); pages.put("3", new ArrayList()); pages.put("4", new ArrayList()); pages.get("default").add("2"); pages.get("default").add("Maths"); pages.get("default").add("Chemistry"); pages.get("2").add("3"); pages.get("2").add("Physics"); pages.get("2").add("Biology"); pages.get("3").add("4"); pages.get("3").add("Art"); pages.get("4").add(""); pages.get("4").add("Geography"); Observable> ret = Observable.defer(() -> { System.out.println("Building Observable"); ReplaySubject pagecontrol = ReplaySubject.create(1); Observable> ret2 = pagecontrol.asObservable().concatMap(aKey -> { if (!aKey.equals("")) { return Observable.just(pages.get(aKey)).doOnNext(page -> pagecontrol.onNext(page.get(0))); } else { return Observable.>empty().doOnCompleted(()->pagecontrol.onCompleted()); } }); pagecontrol.onNext("default"); return ret2; }); // Use this if you want to ensure work isn't done again ret = ret.cache(); ret.subscribe(l -> System.out.println("Sub 1 : " + l)); ret.subscribe(l -> System.out.println("Sub 2 : " + l)); Thread.sleep(2000L); } } 

编辑改进。

JohnWowUs的回答非常好,帮助我理解了如何有效地避免递归,但有些观点我仍然感到困惑,所以我发布了我的调整版本。

概要:

  • 单个页面以单个forms返回。
  • 使用Flowable流式传输页面中包含的每个项目。 这意味着我们的函数的调用者不需要知道各个页面,只能收集包含的项目。
  • 使用BehaviorProcessor从第一页开始,如果下一页可用,则在我们检查当前页面后获取每个后续​​页面。
  • 关键是对processor.onNext(int)的调用开始下一次迭代。

此代码依赖于rxjava和reactive-streams 。

 import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.function.Function; import io.reactivex.Flowable; import io.reactivex.Single; import io.reactivex.processors.BehaviorProcessor; public class Pagination { // Fetch all pages and return the items contained in those pages, using the provided page fetcher function public static  Flowable fetchItems(Function>> fetchPage) { // Processor issues page indices BehaviorProcessor processor = BehaviorProcessor.createDefault(0); // When an index number is issued, fetch the corresponding page return processor.concatMap(index -> fetchPage.apply(index).toFlowable()) // when returning the page, update the processor to get the next page (or stop) .doOnNext(page -> { if (page.hasNext()) { processor.onNext(page.getNextPageIndex()); } else { processor.onComplete(); } }) .concatMapIterable(Page::getElements); } public static void main(String[] args) { fetchItems(Pagination::examplePageFetcher).subscribe(System.out::println); } // A function to fetch a page of our paged data private static Single> examplePageFetcher(int index) { return Single.just(pages.get(index)); } // Create some paged data private static ArrayList> pages = new ArrayList<>(3); static { pages.add(new Page<>(Arrays.asList("one", "two"), Optional.of(1))); pages.add(new Page<>(Arrays.asList("three", "four"), Optional.of(2))); pages.add(new Page<>(Arrays.asList("five"), Optional.empty())); } static class Page { private List elements; private Optional nextPageIndex; public Page(List elements, Optional nextPageIndex) { this.elements = elements; this.nextPageIndex = nextPageIndex; } public List getElements() { return elements; } public int getNextPageIndex() { return nextPageIndex.get(); } public boolean hasNext() { return nextPageIndex.isPresent(); } } } 

输出:

 one two three four five 

另一种方法是使用令牌流:获取初始令牌的数据,在获得新的远程数据后将下一个令牌推送到流,然后重新订阅,直到令牌为空

  public Observable paging() { Subject tokenStream = BehaviorSubject.create().toSerialized(); tokenStream.onNext(Token.startToken()); Observable dataStream = Observable.defer(() -> tokenStream.first().flatMap(this::remoteData)) .doOnNext(window -> tokenStream.onNext(window.getToken())) .repeatWhen(completed -> completed.flatMap(__ -> tokenStream).takeWhile(Token::hasMore)); return dataStream; } 

结果是

 Window{next token=Token{key='1'}, data='data for token: Token{key=''}'} Window{next token=Token{key='2'}, data='data for token: Token{key='1'}'} Window{next token=Token{key='3'}, data='data for token: Token{key='2'}'} Window{next token=Token{key='4'}, data='data for token: Token{key='3'}'} Window{next token=Token{key='5'}, data='data for token: Token{key='4'}'} Window{next token=Token{key='6'}, data='data for token: Token{key='5'}'} Window{next token=Token{key='7'}, data='data for token: Token{key='6'}'} Window{next token=Token{key='8'}, data='data for token: Token{key='7'}'} Window{next token=Token{key='9'}, data='data for token: Token{key='8'}'} Window{next token=Token{key='10'}, data='data for token: Token{key='9'}'} 

复制可交换的样本

 public class RxPaging { public Observable paging() { Subject tokenStream = BehaviorSubject.create().toSerialized(); tokenStream.onNext(Token.startToken()); Observable dataStream = Observable.defer(() -> tokenStream.first().flatMap(this::remoteData)) .doOnNext(window -> tokenStream.onNext(window.getToken())) .repeatWhen(completed -> completed.flatMap(__ -> tokenStream).takeWhile(Token::hasMore)); return dataStream; } private Observable remoteData(Token token) { /*limit number of pages*/ int page = page(token); Token nextToken = page < 10 ? nextPageToken(token) : Token.endToken(); return Observable .just(new Window(nextToken, "data for token: " + token)) .delay(100, TimeUnit.MILLISECONDS); } private int page(Token token) { String key = token.getKey(); return key.isEmpty() ? 0 : Integer.parseInt(key); } private Token nextPageToken(Token token) { String tokenKey = token.getKey(); return tokenKey.isEmpty() ? new Token("1") : nextToken(tokenKey); } private Token nextToken(String tokenKey) { return new Token(String.valueOf(Integer.parseInt(tokenKey) + 1)); } public static class Token { private final String key; private Token(String key) { this.key = key; } public static Token endToken() { return startToken(); } public static Token startToken() { return new Token(""); } public String getKey() { return key; } public boolean hasMore() { return !key.isEmpty(); } @Override public String toString() { return "Token{" + "key='" + key + '\'' + '}'; } } public static class Window { private final Token token; private final String data; public Window(Token token, String data) { this.token = token; this.data = data; } public Token getToken() { return token; } public String getData() { return data; } @Override public String toString() { return "Window{" + "next token=" + token + ", data='" + data + '\'' + '}'; } } @Test public void testPaging() throws Exception { paging().toBlocking().subscribe(System.out::println); } } 

只是一个想法,为什么你不能实现自己的迭代迭代你的页面然后从它做一个观察者?

示例:

 Observable.from(new Iterable() { @Override public Iterator iterator() { return new Iterator() { @Override public boolean hasNext() { return hasNextPage(currentPageKey); } @Override public T next() { page = getNextPage(currentPageKey); currentPageKey = page.getKey(); return page; } @Override public void remove() { throw new UnsupportedOperationException(); } }; } }); 

一个更优雅的方法是让你的页面管理器(我相信你的代码示例中的scanner变量)实现可迭代并在那里编写迭代逻辑。