如何在 kafka-console- Producer 中启用幂等性?

2024-02-09

我正在尝试在 kafka-console- Producer 上启用“幂等”选项。 参考以下链接:

  • https://gerardnico.com/dit/kafka/ Producer#idempot https://gerardnico.com/dit/kafka/producer
  • https://gerardnico.com/dit/kafka/kafka-console- Producer https://gerardnico.com/dit/kafka/kafka-console-producer

使用的命令:

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list node1.com:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --producer-property acks=all --producer-property retries=Integer.MAX_VALUE --producer-property enable.idempotence=true

观察到以下异常:

org.apache.kafka.common.KafkaException:构建kafka失败 制片人 在org.apache.kafka.clients. Producer.KafkaProducer。(KafkaProducer.java:433) 在org.apache.kafka.clients. Producer.KafkaProducer。(KafkaProducer.java:291) 在 kafka.生产者.NewShinyProducer。(BaseProducer.scala:40) 在 kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:50) 在 kafka.tools.ConsoleProducer.main(ConsoleProducer.scala) 引起:org.apache.kafka.common.config.ConfigException:必须设置 确认所有内容以便使用幂等生产者。否则我们 不能保证幂等性。 在 org.apache.kafka.clients. Producer.KafkaProducer.configureAcks(KafkaProducer.java:510) 在org.apache.kafka.clients. Producer.KafkaProducer。(KafkaProducer.java:375)

尽管 acks 已设置为“all”,但我们观察到此异常。 我缺少什么?

以下是使用的版本:

  • 经纪人 - 1.0.0
  • client - 与broker 1.0.0捆绑的控制台生产者

Update

我可以使用以下命令在控制台生成器上启用幂等性--request-required-acks -1回复中建议的选项。

但是,我收到 ClusterAuthorizationException。

bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list borker1:6667 --topic my_topic --producer-property enable.idempotence=true  --request-required-acks -1  --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>[2018-12-26 04:00:56,074] ERROR [Producer clientId=console-producer] Aborting producer batches due to fatal error (org.apache.kafka.clients.producer.internals.Sender)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.
[2018-12-26 04:00:56,080] ERROR Error when sending message to topic orm_c1_prv_non_sepa_ci with key: 4 bytes, value: 6 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster authorization failed.

仅当启用幂等选项时才会发生此异常。没有此选项也可以生成消息。

bash$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list broker1:6667 --topic my_topic --security-protocol SASL_PLAINTEXT --property "parse.key=true" --property "key.separator=:"
>key1:value1
>key2:value2

我缺少什么?


您无法设置acks通过producer-property对于 ConsoleProducer。使用request-required-acks相反,如下所示:

