如何使用 Kafka 发送大消息(超过 15MB)?

2024-05-27

我发送字符串消息到Kafka V. 0.8使用 Java Producer API。 如果消息大小约为 15 MB,我会得到MessageSizeTooLargeException。 我尝试过设置message.max.bytes到 40 MB,但我仍然遇到异常。小消息没有问题。

异常出现在生产者中,我在这个应用程序中没有消费者。

我怎样才能摆脱这个异常?

我的示例生产者配置

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

错误日志:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)

您需要调整三个(或四个)属性:

  • 消费端:fetch.message.max.bytes- 这将确定消费者可以获取的消息的最大大小。
  • 经纪商方面:replica.fetch.max.bytes- 这将允许代理中的副本在集群内发送消息并确保消息正确复制。如果这个值太小,那么消息将永远不会被复制,因此,消费者将永远不会看到该消息,因为该消息永远不会被提交(完全复制)。
  • 经纪商方面:message.max.bytes- 这是代理可以从生产者接收的消息的最大大小。
  • 经纪人方(每个主题):max.message.bytes- 这是代理允许附加到主题的最大消息大小。该尺寸经过预压缩验证。 (默认为经纪商message.max.bytes.)

我发现了关于第二点的困难方法 - 你不会从 Kafka 收到任何异常、消息或警告,所以当你发送大消息时一定要考虑这一点。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用 Kafka 发送大消息(超过 15MB)? 的相关文章

  • Spring Batch 多线程 - 如何使每个线程读取唯一的记录?

    这个问题在很多论坛上都被问过很多次了 但我没有看到适合我的答案 我正在尝试在我的 Spring Batch 实现中实现多线程步骤 有一个包含 100k 条记录的临时表 想要在 10 个线程中处理它 每个线程的提交间隔为 300 因此在任何时
  • Java - 将节点添加到列表的末尾?

    这是我所拥有的 public class Node Object data Node next Node Object data Node next this data data this next next public Object g
  • 如何获取 Kafka 偏移量以进行结构化查询以进行手动且可靠的偏移量管理?

    Spark 2 2引入了Kafka的结构化流源 据我了解 它依赖 HDFS 检查点目录来存储偏移量并保证 恰好一次 消息传递 但是旧码头 比如https blog cloudera com blog 2017 06 offset manag
  • Spark 1.3.1 上的 Apache Phoenix(4.3.1 和 4.4.0-HBase-0.98)ClassNotFoundException

    我正在尝试通过 Spark 连接到 Phoenix 并且在通过 JDBC 驱动程序打开连接时不断收到以下异常 为简洁起见 下面是完整的堆栈跟踪 Caused by java lang ClassNotFoundException org a
  • 反射找不到对象子类型

    我试图通过使用反射来获取包中的所有类 当我使用具体类的代码 本例中为 A 时 它可以工作并打印子类信息 B 扩展 A 因此它打印 B 信息 但是当我将它与对象类一起使用时 它不起作用 我该如何修复它 这段代码的工作原理 Reflection
  • JavaMail 只获取新邮件

    我想知道是否有一种方法可以在javamail中只获取新消息 例如 在初始加载时 获取收件箱中的所有消息并存储它们 然后 每当应用程序再次加载时 仅获取新消息 而不是再次重新加载它们 javamail 可以做到这一点吗 它是如何工作的 一些背
  • Spring Data JPA 应用排序、分页以及 where 子句

    我目前正在使用 Spring JPA 并利用此处所述的排序和分页 如何通过Spring data JPA通过排序和可分页查询数据 https stackoverflow com questions 10527124 how to query
  • 磁模拟

    假设我在 n m 像素的 2D 表面上有 p 个节点 我希望这些节点相互吸引 使得它们相距越远吸引力就越强 但是 如果两个节点之间的距离 比如 d A B 小于某个阈值 比如 k 那么它们就会开始排斥 谁能让我开始编写一些关于如何随时间更新
  • Mockito when().thenReturn 不必要地调用该方法

    我正在研究继承的代码 我编写了一个应该捕获 NullPointerException 的测试 因为它试图从 null 对象调用方法 Test expected NullPointerException class public void c
  • AWS 无法从 START_OBJECT 中反序列化 java.lang.String 实例

    我创建了一个 Lambda 函数 我想在 API 网关的帮助下通过 URL 访问它 我已经把一切都设置好了 我还创建了一个application jsonAPI Gateway 中的正文映射模板如下所示 input input params
  • 在 Mac 上正确运行基于 SWT 的跨平台 jar

    我一直致力于一个基于 SWT 的项目 该项目旨在部署为 Java Web Start 从而可以在多个平台上使用 到目前为止 我已经成功解决了由于 SWT 依赖的系统特定库而出现的导出问题 请参阅相关thread https stackove
  • Eclipse Java 远程调试器通过 VPN 速度极慢

    我有时被迫离开办公室工作 这意味着我需要通过 VPN 进入我的实验室 我注意到在这种情况下使用 Eclipse 进行远程调试速度非常慢 速度慢到调试器需要 5 7 分钟才能连接到远程 jvm 连接后 每次单步执行断点 行可能需要 20 30
  • 如何从终端运行处理应用程序

    我目前正在使用加工 http processing org对于一个小项目 但是我不喜欢它附带的文本编辑器 我使用 vim 编写所有代码 我找到了 pde 文件的位置 并且我一直在从 vim 中编辑它们 然后重新打开它们并运行它们 重新加载脚
  • Android 中麦克风的后台访问

    是否可以通过 Android 手机上的后台应用程序 服务 持续监控麦克风 我想做的一些想法 不断聆听背景中的声音信号 收到 有趣的 音频信号后 执行一些网络操作 如果前台应用程序需要的话 后台应用程序必须能够智能地放弃对麦克风的访问 除非可
  • 静态变量的线程安全

    class ABC implements Runnable private static int a private static int b public void run 我有一个如上所述的 Java 类 我有这个类的多个线程 在里面r
  • 在 Maven 依赖项中指定 jar 和 test-jar 类型

    我有一个名为 commons 的项目 其中包含运行时和测试的常见内容 在主项目中 我添加了公共资源的依赖项
  • 当我从 Netbeans 创建 Derby 数据库时,它存储在哪里?

    当我从 netbeans 创建 Derby 数据库时 它存储在哪里 如何将它与项目的其余部分合并到一个文件夹中 右键单击Databases gt JavaDB in the Service查看并选择Properties This will
  • 按日期对 RecyclerView 进行排序

    我正在尝试按日期对 RecyclerView 进行排序 但我尝试了太多的事情 我不知道现在该尝试什么 问题就出在这条线上适配器 notifyDataSetChanged 因为如果我不放 不会显示错误 但也不会更新 recyclerview
  • Spring Boot @ConfigurationProperties 不从环境中检索属性

    我正在使用 Spring Boot 1 2 1 并尝试创建一个 ConfigurationProperties带有验证的bean 如下所示 package com sampleapp import java net URL import j
  • 使用 xpath 和 vtd-xml 以字符串形式获取元素的子节点和文本

    这是我的 XML 的一部分

