ack造成的风暴延迟

2024-05-22

我正在使用 kafka-storm 连接 kafka 和 Storm。我有3台服务器运行zookeeper、kafka和storm。 kafka中有一个主题“test”,有9个分区。

在storm拓扑中,KafkaSpout执行器的数量为9,默认情况下任务的数量也应为9。 “extract”bolt 是唯一连接到 KafkaSpout(“log”spout)的bolt。

从UI来看,spout的故障率很高。但是,bolt 中执行的消息数 = 发出的消息数 - Bolt 中失败的消息数。当失败消息一开始为空时,这个方程几乎匹配。

根据我的理解,这意味着 Bolt 确实收到了来自 spout 的消息,但 ack 信号在飞行中暂停。这就是为什么 spout 中的 ack 数量如此之少的原因。

这个问题可以通过增加超时秒数和 spout 挂起消息数来解决。但这会导致更多的内存使用,我无法将其增加到无限。

我在想是否有一种方法可以强制storm忽略某些spout/bolt中的ack,这样它就不会等待该信号直到超时。这应该会显着增加吞吐量,但不能保证消息处理。


如果你将acks的数量设置为0,那么storm将自动ack每个样本。

config.setNumAckers(0);

请注意,UI 仅测量和显示 5% 的数据流。 除非你设置

config.setStatsSampleRate(1.0d);

尝试增加螺栓的超时时间并减少topology.max.spout.pending.

另外,请确保 spout 的 nextTuple() 方法是非阻塞且经过优化的。