bin/kafka-console- Producer.sh --broker-list localhost:9092 --topic test --生产者属性enable.idempotence = true --request-required-acks -1

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 kafka-console- Producer 中启用幂等性? 的相关文章

  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 频繁出现“offset out of range”消息,分区被消费者抛弃

    我们正在运行 3 节点 Kafka 0 10 0 1 集群 我们有一个消费者应用程序 它有一个连接到多个主题的消费者组 我们在消费者日志中看到奇怪的行为 有了这些线 Fetch offset 1109143 is out of range
  • Spring Kafka - 为任何主题的分区消耗最后 N 条消息

    我正在尝试读取请求的卡夫卡消息数 对于非事务性消息 我们将从 endoffset N 对于 M 个分区 开始轮询并收集当前偏移量小于每个分区的结束偏移量的消息 对于幂等 事务消息 我们必须考虑事务标记 重复消息 这意味着偏移量将不连续 在这
  • 为什么我无法从外部连接到 Kafka?

    我在 ec2 实例上运行 kafka 所以amazon ec2实例有两个ip 一个是内部ip 第二个是外部使用的 我从本地计算机创建了生产者 但它重定向到内部 IP 并给我连接不成功的错误 任何人都可以帮助我在 ec2 实例上配置 kafk
  • kafka新版本2.1.0 Broker无故挂起

    起初 集群中的所有代理都可以启动并正常工作 但有时其中一个代理会遇到问题 并且会出现一些现象 整个集群挂了 生产者和消费者也不工作 因此从监视器来看网络流量降至零 使用kafka topic sh描述主题消息 每个副本都很好 即使是异常的b
  • 如何有效地将数据从 Kafka 移动到 Impala 表?

    以下是当前流程的步骤 Flafka http blog cloudera com blog 2014 11 flafka apache flume meets apache kafka for event processing 将日志写入
  • 未能在kafka-storm中将偏移量数据写入zookeeper

    我正在设置一个风暴集群来计算实时趋势和其他统计数据 但是我在将 恢复 功能引入到这个项目中时遇到了一些问题 方法是允许上次读取的偏移量kafka spout 源代码为kafka spout来自https github com apache
  • 如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?

    我想使用 Python PySpark 从 Kafka 源到 MariaDB 进行 Spark 结构化流处理 Spark 2 4 x 我想使用流式 Spark 数据帧 而不是静态数据帧或 Pandas 数据帧 看来必须要用foreach o
  • Kafka Streams 反序列化处理程序

    我正在尝试在反序列化中使用 LogAndContinueExceptionHandler 当发生错误时 通过成功记录错误并继续 它可以正常工作 但是 假设我的传入消息有连续的错误流 我停止并重新启动 kafka 流应用程序 然后我看到失败并
  • 从kafka获取特定时间段的结果

    这是我的代码 它使用kafka python now datetime now month ago now relativedelta month 1 topic some topic name consumer KafkaConsumer
  • Windows下Kafka托管在Docker中删除主题时出现异常

    我在 Windows 的 Docker 中托管 Kafka 威斯迈斯特 卡夫卡 https hub docker com r wurstmeister kafka 使用 docker 镜像 Kafka 数据存储在本地 Windows 文件夹
  • 卡夫卡监听器中的钩子

    kafka 监听消息之前 之后是否有任何类型的钩子可用 使用案例 必须设置MDC关联id才能进行日志溯源 我在寻找什么 之前 之后回调方法 以便可以在进入时设置 MDC 关联 ID 并最终在退出时清除 MDC 编辑后的场景 我将关联 id
  • 是否有任何模拟器/工具可以生成流式传输消息?

    出于测试目的 我需要模拟客户端每秒生成 100 000 条消息并将它们发送到 kafka 主题 有没有任何工具或方法可以帮助我生成这些随机消息 有一个用于生成虚拟负载的内置工具 位于bin kafka producer perf test
  • Kafka Consumer 如何(应该)应对有毒消息

    当 Kafka Consumer 无法反序列化消息时 客户端应用程序是否有责任处理有毒消息 Or Kafka是否会 增加 消息偏移并继续消费有效消息 是否有处理 Kafka 主题上的有毒消息的 最佳实践 当 Kafka 无法反序列化记录时
  • 为什么kafka中的__consumer_offsets主题没有传播到所有经纪人?

    我有一个3 zk节点集群 和7 卡夫卡经纪人 nodes 因此 当我创建任何主题时 我可以使用命令行参数设置副本因子和分区数 这些分区分布到所有 7 个经纪商 但是有一个主题 即 consumer offsets 它是自动创建的 并且仅传播
  • kafka中的Bootstrap服务器与zookeeper?

    为什么在 kafka consumer 中不推荐使用 Zookeeper 以及为什么建议使用 bootstrap 服务器 bootstrap server 有什么优点 Kafka消费者需要将偏移量提交给kafka并从kafka获取偏移量 由

