如果消费者持有消息的时间超过自动提交间隔时间,kafka会丢失消息吗?

2023-12-27

假设自动提交间隔时间为 30 秒,消费者由于某种原因无法处理消息并保留消息超过 30 秒然后崩溃。自动提交偏移量机制是否会在消费者崩溃之前提交此偏移量?

如果我的假设是正确的,消息会因其偏移量提交而丢失,但消息本身尚未被处理?


让我们假设您的消费者组名称是 Test,并且您在该消费者组中有一个消费者。

启用自动提交后,仅在 poll() 调用期间和消费者关闭期间提交偏移量。

例如,auto.commit.interval.ms 为 5 秒,每次调用 poll() 需要 7 秒。每次调用 poll() 时,它都会检查自动提交间隔是否已过,如果已过,如上面的示例所示,它将提交偏移量。

在关闭消费者期间也会提交抵消。

从文档中 -

“关闭使用者,等待默认超时 30 秒以进行任何所需的清理。如果启用自动提交,则如果可能的话,这将在默认超时内提交当前偏移量”。

你可以在这里读更多关于它的内容 -

https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

现在,回到你的问题,如果 poll() 没有再次调用或者消费者没有关闭,它不会提交偏移量。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如果消费者持有消息的时间超过自动提交间隔时间,kafka会丢失消息吗? 的相关文章

  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • Spark shell (spark 3.0.0) 添加包 confluence kafka 5.5.1 javax.ws.rs-api 问题

    我本地的win10 WSL回到ubuntu 在ubuntu上 我安装了spark3 0 0 confluence平台5 5 1 手动下载 当我尝试运行spark shell或spark submit时 下面是shell示例 spark sh
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153
  • 当我重新运行 Flink 消费者时,Kafka 再次消费最新消息

    我在用 Scala 编写的 Apache Flink API 中创建了一个 Kafka 消费者 每当我从某个主题传递一些消息时 它就会及时接收它们 但是 当我重新启动使用者时 它不会接收新的或未使用的消息 而是使用发送到该主题的最新消息 这
  • 为什么我无法从外部连接到 Kafka?

    我在 ec2 实例上运行 kafka 所以amazon ec2实例有两个ip 一个是内部ip 第二个是外部使用的 我从本地计算机创建了生产者 但它重定向到内部 IP 并给我连接不成功的错误 任何人都可以帮助我在 ec2 实例上配置 kafk
  • kafka新版本2.1.0 Broker无故挂起

    起初 集群中的所有代理都可以启动并正常工作 但有时其中一个代理会遇到问题 并且会出现一些现象 整个集群挂了 生产者和消费者也不工作 因此从监视器来看网络流量降至零 使用kafka topic sh描述主题消息 每个副本都很好 即使是异常的b
  • 未能在kafka-storm中将偏移量数据写入zookeeper

    我正在设置一个风暴集群来计算实时趋势和其他统计数据 但是我在将 恢复 功能引入到这个项目中时遇到了一些问题 方法是允许上次读取的偏移量kafka spout 源代码为kafka spout来自https github com apache
  • 创建 Kafka 主题导致没有领导者

    我正在使用 Kafka v0 9 0 1 Scala v2 11 和com 101tec zkclientv0 7 我正在尝试使用AdminUtils创建一个kafka主题 我的代码如下 String zkServers node1 218
  • 如何检测java中的消费者是否无法使用kafka代理?

    我有一个简单的 Java Kafka 消费者 如果 Kafka 代理不可用 我试图捕获异常 我需要它来中断线程 我有这样的代码 KafkaConsumer
  • Kafka中如何同时实现分布式处理和高可用?

    我有一个由 n 个分区组成的主题 为了进行分布式处理 我创建了两个在不同机器上运行的进程 他们使用相同的 groupd id 订阅主题并分配 n 2 个线程 每个线程处理单个流 每个进程 n 2 个分区 这样我就可以实现负载分配 但现在如果
  • 带有 spring-kafka 的 Kafka 死信队列 (DLQ)

    最好的实施方式是什么死信队列 DLQ Spring Boot 2 0 应用程序中的概念 使用 spring kafka 2 1 x 来处理无法处理的所有消息 KafkaListener某些bean发送到某些预定义的Kafka DLQ主题的方
  • 删除主题级别配置

    为了删除主题中的所有数据 我将其retention ms配置设置为1000 bin kafka topics sh zookeeper KAFKAZKHOSTS alter topic
  • Mesos DCOS 未安装 Kafka

    我正在尝试在 Mesos 上安装 Kafka 看来安装已经成功了 vagrant DevNode dcos dcos package install kafka This will install Apache Kafka DCOS Ser
  • Kafka JDBC Sink Connector,批量插入值

    我每秒收到很多消息 通过 http 协议 50000 100000 并希望将它们保存到 PostgreSql 我决定使用 Kafka JDBC Sink 来实现此目的 消息以一条记录保存到数据库 而不是批量保存 我想在 PostgreSQL
  • 将 Kafka 输入流动态连接到多个输出流

    Kafka Streams 中是否内置了允许将单个输入流动态连接到多个输出流的功能 KStream branch允许基于真 假谓词进行分支 但这并不是我想要的 我希望每个传入的日志都确定它将在运行时流式传输到的主题 例如日志 date 20
  • JDBC Kafka Connector 可以从多个数据库中提取数据吗?

    我想设置一个 JDBC Kafka 连接器集群 并将它们配置为从同一主机上运行的多个数据库中提取数据 我一直在查看 Kafka Connect 文档 似乎在配置 JDBC 连接器后 它只能从单个数据库中提取数据 谁能证实这一点吗 根据您启动
  • 提供了 kafka schema.registry.url 但不是已知的配置

    尝试使用架构注册表发布有关主题的 json 消息 但出现以下错误 以下Spring Boot方法 已提供配置 schema registry url 但不是已知配置 应用程序 yml 文件 server port 9080 spring k
  • 为什么kafka中的__consumer_offsets主题没有传播到所有经纪人?

    我有一个3 zk节点集群 和7 卡夫卡经纪人 nodes 因此 当我创建任何主题时 我可以使用命令行参数设置副本因子和分区数 这些分区分布到所有 7 个经纪商 但是有一个主题 即 consumer offsets 它是自动创建的 并且仅传播
  • 无法使用 jolokia 从 Kafka 提取 JMX 数据

    我已经在 centos 7 机器上安装了 Jolokia 并尝试使用 Jolokia 代理提取 Kafka 指标 并使用 Nagios 插件 check jmx4perl 与 Icinga 监控工具集成 以下是我遵循的配置步骤 步骤1 下载