我还建议对代码进行分析,也许您的风暴队列已满,您需要增加它们的大小。

    config.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE,32);
    config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE,16384);
    config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE,16384);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ack造成的风暴延迟 的相关文章

  • 如何将两个不同Spout的输出发送到同一个Bolt?

    我有两个 Kafka Spout 我想将它们的值发送到同一个 Bolt 是否可以 对的 这是可能的 TopologyBuilder b new TopologyBuilder b setSpout topic 1 new KafkaSpou
  • Eclipse 中 Storm 集群关闭

    我有一个问题 我知道有麻烦 但找不到解决方案 集群没有关闭 抛出 IOException I O 方法不起作用 因为 Windows 不允许临时文件夹 我的意思是这个路径 C Users Mert AppData Local Temp 8b
  • 测试java HBase连接

    我正在尝试使用 HBase Java API 将数据写入 HBase 我通过 Ambari 安装了 Hadoop HBase 以下是当前设置配置的方式 final Configuration CONFIGURATION HBaseConfi
  • 在 Apache Storm Bolt 中使用 Apache Camel ProducerTemplate

    我正在尝试编写简单的 Storm Camel 项目 我的 Storm 拓扑分析推文 一个 Bolt 应该将推文文本发送到 apache 骆驼路由 而该路由又使用 websocket 通知某些 Web 应用程序 由于尝试使用一次构建 Came
  • 重新平衡 Apache Storm 中的执行器

    我正在尝试重新平衡正在运行的 Apache Storm 0 9 5 拓扑中的 Bolt 的执行器数量 当我对 Nimbus 节点执行命令时 storm rebalance MyTopology n 2 e GreenBolt 4它接受命令行
  • java.lang.ClassNotFoundException:kafka.api.OffsetRequest

    我在尝试将 Kafka 集成到我们的 Storm 拓扑时收到错误 java lang ClassNotFoundException kafka api OffsetRequest 您正在运行什么版本并且它正在运行 我的 pom xml
  • 将一个项目导入到另一个导入的项目中

    我在一个项目的帮助中找到了这个声明 我想将其导入名为 storm 选举 这是一个基于storm starter 项目的简单演示应用程序 https github com nathanmarz storm starter https gith
  • 使用storm时如何将拓扑上下文中的对象访问到bolt中?

    我们在创建拓扑时需要传递一个对象 以便 Bolt 可以访问该对象并基于该对象进行一些进一步的处理 是否可以通过传递对象TopplogyContext如果是 怎么办 或者是否有其他方法可以在提交拓扑时传递对象 然后再提交 以便 Bolt 可以
  • NotSerializedException org.neo4j.kernel.EmbeddedGraphDatabase

    我正在使用 neo4j 创建图表 将 mongodb 中的数据作为文档 独立代码运行良好 没有风暴 但是 在将其与 Storm 集成时 我得到了 java io NotSerializedException org neo4j kernel
  • 风暴集群重复元组

    目前我正在开展一个项目 在该项目中我在四台 Unix 主机上设置了一个 Storm 集群 拓扑本身如下 JMS Spout 侦听 MQ 以获取新消息 JMS Spout 解析然后将结果发送到 Esper Bolt 然后 Esper Bolt
  • 使用 setState SampleRate/topology.stats.sample.rate 的性能影响

    在yaml中设置topology stats sample rate 1 0对性能有什么影响 这是如何运作的 topology stats sample rate配置计算 Storm 拓扑统计数据的速率 默认值在默认值 yaml https
  • 以集群模式在同一物理节点上运行 Storm nimbus 和supervisor

    我现在有一个包含 2 个物理节点的 Storm 集群 我在跑storm nimbus在节点 1 上和storm supervisor在节点 2 上 看起来我的所有拓扑都仅在节点 2 管理节点 上运行 我也应该在节点 1 上运行主管吗 Tha
  • 如何在cloudfoundry上使用kafka和storm?

    我想知道是否可以将 kafka 作为云原生应用程序运行 以及我是否可以在 Pivotal Web Services 上创建一个 kafka 集群作为服务 我不仅仅想要客户端集成 我想运行 kafka 集群 服务本身 谢谢 阿尼尔 我可以向您
  • Storm程序的执行流程

    我是 Storm 的新手 试图了解不同方法的执行流程spout to bolt 就像spout有不同的方法一样 下一个元组 open 声明输出字段 启用 停用 Bolt 有类似的方法 准备 执行 清理 声明输出字段 那么谁能告诉我这些方法的
  • Storm动态拓扑

    Storm 支持动态拓扑吗 我想要的功能是在 Storm 拓扑运行时根据用户要求动态更改拓扑 例如 当用户想知道流的前 10 个单词时 我使用前 10 个 Bolt 来处理它 当用户想知道其他内容时 我使用另一个 Bolt 来处理流并 拔掉
  • Apache Kafka 与 Apache Storm

    Apache Kafka 分布式消息系统Apache Storm 实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据 就实时数据管道而言 在我看来 两者的工作都是相同的 我们如何在数据管道上使用这两种技术 您可以使用 Apa
  • 将数据从 oracle 移动到 HDFS,处理并从 HDFS 移动到 Teradata

    我的要求是 将数据从 Oracle 移至 HDFS 处理HDFS上的数据 将处理后的数据移至 Teradata 还需要每 15 分钟执行一次整个处理 源数据量可能接近50GB 处理后的数据也可能相同 在网上搜索了很多之后 我发现 PRARO
  • ack造成的风暴延迟

    我正在使用 kafka storm 连接 kafka 和 Storm 我有3台服务器运行zookeeper kafka和storm kafka中有一个主题 test 有9个分区 在storm拓扑中 KafkaSpout执行器的数量为9 默认
  • Storm Spout 未收到 Ack

    我已经开始使用storm 所以我使用创建简单的拓扑本教程 https github com nathanmarz storm wiki Tutorial 当我运行我的拓扑时LocalCluster一切看起来都很好 我的问题是我没有得到元组的
  • Storm 中的连接被拒绝错误

    我是 Storm 的新手 我遇到了以下错误 java net ConnectException Connection refused at sun nio ch SocketChannelImpl checkConnect Native M

