为什么 Sinks.many().multicast().onBackPressureBuffer() 在订阅者之一取消订阅后完成以及如何避免它

2024-02-01

我在使用时遇到了我不明白的行为Sinks.Many<String>向多个订阅者通知某些事件:

fun main() {

    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)
}

此代码显示第一个订阅者获取值 1 和 2,第二个订阅者获取值 2。到目前为止一切顺利:

11:49:06.936 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.938 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:49:06.942 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--> 2
11:49:06.943 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(2)
--2> 2

现在,假设第一个订阅者在第一次发射后处置(取消)其订阅,我期望第一个订阅者获得 1,第二个订阅者获得 2:


    val sink : Sinks.Many<String>  = Sinks.many().multicast().onBackpressureBuffer()
    val flux = sink.asFlux().log()

    val d = flux.subscribe {
        println("--> $it")
    }

    sink.emitNext("1", Sinks.EmitFailureHandler.FAIL_FAST)

    d.dispose()

    val d2 = flux.subscribe {
        println("--2> $it")
    }

    sink.emitNext("2", Sinks.EmitFailureHandler.FAIL_FAST)

}
11:51:48.684 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.685 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onNext(1)
--> 1
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - cancel()
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - onSubscribe(EmitterProcessor.EmitterInner)
11:51:48.689 [main] INFO reactor.Flux.EmitterProcessor.1 - request(unbounded)
11:51:48.690 [main] INFO reactor.Flux.EmitterProcessor.1 - onComplete()

然而,当第二个订阅者尝试订阅时,通量被视为已完成。为什么会发生这种情况?我需要 Sinks.Many 可以随时订阅和取消订阅,而无需取消。


我刚刚遇到了同样的问题。

这是由 autoCancel 默认为 true 引起的。不幸的是onBackPressureBuffer javadoc https://projectreactor.io/docs/core/3.4.0-SNAPSHOT/api/reactor/core/publisher/Sinks.MulticastSpec.html#onBackpressureBuffer--没有提及它。

这种行为继承自EmitterProcessor.create https://projectreactor.io/docs/core/3.4.0-SNAPSHOT/api/reactor/core/publisher/EmitterProcessor.html#create--它被记录在哪里。

要将 autoCancel 标志设置为 false,需要使用替代方法背压缓冲区 https://projectreactor.io/docs/core/3.4.0-SNAPSHOT/api/reactor/core/publisher/Sinks.MulticastSpec.html#onBackpressureBuffer-int-boolean-

Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么 Sinks.many().multicast().onBackPressureBuffer() 在订阅者之一取消订阅后完成以及如何避免它 的相关文章

