【注意】Kafka生产者异步发送消息仍有可能阻塞

2023-11-04

问题描述

Kafka是常用的消息中间件。在Spring Boot项目中,使用KafkaTemplate作为生产者发送消息。有时,为了不影响主业务流程,会采用异步发送的方式,如下所示。

@Slf4j
@Component
public class KafkaSender {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendAsync(String topic, String message) {
        kafkaTemplate.send(topic, message)
                .addCallback(
                        sendResult -> log.info("Send success"),
                        e -> log.error("Send failed", e));
    }
}

本以为采用异步发送,必然不会影响到主业务流程。但实际使用时发现,在第一次发送消息时,如果Kafka Broker连接失败,调用sendAsync()方法的主线程会长时间阻塞。这点是出乎意料的。

原因分析

跟踪源码可知,Kafka生产者在第一次发送消息时,会尝试从Broker获取元数据Metadata(见KafkaProducerwaitOnMetadata()方法),如果Broker连接失败,则会一直阻塞于此,循环尝试获取,直至超时(超时时间由max.block.ms定义)。

    /**
     * Wait for cluster metadata including partitions for the given topic to be available.
     * @param topic The topic we want metadata for
     * @param partition A specific partition expected to exist in metadata, or null if there's no preference
     * @param nowMs The current time in ms
     * @param maxWaitMs The maximum time in ms for waiting on the metadata
     * @return The cluster containing topic metadata and the amount of time we waited in ms
     * @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
     * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
     */
    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already and reset expiry
        Cluster cluster = metadata.fetch();

        if (cluster.invalidTopics().contains(topic))
            throw new InvalidTopicException(topic);

        metadata.add(topic, nowMs);

        Integer partitionsCount = cluster.partitionCountForTopic(topic);
        // Return cached metadata if we have it, and if the record's partition is either undefined
        // or within the known partition range
        if (partitionsCount != null && (partition == null || partition < partitionsCount))
            return new ClusterAndWaitTime(cluster, 0);

        long remainingWaitMs = maxWaitMs;
        long elapsed = 0;
        // Issue metadata requests until we have metadata for the topic and the requested partition,
        // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata
        // is stale and the number of partitions for this topic has increased in the meantime.
        do {
            if (partition != null) {
                log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
            } else {
                log.trace("Requesting metadata update for topic {}.", topic);
            }
            metadata.add(topic, nowMs + elapsed);
            int version = metadata.requestUpdateForTopic(topic);
            sender.wakeup();
            try {
                metadata.awaitUpdate(version, remainingWaitMs);
            } catch (TimeoutException ex) {
                // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
                throw new TimeoutException(
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs));
            }
            cluster = metadata.fetch();
            elapsed = time.milliseconds() - nowMs;
            if (elapsed >= maxWaitMs) {
                throw new TimeoutException(partitionsCount == null ?
                        String.format("Topic %s not present in metadata after %d ms.",
                                topic, maxWaitMs) :
                        String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
                                partition, topic, partitionsCount, maxWaitMs));
            }
            metadata.maybeThrowExceptionForTopic(topic);
            remainingWaitMs = maxWaitMs - elapsed;
            partitionsCount = cluster.partitionCountForTopic(topic);
        } while (partitionsCount == null || (partition != null && partition >= partitionsCount));

        return new ClusterAndWaitTime(cluster, elapsed);
    }

也就是说,Kafka生产者在发送消息前,要先获取到Metadata。对于异步发送,虽然消息发送的过程是非阻塞的,但获取Metadata的过程是阻塞的。如果因为Broker连接失败、Topic未创建等原因而一直获取不到Metadata,主线程将长时间阻塞。

解决办法

解决办法也很简单。如果Kafka发送消息并非关键业务,为了不影响主业务流程的进行,可以创建线程池来专门执行消息发送工作,保证sendAsync()方法一定是异步执行的。注意,线程池大小和工作队列长度需要合理限定,避免因阻塞任务过多而OOM;拒绝策略可以视情况选择DiscardPolicy。

另外,还可以考虑指定max.block.ms,来限制获取Metadata的最大阻塞时间(默认60000ms):

spring:
  kafka:
    producer:
      properties:
        max.block.ms: 1000

实际上,在异步发送消息的过程中,除了因为获取不到Metadata而阻塞外,还可能因为消息缓冲池已满而阻塞(参考:Kafka Producer 异步发送消息居然也会阻塞?)。这2种阻塞的超时时间均由max.block.ms定义。

总结

Kafka生产者异步发送消息的方法(如Spring Boot中的kafkaTemplate.send()),看似异步,实则可能阻塞。由于发送消息前需要获取元数据Metadata,如果一直获取失败(可能原因包括Broker连接失败、Topic未创建等),将导致长时间阻塞。这点与我们的一般理解不符,需要特别注意。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【注意】Kafka生产者异步发送消息仍有可能阻塞 的相关文章

