Project Reactor:如何控制通量排放

2024-02-22

我有一个能发出一些光的通量Date. This Date映射到我在某些设备上运行的 1024 个模拟 HTTP 请求Executer.

我想做的是等待所有 1024 个 HTTP 请求,然后再发出下一个请求Date.

目前运行时,onNext()被调用多次,然后稳定在某个稳定的速率上。

我怎样才能改变这种行为?

附:如果需要的话,我愿意转向架构。

private void run() throws Exception {

    Executor executor = Executors.newFixedThreadPool(2);

    Flux<Date> source = Flux.generate(emitter ->
        emitter.next(new Date())
    );

    source
            .log()
            .limitRate(1)
            .doOnNext(date -> System.out.println("on next: " + date))
            .map(date -> Flux.range(0, 1024))
            .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
                    .subscribeOn(Schedulers.fromExecutor(executor)))
            .subscribe(s -> System.out.println(s));

    Thread.currentThread().join();
}

HTTP请求模拟:

private static String simulateHttp() {
    try {
        System.out.println("start http call");
        Thread.sleep(3_000);
    } catch (Exception e) {}

    return "HTML content";
}

编辑:改编自答案的代码:

  • 首先,我的代码中有一个错误(另一个flatMap需要)
  • 其次,我补充说concurrency的参数1二者皆是flatMap(貌似两者都需要)

    Executor executor = Executors.newSingleThreadExecutor();
    
    Flux<Date> source = Flux.generate(emitter -> {
        System.out.println("emitter called!");
        emitter.next(new Date());
    });
    
    source
            .limitRate(1)
            .map(date -> Flux.range(0, 16))
            .flatMap(Function.identity(), 1) # concurrency = 1
            .flatMap(i -> Mono.fromCallable(Pipeline::simulateHttp)
                    .subscribeOn(Schedulers.fromExecutor(executor)), 1) # concurrency = 1
            .subscribe(s -> System.out.println(s));
    
    Thread.currentThread().join();
    

您应该看看这些方法:

  • Flux.flatMap(Function, int, int) https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#flatMap-java.util.function.Function-int-int-
  • Flux.concatMap(Function, int) https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#concatMap-java.util.function.Function-int-.

concatMap确保在算子内按顺序处理通量上的元素:

内部消息的生成和订阅:该操作员正在等待一个 在生成下一个之前完成内部并订阅 它。

flatMap让你通过暴露来做同样的事情concurrency and prefetch参数可以让您更好地控制此行为:

