如果第一个经纪人宕机,Kafka 消费者将无法消费

2023-12-04

我正在使用最新版本的kafka(kafka_2.12-1.0.0.tgz)。我已经设置了带有 3 个代理的简单集群(只是在每个实例的属性文件中更改了broker.id=1 和listeners=PLAINTEXT://:9092)。集群启动后,我使用以下命令创建了主题

./kafka-topics.sh --create    --zookeeper localhost:2181  --replication-factor 3     --partitions 13    --topic demo

然后使用以下命令启动kafka消费者和生产者

./kafka-console-producer.sh --topic  demo  --broker-list localhost:9094,localhost:9093,localhost:9092

./kafka-console-consumer.sh --group test --bootstrap-server localhost:9094,localhost:9093,localhost:9092  --topic demo

当所有经纪人都起来时,一切都好。但是,如果我先杀死(按启动顺序)代理消息将发送到代理,但消费者无法接收任何消息。消息不会丢失。启动该代理消费者后立即收到消息。

关闭broker实例后consumer的日志:

[2018-01-09 13:39:31,130] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,132] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 1 的连接。经纪人 可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,344] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,451] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 1 的连接。经纪人 可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,848] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:31,950] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 1 的连接。经纪人 可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:32,363] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:33,092] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:34,216] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 2147483646 的连接。 经纪人可能不可用。 (org.apache.kafka.clients.NetworkClient) [2018-01-09 13:39:34,218] 警告 [消费者 clientId=consumer-1, groupId=test] 异步自动提交偏移量 {demo-0=OffsetAndMetadata{偏移=3,元数据=''}, demo-1=OffsetAndMetadata{偏移=3, 元数据=''}, demo-2=OffsetAndMetadata{offset=2, 元数据=''}, demo-3=OffsetAndMetadata{offset=2, 元数据=''}, demo-4=OffsetAndMetadata{offset=1, 元数据=''}, demo-5=OffsetAndMetadata{offset=1, 元数据=''}, demo-6=OffsetAndMetadata{offset=3, 元数据=''}, demo-7=OffsetAndMetadata{offset=2, 元数据=''}, demo-8=OffsetAndMetadata{offset=3, 元数据=''}, demo-9=OffsetAndMetadata{offset=2, 元数据=''}, demo-10=OffsetAndMetadata{偏移=3, 元数据=''}, demo-11=OffsetAndMetadata{偏移=2, 元数据=''}, demo-12=OffsetAndMetadata{offset=2,metadata=''}} 失败:偏移 提交失败并出现可重试异常。您应该重试提交 偏移量。根本错误是:协调器不可用。 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:39:34,219] 警告 [消费者 clientId=consumer-1, groupId=test] 无法建立与节点 1 的连接。经纪人 可能不可用。 (org.apache.kafka.clients.NetworkClient)

再次启动缺少broker后的consumer日志:

[2018-01-09 13:41:21,739] 错误 [消费者 clientId=consumer-1, groupId=test] 分区 demo-0 上偏移量 3 处的偏移量提交失败: 这不是正确的协调员。 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:21,739] 警告 [消费者 clientId=consumer-1, groupId=test] 异步自动提交偏移量 {demo-0=OffsetAndMetadata{偏移=3,元数据=''}, demo-1=OffsetAndMetadata{偏移=3, 元数据=''}, demo-2=OffsetAndMetadata{offset=2, 元数据=''}, demo-3=OffsetAndMetadata{offset=2, 元数据=''}, demo-4=OffsetAndMetadata{offset=1, 元数据=''}, demo-5=OffsetAndMetadata{offset=1, 元数据=''}, demo-6=OffsetAndMetadata{offset=3, 元数据=''}, demo-7=OffsetAndMetadata{offset=2, 元数据=''}, demo-8=OffsetAndMetadata{offset=3, 元数据=''}, demo-9=OffsetAndMetadata{offset=2, 元数据=''}, demo-10=OffsetAndMetadata{偏移=3, 元数据=''}, demo-11=OffsetAndMetadata{偏移=2, 元数据=''}, demo-12=OffsetAndMetadata{offset=2,metadata=''}} 失败:偏移 提交失败并出现可重试异常。您应该重试提交 偏移量。根本错误是:这不是正确的 协调员。 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:22,353] 错误 [消费者 clientId=consumer-1, groupId=test] 分区 demo-0 上偏移量 3 处的偏移量提交失败: 这不是正确的协调员。 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-09 13:41:22,354] 警告 [消费者 clientId=consumer-1, groupId=test] 异步自动提交偏移量 {demo-0=OffsetAndMetadata{偏移=3,元数据=''}, demo-1=OffsetAndMetadata{偏移=3, 元数据=''}, demo-2=OffsetAndMetadata{offset=2, 元数据=''}, demo-3=OffsetAndMetadata{offset=2, 元数据=''}, demo-4=OffsetAndMetadata{offset=1, 元数据=''}, demo-5=OffsetAndMetadata{offset=1, 元数据=''}, demo-6=OffsetAndMetadata{offset=3, 元数据=''}, demo-7=OffsetAndMetadata{offset=2, 元数据=''}, demo-8=OffsetAndMetadata{offset=3, 元数据=''}, demo-9=OffsetAndMetadata{offset=2, 元数据=''}, demo-10=OffsetAndMetadata{偏移=3, 元数据=''}, demo-11=OffsetAndMetadata{偏移=3, 元数据=''}, demo-12=OffsetAndMetadata{offset=2,metadata=''}} 失败:偏移 提交失败并出现可重试异常。您应该重试提交 偏移量。根本错误是:这不是正确的 协调员。 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

