KafkaTemplate是如何发送消息的?

2023-11-15

Kafka使用KafkaTemplate发送消息,需要先实例化bean.配置如下

<!-- 定义producer的参数 -->
	<bean id="producerProperties" class="java.util.HashMap">
		<constructor-arg>
			<map>
				<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
				<entry key="group.id" value="0" />
				<entry key="retries" value="2" />
				<entry key="batch.size" value="16384" />
				<entry key="linger.ms" value="1" />
				<entry key="buffer.memory" value="33554432" />
				<entry key="max.request.size" value="10000000"></entry>
				<entry key="send.buffer.bytes" value="10000000"></entry>
				<entry key="key.serializer"
					value="org.apache.kafka.common.serialization.StringSerializer" />
				<entry key="value.serializer"
					value="org.apache.kafka.common.serialization.StringSerializer" />
			</map>
		</constructor-arg>
	</bean>

	<!-- 创建kafkatemplate需要使用的producerfactory bean -->
	<bean id="producerFactory"
		class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
		<constructor-arg>
			<ref bean="producerProperties" />
		</constructor-arg>
	</bean>

	<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
	<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg ref="producerFactory" />
		<constructor-arg name="autoFlush" value="true" />
		<property name="defaultTopic" value="mhb-test" />
	</bean>

使用时直接注入就可以使用了.

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic,key, JSON.toJSONString(obj));
future.get();

 这个是通过Spring包装后的用法.Spring增加了ProducerFactory创建Producer对象,并且给Producer增加了事务功能,把参数包装成ProducerRecord对象,调用Kafka-client包中Producer类的send方法.

protected ListenableFuture<SendResult<K, V>> doSend(ProducerRecord<K, V> producerRecord) {
        if (this.transactional) {
            Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
        }
//增加事务功能,使用factory创建producer
        Producer<K, V> producer = this.getTheProducer();
        this.logger.trace(() -> {
            return "Sending: " + producerRecord;
        });
        SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
//发送Kafka,包装返回结果  
producer.send(producerRecord, this.buildCallback(producerRecord, producer, future));
        if (this.autoFlush) {
            this.flush();
        }

        this.logger.trace(() -> {
            return "Sent: " + producerRecord;
        });
        return future;
}

接下来都是kafka-client包内的内容了.KafkaProducer实现了Producer接口,在发送前还调用了拦截器ProducerInterceptor,这个拦截器能拦截甚至更改record数据.官方介绍如下.

A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster.

拦截器处理完后就是真正发送到Kafka了.调用了org.apache.kafka.clients.producer.KafkaProducer#doSend方法.源码如下:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;

        try {
//0.检查和一些参数的初始化,计算等待时间,因为Kafka是批量发送.
            this.throwIfProducerClosed();

            KafkaProducer.ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);
            } catch (KafkaException var19) {
                if (this.metadata.isClosed()) {
                    throw new KafkaException("Producer closed while send in progress", var19);
                }

                throw var19;
            }

            long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;

            byte[] serializedKey;
            try {//1.序列化key
                serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException var18) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var18);
            }

            byte[] serializedValue;
            try {//2.序列化value
                serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException var17) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var17);
            }
//3,计算根据key,计算所在分区
            int partition = this.partition(record, serializedKey, serializedValue, cluster);
//组装TopicPartition对象
            tp = new TopicPartition(record.topic(), partition);
            this.setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
            this.ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? this.time.milliseconds() : record.timestamp();
            this.log.trace("Sending record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
//组装Callback 对象
            Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
            if (this.transactionManager != null && this.transactionManager.isTransactional()) {
                this.transactionManager.maybeAddPartitionToTransaction(tp);
            }
//3.添加到RecordAccumulator中等待发送
            RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);
            if (result.batchIsFull || result.newBatchCreated) {
                this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
//4.返回结果
            return result.future;
        } catch (ApiException var20) {
            this.log.debug("Exception occurred during message send:", var20);
            if (callback != null) {
                callback.onCompletion((RecordMetadata)null, var20);
            }

            this.errors.record();
            this.interceptors.onSendError(record, tp, var20);
            return new KafkaProducer.FutureFailure(var20);
        } catch (InterruptedException var21) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var21);
            throw new InterruptException(var21);
        } catch (BufferExhaustedException var22) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            this.interceptors.onSendError(record, tp, var22);
            throw var22;
        } catch (KafkaException var23) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var23);
            throw var23;
        } catch (Exception var24) {
            this.interceptors.onSendError(record, tp, var24);
            throw var24;
        }
 }

 这里是发送Kafka消息的核心逻辑了,这段代码非常重要,尤其是计算分区逻辑和Kafka批量发送逻辑.