并发参数允许控制可以有多少个发布者 并行订阅和合并。反过来,该论点表明 向上游发出的第一个 Subscription.request(long) 的大小。这 prefetch 参数允许给定任意预取大小 合并的发布者(换句话说,预取大小意味着 第一个 Subscription.request(long) 到合并的发布者)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Project Reactor:如何控制通量排放 的相关文章

  • Java中字符串中特殊字符的替换

    Java中如何替换字符串 E g String a adf sdf 如何替换和避免特殊字符 您可以删除除此之外的所有字符可打印的 ASCII 范围 http en wikipedia org wiki ASCII ASCII printab
  • Java - 为什么不允许 Enum 作为注释成员?

    It says 原始 String Class an Enum 另一个注释 上述任何一个的数组 只有这些类型才是合法的 Annotation 成员 为什么泛型 Enum 不能成为 Annotation 的成员 例如 Retention Re
  • Hibernate注解放置问题

    我有一个我认为很简单的问题 我见过两种方式的例子 问题是 为什么我不能将注释放在字段上 让我举一个例子 Entity Table name widget public class Widget private Integer id Id G
  • 插入最大日期(独立于数据库)

    在我的本地设置中 我使用一个简单的 H2 数据库 托管 解决方案将有另一个 类似但不相同 数据库 我需要将最大可能日期插入到日期时间列中 我尝试使用 Instant MAX 但是 这会导致列中出现 169104626 12 11 20 08
  • org.apache.sling.api.resource,version=[2.3,3) -- 无法解析

    您好 我无法访问我的项目内容 我已经上传了从 CQ 访问内容所需的所有包 我唯一能看到的是 org apache sling api resource version 2 3 3 无法解析 这是否是异常的原因 如果是 请告诉我如何解决 中Q
  • 比较两个文本文件的最快方法是什么,不将移动的行视为不同

    我有两个文件非常大 每个文件有 50000 行 我需要比较这两个文件并识别更改 然而 问题是如果一条线出现在不同的位置 它不应该显示为不同的 例如 考虑这个文件A txt xxxxx yyyyy zzzzz 文件B txt zzzzz xx
  • 使用 AES SecretKey 的 Java KeyStore setEntry()

    我目前正在 Java 中开发一个密钥处理类 特别是使用 KeyStore 我正在尝试使用 AES 实例生成 SecretKey 然后使用 setEntry 方法将其放入 KeyStore 中 我已经包含了代码的相关部分 The KS Obj
  • Java 文件上传速度非常慢

    我构建了一个小型服务 它从 Android 设备接收图像并将其保存到 Amazon S3 存储桶中 代码非常简单 但是速度非常慢 事情是这样的 public synchronized static Response postCommentP
  • 在 S3 中迭代对象时出现“ConnectionPoolTimeoutException”

    我已经使用 aws java API 一段时间了 没有遇到太多问题 目前我使用的是库 1 5 2 版本 当我使用以下代码迭代文件夹内的对象时 AmazonS3 s3 new AmazonS3Client new PropertiesCred
  • 在游戏视图下添加 admob

    我一直试图将 admob 放在我的游戏视图下 这是我的代码 public class HoodStarGame extends AndroidApplication Override public void onCreate Bundle
  • 如何在 Java 中测试一个类是否正确实现了 Serialized(不仅仅是 Serialized 的实例)

    我正在实现一个可序列化的类 因此它是一个与 RMI 一起使用的值对象 但我需要测试一下 有没有办法轻松做到这一点 澄清 我正在实现该类 因此在类定义中添加 Serialized 很简单 我需要手动序列化 反序列化它以查看它是否有效 我找到了
  • 编辑文件名在 JComboBox 中的显示方式,同时保持对文件的访问

    我对 Java 很陌生 对堆栈溢出也很陌生 我正在尝试利用 JMF API 创建一个用 Java 编码的简单媒体播放器 到目前为止 我已经能够设置一个简单的队列 播放列表来使用JComboBox called playListHolder
  • 如何在selenium服务器上提供自定义功能?

    我知道可以通过某种方法获得一些硒功能 其中之一如下 driver getCapabilities getBrowserName 它返回浏览器名称的值 但如果它指的是一个可用的方法 如果我没有误解的话 这似乎与自定义功能有关 就像我的意思是
  • Javafx过滤表视图

    我正在尝试使用文本字段来过滤表视图 我想要一个文本字段 txtSearch 来搜索 nhs 号码 名字 姓氏 和 分类类别 我尝试过在线实施各种解决方案 但没有运气 我对这一切仍然很陌生 所以如果问得不好 我深表歉意 任何帮助将不胜感激 我
  • IntelliJ - 调试模式 - 在程序内存中搜索文本

    我正在与无证的第三方库合作 我知道有一定的String存储在库深处的某个字段中的某处 我可以预测的动态值 但我想从库的 API 中获取它 有没有一种方法可以通过以下方式进行搜索 类似于全文搜索 full程序内存处于调试模式并在某个断点处停止
  • Jersey 客户端请求中未设置 Content-Length-Header

    我正在使用 Jersey Client 访问网络服务 如下所示 response r accept MediaType TEXT PLAIN TYPE header content length 0 post String class 其中
  • java.lang.NumberFormatException: Invalid int: "3546504756",这个错误是什么意思?

    我正在创建一个 Android 应用程序 并且正在从文本文件中读取一些坐标 我在用着Integer parseInt xCoordinateStringFromFile 将 X 坐标转换为整数 Y 坐标的转换方法相同 当我运行该应用程序时
  • HQL Hibernate 内连接

    我怎样才能在 Hibernate 中编写这个 SQL 查询 我想使用 Hibernate 来创建查询 而不是创建数据库 SELECT FROM Employee e INNER JOIN Team t ON e Id team t Id t
  • 将 Azure AD 高级自定义角色与 Spring Security 结合使用以进行基于角色的访问

    我创建了一个演示 Spring Boot 应用程序 我想在其中使用 AD 身份验证和授权 并使用 AD 和 Spring Security 查看 Azure 文档 我执行了以下操作 package com myapp contactdb c
  • 调整添加的绘制组件的大小和奇怪的摆动行为

    这个问题困扰了我好几天 我正在制作一个特殊的绘画程序 我制作了一个 JPanel 并添加了使用 Paint 方法绘制的自定义 jComponent 问题是 每当我调整窗口大小时 所有添加的组件都会 消失 或者只是不绘制 因此我最终会得到一个