Thanks


尝试检查 server-*.properties 文件中的“offsets.topic.replication.factor”

例如:

############################# Internal Topic Settings       
# The replication factor for the group metadata internal topics    
# For anything other than development testing, a value greater than 1 is  recommended for to ensure availability such as 3.
offsets.topic.replication.factor=3

http://kafka.apache.org/documentation/#brokerconfigs

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如果第一个经纪人宕机,Kafka 消费者将无法消费 的相关文章

  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • 将数据从 Kafka 存储传输到 Kafka 主题

    我想在卡夫卡做这样的事情 继续将数据存储在 KStream Ktable Kafka store 中 当我的应用程序收到特定事件 数据时 仅将上述存储中的特定数据集发送到主题 我们可以在卡夫卡中做到这一点吗 我认为单独使用 Kafka 消费
  • 卡夫卡主题查看器? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想调试一些 Kafka 主题 这样我就知道消费者或生产者是否有问题 Kafka 是否有一个 UI 我
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB
  • 频繁出现“offset out of range”消息,分区被消费者抛弃

    我们正在运行 3 节点 Kafka 0 10 0 1 集群 我们有一个消费者应用程序 它有一个连接到多个主题的消费者组 我们在消费者日志中看到奇怪的行为 有了这些线 Fetch offset 1109143 is out of range
  • 有没有办法使用 .NET 中的 Kafka Ksql Push 查询

    我目前正在 NET 中使用 Kafka 消费者处理大量 Kafka 消息 我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息 我不想首先处理 特别是不下载 那些不需要的消息 看起来 kSql 查询 写为推送查
  • Kafka 中的内部和外部通信

    流动 本地 gt 代理 gt Kafka advertised listeners PLAINTEXT proxyhostname 8080 for external communication listeners PLAINTEXT 90
  • 嵌入式 Kafka 测试随机失败

    我使用 EmbededKafka 实现了一系列集成测试 以测试使用 spring kafka 框架运行的一个 Kafka 流应用程序 流应用程序正在从 Kafka 主题读取消息 将其存储到内部状态存储中 进行一些转换并将其发送到另一个微服务
  • 如何使用 Python 在 Kafka 中生成 Tombstone Avro 记录?

    我的水槽属性 name jdbc oracle config connector class io confluent connect jdbc JdbcSinkConnector tasks max 1 topics orders con
  • Kafka Streams 反序列化处理程序

    我正在尝试在反序列化中使用 LogAndContinueExceptionHandler 当发生错误时 通过成功记录错误并继续 它可以正常工作 但是 假设我的传入消息有连续的错误流 我停止并重新启动 kafka 流应用程序 然后我看到失败并
  • 如何删除 Apache Kafka 中的主题? [复制]

    这个问题在这里已经有答案了 我需要删除 Kafka 0 8 2 2 3 中的一个主题 我使用以下命令删除主题 bin kafka topics sh zookeeper localhost 2181 delete topic DummyTo
  • Kafka JDBC Sink Connector,批量插入值

    我每秒收到很多消息 通过 http 协议 50000 100000 并希望将它们保存到 PostgreSql 我决定使用 Kafka JDBC Sink 来实现此目的 消息以一条记录保存到数据库 而不是批量保存 我想在 PostgreSQL
  • 将 Kafka 输入流动态连接到多个输出流

    Kafka Streams 中是否内置了允许将单个输入流动态连接到多个输出流的功能 KStream branch允许基于真 假谓词进行分支 但这并不是我想要的 我希望每个传入的日志都确定它将在运行时流式传输到的主题 例如日志 date 20
  • 为什么我的 Kafka Streams 拓扑无法正确重放/重新处理?

    我有一个如下所示的拓扑 KTable
  • Flink Kafka - 如何使应用程序并行运行?

    我正在 Flink 中创建一个应用程序 读取某个主题的消息 对其进行一些简单的处理 将结果写入不同的主题 我的代码确实有效 然而它不并行运行我怎么做 看来我的代码只在一个线程 块上运行 在 Flink Web 仪表板上 应用程序进入运行状态
  • JDBC Kafka Connector 可以从多个数据库中提取数据吗?

    我想设置一个 JDBC Kafka 连接器集群 并将它们配置为从同一主机上运行的多个数据库中提取数据 我一直在查看 Kafka Connect 文档 似乎在配置 JDBC 连接器后 它只能从单个数据库中提取数据 谁能证实这一点吗 根据您启动
  • 如何使用PySpark结构流+Kafka

    我尝试将 Spark 结构流与 kafka 一起使用 并且在使用 Spark 提交时遇到问题 消费者仍然从生产中接收数据 但 Spark 结构出错 请帮我找到我的代码的问题 这是我在 test py 中的代码 from kafka impo
  • mysql故障转移:如何选择slave作为新的master?

    我是 mysql 新手 当涉及到故障转移时 哪个从机应该晋升为新的主机 例如 A是master B和C是slave A对B和C进行异步复制 在某个时间点 B 从 A 接收的数据多于 C A 崩溃 如果我们将C提升为新的master 并将B的
  • 为什么kafka中的__consumer_offsets主题没有传播到所有经纪人?

    我有一个3 zk节点集群 和7 卡夫卡经纪人 nodes 因此 当我创建任何主题时 我可以使用命令行参数设置副本因子和分区数 这些分区分布到所有 7 个经纪商 但是有一个主题 即 consumer offsets 它是自动创建的 并且仅传播

随机推荐