Kafka 生产者超时异常

2024-02-23

我正在运行 Samza 流作业,将数据写入 Kafka 主题。 Kafka 正在运行一个 3 节点集群。 Samza 作业部署在纱线上。我们在容器日志中看到很多这样的异常:

 INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[ContainerHeartbeatMonitor:stop:61] - [main] - Stopping ContainerHeartbeatMonitor
ERROR [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.runtime.LocalContainerRunner:[LocalContainerRunner:run:107] - [main] - Container stopped with Exception. Exiting process now.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:147)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:694)
        at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:104)
        at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:149)
Caused by: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:181)
        at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
        at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for Topic3-16 due to 30332 ms has passed since last attempt plus backoff time

这 3 种类型的异常出现很多。

59088 org.apache.kafka.common.errors.TimeoutException: Expiring 115 record(s) for Topic3-1 due to 30028 ms has passed since last attempt plus backoff time

61015 org.apache.kafka.common.errors.TimeoutException: Expiring 60 record(s) for Topic3-1 due to 74949 ms has passed since batch creation plus linger time

62275 org.apache.kafka.common.errors.TimeoutException: Expiring 176 record(s) for Topic3-4 due to 74917 ms has passed since last append

请帮助我了解这里的问题是什么。每当发生这种情况时,Samza 容器都会重新启动。


该错误表明某些记录放入队列的速度比从客户端发送的速度快。

当您的生产者发送消息时,它们会存储在缓冲区中(在将消息发送到目标代理之前),并且记录会分组在一起以提高吞吐量。当新记录添加到批次中时,必须在可配置的时间窗口内发送,该时间窗口由request.timeout.ms(默认设置为 30 秒)。如果该批次在队列中的时间较长,则会出现TimeoutException被抛出,然后批处理记录将从队列中删除,并且不会传递给代理。

增加价值request.timeout.ms应该可以帮到你。

如果这不起作用,您也可以尝试减少batch.size以便更频繁地发送批次(但这次将包含更少的消息)并确保linger.ms设置为 0(这是默认值)。

请注意,更改任何配置参数后,您需要重新启动 kafka 代理。