随机推荐

  • Runtime.getRuntime().exec(String[]) 安全性

    我正在使用 Runtime getRuntime exec String 来运行进程 其中 String 数组的某些元素是由用户定义的 这安全吗 或者它允许将代码注入终端吗 如果不安全 我该怎么做才能避免代码注入 它必须是平台独立的 正如我
  • 如何在迭代时向列表添加值[重复]

    这个问题在这里已经有答案了 我有一个这样的场景 List
  • Android-R.java 文件未找到

    在处理 android 项目时 我被 R java 文件困住了 即使我清理项目 项目 gt clean 也找不到该文件 但该文件仍然找不到 即使我创建一个相同的新项目出现问题 我需要做什么 我期待有价值的答复 以便我可以克服这个问题 R j
  • 不知道如何导出 Objective-C 类。 i386 体系结构的未定义符号

    我正在尝试在 OSX 上的 GTK 上做一些工作 但遇到了一些麻烦 因为说实话 我对 Objective C 不太熟悉 我有足够的编程经验 可以很快掌握基本语法 并且可以在文档中查找我需要的内容 但我遇到的问题与链接库并将类暴露给我链接的程
  • 使用 Linkify.addLinks 与 Html.fromHtml 结合使用

    我有一个TextView通过调用以下命令获取其数据集 tv setText Html fromHtml myText 字符串myText包含部分格式化的 html 数据 例如 它可能有字体标签 但没有任何使用格式设置的 url 链接 a h
  • 通过 MsBuildProj 文件转换多个项目的多个配置文件

    我正在尝试根据模式 所有形式的文件 在文件列表上运行多个命令 config在给定目录的子目录下 如下所示
  • Node.js docker 容器未更新以适应卷的变化

    我正在尝试在我的 Windows 计算机上托管一个开发环境 该计算机托管前端和后端容器 到目前为止 我只在后端工作 所有文件都位于 C 盘上 通过 Docker Desktop 共享 我有以下 docker compose 文件和 Dock
  • 通过 Response.ContentType、Response.End 输出文件时如何显示进度状态/旋转器?

    我有一个网络表单下载链接按钮 在按钮的点击事件上我正在获取数据 然后生成 XLSX 文件供下载 在文件生成过程中 响应 Clear 叫做 响应内容类型被设定并最终响应 End 叫做 我需要显示微调器 gif在那次操作期间 文件生成并弹出文件
  • 角度区域

    什么是区域 Angular ngZone 与 zone js 有何不同 什么时候应该使用它们 有人可以帮助提供使用 ngZone 的实际示例吗 我在这里浏览了角度文档 但是我无法完全理解 https angular io api core
  • Systemd http 健康检查

    我在 Redhat 7 1 上有一个服务 我使用 systemctl 启动 停止 重新启动和状态来控制 有一次 systemctl 状态返回 active 但服务 背后 的应用程序响应的 http 代码与 200 不同 我知道我可以使用 M
  • 每次插入数据库时​​如何找到数据字段(例如电子邮件)的唯一性?

    我正在开发一个 Android 应用程序 用户在其中输入姓名 电子邮件和密码进行注册 这个输入过程工作得很好 现在我想在每次用户输入他 她的电子邮件时检查输入的电子邮件是否已存在于我的数据库中 为此 我在 DBHelper 类中尝试了以下方
  • 4.1 android模拟器未检测到sd卡

    我曾经使用 4 1 kitkat x86 android 模拟器和 SD 卡进行测试 将 Android Studio 升级到 2 3 后 我无法再访问 android 中提供的 SD 卡 这使得我无法进行测试 谷歌还没有对此的答案 我也没
  • 等待所有 pid 在 php 中退出

    我的问题是这样的 我正在分叉一个进程 以便可以加快磁盘上文件的访问时间 我将这些文件中的所有数据存储在本地桌面上的 tmp 文件中 理想情况下 在所有进程完成后 我需要访问该 tmp 文件并将该数据放入数组中 然后我取消链接 tmp 文件
  • 自动化 sftp 上传过程

    我正在寻找一种将文件 目录结构从一台服务器上传到另一台服务器的方法 在我的情况下 唯一可能的方法是 SFTP 上传 有没有简单的方法来上传它 使用脚本或其他东西 而不需要对文件 目录进行存档 我想在远程服务器上重新创建 谢谢你 也许可以使用
  • 如何获取不带参数的文件名?

    我需要找到我包含的不带 GET 参数的文件的文件名 例如 如果当前 URL 是 我想要返回 file php 我发现了什么 basename SERVER REQUEST URI 返回 file php a b c d 就我而言 我在购物车
  • DataAnnotations:递归验证整个对象图

    我有一个对象图 上面散布着 DataAnnotation 属性 其中对象的某些属性是本身具有验证属性的类 等等 在以下场景中 public class Employee Required public string Name get set
  • 什么东西永远不等于自己?

    Prolog 中是否存在不等于其自身的价值 我写的answer https stackoverflow com a 53404595 10631003对某些人关于树的最小值的问题 https stackoverflow com q 5339
  • PHP如何检索数组值

    我有以下数组 我想检索name comment and each of the tags 插入数据库 我如何检索数组值 另外 我可以仅过滤大于 3 个字符且仅包含 a Z0 9 值的标签值吗 非常感谢 Array folder gt tes
  • Facebook Like 按钮以 0 宽度和 0 高度呈现?

    我是 facebook api 的新手 所以我不知道这是否是一个新手问题 我所做的是我遵循快速开始 https developers facebook com docs javascript quickstart v2 3 我将以下代码片段
  • Project Reactor:如何控制通量排放

    我有一个能发出一些光的通量Date This Date映射到我在某些设备上运行的 1024 个模拟 HTTP 请求Executer 我想做的是等待所有 1024 个 HTTP 请求 然后再发出下一个请求Date 目前运行时 onNext 被