随机推荐

  • 如何使浮动内部div与最高div的高度相同

    在下面的代码中 我希望带有 y 的 div 与带有 3 个 x 的 div 的高度相匹配 div style border 0px solid red margin 0px 0px 5px div style border 1px soli
  • 如何使用 Eclipse 运行 testng 工厂?

    我正在使用 eclipse 2018 09 4 9 0 和 testng 插件 版本 6 14 0 201802161500 我创建了一个 Maven 项目来从教程中学习 testng 我想在 Eclipse 中运行 testng 工厂方法
  • 适用于 Mac App Store 的应用程序在签名后无法运行

    我是 Mac App Store 开发的新手 我正在尝试在开发环境中测试我的第一个签名应用程序 就在进入混乱的收据验证之前 我经历了以下步骤 1 我创建了3个证书 a Mac 应用程序 b Mac 安装程序 c 开发 如果我不创建此证书 则
  • xamarin 形成 DatePicker 取消/确定事件

    我找到并尝试了一个针对 Android Xamarin Forms 的自定义渲染 DatePicker 示例 并且不显示在 UnFocus 中单击了哪个按钮 至少不适合我 它来自 stackoverflow Xamarin Forms An
  • 使用 sFTP 服务面向 Azure 存储 blob

    我们需要创建大型 1G 16G 行数据报告并对其进行压缩和加密 我们的客户将通过 sFTP 使用这些报告 我们正在替换现有的实施 因此我们的客户应该透明地获得此更改 Azure Blob 服务不公开 sFTP 服务 因此我们需要某种方法来使
  • 何时在 Dialogflow 中使用用户实体?

    In 对话流 API ai 中的数据上下文在哪里 https stackoverflow com questions 47591980 where is the data context in dialog flow api ai我问如何保
  • CKEditor:删除“链接类型”选项,但将 URL 设置为默认链接类型?

    我正在使用 CKEditor 的链接插件 并且尝试删除 链接类型 选项 以便用户可以在 URL 字段中输入地址 而不必设置 链接类型 选项 当我使用下面的代码时 它会删除 链接类型 选项 但是当您尝试单击它创建的链接时 它不会按预期打开链接
  • Matplotlib:调整图例位置/位置

    我正在创建一个具有多个子图的图形 这些子图之一给我带来了一些麻烦 因为轴角或中心都没有空闲 或可以释放 来放置图例 我想做的是将图例放置在 左上 和 左中 位置之间的某个位置 同时保持它和 y 轴之间的填充等于其他子图中的图例 即使用预定义
  • 从 Java 调用 clojure (Clojure Interop)

    从 Clojoure 调用 Java 非常简单明了 但事实证明反过来是不可预测的 他们似乎有两种方法 1 以下课程 i import clojure java api Clojure ii import clojure lang IFn 2
  • MySQL 说:文档 #1045 - 用户“root”@“localhost”访问被拒绝(使用密码:NO)

    我安装了 xampp 但是当我尝试运行它时出现错误 如下所示 Error MySQL 说 文档 1045 用户 root localhost 的访问被拒绝 使用密码 NO 配置中定义的 controluser 连接失败 phpMyAdmin
  • 是否可以等到 toast 完成后再恢复该方法?

    在我的一种方法中 我有一个toast如果用户给出正确的输入 就会出现 但是 我不希望在吐司完成之前显示下一张图像 如果我使用Thread sleep 3000 如果不允许toast当 UI 活动处于睡眠状态时显示 我正在尝试做的一个例子 p
  • 如何定义稍后在 Julia 中共享的全局变量

    我的文件中有一个模块全球 jl它定义了一个名为 data 的全局多维数组 module Global export data GLOBAL DATA ARRAY data zeros Int32 20 12 31 24 60 5 end 我
  • ASP.NET - 在 Page_Preinit() 或 Page_Init() 与 Page_Load() 中创建的动态控件

    在 ASP NET 中创建动态控件的最佳位置在哪里 MSDN http msdn microsoft com en us library ms178472 aspx另一个说 Pre initMSDN 文章 http msdn microso
  • .sendkeys 方法无法使用 Python Selenium 上传文件

    我正在尝试自动化 Facebook 市场帖子 但我很难上传图片 我已经找到了该元素 当我单击该元素时 它将显示显示文件管理器的 框 以便我可以单击文件夹 然后单击所需的图像 ele wait until EC element to be c
  • Jquery返回值

    我用了一段代码 jQuery fn MyFunction function return this each function attributes test return attributes 但当我打电话时 var1 this MyFu
  • 如何在表格布局中应用行跨度?

    如何将 row span 应用于 TableLayout 我想做一些类似于这张图片的东西 但我不知道该怎么做 制作这样的东西的正确方法是什么 TableLayout不支持行跨度 仅支持列跨度 GridLayout支持行跨度和列跨度 图片来自
  • Google 附近地点搜索

    我想知道我们是否可以通过给出固定点周围的半径参数来获取附近的位置 假设我只想获取某个特定位置直径 10 公里内的附近位置 我可以使用谷歌API来做到这一点吗 或者 我必须为此使用其他东西吗 from http code google com
  • Firebase 将数据保存为 List,有时保存为 Map 对象

    我将键保存为数值的数据 其中键是用户尝试的问题 如果用户尝试了所有问题 则它可能是连续的 例如 键可能是 0 1 2 3 这由 Firebase 保存为数组 如图所示 JSON is parthgupta48 gmail com attem
  • Angular onPush 不会从父级更新子属性

    我有一个如下所示的子组件 Component selector app child changeDetection ChangeDetectionStrategy OnPush template text export class Chil
  • 如何在 kafka-console- Producer 中启用幂等性?

    我正在尝试在 kafka console Producer 上启用 幂等 选项 参考以下链接 https gerardnico com dit kafka Producer idempot https gerardnico com dit