RxJava Observable.fromEmitter 奇怪的背压行为

2024-03-02

我一直在利用Observable.fromEmitter()作为一个绝佳的替代品Observable.create()。我最近遇到了一些奇怪的行为,但我不太明白为什么会出现这种情况。我真的很感谢对背压和调度程序有一定了解的人来看看这个。

public final class EmitterTest {
  public static void main(String[] args) {
    Observable<Integer> obs = Observable.fromEmitter(emitter -> {
      for (int i = 1; i < 1000; i++) {
        if (i % 5 == 0) {
          sleep(300L);
        }

        emitter.onNext(i);
      }

      emitter.onCompleted();
    }, Emitter.BackpressureMode.LATEST);

    obs.subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.computation())
        .subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"

    sleep(10000L);
  }

  private static void sleep(Long duration) {
    try {
      Thread.sleep(duration);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

该应用程序的输出是

Received 1
Received 2
...
Received 128

然后它仍然停留在 128(大概是因为这是 RxJava 的默认缓冲区大小)。

如果我更改指定的模式fromEmitter() to BackpressureMode.NONE,那么代码将按预期工作。如果我删除对observeOn(),它也按预期工作。有谁能够解释为什么会出现这种情况吗?


这是同池死锁的情况。subscribeOn安排下游request在它正在使用的同一线程上,但如果该线程正忙于睡眠/发射循环,则请求永远不会传递到fromEmitter因此一段时间后LATEST如果主源等待足够长的时间,就会开始删除元素,直到最后一个值 (999) 被传递为止。 (这与以下情况类似onBackpressureBlock我们删除了。)

If subscribeOn如果没有执行此请求调度,该示例将正常工作。

我已经打开了an issue https://github.com/ReactiveX/RxJava/issues/4735制定解决方案。

目前的解决方法是使用更大的缓冲区大小observeOn(有过载)或使用fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava Observable.fromEmitter 奇怪的背压行为 的相关文章

  • Java 中等效的并行扩展

    我在 Net 开发中使用并行扩展有一些经验 但我正在考虑在 Java 中做一些工作 这些工作将受益于易于使用的并行库 JVM 是否提供任何与并行扩展类似的工具 您应该熟悉java util concurrent http java sun
  • 在 Java 中连接和使用 Cassandra

    我已经阅读了一些关于 Cassandra 是什么以及它可以做什么的教程 但我的问题是如何在 Java 中与 Cassandra 交互 教程会很好 如果可能的话 有人可以告诉我是否应该使用 Thrift 还是 Hector 哪一个更好以及为什
  • 如何使用 Java 和 Selenium WebDriver 在 C 目录中创建文件夹并需要将屏幕截图保存在该目录中?

    目前正在与硒网络驱动程序和代码Java 我有一种情况 我需要在 C 目录中创建一个文件夹 并在该文件夹中创建我通过 selenium Web 驱动程序代码拍摄的屏幕截图 它需要存储在带有时间戳的文件夹中 如果我每天按计划运行脚本 所有屏幕截
  • Java - 将节点添加到列表的末尾?

    这是我所拥有的 public class Node Object data Node next Node Object data Node next this data data this next next public Object g
  • 使用 Android 发送 HTTP Post 请求

    我一直在尝试从 SO 和其他网站上的大量示例中学习 但我无法弄清楚为什么我编写的示例不起作用 我正在构建一个小型概念验证应用程序 它可以识别语音并将其 文本 作为 POST 请求发送到 node js 服务器 我已确认语音识别有效 并且服务
  • Spark 1.3.1 上的 Apache Phoenix(4.3.1 和 4.4.0-HBase-0.98)ClassNotFoundException

    我正在尝试通过 Spark 连接到 Phoenix 并且在通过 JDBC 驱动程序打开连接时不断收到以下异常 为简洁起见 下面是完整的堆栈跟踪 Caused by java lang ClassNotFoundException org a
  • 斯坦福 NLP - 处理文件列表时 OpenIE 内存不足

    我正在尝试使用斯坦福 CoreNLP 中的 OpenIE 工具从多个文件中提取信息 当多个文件 而不是一个 传递到输入时 它会给出内存不足错误 All files have been queued awaiting termination
  • 禁止的软件包名称:java

    我尝试从数据库名称为 jaane 用户名 Hello 和密码 hello 获取数据 错误 java lang SecurityException Prohibited package name java at java lang Class
  • Java TestNG 与跨多个测试的数据驱动测试

    我正在电子商务平台中测试一系列商店 每个商店都有一系列属性 我正在考虑对其进行自动化测试 是否有可能有一个数据提供者在整个测试套件中提供数据 而不仅仅是 TestNG 中的测试 我尝试不使用 testNG xml 文件作为机制 因为这些属性
  • 在两个活动之间传输数据[重复]

    这个问题在这里已经有答案了 我正在尝试在两个不同的活动之间发送和接收数据 我在这个网站上看到了一些其他问题 但没有任何问题涉及保留头等舱的状态 例如 如果我想从 A 类发送一个整数 X 到 B 类 然后对整数 X 进行一些操作 然后将其发送
  • JRE 系统库 [WebSphere v6.1 JRE](未绑定)

    将项目导入 Eclipse 后 我的构建路径中出现以下错误 JRE System Library WebSphere v6 1 JRE unbound 谁知道怎么修它 右键单击项目 特性 gt Java 构建路径 gt 图书馆 gt JRE
  • 使用Caliper时如何指定命令行?

    我发现 Google 的微型基准测试项目 Caliper 非常有趣 但文档仍然 除了一些示例 完全不存在 我有两种不同的情况 需要影响 JVM Caliper 启动的命令行 我需要设置一些固定 最好在几个固定值之间交替 D 参数 我需要指定
  • 总是使用 Final?

    我读过 将某些东西做成最终的 然后在循环中使用它会带来更好的性能 但这对一切都有好处吗 我有很多地方没有循环 但我将 Final 添加到局部变量中 它会使速度变慢还是仍然很好 还有一些地方我有一个全局变量final 例如android Pa
  • 加密 JBoss 配置中的敏感信息

    JBoss 中的标准数据源配置要求数据库用户的用户名和密码位于 xxx ds xml 文件中 如果我将数据源定义为 c3p0 mbean 我会遇到同样的问题 是否有标准方法来加密用户和密码 保存密钥的好地方是什么 这当然也与 tomcat
  • 如何从终端运行处理应用程序

    我目前正在使用加工 http processing org对于一个小项目 但是我不喜欢它附带的文本编辑器 我使用 vim 编写所有代码 我找到了 pde 文件的位置 并且我一直在从 vim 中编辑它们 然后重新打开它们并运行它们 重新加载脚
  • Android 中麦克风的后台访问

    是否可以通过 Android 手机上的后台应用程序 服务 持续监控麦克风 我想做的一些想法 不断聆听背景中的声音信号 收到 有趣的 音频信号后 执行一些网络操作 如果前台应用程序需要的话 后台应用程序必须能够智能地放弃对麦克风的访问 除非可
  • 声明的包“”与预期的包不匹配

    我可以编译并运行我的代码 但 VSCode 中始终显示错误 早些时候有一个弹出窗口 我不记得是什么了 我点击了 全局应用 从那以后一直是这样 Output is there but so is the error The declared
  • 使用 JMF 创建 RTP 流时出现问题

    我正处于一个项目的早期阶段 需要使用 RTP 广播DataStream创建自MediaLocation 我正在遵循一些示例代码 该代码目前在rptManager initalize localAddress 出现错误 无法打开本地数据端口
  • 按日期对 RecyclerView 进行排序

    我正在尝试按日期对 RecyclerView 进行排序 但我尝试了太多的事情 我不知道现在该尝试什么 问题就出在这条线上适配器 notifyDataSetChanged 因为如果我不放 不会显示错误 但也不会更新 recyclerview
  • 使用 xpath 和 vtd-xml 以字符串形式获取元素的子节点和文本

    这是我的 XML 的一部分

随机推荐

  • CSS 位置元素“固定”在滚动容器内

    我想知道是否有人找到了解决方案 我正在寻找一种将元素附加到滚动容器顶部的解决方案 HTML div class container div class header title div div class element div about
  • CSS响应中心部门

    我想将一些有背景图像的 div 居中 该 div 的响应存在问题 因为如果我将宽度设置为 80 高度设置为 80 则背景图像不会位于中心 我尝试了一切 但图片不能只站在中心 如果浏览器更小或更大 这是一个非常大的问题 所以如果你看图片 我想
  • R Shiny:从 Excel 复制单元格并将其粘贴到 Shiny 应用程序中,然后使用它们创建数据表

    我正在开发一个 R Shiny 应用程序 我需要开发以下功能 我需要从 Excel 中复制单元格行 开始时一次一列 然后使用 selectizeInput textInput 或 textAreaInput 将它们粘贴到 Shiny 中 数
  • 将 STL 容器 转换为容器

    我正在寻找一种方法来制定具有以下内容的课程 使用具有最大 常量 的指针的 STL 容器的接口 但它会在内部改变所指向的对象 与非常量模拟相比 没有额外的运行时开销 理想情况下 与非常量版本相比 该解决方案不会编译为额外的代码 因为常量 非常
  • 有条件禁用/重新启用 jQuery 单击事件

    我在禁用和重新启用链接上的点击事件时遇到问题 设置为一行 4 列 每列包含一个链接和隐藏内容框 单击链接时 它会展开该行并显示特定于该列的内容框 单击链接并展开行后 所有其他链接都会淡出 然后 您可以重新单击打开的链接以关闭该行并取消淡入淡
  • 如何使用 NDK 17 为 64 位 Android 构建 OpenSSL 1.1.1

    无法为 64 位 Android 构建 OpenSSL 以下是我已采取的步骤 下载了setenv android sh from https wiki openssl org images 7 70 Setenv android sh ht
  • 使用 lubridate 进行矢量化时区转换

    我有一个数据框 其中包含一列日期时间字符串 library tidyverse library lubridate testdf data frame mytz c Australia Sydney Australia Adelaide A
  • 从文本文件中读取并将其加载到 matlab 中的矩阵中[重复]

    这个问题在这里已经有答案了 我有一个名为坐标 txt 的文本文件 格式如下 0 0 0 0 95 0 32 0 02 1 02 0 26 0 96 0 73 0 6 0 52 0 77 0 6 0 71 0 28 0 0 95 0 14 0
  • N个矩形的并集周长

    我想知道解决这个问题的有效方法 给定N个矩形 并给出左上角和右下角 请求N个矩形的并集周长 我只有O N 2 算法太慢 所以请寻找更高效的算法 您可以假设坐标值为正整数且小于 100000 EDIT For example in this
  • simpleXML 根据属性获取节点子节点[重复]

    这个问题在这里已经有答案了 我正在尝试解析我通过其属性之一引用的节点的值 但我不确定语法 XML
  • Netty如何使用线程池?

    您能解释一下 Netty 如何使用线程池来工作吗 我是否理解正确 有两种线程池 老板和工人 Boss 用来做 I O worker 用来调用用户回调 messageReceived 来处理数据 这是来自 NioServerSocketCha
  • 使用什么工具来解析Python中的编程语言?

    您可以推荐哪种 Python 工具来解析编程语言 它应该允许源代码中语言语法的可读表示 并且应该能够扩展到复杂的语言 语法像 Python 本身一样复杂的语言 当我搜索时 我主要找到 pyparsing 我将对其进行评估 但当然我对其他替代
  • 用于单元/集成测试的嵌入式动物园管理员

    是否有嵌入式动物园管理员以便我们可以在单元测试中使用它 它可以与测试一起发货并开箱即用 也许我们可以模拟一些服务并注册到嵌入式动物园管理员 The Curator https github com Netflix curator wiki框
  • Mac 版 Github:缺少拉取请求按钮

    周六 我在 Mac 上安装了 Github Desktop 并尝试了 Github Workflow 创建一个分支 提交更改并执行拉取请求 一切顺利 今天我在工作中安装了 Mac 版 Github 但找不到 Pull Request 按钮
  • Typescript 在 vs 2015 ctp 6 中禁用保存时编译

    我需要知道如何禁用打字稿文件保存时编译 默认情况下启用 Typescript 编译 您可以执行下一步来禁用它 选择并单击 卸载项目 菜单项 选择已卸载的项目并单击 编辑 kproj 将新的 PropertyGroup 节点添加到项目根节点
  • 捆绑链接的 JavaScript 文件

    我正在使用 Visual Studio 2012 和 MVC4 我已将链接文件 来自另一个项目 添加到我的 MVC4 应用程序中 以下是该文件的属性 构建操作 内容 复制到输出目录 不复制 这是我的捆绑包的示例 bundles Add ne
  • WPF DataGrid AlternatingRowBackground 和 RowStyle 优先级

    我该如何做我的RowStyle后申请AlternatingRowBackground 我想要物品 有IsOrange as true具有Orange背景 无论交替的行背景如何 目前情况并非如此 XAML
  • 当选择文本时,如何用我自己的视图替换 UIMenuController?

    当选择文本时 默认情况下会弹出一个 UIMenuController 其中包含剪切 复制 粘贴等功能 我想用我自己的自定义视图替换它 外观相似 但高两倍 以便我可以有两行按钮 自定义视图 我怎样才能做到这一点 我知道没有简单的方法 我预计即
  • Mongodb动态like运算符

    在 mongodb 中相当于 sql like 运算符是 db users find shows m 使用 nodejs javascript 我想根据 url 参数动态更改字母 我努力了 letter req params letter
  • RxJava Observable.fromEmitter 奇怪的背压行为

    我一直在利用Observable fromEmitter 作为一个绝佳的替代品Observable create 我最近遇到了一些奇怪的行为 但我不太明白为什么会出现这种情况 我真的很感谢对背压和调度程序有一定了解的人来看看这个 publi