Kafka 消费者通过 JMX 滞后

2024-04-25

我正在尝试监控 Kafka 0.10 中消费者组的滞后情况。

我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量。这意味着我可以使用以下方式获取数据:

bin/kafka-consumer-groups.sh --bootstrap-server <broker> --describe --group <group-name>

这很好用。但是,我的经纪人已经使用了普罗米修斯 JMX 导出器 https://github.com/prometheus/jmx_exporter用于收集一些统计数据。我已将 JConsole 连接到代理,但看不到 JMX 中报告的相同数据kafka-consumer-groups.sh.

有没有办法使用 JMX 从 Kafka 获取这些信息without需要任何额外的工具吗?


您可以检索属性{topic}-{partition}.records-lag公制的kafka.consumer:type=consumer-fetch-manager-metrics,client-id={client-id}对于所有分区。这应该相当于输出consumer-groups.sh

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 消费者通过 JMX 滞后 的相关文章

  • 使用表白名单选项更新 Debezium MySQL 连接器

    我正在使用 Debezium 0 7 5 MySQL 连接器 并且我试图了解如果我想使用以下选项更新此配置 最好的方法是什么table whitelist 假设我创建了一个连接器 如下所示 curl i X POST H Accept ap
  • 将数据从 Kafka 存储传输到 Kafka 主题

    我想在卡夫卡做这样的事情 继续将数据存储在 KStream Ktable Kafka store 中 当我的应用程序收到特定事件 数据时 仅将上述存储中的特定数据集发送到主题 我们可以在卡夫卡中做到这一点吗 我认为单独使用 Kafka 消费
  • 如何在 Python 中以编程方式检查 Kafka Broker 是否已启动并运行

    我正在尝试使用来自 Kafka 主题的消息 我正在使用包装器confluent kafka消费者 我需要在开始使用消息之前检查连接是否已建立 我读到消费者很懒 所以我需要执行一些操作才能建立连接 但我想检查连接建立而不执行consume o
  • Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题

    我有多个冗余应用程序实例 它们想要使用主题的所有事件并独立存储它们以进行磁盘查找 通过rocksdb 为了便于论证 我们假设这些冗余消费者正在服务无状态 http 请求 因此 负载不是使用 kafka 共享的 而是使用 kafka 将数据从
  • 为每个键使用主题中的最新值

    我有一个 Kafka 生产者 它正在以高速率生成消息 消息键是用户名 值是他在游戏中的当前分数 Kafka消费者处理消费消息的速度相对较慢 在这里 我的要求是显示最新的分数并避免显示陈旧的数据 但代价是某些分数可能永远不会显示 本质上 对于
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 我的 Kafka 流应用程序刚刚退出,代码为 0,什么也不做

    为了尝试 Kafka 流 我这样做了 public static void main String args final StreamsBuilder builder new StreamsBuilder final Properties
  • Kafka:如何获取主题的最后修改时间,即添加到主题的任何分区的最后一条消息

    我们的用例是从 kafka 中删除陈旧 未使用的主题 即如果某个主题 在所有分区上 在过去 7 天内没有任何新消息 那么我们会将其视为陈旧 未使用并删除它 许多谷歌结果建议向消息添加时间戳 然后解析它 对于新主题和消息 灵魂可以工作 但我们
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 频繁出现“offset out of range”消息,分区被消费者抛弃

    我们正在运行 3 节点 Kafka 0 10 0 1 集群 我们有一个消费者应用程序 它有一个连接到多个主题的消费者组 我们在消费者日志中看到奇怪的行为 有了这些线 Fetch offset 1109143 is out of range
  • 配置jmxremote时无法正常停止tomcat

    我添加了一个jmxremotecatalina bat中的配置 set JAVA OPTS Dcom sun management jmxremote port 9004 Dcom sun management jmxremote ssl
  • 未能在kafka-storm中将偏移量数据写入zookeeper

    我正在设置一个风暴集群来计算实时趋势和其他统计数据 但是我在将 恢复 功能引入到这个项目中时遇到了一些问题 方法是允许上次读取的偏移量kafka spout 源代码为kafka spout来自https github com apache
  • Kafka 中的内部和外部通信

    流动 本地 gt 代理 gt Kafka advertised listeners PLAINTEXT proxyhostname 8080 for external communication listeners PLAINTEXT 90
  • 创建 Kafka 主题导致没有领导者

    我正在使用 Kafka v0 9 0 1 Scala v2 11 和com 101tec zkclientv0 7 我正在尝试使用AdminUtils创建一个kafka主题 我的代码如下 String zkServers node1 218
  • Kafka 是否保证具有任何配置参数值的单个分区内的消息排序?

    如果我在 Producer 中将 Kafka 配置参数设置为 1 retries 3 2 max in flight requests per connection 5 那么一个分区内的消息很可能不按 send order 排列 Kafka
  • Kafka JDBC Sink Connector,批量插入值

    我每秒收到很多消息 通过 http 协议 50000 100000 并希望将它们保存到 PostgreSql 我决定使用 Kafka JDBC Sink 来实现此目的 消息以一条记录保存到数据库 而不是批量保存 我想在 PostgreSQL
  • 将 Kafka 输入流动态连接到多个输出流

    Kafka Streams 中是否内置了允许将单个输入流动态连接到多个输出流的功能 KStream branch允许基于真 假谓词进行分支 但这并不是我想要的 我希望每个传入的日志都确定它将在运行时流式传输到的主题 例如日志 date 20
  • Windows下Kafka托管在Docker中删除主题时出现异常

    我在 Windows 的 Docker 中托管 Kafka 威斯迈斯特 卡夫卡 https hub docker com r wurstmeister kafka 使用 docker 镜像 Kafka 数据存储在本地 Windows 文件夹
  • Apache Kafka 中消费者消费消息的延迟

    我正在使用 Kafka 0 8 0 并尝试实现下面提到的场景 JCA API 充当生产者并将数据发送到 gt 消费者 gt HBase 一旦我使用 JCA 客户端获取数据 我就会将每条消息发送给消费者 例如 一旦生产者发送消息 no 1 我

