无需递归即可对可观察结果进行分页 - RxJava

2024-05-14

我有一个非常标准的 API 分页问题,​​您可以通过一些简单的递归来处理。这是一个捏造的例子:

public Observable<List<Result>> scan() {
    return scanPage(Optional.empty(), ImmutableList.of());
}

private Observable<?> scanPage(Optional<KEY> startKey, List<Result> results) {
    return this.scanner.scan(startKey, LIMIT)
            .flatMap(page -> {
                if (!page.getLastKey().isPresent()) {
                    return Observable.just(results);
                }
                return scanPage(page.getLastKey(), ImmutableList.<Result>builder()
                        .addAll(results)
                        .addAll(page.getResults())
                        .build()
                );
            });
}

但这显然会创建一个巨大的调用堆栈。我怎样才能强制执行此操作但又维护可观察流?

这是一个命令式阻塞示例:

public List<Result> scan() {
    Optional<String> startKey = Optional.empty();
    final ImmutableList.Builder<Result> results = ImmutableList.builder();

    do {
        final Page page = this.scanner.scan(startKey);
        startKey = page.getLastKey();
        results.addAll(page.getResults());
    } while (startKey.isPresent());

    return results.build();
}

JohnWowUs 的回答很好,帮助我理解了如何有效地避免递归,但有一些点我仍然感到困惑,所以我发布了我的调整版本。

Summary:

  • 各个页面作为Single.
  • Use a Flowable流式传输页面中包含的每个项目。这意味着我们函数的调用者不需要了解各个页面,只需收集所包含的项目即可。
  • Use a BehaviorProcessor从第一页开始,一旦我们检查了当前页面是否有下一页可用,就获取每个后续​​页面。
  • 关键是调用processor.onNext(int)开始下一次迭代。

该代码取决于rxjava https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava/2.1.9 and 反应流 https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams/1.0.2.

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.processors.BehaviorProcessor;

public class Pagination {

    // Fetch all pages and return the items contained in those pages, using the provided page fetcher function
    public static <T> Flowable<T> fetchItems(Function<Integer, Single<Page<T>>> fetchPage) {
        // Processor issues page indices
        BehaviorProcessor<Integer> processor = BehaviorProcessor.createDefault(0);
        // When an index number is issued, fetch the corresponding page
        return processor.concatMap(index -> fetchPage.apply(index).toFlowable())
                        // when returning the page, update the processor to get the next page (or stop)
                        .doOnNext(page -> {
                            if (page.hasNext()) {
                                processor.onNext(page.getNextPageIndex());
                            } else {
                                processor.onComplete();
                            }
                        })
                        .concatMapIterable(Page::getElements);
    }

    public static void main(String[] args) {
        fetchItems(Pagination::examplePageFetcher).subscribe(System.out::println);
    }

    // A function to fetch a page of our paged data
    private static Single<Page<String>> examplePageFetcher(int index) {
        return Single.just(pages.get(index));
    }

    // Create some paged data
    private static ArrayList<Page<String>> pages = new ArrayList<>(3);

    static {
        pages.add(new Page<>(Arrays.asList("one", "two"), Optional.of(1)));
        pages.add(new Page<>(Arrays.asList("three", "four"), Optional.of(2)));
        pages.add(new Page<>(Arrays.asList("five"), Optional.empty()));
    }

    static class Page<T> {
        private List<T> elements;
        private Optional<Integer> nextPageIndex;

        public Page(List<T> elements, Optional<Integer> nextPageIndex) {
            this.elements = elements;
            this.nextPageIndex = nextPageIndex;
        }

        public List<T> getElements() {
            return elements;
        }

        public int getNextPageIndex() {
            return nextPageIndex.get();
        }

        public boolean hasNext() {
            return nextPageIndex.isPresent();
        }
    }
}

Output:

one
two
three
four
five
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无需递归即可对可观察结果进行分页 - RxJava 的相关文章

随机推荐

  • 用于传输命名参数和正文的云端点资源属性不起作用

    我正在尝试通过gapi client rpc调用实现对谷歌云端点的调用 如文档中所述 和Google Cloud Endpoints 使用 JS 客户端进行调用 传递参数和 JSON 正文 https stackoverflow com q
  • 提交表单时获取查询字符串值...MVC3

    在我的 MVC3 应用程序中 如果我在 url 中输入查询字符串值并按 Enter 键 我可以获得输入的值 localhost 34556 db test 我将触发的默认操作 public ActionResult Index string
  • 如果一个多维数组中的子数组与另一个多维数组不同,则覆盖该子数组

    我坚持这个问题 真的不知道如何解决 我有两个多维数组 需要将第二个数组中的每个 entry id 与第一个数组进行匹配 然后需要检查第二个数组中的每个 file no 是否在数据库 第一个数组 中 并且 status 是否与第一个数组匹配
  • Mysql获取特定表的最后一个id

    我必须从特定的插入表中获取最后的插入 ID 可以说我有这个代码 INSERT INTO blahblah test1 test 2 VALUES test1 test2 INSERT INTO blahblah2 test1 test 2
  • GoogleDrive + Alamofire:上传具有属性的文件

    我正在尝试通过 Swift 2 Alamofire 将文件 参数上传到 Google Drive 在下面的代码中 我更改了以下行 https www googleapis com upload drive v3 files uploadTy
  • Mongo按动态字段排序

    所以我传入了一个动态变量 它是我想要排序的字段的名称 假设下面的 sortVariable 可能等于 price createdAt name 等 这不起作用 我该怎么做 function findStuff sortVariable va
  • SQL Server 不使用索引将日期时间与非空进行比较

    我有一个与其他任何表都不相关的简单表 它有一个非 PK 列 它是一个日期 我已经为该列创建了一个非聚集索引 如果我提出这个查询 select from table where datecolumn is not null 但如果我删除 no
  • Android 认为我没有关闭数据库!为什么?

    我有一个 SQLiteDatabase 数据成员 我在 onCreate 中初始化它 并在 onPause onStop 和 onDestroy 中调用 close 它在 onResume 中重新初始化 它似乎运行得很好 但当我查看调试器时
  • 在 Scala 中将元素追加到列表末尾

    我无法添加 type 元素T到一个列表中List T 我尝试过myList myElement但它似乎创建了一个奇怪的对象并访问myList last始终返回放入列表中的第一个元素 我怎么解决这个问题 List 1 2 3 4 Result
  • 无需构建树即可预测霍夫曼压缩比

    我有一个二进制文件 我知道其中每个符号出现的次数 如果我要使用霍夫曼算法压缩它 我需要预测压缩文件的长度 我只对假设的输出长度感兴趣 而不对单个符号的代码感兴趣 因此构建霍夫曼树似乎是多余的 作为一个例子 我需要得到类似的东西 包含 4 个
  • 在 pygame 中,我如何创建一个数据结构来跟踪调整大小事件和对象的坐标?

    我希望在调整屏幕大小后使鼠标事件与对象保持同步 有人告诉我需要创建一个数据结构来跟踪 调整事件大小 新坐标以匹配调整大小 如何使用简单的代数方程来完成此操作并将其集成到调整大小事件中以进行准确更新 反过来做 创建一个虚拟游戏地图 在绘制场景
  • 子类 B 继承自模板类 A [重复]

    这个问题在这里已经有答案了 我最近偶然发现了如下代码 但我无法理解它 template
  • Winform 上的 C# ComboBox 是否有 BeforeUpdate

    我来自 VBA 世界 记得有一个BeforeUpdate我可以在组合框上进行调用 现在我使用 C 并且喜欢它 我想知道是否有一个BeforeUpdate呼吁ComboBox在 Winform 上 我可以制作一个不可见的文本框并在那里存储我需
  • 生成一个新终端并写入其标准输出

    我有一个应用程序 它使用 GUI 来完成与用户的大部分界面 不过 我想要一个单独的终端窗口 我可以写入一些错误检查 原始值等 我知道我可以使用以下命令生成一个新终端system 命令 但我不知道是否可以进行交互 在最好的情况下 我希望有一个
  • 定时任务应该放在哪一层?

    我正在尝试使用分层架构来实现 DDD 应用程序 我有 基础设施层 实现应用程序的技术特定部分的层 领域层 包含领域模型的层 应用层 包含与领域模型交互的干扰的层 接口层 从外部接收事件的层 经典的 3 层 基础设施 架构非常清晰 但我的应用
  • 对 URL 进行编码 C#

    所以我有一个看起来像这样的 URL http www test com folder1 id 3 但基本上 当他们单击按钮时 我想在 URL 栏中显示与该 id 关联的值的名称 例如 id 3 是名为 Rollex 的手表 所以我想要读取
  • scala中的反引号有什么用[重复]

    这个问题在这里已经有答案了 我在一本书上找到了以下代码 val list List 5 4 3 2 1 val result 0 list running total next element running total next elem
  • CouchDB 中的分页?

    我将如何实现分页所需的查询 基本上 当请求第 1 页时 获取前 5 个条目 对于第 2 页 获取接下来的 5 页 依此类推 我计划通过 couchdb python 模块使用它 但这不会对实现产生任何影响 The CouchDB 指南 ht
  • 如何解构 React props 并仍然访问其他 props?

    我很好奇如果我想要所有的 props 但也想要解构单个属性 那么组件的参数 props 是否可以像导入一样解构 我想这更像是一个 JavaScript 问题 而不是一个 React 问题 但是举个例子 import React useEff
  • 无需递归即可对可观察结果进行分页 - RxJava

    我有一个非常标准的 API 分页问题 您可以通过一些简单的递归来处理 这是一个捏造的例子 public Observable