如何并行处理 Flux 事件?

2023-11-25

我有需要丰富的传入事件流,然后在它们到达时并行处理。

我以为 Project Reactor 是为这项工作定制的,但在我的测试中,所有处理似乎都是串行完成的。

这是一些测试代码:

ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.of(10, ChronoUnit.MILLIS))
        .map(i-> {
            System.out.println("ReactorTests.test " + Thread.currentThread());
            sleep(1000L); // simulate IO delay
            return String.format("String %d", i);
        })
        .take(3)
//    .subscribeOn(Schedulers.elastic());
//    .subscribeOn(Schedulers.newParallel("test"));
//    .subscribeOn(Schedulers.fromExecutor(executor));
;
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread()), 
               System.out::println, 
               ()-> System.out.println("Done"));
System.out.println("DONE AND DONE");

我尝试取消注释每个注释行,但是在每种情况下,输出都表明使用同一线程来处理所有事件

Main thread: Thread[main,5,main]
[DEBUG] (main) Using Console logging
DONE AND DONE
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
Done

(唯一的区别是,如果没有调度程序,它们在订阅线程上运行,而对于任何执行程序,它们都在同一个线程中运行,该线程不是订阅线程。)

我缺少什么?

仅供参考,有一种“睡眠”方法:

public static void sleep(long time) {
    try {
        Thread.sleep(time);
    } catch (InterruptedException e) {
        System.out.println("Exiting");
    }
}

并行处理项目的一种方法是使用.parallel / .runOn

flux
    .parallel(10)
    .runOn(scheduler)
    //
    // Work to be performed in parallel goes here.  (e.g. .map, .flatMap, etc)
    //
    // Then, if/when you're ready to go back to sequential, call .sequential()
    .sequential()

阻塞操作(例如阻塞 IO,或Thread.sleep)将阻塞执行它们的线程。反应式流无法神奇地将阻塞方法转变为非阻塞方法。因此,您需要确保阻塞方法在Scheduler适合阻塞操作(例如Schedulers.boundedElastic()).

在上面的示例中,由于您知道正在调用阻塞操作,因此可以使用.runOn(Schedulers.boundedElastic()).

根据用例,您还可以使用异步运算符,例如.flatMap结合.subscribeOn or .publishOn将特定的阻塞操作委托给另一个人Scheduler, as 项目反应器文档中描述。例如:

flux
    .flatMap(i -> Mono.fromCallable(() -> {
            System.out.println("ReactorTests.test " + Thread.currentThread());
            sleep(1000L); // simulate IO delay
            return String.format("String %d", i);
        })
        .subscribeOn(Schedulers.boundedElastic()))

实际上,.flatMap还有一个重载变体,需要一个concurrency参数,您可以在其中限制运行中的内部序列的最大数量。这可以用来代替.parallel在某些用例中。它会not一般为Flux.interval不过,自从Flux.interval不支持补充速度慢于刻度的下游请求。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何并行处理 Flux 事件? 的相关文章