随机推荐

  • 在 iOS 上使用 OpenGL ES 2.0 进行实例化绘制

    简而言之 谁能确认是否可以使用内置变量gl InstanceID or gl InstanceIDEXT 在 iOS 上使用 OpenGL ES 2 0 的顶点着色器中GL EXT draw instanced启用 Longer 我想使用绘
  • 检查数组中是否有 3 个连续值高于某个阈值

    假设我有一个像这样的 np array a 1 3 4 5 60 43 53 4 46 54 56 78 有没有一种快速方法来获取 3 个连续数字都高于某个阈值的所有位置的索引 也就是说 对于某个阈值th 得到所有x其中 a x gt th
  • 让 Google 地图在刷新后保留缩放和居中?

    如何让 Google 地图保留用户的视图 缩放级别和 HTTP 刷新后 现在 它会在每次刷新后重置视图 我可以调整代码吗 下面说 zoom 当前缩放级别 和 center 当前中心 位置 以某种方式 function initialize
  • 如何在猫鼬中使用聚合

    如何在 mongoose 中定义以下 MongoDB 聚合查询 db contacts aggregate group id code Code name Name 查询的目的是获取不同代码和名称的列表 我当前的模型代码是 use stri
  • 域名 foo.bar 指向 127.0.53.53 ——为什么?

    我今天刚刚注意到域名 foo bar 解析为 127 0 53 53 http foo bar http foo bar http whois domaintools com foo bar http whois domaintools c
  • 打开我网站上的链接不起作用

    在我的网站上 我有一个我正在尝试获取工作的链接 我有一个遵循正常格式的链接 即 href 以 www youtube com 作为目标链接 并在末尾添加 target blank 当我单击该链接时 托管我的网站的网站会显示一条错误消息 当我
  • Python - 使用 BeautifulSoup 从 URL 列表中抓取文本的最简单方法

    使用 BeautifulSoup 从几个网页 使用 URL 列表 中抓取文本的最简单方法是什么 有可能吗 最好的 乔治娜 import urllib2 import BeautifulSoup import re Newlines re c
  • MATLAB - 从目录读取文件?

    我希望从目录中读取文件并对每个文件迭代执行操作 此操作不需要更改文件 我知道我应该为此使用 for 循环 到目前为止我已经尝试过 FILES ls path to folder for i 1 size FILES 1 STRU pdbre
  • 连接到 Docker Postgres 容器连接超时

    所以我所做的是 docker run d e POSTGRES USER user e POSTGRES PASSWORD 456789 name admin service p 5432 5432 postgres 当我检查时docker
  • pm2 start app.js 在 15 次重启后退出

    npm start 会很好地启动我的应用程序 但是当我这样做时 pm2 start app js I get PM2 Spawning PM2 daemon PM2 PM2 Successfully daemonized PM2 Proce
  • JavaFX 中的 MVC 模式与场景生成器

    我是 JavaFX 新手 根据我当前的设置 正在努力创建合适的 MVC 架构 我使用 Scene Builder 单击了一个 UI 并指定了一个 Controller 类 Startup public class Portal extend
  • GitHub Action 工作流程未运行

    我有一个 GitHub 操作工作流程文件 myrepo github workflows Build Webpage yml 它包含以下内容 name Webpage Build on push branches webpage jobs
  • 从 pandas 值序列创建 pandas 区间序列

    我能找到的最接近的答案似乎太复杂 如何在 pandas 中创建间隔列 https stackoverflow com a 47396828 575530 如果我有一个看起来像这样的 pandas 数据框 Value 6 12 56 60 1
  • 从多个 .csv 文件创建混淆矩阵

    我有很多具有以下格式的 csv 文件 338 800 338 550 339 670 340 600 327 500 301 430 299 350 284 339 284 338 283 335 283 330 283 310 282 3
  • exec()、shell_exec()、curl_exec() 的安全漏洞

    有时 我会使用 exec shell exec 和curl exec 以下是典型用途 假设其中有 PHP 变量 即第一个变量中的 html 用户有可能修改其内容 从安全漏洞的角度来看 我应该关注什么 escapeshellcmd 和 esc
  • 计算级别内的值

    我在 R 中生成了一组级别cut 例如假设 0 到 1 之间的小数值 分为 0 1 个区间 gt frac lt cut c 0 1 breaks 10 gt levels frac 1 0 001 0 1 0 1 0 2 0 2 0 3
  • 无法让我的脚本自动生成一些值以在有效负载中使用

    我创建了一个脚本 通过随后发送两个 https 请求来从目标页面获取 html 元素 我的脚本可以完美地完成这件事 但是 我必须从 chrome 开发工具复制四个值来填充其中的四个键payload为了发送最终的http请求到达目标页面 这是
  • 获取Android联系人排序首选项

    在 Android 的 联系人 gt 设置 中 我们有 列表排序依据 和 查看联系人姓名 选项 有什么方法可以在另一个应用程序中获取这些首选项吗 这意味着我的应用程序有我自己版本的联系人列表 我需要这些才能对其进行相应的排序 int sor
  • 如何删除 Google 地图自动完成下拉列表中的“Powered by Google”徽标?

    文档表明 如果我们要使用 Google 地图的自动完成功能 我们可以删除底部的 Powered by Google 徽标 知道该怎么做吗 阿尼梅什 南迪的答案有点旧了 实际上 要删除 由谷歌提供支持 徽标我必须使用以下代码 pac cont
  • 如何使用 Kafka 发送大消息(超过 15MB)?

    我发送字符串消息到Kafka V 0 8使用 Java Producer API 如果消息大小约为 15 MB 我会得到MessageSizeTooLargeException 我尝试过设置message max bytes到 40 MB