如果您仍然收到错误消息,我认为您的网络出现问题。您启用了 SSL 吗?

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 生产者超时异常 的相关文章

  • 如何使用rest api设置kafka连接auto.offset.reset

    我创建了一个接收器 kafka 连接 将数据转换为其他存储 我想设置auto offset reset as latest当新连接器创建时kafka connect rest api 我已经设定consumer auto offset re
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • Kafka Consumer 无法加载任何密钥库类型和路径的 SSL 密钥库(Logstash ArcSight 模块)

    我需要为 Kafka Consumer 提供客户端身份验证证书 但是 它总是失败并出现以下异常 无法加载 SSL 密钥库 ssl cipher suites null ssl enabled protocols TLSv1 2 TLSv1
  • 使用表白名单选项更新 Debezium MySQL 连接器

    我正在使用 Debezium 0 7 5 MySQL 连接器 并且我试图了解如果我想使用以下选项更新此配置 最好的方法是什么table whitelist 假设我创建了一个连接器 如下所示 curl i X POST H Accept ap
  • 使用 kafka java api 的 Avro 序列化器和反序列化器

    Kafka Avro 序列化器和反序列化器无法工作 我尝试使用 kafka 控制台消费者消费消息 我可以看到发布的消息 public class AvroProducer
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • 卡夫卡主题查看器? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我想调试一些 Kafka 主题 这样我就知道消费者或生产者是否有问题 Kafka 是否有一个 UI 我
  • 为每个键使用主题中的最新值

    我有一个 Kafka 生产者 它正在以高速率生成消息 消息键是用户名 值是他在游戏中的当前分数 Kafka消费者处理消费消息的速度相对较慢 在这里 我的要求是显示最新的分数并避免显示陈旧的数据 但代价是某些分数可能永远不会显示 本质上 对于
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • 使用 Spring Embedded Kafka 测试 @KafkaListener

    我正在尝试为我正在使用 Spring Boot 2 x 开发的 Kafka 侦听器编写单元测试 作为一个单元测试 我不想启动一个完整的 Kafka 服务器作为 Zookeeper 的实例 所以 我决定使用 Spring Embedded K
  • 如何检测 KTable 连接的哪一侧触发了更新?

    当您在 Kafka 中连接两个表时 每次更新两个 KTable 之一时 您的输出 Ktable 也会更新 想象一下你正在加入Customers与一个列表Orders你已经适当减少了 再次想象一下 您使用此连接的结果来为最终客户提供特别优惠和
  • Kafka不启动空白输出

    我正在努力安装 Kafka 和 Zookeeper 我已经运行了 Zookeeper 并且它当前正在运行 我将所有内容设置为 https dzone com articles running apache kafka on windows
  • Confluence 平台与 apache kafka [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我是 kafka 的新手 对 Confluence 平台很好奇 看来Confluence平台上的用户故事并不多 Confluence平台和Apa
  • 创建 Kafka 主题导致没有领导者

    我正在使用 Kafka v0 9 0 1 Scala v2 11 和com 101tec zkclientv0 7 我正在尝试使用AdminUtils创建一个kafka主题 我的代码如下 String zkServers node1 218
  • Zookeeper + Kafka - 无法创建数据目录

    我在单节点中使用zookeeper 3 4 8并尝试使用kafka 当我运行这个命令时 zookeeper server start sh usr local kafka 2 9 2 0 8 2 2 config zookeeper pro
  • Kafka中如何同时实现分布式处理和高可用?

    我有一个由 n 个分区组成的主题 为了进行分布式处理 我创建了两个在不同机器上运行的进程 他们使用相同的 groupd id 订阅主题并分配 n 2 个线程 每个线程处理单个流 每个进程 n 2 个分区 这样我就可以实现负载分配 但现在如果
  • Kafka 是否保证具有任何配置参数值的单个分区内的消息排序?

    如果我在 Producer 中将 Kafka 配置参数设置为 1 retries 3 2 max in flight requests per connection 5 那么一个分区内的消息很可能不按 send order 排列 Kafka
  • Kafka Streams 反序列化处理程序

    我正在尝试在反序列化中使用 LogAndContinueExceptionHandler 当发生错误时 通过成功记录错误并继续 它可以正常工作 但是 假设我的传入消息有连续的错误流 我停止并重新启动 kafka 流应用程序 然后我看到失败并
  • Kafka 0.10 Java 客户端超时异常:包含 1 条记录的批次已过期

    我有一个单节点 多 3 个代理 Zookeeper Kafka 设置 我正在使用 Kafka 0 10 Java 客户端 我编写了以下简单的远程 在与 Kafka 不同的服务器上 生产者 在代码中我用 MYIP 替换了我的公共 IP 地址
  • Apache Kafka 中消费者消费消息的延迟

    我正在使用 Kafka 0 8 0 并尝试实现下面提到的场景 JCA API 充当生产者并将数据发送到 gt 消费者 gt HBase 一旦我使用 JCA 客户端获取数据 我就会将每条消息发送给消费者 例如 一旦生产者发送消息 no 1 我

随机推荐

  • 如何设置shell脚本的进程组

    如何设置shell脚本的进程组 我还希望所有子进程都位于同一个进程组中 我期望类似的东西setpgid in C As 普斯科西克指出 https stackoverflow com a 45112755在大多数 shell 中 通过激活作
  • 使用 WebFlux 的 Spring Boot 在测试中总是抛出 403 状态

    非常感谢您查看我的问题 我有一些奇怪的主题 我的 Spring Boot 测试不起作用 它们启动成功 但在向任何控制器发出请求时总是抛出 403 HTTP 状态 我有一些具有下一个依赖项的项目 buildscript ext kotlin
  • 多索引数据框 pandas 中的操作

    我需要处理大数据 csv 中的地理和统计数据 它包含来自地理行政和地统计的数据 城市 区位 地统计基本区划和区块构成层次指标 我必须为地理索引中数据的最大值的每个元素创建一个新列 data2 并将每个块值除以该值 对于每个索引级别 索引级别
  • 如何在不使用库(Metrics)的情况下制作MAE和RAE的函数?

    我的目标是创建平均绝对误差 MAE 和相对绝对误差 RAE 的函数 而不使用任何类型的库 例如库 Metrics 我尝试在 MAE 和 RAE 的函数内输入公式 mae lt function a b mean abs a b rae lt
  • Android WebView HTTP Cookie 在 API 21 中不起作用

    我有一个使用 WebView 和 HTTP cookie 的 Android 应用程序 此应用程序适用于运行 API 19 或更低版本的 Android 设备 API 21 不会保存 http cookie 以供以后参考 Android W
  • ffmpeg 连接并保留元数据流

    我正在尝试连接 GoPro Hero6 分割电影的多个文件以避免 FAT 4GB 限制 ffmpeg 非常适合此目的 但我需要在元数据流中编码的遥测数据 而 ffmpeg 默认情况下似乎不保留此数据 使用ffprobe命令你可以看到源视频有
  • 如何在 MDriven 中设置日期和时间选择器?

    我试图在 MDriven 中捕获日期和时间 但数据类型 DateTime 的默认值仅显示日期选择器 在 Web 中 但时间存储在持久层中 我又如何捕捉时间 我在wiki mdriven net https wiki mdriven net
  • 在xml中定义没有class属性的bean

    我是 Spring 世界的新手 在一次采访中 有人问我们是否可以在 XML 中创建一个 bean 而不指定class 也就是说 bean 只会有一个id属性 我对此没有答案 请告知我们是否可以在 Spring 中以 XML 形式创建一个 b
  • Android NDK - 在配置更改时强制库重建

    在 Eclipse 中更改构建配置时 有没有办法强制 Android NDK 重建特定库 我正在使用 Android NDK 构建一个 Android 项目来构建 C 库 我正在使用带有 Sequoyah 插件的 Eclipse 一切都已设
  • 是否有现有的 gem 或脚本可以将数字转换为 comp-3/压缩十进制格式?

    继续我将 COBOL 转换为 Ruby 程序的冒险 我必须将十进制数字转换为 comp 3 压缩十进制格式 有人知道一个简单的 Ruby 脚本或 gem 可以做到这一点吗 伯恩斯 Ruby 知道如何打包半字节 因此结果非常简单 def pa
  • fparsec 解析字符串序列

    我有一个用户输入文本 例如 abc def ghi 我想解析它以获取字符串列表 abc def I tried let str Parser lt gt many1Chars noneOf let listParser Parser lt
  • 如何识别访客用户的时间比会话通常存在的时间长

    我知道 我可以使用 Session getId 但它会随着时间的推移而改变 也许我不明白这些会议 据我所知 它在 php 运行时启动 并在 php 代码完成时删除 另一方面 我读到会话 ID 存储在 cookie 中 当用户再次打开您的网站
  • 使用类实例作为 Typescript 映射中的键

    当获取和设置映射值时 映射必须以某种方式知道键是否等于另一个已设置的键 如何在 Typescript 中实现复杂数据类型 自定义类 的相等性 在Java中我会重写equals方法 打字稿中有等价的方法吗 就我而言 我有以下课程 export
  • cplex boolVarArray 给出双精度值

    我一直在尝试使用 CPLEX Java 实现 ILP 并且长期以来一直被一个问题困扰 以下是 ILP 的几个变量 IloIntVar above new IloIntVar numRect IloIntVar below new IloIn
  • 酿造安装 nvm。 nvm:找不到命令

    使用brew安装nvm并运行后nvm 它说nvm command not found 我怎样才能得到要执行的命令 使用brew 安装nvm 有两个步骤 首先使用brew安装应用程序 brew install nvm 然后查看brew 信息的
  • 我可以使用ASP.Net Core 3.0中的IEmailSender接口向多个接收者发送电子邮件吗

    我是 ASP Net core 的初学者 实际上我正在使用 ASP Net Core 3 0 我想向多个收件人发送电子邮件 我可以使用IEmailSender接口吗 或者有什么建议吗 我的 IEmailSender 实现是这样的 publi
  • 根据太阳位置(方位角和仰角)以及纬度和经度计算日期和时间

    与此相关非常有帮助question https stackoverflow com questions 8708048 position of the sun given time of day latitude and longitude
  • 应包含哪个 aSmack jar 文件(android-14、android-15...)以支持 SDK 版本 14-19?

    我陷入了两个不同的错误之间 无法实例化活动 ComponentInfo https stackoverflow com questions 16610296 android unable to instantiate activity cl
  • 为什么rvm需要登录shell?

    据我所知 rvm是一组bash脚本 为什么需要登录 shell 哪些仅存在于登录 shell 中的属性对于 rvm 是必需的 相关帖子 rvm 安装无法正常工作 RVM 不是一项功能 https stackoverflow com ques
  • Kafka 生产者超时异常

    我正在运行 Samza 流作业 将数据写入 Kafka 主题 Kafka 正在运行一个 3 节点集群 Samza 作业部署在纱线上 我们在容器日志中看到很多这样的异常 INFO 2018 10 16 11 14 19 410 U 2 151