随机推荐

  • 从 .NET 应用程序捕获控制台输出 (C#)

    如何从 NET 应用程序调用控制台应用程序并捕获控制台中生成的所有输出 请记住 我不想先将信息保存在文件中 然后重新列出 因为我希望实时接收它 使用以下命令可以很容易地实现这一点ProcessStartInfo RedirectStanda
  • SCRIPT、STYLE 和 LINK 元素上的类型属性是否仍然需要?

    您将看到许多网站具有以下类型的代码 脚本元素 链接元素 风格元素 我的问题是这样的 Are the type当今流行的浏览器需要哪些属性 通俗指 IE 8 Firefox Webkit Opera 和 Chrome 如果你会发生什么do n
  • 在 Android Service 类中哪里停止/销毁线程?

    我通过以下方式创建了线程服务 public class TCPClientService extends Service Override public void onCreate Measurements new LinkedList
  • psycopg2 的 AWS Lambda 层

    我正在尝试创建一个新的 lambda 层来使用 psycopg2 导入 zip 文件 因为该库使我的部署包超过 3MB 并且我再也看不到 lambda 函数中的内联代码 我使用 Python 3 7 为以下 2 种情况创建了 lambda
  • 导入模块只是为了运行它

    我有一个 JavaScript 文件 它为我正在使用的表单验证库注册验证器 这些验证器可以通过该库访问 因此我不需要将其导入到任何地方 我只需要确保它运行一次 如何在 es6 中以这种方式导入模块 项目中执行此操作的最佳位置是什么 我目前将
  • 如何在firebase云函数中从通配符获取数据

    假设我触发以下事件ref users userId items newItem 我想从通配符内部获取一个值userId 我努力了var token event params userId token但它返回未定义 有什么建议么 在 fire
  • Gradle:如何从 JScience jar 依赖项中排除 javax.realtime 包(多个 dex 定义)

    我在 Android 应用程序中使用 Gradle 我想使用 JScience 库依赖项 我以这种方式添加了库 dependencies compile fileTree dir libs include jar compile org j
  • 自定义弹出编辑器中的 kendoui 验证工具提示未正确定位

    请参见jsfiddle例如 空白 名字 字段以显示验证工具提示 在正常形式中 验证工具提示正确地位于每个元素的右侧 但在网格的弹出编辑器中 它仍然尝试将工具提示放置在元素下方 就像它在内联编辑一样 我努力了 span class k inv
  • 在 Perl 中覆盖区分大小写的正则表达式

    是否可以覆盖 Perl 中先前定义的正则表达式的区分大小写 例如 如果我有以下内容 my upper qr BLAH x my lower qr upper xi warn blah lower 我希望第三行打印出正匹配 您可以添加 i正则
  • 如何让 32 位 Perl 读取 64 位 Windows 注册表?

    我有一个 32 位 Perl 安装程序 使用它我需要能够安装和卸载 32 位和 64 位应用程序 安装 32 位和 64 位就可以了 卸载32位也可以 但是 我在卸载 64 位应用程序时遇到问题 应用程序只知道应用程序的名称 如控制面板中的
  • React 路由器重定向条件

    我正在尝试制作一个按钮 仅在验证正确完成后将用户重定向到新页面 有没有办法做这样的事情 如何在类方法内激活路由 import validator from validator class Example constructor props
  • 使用 microsoft.web.helpers 后登录重定向发生变化

    在 asp net mvc3 网站中 我导入了 microsoft web helpers webmatrix data 和 webmatrix webdata 之后 我发现当我在控制器中的某些 ActionResults 上使用 Auth
  • Rails 3 引擎和静态资源

    我正在构建一个捆绑为 gem 的引擎 gmaps4rails 我将引擎的 public 复制到了 Rails 应用程序的 public 中 在开发中一切正常 但在生产中无法正常工作 似乎找不到静态资产 我的引擎和我的主应用程序 日志讲述了以
  • Bash 脚本将日期和时间列转换为 .csv 中的 unix 时间戳

    我正在尝试创建一个脚本来将 csv 文件中的两列 日期和时间 转换为 unix 时间戳 因此 我需要从每一行获取日期和时间列 将其转换并将其插入到末尾包含时间戳的附加列中 有人可以帮助我吗 到目前为止 我已经发现了将任何给定时间和日期转换为
  • WaitHandle.WaitAny 和 Semaphore 类

    Edit 我想说这个问题只是暂时的精神错乱 但当时这是有道理的 见下面的编辑2 对于 NET 3 5 项目 我有两种类型的资源 R1 and R2 我需要检查其可用性 每种资源类型在任何时候都可以有 比如说 10 个实例 当任一类型的资源可
  • 如何访问 MIPS 中字的各个位的状态?

    我正在编写一个程序 我需要确定是否设置了位 3 和 6 我知道我可以旋转一个单词或左 右移动它 但如何访问各个位的状态呢 我是否使用像 and xor 这样的按位运算符 您可以使用 0x08 和 0x40 进行按位与运算 假设位 0 是最低
  • 如何枚举私有 JavaScript 类字段

    我们如何通过私有类字段进行枚举 class Person isFoo true isBar false constructor first last this firstName first this lastName last enume
  • Fetch Api 无法从 PHP 服务器获取会话

    我在我的应用程序中使用 Fetch Api 我有一个 PHP 服务器页面来获取之前已经定义的会话数据 看起来像这样
  • NHibernate中的inverse和cascade是什么意思

    我正在学习 Fluent Nhibernate 我的问题是 什么是Inverse意思是 我读到这意味着关系的另一方负责储蓄 也是如此Cascade 有人可以解释一下它们之间有什么区别吗 请详细解释一下 因为我是NH的新手 看一下本文 链接已
  • 如何并行处理 Flux 事件?

    我有需要丰富的传入事件流 然后在它们到达时并行处理 我以为 Project Reactor 是为这项工作定制的 但在我的测试中 所有处理似乎都是串行完成的 这是一些测试代码 ExecutorService executor Executor