分区默认是采用hash算法计算key,转32位后与总分区取余.

发送消息是批量发送,先把数据在client中存下来,等队列满了或者等待时间到了就发送给Kafka服务器.重点关注org.apache.kafka.clients.producer.internals.RecordAccumulator#append方法,代码如下,具体逻辑将在下一篇中补充.

public RecordAccumulator.RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException {
        this.appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;
        if (headers == null) {
            headers = Record.EMPTY_HEADERS;
        }

        RecordAccumulator.RecordAppendResult var16;
        try {
//1.检查是否有包含该主题分区的批处理对象的双端队列,如果没有则新建
            Deque<ProducerBatch> dq = this.getOrCreateDeque(tp);
            synchronized(dq) {
                if (this.closed) {
                    throw new KafkaException("Producer closed while send in progress");
                }
//尝试向批处理对象追加消息,并返回追加结果,如果队列里没有批处理对象,则返回空
                RecordAccumulator.RecordAppendResult appendResult = this.tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    RecordAccumulator.RecordAppendResult var14 = appendResult;
                    return var14;
                }
            }

            byte maxUsableMagic = this.apiVersions.maxUsableProduceMagic();
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, this.compression, key, value, headers));
            this.log.trace("Allocating a new {} byte message buffer for topic {} partition {}", new Object[]{size, tp.topic(), tp.partition()});
            buffer = this.free.allocate(size, maxTimeToBlock);
            synchronized(dq) {
                if (this.closed) {
                    throw new KafkaException("Producer closed while send in progress");
                }

                RecordAccumulator.RecordAppendResult appendResult = this.tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult == null) {
//2. 将消息写入内存中,封装成一个内存消息对象
                    MemoryRecordsBuilder recordsBuilder = this.recordsBuilder(buffer, maxUsableMagic);
//根据内存消息对象新建一个批处理对象
                    ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, this.time.milliseconds());
//批量处理
                    FutureRecordMetadata future = (FutureRecordMetadata)Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, this.time.milliseconds()));
//将批处理对象添加到双端队列中
                    dq.addLast(batch);
                    this.incomplete.add(batch);
                    buffer = null;
                    RecordAccumulator.RecordAppendResult var19 = new RecordAccumulator.RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
                    return var19;
                }

                var16 = appendResult;
            }
        } finally {
            if (buffer != null) {
                this.free.deallocate(buffer);
            }

            this.appendsInProgress.decrementAndGet();
        }

        return var16;
    }

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

KafkaTemplate是如何发送消息的? 的相关文章

  • node基础之三:http 模块

    1 导入模块 const http require http 2 创建服务 const server http createServer request response gt 获取请求方法 request method 获取请求 url