随机推荐

  • Spring MVC:@ResponseBody,415 不支持的媒体类型

    我在将 JSON Post 映射到特定 Java 对象以通过以下方式保存时遇到问题休眠 Ajax 调用的标头设置正确 Accept application json Content Type application json charset
  • 在 shell 脚本中禁止输出到屏幕

    你好 我写了一个小脚本 usr bin ksh for i in DAT do awk BEGIN OFS FS 3 353 3 353861958962 print i gt gt i changed awk 3 353 i change
  • 选项卡视觉选择

    In many GUIs when I select a section of text and then hit the Tab or Shift Tab button the selected section will indent i
  • XML-RPC Odoo - C# 多个搜索条件

    当使用 CookComputing XML RPC net 尝试仅使用一个条件搜索 mail notification 模型时 这相当简单 因为您只需调用 object args new object 1 object subargs ne
  • 从 SQLDataReader 读取结果时出现无效转换异常

    我的存储过程 UserName nvarchar 64 AS BEGIN SELECT MPU UserName SUM TS Monday as Monday TS Monday contains float value FROM dbo
  • 如何更改 vuetify v2 中 scss 中的断点?

    我正在使用 scss 文件 我想更改 vuetify v2 中 css 端的断点 我在 vuetify 升级指南中找不到任何参考 在 1 5 版本中我做了 style x styl grid breakpoints xs 0 sm 476p
  • Laravel - 获取客户端 IP 地址 - 始终获取 127.0.0.1 结果

    我无法获取客户端的 IP 地址 我需要该地址来确定他的当前位置 我使用了 request gt ip SERVER REMOTE ADDR 并且总是得到 127 0 0 1 结果 这不是我想要的 我究竟做错了什么 有时您的客户端通过代理使用
  • pyQt4 - 如何选择表格行并禁用编辑单元格

    我使用以下命令创建 QTableWidget self table QtGui QTableWidget self table setObjectName table self table setSelectionBehavior QtGu
  • 如何使用实体框架和成员资格表初始化数据库

    我有一个使用 Entity Framework 5 0 Code First 的 MVC4 Web 应用程序 在 Global asax cs 中 我有一个引导程序 用于初始化 Entity Database 强制初始化数据库并初始化成员资
  • 如何将谓词与包含的属性一起包含在内?

    目前 我有一个函数 允许我查询数据库 同时在结果中包含一些其他相关表 以防止返回数据库 否则如果我不这样做 我知道会发生这种情况 public class EntityRepository
  • 如何拦截来自 iframe 的 http 请求?

    我以编程方式在网页中设置 iframe 的 URL 我必须知道此 URL 更改会触发哪些 http 请求 CSS 脚本 图像等加载的 URL 我拦截了 XMLHttpRequest 但这个对象从未被调用过 如何拦截这些http请求 是否有另
  • Node.js (Express) 带有路由器的错误处理中间件

    这是我的应用程序结构 app js routes index js The ExpressJS应用程序创建错误处理程序development and production环境 这是来自的代码片段app js app use routes r
  • 物化滑动选项卡在模态中不起作用

    我遇到了一个奇怪的问题 我在物化选项卡上使用滑动 当我在没有模式的情况下滑动时 它工作正常 但是当我将它们包含在模式中时 滑动功能不再工作 document ready function modal modal tabs tabs swip
  • 如何获取可公开访问的 crashlytics 报告 url?

    我想与一些第三方分享我的 crashlytics 崩溃报告 如何获得如下所示的可公开访问的网址 http crashes to s 419b5b28766 http crashes to s 419b5b28766 我是新来的 这是一个旧的
  • SchedPolicy:set_timerslack_ns 写入失败:不允许操作

    当我运行 Android 应用程序时 我在 Logcat 中遇到了这个问题 有谁知道这个问题以及如何解决它 依赖项是 implementation com android support appcompat v7 25 3 0 implem
  • 32x8 寄存器文件 VHDL 测试台

    我已经用 vhdl 编写了该电路的汇编代码 我想用测试台来模拟它 RegWrite 1 位输入 时钟 写寄存器个数 3位输入 写地址 写入数据 32 位输入 数据输入 读取 寄存器编号 A 3 位输入 读取地址 读取寄存器编号 B 3 位输
  • Spring Cloud 配置服务器无法使用本地属性文件

    我一直在玩弄位于此处的 github 上的 Spring Cloud 项目 https github com spring cloud spring cloud config https github com spring cloud sp
  • redis-cli 重定向到 127.0.0.1

    我在PC1上启动Redis集群 然后在PC2上连接它 当需要重定向到另一个集群节点时 它会显示Redirected to slot 7785 located at 127 0 0 1 但应该显示Redirected to slot 7785
  • 壁纸更换事件

    我可以在广播接收器中获取壁纸更改事件吗 我需要检测用户是否更改了壁纸 我怎样才能做到呢 我正在做的是这样的 我有一个可以自动更改壁纸的应用程序 如果用户使用不同的应用程序手动更改它 我想注意到它并询问用户是否要将新壁纸添加到我的应用程序的列
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst