为什么camel kafka Producer很慢?

2024-01-12

我使用 apache camel kafka 作为生成消息的客户端,我观察到 kafka 生产者需要 1 毫秒才能推送一条消息,如果我使用骆驼聚合将消息合并到批处理中,那么推送一条消息需要 100 毫秒。

安装简述 3 kafka 集群 16 核 32GB RAM

示例代码

    String endpoint="kafka:test?topic=test&brokers=nodekfa:9092,nodekfb:9092,nodekfc:9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536";      
    Message message = new Message();
    String payload = new ObjectMapper().writeValueAsString(message);
    StopWatch stopWatch = new StopWatch();
    stopWatch.watch();
    for (int i=0;i<size;i++)
    {
        producerTemplate.sendBody(endpoint,ExchangePattern.InOnly, payload);
    }
    logger.info("Time taken to push {} message is {}",size,stopWatch.getElasedTime());

骆驼生产者端点

kafka:[topic]?topic=[topic]&brokers=[brokers]&maxInFlightRequest=1

尽管 kafka 文档吹嘘生产者 tps 约为 100,000,但我得到的吞吐量为 1000/s。

如果camel-kafka或kafka本身有任何错误,请告诉我。

生产者配置

     acks = 1
        batch.size = 65536
        bootstrap.servers = [nodekfa:9092, nodekfb:9092, nodekfc:9092]
        buffer.memory = 33554432
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 1
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retries = 0
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class  org.apache.kafka.common.serialization.StringSerializer


测试日志

DEBUG [2019-06-02 17:30:46,781]  c.g.p.f.u.AuditEventNotifier: >>> Took 3 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,781]  c.g.p.f.u.AuditEventNotifier: >>> Took 3 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,783]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,783]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,784]  c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,785]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,786]  c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,786]  c.g.p.f.u.AuditEventNotifier: >>> Took 1 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
DEBUG [2019-06-02 17:30:46,788]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis for the exchange on the route : null
DEBUG [2019-06-02 17:30:46,788]  c.g.p.f.u.AuditEventNotifier: >>> Took 2 millis to send to external system : kafka://test?brokers=nodekfa%3A9092%2Cnodekfb%3A9092%2Cnodekfc%3A9092&lingerMs=0&maxInFlightRequest=1&producerBatchSize=65536&topic=test by thead http-nio-8551-exec-6
INFO  [2019-06-02 17:30:46,788]  c.g.p.f.a.MessageApiController: Time taken to push 5 message is 10ms


显然,消息至少需要 1 毫秒,默认工作池最大大小为 20 ,如果我将压缩编解码器设置为 snappy 这将使性能最差。

让我知道我错过了什么!


我遇到了同样的问题,来自这封电子邮件https://camel.465427.n5.nabble.com/Kafka-Producer-Performance-tp5785767p5785860.html https://camel.465427.n5.nabble.com/Kafka-Producer-Performance-tp5785767p5785860.html I used https://camel.apache.org/manual/latest/aggregate-eip.html https://camel.apache.org/manual/latest/aggregate-eip.html创建批次并获得更好的性能

from("direct:dp.events")
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionSize(3)
.to(kafkaUri)
.to("log:out?groupInterval=1000&groupDelay=500")
.end();

I get :

 INFO  Received: 1670 new messages, with total 13949 so far. Last group took: 998 millis which is: 1,673.347 messages per second. average: 1,262.696

这是使用 1 个 Azure 事件中心,使用 Kafka 协议和一个分区。奇怪的是,当我使用另一个带有 5 个分区的 EH 时,与 1 个分区的示例相比,我的性能很差......

多个分区(更新)

通过增加workerPoolCoreSize和workerPoolMaxSize,除了向消息添加分区键并在发送到kafka端点之前添加聚合之外,我还能够每秒获取3K消息

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么camel kafka Producer很慢? 的相关文章