随机推荐

  • 阿里云1核1G内存1M宽带可以支持多少IP访问量?

    阿里云1核CPU 1G内存 1M公网宽带云服务器够用吗 1M宽带可以支持多少IP的访问量 来说说1M宽带可以跑多少流量及1核1G服务器配置性能 1核 1G 1M宽带配置能跑多少IP 一般来讲 如果图片不多 每天3000PV是没问题的 如果将
  • 优秀的CobaltStrike插件推荐:编程

    优秀的CobaltStrike插件推荐 编程 CobaltStrike是一款功能强大的渗透测试工具 广泛应用于红队行动和网络安全评估 它的灵活性和可扩展性使得开发者可以编写自己的插件来增强其功能 在这篇文章中 我将向你推荐一些好用的Coba
  • java并发包:重入锁与Condition条件

    本文转载至 http blog csdn net a910626 article details 51900941 重入锁 这里介绍一下synchronized wait notify方法的替代品 或者说是增强版 重入锁 重入锁是可以完全替
  • redis--13--Jedis使用

    redis 13 Jedis使用 代码位置 https gitee com DanShenGuiZu learnDemo tree master redis learn jedis 1 redis conf 修改 允许远程连接 bind 1
  • Java异常-Exception

    一 异常介绍 基本概念 Java语言中 将程序执行中发生的不正常情况称为 异常 注 开发过程中的语法错误和逻辑错误不是异常 执行过程中所发生的异常事件可分为两大类 Error 错误 Java虚拟机无法解决的严重问题 如 JVM系统内部错误
  • 2023-05-18 题目

    2023 05 18 题目 1 String 字符串 String 不是基本数据类型 且是不能被继承的 因为string类被final修饰 源码 public final class String implements java io Se
  • [FreeRTOS入门学习笔记]定时器

    定时器的使用步骤 1 定义一个handle xTimerCreate创建 2 启动定时器 在Task1中调用 通过队列通知守护任务来执行定时器任务 要再config头文件中定义守护任务相关配置 虽然定时器是在task1中启动 但是定时器的任
  • qt实现opengl播放yuv视频

    qt使用opengl播放yuv视频 文章目录 qt使用opengl播放yuv视频 toc 1 实现效果 2 pro文件 3 xvideowidget h 4 xvideowidget cpp 更多精彩内容 个人内容分类汇总 1 实现效果 2
  • VS2022编译GDAL库报错: fatal error U1050: PROJ_INCLUDE should be defined. PROJ >= 6 is a required depende

    目录 场景复现 定位问题 解决方案 踩过的坑 场景复现 使用VS2022的Native Tools command prompt for 2022工具编译GDAL库时 报 fatal error U1050 PROJ INCLUDE sho
  • RTSP视频边缘计算网关EasyNVR在5G时代有什么运用价值?

    5G和互联网的发展在近几年一直被按下了加速键 物联网正在成为主流 毋庸置疑 云计算为越来越多智能设备的连接提供了基础 给我们生活带来了极大便利 而边缘计算是云计算物联当中的一个关键应用 当我们在考虑云计算带来的数据过度集中 信息传输堵塞问题
  • 2018年最好用的5个python网站开发框架

    python作为解释型脚本语言 是一种通用的编程语言 由于python社区拥有大量的库文件 框架和其他的一些实用工具 我们可以用python完成各种各样的任务 另外 由于python的代码构成和结构就像英语句子一样自然 这种语言的学习曲线也
  • Spring(三)-IOC使用

    目录 基于XML管理bean 入门案例 引入依赖 创建类HelloWorld 创建Spring的配置文件 在Spring的配置文件中配置bean 创建测试类测试 思路 获取bean 方式一 根据id获取 方式二 根据类型获取 方式三 根据i
  • 延迟渲染到最终结果------1,2,分配渲染目标和初始化窗口(大象无形11.3.1)

    版本不同 我这里延迟渲染是FDeferredShadingSceneRenderer类 即函数 void FDeferredShadingSceneRenderer Render FRHICommandListImmediate RHICm
  • 经过两年努力,我终于进入腾讯(PCG事业群4面总结)

    前言 为什么要尽量让自己进大厂 如果毕业就进了大厂 那你将得到业内大牛的指导 以及随处可见的技术碰撞 新技术的跟进也是非常快的 在这样的环境中 你的技术成长自然是非常快的 如果自己足够努力 用不了三年 你可能也将会跟他们水平差不多 所以 明
  • c语言编译过程

    C语言的编译过程一般分为四个步骤 预处理 编译 汇编和链接 预处理 Preprocessing 预处理器会处理源代码中以 开头的预处理指令 例如 include和 define等 将它们替换为相应的内容 同时 还会删除注释和空格 将多行代码
  • qt-事件循环系统

    Qt中 如果创建的console程序 使用的是QCoreApplication对象 如果创建的是GUI程序 使用的是QApplication对象 而QApplication 继承自 QGUIApplication 最终继承QCoreAppl
  • golang的cms

    golang的cms 2019 03 06 12 53 by 轩脉刃 阅读 评论 收藏 编辑 golang的cms 说说cms cms 内容管理系统 是建站利器 它的本质是为了快速建站 cms本质是一个后台服务站 使用这个后台 能很快搭建一
  • 做区块链卡牌游戏有什么好处?

    区块链卡牌游戏是一种基于区块链技术的创新性游戏形式 它将传统的卡牌游戏与区块链技术相结合 实现了去中心化 数字化资产的交易和收集 这种新型游戏形式正逐渐在游戏行业引起了广泛的关注和热潮 本文将深入探讨区块链卡牌游戏的定义 特点以及其在未来的
  • 自己撸一个阅读类休闲app

    其实自己早就想撸一个app 因为自己一直没什么机会可以做那种好看的app 对我而言好看就是能安装在手机上 然后看着舒服的 所以也对自己所学进行一次整合 然后再次扬帆 感谢那些贡献开源api的大神 也感谢gank 主要使用的开眼的api ga
  • KafkaTemplate是如何发送消息的?

    Kafka使用KafkaTemplate发送消息 需要先实例化bean 配置如下