我有一个非常标准的 API 分页问题,您可以通过一些简单的递归来处理。这是一个捏造的例子:
public Observable<List<Result>> scan() {
return scanPage(Optional.empty(), ImmutableList.of());
}
private Observable<?> scanPage(Optional<KEY> startKey, List<Result> results) {
return this.scanner.scan(startKey, LIMIT)
.flatMap(page -> {
if (!page.getLastKey().isPresent()) {
return Observable.just(results);
}
return scanPage(page.getLastKey(), ImmutableList.<Result>builder()
.addAll(results)
.addAll(page.getResults())
.build()
);
});
}
但这显然会创建一个巨大的调用堆栈。我怎样才能强制执行此操作但又维护可观察流?
这是一个命令式阻塞示例:
public List<Result> scan() {
Optional<String> startKey = Optional.empty();
final ImmutableList.Builder<Result> results = ImmutableList.builder();
do {
final Page page = this.scanner.scan(startKey);
startKey = page.getLastKey();
results.addAll(page.getResults());
} while (startKey.isPresent());
return results.build();
}
JohnWowUs 的回答很好,帮助我理解了如何有效地避免递归,但有一些点我仍然感到困惑,所以我发布了我的调整版本。
Summary:
- 各个页面作为
Single
.
- Use a
Flowable
流式传输页面中包含的每个项目。这意味着我们函数的调用者不需要了解各个页面,只需收集所包含的项目即可。
- Use a
BehaviorProcessor
从第一页开始,一旦我们检查了当前页面是否有下一页可用,就获取每个后续页面。
- 关键是调用
processor.onNext(int)
开始下一次迭代。
该代码取决于rxjava https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava/2.1.9 and 反应流 https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams/1.0.2.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.processors.BehaviorProcessor;
public class Pagination {
// Fetch all pages and return the items contained in those pages, using the provided page fetcher function
public static <T> Flowable<T> fetchItems(Function<Integer, Single<Page<T>>> fetchPage) {
// Processor issues page indices
BehaviorProcessor<Integer> processor = BehaviorProcessor.createDefault(0);
// When an index number is issued, fetch the corresponding page
return processor.concatMap(index -> fetchPage.apply(index).toFlowable())
// when returning the page, update the processor to get the next page (or stop)
.doOnNext(page -> {
if (page.hasNext()) {
processor.onNext(page.getNextPageIndex());
} else {
processor.onComplete();
}
})
.concatMapIterable(Page::getElements);
}
public static void main(String[] args) {
fetchItems(Pagination::examplePageFetcher).subscribe(System.out::println);
}
// A function to fetch a page of our paged data
private static Single<Page<String>> examplePageFetcher(int index) {
return Single.just(pages.get(index));
}
// Create some paged data
private static ArrayList<Page<String>> pages = new ArrayList<>(3);
static {
pages.add(new Page<>(Arrays.asList("one", "two"), Optional.of(1)));
pages.add(new Page<>(Arrays.asList("three", "four"), Optional.of(2)));
pages.add(new Page<>(Arrays.asList("five"), Optional.empty()));
}
static class Page<T> {
private List<T> elements;
private Optional<Integer> nextPageIndex;
public Page(List<T> elements, Optional<Integer> nextPageIndex) {
this.elements = elements;
this.nextPageIndex = nextPageIndex;
}
public List<T> getElements() {
return elements;
}
public int getNextPageIndex() {
return nextPageIndex.get();
}
public boolean hasNext() {
return nextPageIndex.isPresent();
}
}
}
Output:
one
two
three
four
five
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)