随机推荐

  • ui-router 可选参数不带尾部斜杠

    所以这似乎是一个常见问题 但我还没有找到任何明确的答案 基本上我有一个状态 state users url example id templateUrl angular views example html controller Examp
  • 异步加载js但同步执行

    场景是我有很多 js 文件 根据不同的平台调用不同的文件 所以我有一个问题 因为我想异步加载文件 但这些文件的执行应该同步完成 目前我正在执行这个函数调用 function loadScriptJs src Console log Call
  • 如何在 Mac 上卸载 Composer?

    我已经安装了Composer https getcomposer org用这个命令 php r copy https getcomposer org installer composer setup php php r if hash fi
  • Android 2.1:单个 Activity 中的多个处理程序

    我有不止一个Handlers在活动中 我在中创建所有处理程序onCreate 的主要活动 我的理解是handleMessage 每个处理程序的方法永远不会同时被调用 因为所有消息都放在同一个队列 Activity 线程 MessageQue
  • Azure 持久功能和数据保留

    我可以看到 azure 持久函数使用存储帐户来管理状态和检测 当在具有大量数据的环境中运行持久函数时 表和队列会变得越来越大 并且速度会越来越慢 持久功能清洁是否会自行记录它们 或者这是您需要自己完成的任务吗 经过研究后 开发人员似乎需要为
  • Mongoid分页

    I tried posts Post page params page per page 10 and posts Post paginate page gt 1 per page gt 10 但这两种方法都行不通 undefined me
  • 如何在模板助手中使用 Meteor 方法

    如何定义一个可以在模板助手中调用的 Meteor 方法 我有这两个文件 文件 lib test js Meteor methods viewTest function str return str 文件 客户端 myView js Temp
  • 未找到代理 JAR 或没有代理类属性

    已修复 这不是代码导致的错误 这是因为 IDE 我只是尝试为一款名为 Minecraft 的游戏进行注入 但我有一个问题 无法加载代理 这是例外情况 Exception in thread main com sun tools attach
  • 检测父进程何时退出

    我将有一个用于处理网络服务器重新启动的父进程 它将向子级发出信号以停止侦听新请求 子级将向父级发出信号以表明其已停止侦听 然后父级将向新子级发出信号以表明其可以开始侦听 通过这种方式 我们可以实现该级别重新启动的停机时间少于 100 毫秒
  • 文件夹中的新文件事件

    有人可以帮助我了解如何构建一个 24 7 运行的软件来侦听特定文件夹 例如 C Actions 并且每次我在该文件夹中放置一个新文件时 该软件都需要读取和处理它 如果文件夹中没有文件 软件不应只等待下一个文件的到来而什么也不做 文件 act
  • ASP.NET 动态控件计数(随时创建控件)

    我正在尝试创建一个复合 ASP NET 控件 让您可以构建可编辑的控件集合 我的问题是 当我按下添加或回发按钮 除了回发表单之外什么也不做 时 在文本框中输入的任何值都会丢失 当控件数量在回发之间发生变化时 我无法让它工作 我基本上需要能够
  • 理论计算机科学主题是否具有“现实世界”的开发应用?

    我所说的 理论计算机科学主题 指的是常规语言与非常规语言 泵引理和语法等内容 我熟悉有限自动机和正则表达式的现实世界应用 但诸如此类的其他主题给我带来了更多问题 因为我没有看到任何现实世界的应用 如果您想知道尝试使用正则表达式做某事是否徒劳
  • Ruby on Rails - 在包含 I18n 的 link_to 调用中嵌入额外的 HTML

    我正在尝试在 link to 调用中嵌入额外的 HTML 如本线程中所示在 link to 调用中嵌入额外的 HTML https stackoverflow com questions 9403256 embed additional h
  • “依赖系统的时区设置并不安全”

    我有一个一周前完成的脚本 没有任何问题或错误 今天 我再次测试 收到以下消息 严格标准 date function date 依赖并不安全 系统的时区设置 请使用 date timezone 设置 TZ 环境变量或 date default
  • Android 如何获取所有浏览器应用中的浏览历史记录?

    我想获取android手机中不同浏览器的所有浏览历史记录 也许您知道 一部手机中通常有多个浏览器应用程序 假设您的所有浏览器都使用此 API 来保存历史记录 这应该对您有用 http developer android com refere
  • Node.js sqlite3 IN 运算符

    所以我目前正在尝试在 Node js 中进行查询 friends is an array object db all SELECT email FROM users WHERE email in friends function err r
  • 颤振:缺少 google_app_id。 Firebase 分析已禁用

    我想从一个设备向另一个设备发送通知 但是当我发送它时 我进入了接收器设备 D FLTFireMsgReceiver 8876 broadcast received for message E FA 8876 Missing google a
  • 如何组合多个grep命令?

    我有一个很长的 txt 文件 LONG txt 在该 txt 文件中 我想搜索 3 种类型的模式 然后我想将 grep 结果捕获到一个新的 txt 文件中 SHORT txt 图案 AAAAA BBBBB CCCCC NOTE 当图案AAA
  • Vim 代码补全

    是否可以让 Vim 为您完成代码 就像在任何其他 IDE 中一样 不仅仅是单词搜索 还包括类成员 方法等 可用的最佳选项 插件有哪些 铿锵完成 http www vim org scripts script php script id 33
  • 如果消费者持有消息的时间超过自动提交间隔时间,kafka会丢失消息吗?

    假设自动提交间隔时间为 30 秒 消费者由于某种原因无法处理消息并保留消息超过 30 秒然后崩溃 自动提交偏移量机制是否会在消费者崩溃之前提交此偏移量 如果我的假设是正确的 消息会因其偏移量提交而丢失 但消息本身尚未被处理 让我们假设您的消