随机推荐

  • Kubernetes 部署与 StatefulSet

    我对 Kubernetes 做了很多挖掘 我很喜欢我所看到的 我一直无法清楚地了解的一件事是 Deployment 和 StatefulSet 资源之间的确切区别是什么 以及您将在哪些场景中使用它们 或者通常更喜欢其中一种 Deployme
  • 顺序对象是否由指定的 jQuery 选择器返回?

    所有 jQuery 选择器都会返回一个对象数组 这些对象的顺序是否始终与 HTML 中的顺序相同 我可以依靠这个吗 Yes The jQuery 1 3 2 发行说明 https blog jquery com 2009 02 20 jqu
  • 如何获得两个元素之间的距离(中点)?

    我需要你的帮助 我在它们之间放置了随机数量的 div div div class item Item description div div class item Item description div div class item It
  • Vim 突出显示 FORTRAN 中奇怪的部分

    我正在使用 VIM 主题 molokai 如果这有什么区别的话 我最近一直在学习 FORTRAN 当我使用 VIM 编写 FORTRAN 程序时 根据我的空白 我有奇怪的颜色 例如 如果我按原样 没有缩进 进行制表符 我只会在单词的一部分上
  • Enumerable.Range - 什么时候使用才有意义?

    编程时 几乎本能地决定何时使用 for 循环或 foreach 但是选择使用 Enumerable Range 的决定因素或问题空间是什么 A For Loop当我们想要迭代一定次数 通过简单的数据类型 来计算 执行重复任务时选择 A Fo
  • 在Java编译器中,哪种类型可以定义为标识符(ID)或关键字(保留字)?

    我有一个简单的问题 在Java编译器中 哪些类型的方法或变量可以被定义为标识符 ID 或关键字 保留字 对于以下示例 ID 应为 add main a b c Test1 关于什么print is printID 或关键字 Example
  • Postgres - 返回 2 个数组的交集的函数?

    在 postgresql 中 如果两个数组具有公共成员 即它们重叠 则可以使用 运算符返回 t true 是否有一个函数 运算符可以返回这些常见成员的内容 即像这样 select arrray intersection ARRAY 1 4
  • 如何从自定义发行版中采样?

    我有一个发行版的pdf 该分布不是标准分布 R 中不存在可从中采样的函数 如何使用 R 从此 pdf 中进行采样 这更多的是一个统计问题 因为它需要采样 但一般来说 您可以采用这种方法来解决问题 查找发行版f 其 pdf 当乘以任何给定常数
  • Hibernate 中的通用 DAO 模式

    在处理 Hibernate 时 我们遵循 Hibernate Doc 中提到的通用 Hibernate DAO 模式 因此 我们目前正在维护两个并行的层次结构 1 对于接口 2 实施 因此 如果我们以这种方式工作 即使除了标准持久性方法之外
  • 如何在 xampp windows [php 7.2] 中安装/启用 GD?

    我不知道如何为 PHP7 2 安装 php gd 有没有办法在 xampp windows 中安装 启用 GD 扩展 我检查了 php ini 文件php gd2 dll但我找不到那条线 PHP7 2 中似乎缺少 GD 有什么建议么 转到
  • 是否可以使用指向参数数量未知的函数的指针?

    我正在编写一个简单的类来衡量函数在时间方面的性能 用户应该能够发送指向他的函数的指针 函数的参数 调用该函数的时间以及我将调用该函数的时间 返回经过的时间 我的问题是我不知道用户的函数需要多少个参数 我想使用可变参数函数来获取未知数量的参数
  • 运行 tf.estimator.train 100 步时,在张量板上仅看到一个步骤

    我有一个通过我自己的自定义构建的自定义估算器model fn 我想跑train并在张量板上查看每个步骤的数据点 但是 无论步骤数如何 每次调用我都只能看到一个数据点 以下是我构建和训练估算器的方法 estimator tf estimato
  • 以给定概率获取伪随机项

    我想在用户登录时给他一个奖品 但它需要有一些稀有的奖品 所以我想使用百分比以不同的机会出现奖品 我想显示其中之一 50 flower 30 book 20 mobile 使用他们拥有的百分比 如果有任何方法使用 Node js 或只是 ja
  • 我应该如何在 ECS 上设置 Traefik?

    简而言之 我已经成功跑了Traefik本地及上AWS ECS但现在我想知道应该如何设置某种负载平衡 以使我的两个具有随机 IP 的服务可供公众使用 我当前在 ECS 上的设置 Internet Load balancer on port 4
  • gcloud 未添加用于连接 GKE 集群的访问令牌

    我创建了一个 GKE 集群并使用以下命令连接到它kubectl运行针对我的集群单击 连接 按钮时出现的命令 gcloud container clusters get credentials cluster name zone us cen
  • SpringServletContainerInitializer 无法转换为 javax.servlet.ServletContainerInitializer

    我正在尝试将基于 xml 的 Spring MVC 应用程序移动到基于 Java 配置的应用程序 似乎与 maven 中可用的各种 java servlet 类不匹配 例如 有些提供 addServlet 方法 有些则不提供 这是我的配置类
  • VS 测试在管道中失败,缺少“Microsoft.NET.Test.Sdk”

    由于以下原因 我的构建失败了视觉工作室测试我的构建管道中的步骤失败 我有一个简单的 NET Core v2 1 类库和关联的 MS 测试库 我的管道有两个步骤 NET Core 构建步骤 以及 Visual Studio 测试步骤 这两个项
  • 将字符串移动到向量中

    有没有办法move将 std string 的内容转换为 std vector 我认为现在语言中有右值引用 这个操作有时会非常有用 It is 理论上可以从一种对象类型移动到另一种对象类型 然而 这些对象类型的设计必须允许这样做 vecto
  • 调整闪亮控件的标签位置

    令我惊讶的是 StackOverflow 上以前没有出现过这个问题 但无论如何 问题是 目前 标签文本 年龄范围 在此处指定 sliderInput inputId age Age Range min 32 max 99 value c 3
  • 为什么camel kafka Producer很慢?

    我使用 apache camel kafka 作为生成消息的客户端 我观察到 kafka 生产者需要 1 毫秒才能推送一条消息 如果我使用骆驼聚合将消息合并到批处理中 那么推送一条消息需要 100 毫秒 安装简述 3 kafka 集群 16