随机推荐

  • shell编写yum安装监控zabbix脚本

    bin bash zabbix 5 0 安装zabbix源 修改为aliyun源 cd etc yum repos d rpm Uvh https repo zabbix com zabbix 5 0 rhel 7 x86 64 zabbi
  • 【hive】grouping sets函数 多维度数据生成

    我们经常会遇到这样的分析需求 要求按时间 日 月 季 半年 年 地域 大区 分公司 多维度组合来汇总分析结果 通常情况下需要编写繁琐的sql来实现 比如之前我们是这样做的 insert into table tmp tmp t1 selec
  • OpenCV代码提取 warpPerspective函数的实现

    For perspective transformation you need a 3x3 transformation matrix Straight lines will remain straight even after the t
  • 万众瞩目,谷歌的反击来了!全新PaLM 2反超GPT-4,办公全家桶炸裂升级,Bard史诗进化...

    Datawhale干货 最新 谷歌 PaLM 2 来源 量子位 万众瞩目 谷歌的反击来了 现在 谷歌搜索终于要加入AI对话功能了 排队通道已经开放 当然这还只是第一步 大的还在后面 全新大语言模型PaLM 2正式亮相 谷歌声称它在部分任务超
  • python 贪心算法解决找零钱问题

    target 99 找零钱目标数 money 5 29 10 5 2 1 纸币种类 number 0 0 0 0 0 0 纸币种类 for i in range 6 排循环 从最大面值开始考虑 number i target money i
  • 利用nodemcu和mqtt协议让嵌入式设备接入互联网(三.实现数据交互)

    文章目录 前言 KOA2框架 koa2初步使用 koa示例代码解释 koa static中间件 Aedes mqtt协议 用aedes创建broker并测试 MQTT server over WebSocket http协议和websock
  • 从开源组件安全看SCA软件成分分析技术

    1 基本概念 软件成分分析 SCA Software Composition Analysis 是一种对二进制软件的组成部分进行识别 分析和追踪的技术 专门用于分析开发人员使用的各种源码 模块 框架和库 以识别和清点开源软件 OSS 的组件
  • 2020美赛F奖论文(三):足球团队指标和基于机器学习的球队表现预测

    上接 2020美赛F奖论文 二 传球网络模型 PNM 的建立和影响因子分析 全文 2020美赛F奖论文 一 摘要 绪论和模型准备 2020美赛F奖论文 二 传球网络模型 PNM 的建立和影响因子分析 2020美赛F奖论文 三 足球团队指标和
  • 全面总结sizeof的用法(定义、语法、指针变量、数组、结构体、类、联合体、位域位段)

    一 前言 编译环境是vs2010 32位 span style font size 18px include span
  • 微信小程序图片自适应大小(炒鸡详细)

    开发小程序图片是非常重要的部分 小程序开发想要提升用户的体验感图片是必不可少的部分 因此对图片的各种操作也是咱不可或缺的技能 我遇到的坑 本来想放入一张好看的图片
  • 设计模式对程序员的必要性

    其实设计模式的应用基础是面向对象的程序设计方法 没有面向对象的理论 设计模式的实现就没有了基础 可能从方法论的角度说 设计模式是一种思想和具体的程序设计语言没有必然的联系 可软件最终还是要通过代码来实现 不同的语言体现了对面向对象理论的不同
  • Mybatis读取和存储json类型的数据

    目录 一 测试使用JSONObject来获取json 二 设置 TableName的autoResultMap为true TableField的typeHandler为JacksonTypeHandler class 三 设置xml当中的r
  • 3个权威免费资源下载网站!

    hello大家好 这里是预计今天可以到家的老Y工作室 因为这几天在出差 也没花太多时间帮大家搜罗一些好玩有趣的网站 等回家后 会把补一些软件和教程 有朋友之前问老Y有没有免费的标准下载网站或者查询 于是老Y找了3个给有需要的朋友分享一下 0
  • Redis初级篇

    Redis 视频地址 https www bilibili com video BV1Rv41177Af p 38 资料地址 https pan baidu com s 1GxYRq5UkZHKhk3KB0nOioQ q7vj 概述 Red
  • Windows C++多线程:生产者消费者模型编程

    Windows C 多线程 生产者消费者模型编程 生产者消费者模型是一种常见的并发编程模型 用于解决生产者和消费者之间的数据交互问题 在这个模型中 生产者负责生成数据并将其放入共享的缓冲区 而消费者则从缓冲区中获取数据进行处理 在Windo
  • 软件项目管理

    一 填空题 1 项目是为创造独特的产品 服务或成果而进行的临时性的工作 2 PMBOK 2016 将项目管理分为五个过程组 即启动 计划 执行 控制和收尾 与十大知识领域 整合管理 范围管理 时间管理 成本管理 质量管理 人力资源管理 沟通
  • 如何申请国内博士

    博士申请过程还算平坦 在申请过程中得到了很多学长学姐的帮助 为了将这份帮助传递下去 我便将我的经历写下来 希望可以帮到一些同学 先介绍一下自己的情况 专业是计算机 研究方向是深度学习 机器视觉 硕士是一所排名较低的211 博士最终申请的学校
  • Lodash 总结

    数组 数组创建 随机创建数组 range range start 0 end step 1 let arr range 24 console log arr 0 1 2 3 23 创建相同元素数组 fill fill array value
  • 数据通信-路由基础

    1 IP路由选择原理 路由器的工作内容 路由器知道目标地址 发现到达目标地址的可能的路由 选择最佳路径 路由表 维护路由信息 转发IP数据 IP路由表 初始化情况下 路由器所知的网段 只有其直连接口所在网段 路由器自动将接口所在网段的路由写
  • 【注意】Kafka生产者异步发送消息仍有可能阻塞

    文章目录 问题描述 原因分析 解决办法 总结 问题描述 Kafka是常用的消息中间件 在Spring Boot项目中 使用KafkaTemplate作为生产者发送消息 有时 为了不影响主业务流程 会采用异步发送的方式 如下所示 Slf4j