随机推荐

  • Angular-Dart DI 库中的工厂注入

    在我的 Dart 应用程序中 我使用 MVP 模式和 Angular dart 依赖注入库 Angular di 在上面的示例中 我无法注入 MyView 或 MyPresenter 因为这是循环依赖项 class MyView MyPre
  • 术语:前向声明与函数原型

    对我来说 使用 C 编程语言时这些术语本质上是同义词 在实践中 我可能更喜欢文件内原型的 前向声明 而不是通过头文件包含的原型的 函数原型 但当你考虑预处理后会发生什么时 即使这也是人为的区别 也许我错过了一些东西 对于何时使用一个术语与另
  • 解构 Open Layers 3 地图

    所以 我使用 Open Layers 3 和 Ember js 来制作仪表板 并且我已经动态加载地图 但我希望它在我离开路线时被销毁 我发现的唯一东西是 map destroy 但它是针对旧版本的API 新版本中似乎没有 进入地图页面几次后
  • 取消设置 git 配置

    我在 Mac 上使用 FileMerge 来查看差异 并设置为 git config global diff external bin git diff cmd sh 现在我不想再使用 FileMerge 如何恢复到之前的默认设置 Use
  • Zsh 无法正确自动完成我的 ssh 命令

    我在 ssh 自动完成方面遇到一些问题 我希望我的 zsh 在我的 ssh config 文件上自动完成 但到目前为止它只对 etc hosts 文件执行此操作 我发现如何通过添加此配置来不使用主机文件 zstyle completion
  • valgrind - 地址 ---- 是分配大小为 8 的块后的 0 字节

    首先 我知道similar已提出问题 但是 我想问一个关于真正原始 C 数据类型的更一般的简单问题 所以就是这样 In main c我调用一个函数来填充这些字符串 int main int argc char argv char host
  • 有没有API可以从wiki页面获取图像

    我想从维基百科页面获取主图像 我有所有维基百科实体名称 我从中创建维基链接并从该页面获取主图像 我尝试过 https github com richardasaurus wiki api https github com richardas
  • 嵌套的纯函数仍然是纯函数吗?

    根据定义 一个纯函数是纯的 如果 给定相同的输入 将始终返回相同的输出 不产生任何副作用 不依赖于外部状态 所以这是一个纯函数 function foo x return x 2 foo 1 2 foo 2 4 foo 3 6 这也将是一个
  • Angular 6 - 材质文本框浮动占位符不起作用

    我想使用 Angular 6 Material UI 组件来提供更高级的外观和感觉 我在测试程序下运行 但 mat 输入没有提供那种外观和感觉 参考 https material angular io components input ov
  • 有没有办法在 iOS 12.2 的 PWA 中使用 mailto: 或 message: 方案?

    我使用 Ionic 4 构建了一个 PWA 它有一个 联系 按钮 其中包含使用 mailto 方案的简单 href a href Contact a 当从主屏幕启动 PWA 时 这用于打开 iOS 12 1 中的本机邮件应用程序 自从我更新
  • 如何获取和/或覆盖 Mac OSX 中窗口的最小尺寸

    我想调整我的机器上的任何窗口的大小 这是我使用 AppleScript 完成的 使用图形用户界面脚本 http www macosxautomation com applescript uiscripting index html 该脚本类
  • 如何计算 JSON 数据中变量的总和?

    我编写了一个项目 其中字符串以相反的方式返回 PostMapping reverse public String reverseList RequestBody String string List
  • 关于Android NDK libc++ libc++_shared、libstdc++的困惑

    我在尝试使用 Android NDK 23 23 1 7779620 构建一个简单的 C 库时感到非常困惑 我正在使用 CMake 这是一个非常简单的程序 CMakeLists txt cmake minimum required VERS
  • Flex:如何创建一个全新的组件?

    我想为 Flex 开发一个网络图形应用程序 想象一下将节点放置在 Canvas 上并用链接将它们连接起来 节点应具有可编辑文本和其他 UI 组件 我试图找到从头开始创建全新 UI 组件的示例 但我所能找到的只是扩展现有组件的琐碎示例 例如
  • 将数字排列成最大数 - 算法证明

    有众所周知的算法问题 http www programcreek com 2014 02 leetcode largest number java 给定数字数组 例如 1 20 3 14 在这种情况下 以尽可能形成最大数字的方式排列数字32
  • 有 ASP.Net 基准测试工具吗? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想测量 Net 应用程序的性能 特别是在服务器中运行时 ASP Net 中的 Web 应用程序 我需
  • 日期选择器给出的时间不是异常之间的时间

    我正在使用日期选择器 想要将最小日期设置为今天 将最大日期设置为一年后的今天 我这样做是这样的 datePickerDialog getDatePicker setMinDate System currentTimeMillis 1000
  • 无法在 React 输入文本字段中输入内容

    我正在尝试我的第一个 React js 但很早就被难住了 我有下面的代码 它将搜索表单呈现为 div div 但在搜索框中输入内容却没有任何作用 大概是在传递 props 和 state 的过程中缺少了一些东西 这似乎是一个常见的问题 但我
  • 如何计算图像中“绿点”的数量?

    你好 我有一堆图像 让我们假设它们都具有相同的大小 图像有黑色背景和一些准圆形绿点 代表荧光 我必须计算金额 百分比 每个图像的荧光 IE 绿点的面积 知道如何做到这一点 例如在 Java 中吗 这是图像处理中的一个标准问题 称为图像分割
  • 为什么 Sinks.many().multicast().onBackPressureBuffer() 在订阅者之一取消订阅后完成以及如何避免它

    我在使用时遇到了我不明白的行为Sinks Many