随机推荐

  • 在开发过程中自动允许访问 MacOS 辅助功能 API

    我正在开发一个使用辅助功能 API 的应用程序 每次我进行更改和重建时 我都必须删除该应用程序并将其重新添加到下面的 安全和隐私 可访问性 视图中 一段时间后这会变得非常烦人 有没有什么方法可以在开发时禁用此安全检查 或者即使应用程序本身正
  • D3 围绕一组圆圈绘制船体

    我想用 d3 围绕分组力定向图构建绘制一个船体 我已经用圆圈构建了图表 但我现在想将圆的交点与路径 船体 连接起来 如果不连接交叉点 画一个围绕这组圆的船体就足够了 我尝试过具有凸包的力导向布局 http bl ocks org 29205
  • 使用 jQuery 插件及其依赖项的指南 [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 jQuery 插件通常依赖于外部文件 jQuery 库 样式表 CSS 图像 其他插件等 使用 和编写 解决依赖关系放置问题的 jQuery 插
  • 模拟 Spock 中的超类

    如何对 Spock 中具有超类的类进行单元测试 该超类调用来自其超类的方法调用 或者如何模拟 Spock 中的超类 Ex class Bar def method1 parm1 Method actions class Foo extend
  • 有没有一种简单的方法可以在 R 的 igraph 中按度数对网络节点进行着色?

    使用igraphR 包 我想按度数对网络节点进行着色 颜色应代表渐变 例如从蓝色到红色 或从黄色到红色 从网络中观察到的最低程度到最高程度 我找到了一个可行的解决方案 https stackoverflow com questions 40
  • 阅读 Google 文档电子表格

    是否可以使用 PHP 从 Google Docs 电子表格中提取行 在 SQL 中我会使用类似的东西 SELECT FROM table WHERE field value LIMIT 1 有没有办法做到这一点 我听说你应该使用 Zend
  • SQL Server 中的每个实体自动增量字段?

    我的数据库中有一个稳定的 食物 主键为 fooD 我有第二个表 foo Attributes 其外键引用 foo fooD 我想在 fooAttributes 表上有一个组合键 fooID attributeNumber 当我插入新属性时
  • Java如何避免在循环中使用Thread.sleep()

    从我的主线程开始 我启动了两个线程 称为生产者和消费者 两者都包含while true 环形 生产者循环是 UDP 服务器 因此不需要睡眠 我的问题出在消费者循环中 消费者循环从链接队 列中删除对象并将其传递给函数以进行进一步处理 根据研究
  • 在 Bootstrap 4 中创建水平表单

    我是初学者引导程序我正在使用 bootstrap 来设计表单 我正在尝试使用创建水平形式form horizontal引导类BUT标签和文本字段未显示在同一水平线上 我看过有关它的教程 并且我从该教程中复制了相同的代码 但它对我不起作用 而
  • 布局引擎和javascript引擎的区别

    经过大量阅读 似乎当人们说浏览器引擎时 他们指的是诸如 gecko 或 webkit 之类的布局引擎 我还知道布局引擎基本上负责 绘制 屏幕 而javascript引擎则用于解释 但问题是 对于现代网络应用程序来说 哪一个对性能影响更大 这
  • Material-UI 4.8.1 API 更改 - 指定“组件”属性的新方法?

    Edit 这是 4 8 x 的打字错误 升级到 4 9 0 即可解决该问题 上版本4 8 0 以下代码编译并运行良好
  • Android 中读取未提交的事务

    我正在进行大量数据库操作 这会向我的数据库添加大约 10 000 条记录 由于这可能需要很长时间 因此最好使用事务 db startTransaction do write operations db setTransactionSucce
  • FirebirdSql 中参数的正确使用

    我想知道是否有人可以提供以下帮助 using FbConnection conn new FbConnection ConnectionString conn Open FbCommand command1 new FbCommand SE
  • Angular 4+为ngComponentOutlet动态创建的组件分配@Input

    在 Angular 4 中动态创建一个可以使用的组件ngComponentOutlet指示 https angular io docs ts latest api common index NgComponentOutlet directi
  • dayname(curdate()) 不适用于 codeigniter php

    此 sql 在 phpmyadmin 中有效 但在 codeigniter php 中无效 function getProgramsHomepage data array this gt db gt select p name p star
  • Android 中的短信编码

    我的问题是我想发送特定类别和特定编码的短信 0 类和 7 位编码 当检查 Android Telephony SmsManager 和 SmsMessage 时 您无能为力 SmsManager 提供两个功能 发送文本消息和发送数据消息 如
  • 为什么不允许 System.out.println(super) ?

    Why is System out println super 不允许 System out println this 这没问题并且this toString 自动调用并打印 当然 用实例变量代替也可以this 然而 this and su
  • 如何使用Jquery使用javascript运算符符号搜索表单列?

    实际上 我试图在搜索后获取单个表列值 如果我选择 运算符并在列中键入任何输入值 然后在搜索后 输入值将类似于匹配相似值的一行或多行 同样 如果我选择 运算符和此列 则将获得小于输入值的任何输入值 如果我选择 运算符 则小于或等于输入值将获得
  • Jquery 表单验证 - 电话号码

    我已经在表单上设置了 jQuery 验证 该验证当前测试电话号码字段不为空并且是一个数字 但我希望它能够处理用户在手机 区号后放置空格的情况 谁能建议我需要做什么才能允许这样做 这是我当前的代码 if phone length 0 name
  • ack造成的风暴延迟

    我正在使用 kafka storm 连接 kafka 和 Storm 我有3台服务器运行zookeeper kafka和storm kafka中有一个主题 test 有9个分区 在storm拓扑中 KafkaSpout执行器的数量为9 默认