springBoot整合kafka配置

2023-11-06

pom.xml

<dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
     <version>2.3.6.RELEASE</version> 
 </dependency>

生产者config

import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafaProducerConfig {

    public final static String  BOOTSTRAPSERVERS = "111.168.0.222:9092,111.168.0.222:9092,111.168.0.222:9092";
    public final static String  TOPIC = "topic_自定义主题";

//    /**
//     * 定义一个KafkaAdmin的bean,可以自动检测集群中是否存在topic,不存在则创建
//     */
//    @Bean
//    public KafkaAdmin kafkaAdmin() {
//        Map<String, Object> configs = new HashMap<>();
//        // 指定多个kafka集群多个地址,例如:192.168.2.11,9092,192.168.2.12:9092,192.168.2.13:9092
//        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS);
//        return new KafkaAdmin(configs);
//    }
//    /**
//     * 创建 Topic
//     */
//    @Bean
//    public NewTopic topicinfo() {
//        // 创建topic,需要指定创建的topic的"名称"、"分区数"、"副本数量(副本数数目设置要小于Broker数量)"
//        return new NewTopic(TOPIC, 8, (short) 2);
//    }



    /**
     * Producer Template 配置
     */
    @Bean(name="kafkaTemplate")
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * Producer 工厂配置
     */
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * Producer 参数配置
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // 指定多个kafka集群多个地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS);

        // 重试次数,0为不启用重试机制
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        //同步到副本, 默认为1
        // acks=0 把消息发送到kafka就认为发送成功
        // acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
        // acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
        props.put(ProducerConfig.ACKS_CONFIG, "1");

        // 生产者空间不足时,send()被阻塞的时间,默认60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        // 控制批处理大小,单位为字节
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
        // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        // 消息的最大大小限制,也就是说send的消息大小不能超过这个限制, 默认1048576(1MB)
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,1048576);
        // 键的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 值的序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 压缩消息,支持四种类型,分别为:none、lz4、gzip、snappy,默认为none。
        // 消费者默认支持解压,所以压缩设置在生产者,消费者无需设置。
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"none");
        return props;
    }

}



生产者发送消息
这里引用上一篇自定义aop发送消息代码

//初始化线程池
public static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors()*5,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(20), //缓存线程
            new ThreadPoolExecutor.AbortPolicy() //直接报异常不执行这个线程
    );

//发送消息方法
public void send(String message) throws Throwable {
        try{
            
            EXECUTOR.execute(()->{
                log.info("用户操作日志消息内容:"+message);
                kafkaTemplate.send("topic_自定义主题", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                    @Override
                    public void onFailure(Throwable ex) {
                        log.error("发送消息失败:"+ex.getMessage());
                    }
                    @Override
                    public void onSuccess(SendResult<String, Object> result) {
                        log.info("发送消息成功");
                    }
                });
            });
        }catch (Exception e){
            log.error(e.getMessage());
        }

    }

消费者config

package com.sf.gis.cbtruck.baseinfo.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
 * kafka消费者配置
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    public final static String BOOTSTRAPSERVERS = "111.168.0.222:9092,111.168.0.222:9092,111.168.0.222:9092";
    public final static String TOPIC = "topic_自定义主题";
    public final static String GOUPID = "消费者_consumer_group";

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 设置消费者工厂
        factory.setConsumerFactory(consumerFactory());
        // 消费者组中线程数量
        factory.setConcurrency(3);
        // 拉取超时时间
        factory.getContainerProperties().setPollTimeout(3000);

        // 当使用批量监听器时需要设置为true
        factory.setBatchListener(true);

        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // Kafka地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS);
        //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, GOUPID);
        // 是否自动提交offset偏移量(默认true)
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自动提交的频率(ms)
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        // Session超时设置
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        //请求超时时间
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "15000");
        // 键的反序列化方式
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 值的反序列化方式
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // offset偏移量规则设置:
        // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
        // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return propsMap;
    }
}

消费者消费

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class KafkaConsumerListener {

    @Autowired
    private TSystemApiLogService systemApiLogService;

	//监听kafka消费
    @KafkaListener(topics = "topic_cb_truck_operate_log", containerFactory = "kafkaListenerContainerFactory")
    public void test(String message) {
        try{
            log.info("消费消息:"+message);
            //转换为自己的log实体类
            ApiLog apiLog = JSONObject.parseObject(message, TSystemApiLog.class);
            apiLogService.insert(tSystemApiLog);
        }catch (Exception e){
            log.error("日志插入失败");
            e.printStackTrace();
        }
    }
}

不用config配置,另外一种yml文件配置

spring:
  kafka:
    ###########【Kafka集群】###########
    bootstrap-servers: 192.168.0.196:9092,192.168.0.58:9092,192.168.0.51:9092
    ###########【初始化生产者配置】###########
    producer:
      retries: 0 #重试次数
      # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      acks: all
      # 批量大小
      batch-size: 16384
      # 提交延时
      properties:
        linger:
          ms: 0
      # 生产端缓冲区大小
      buffer-memory: 33554432
      # Kafka提供的序列化和反序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    ###########【初始化消费者配置】###########
    consumer:
      group-id: operate_log_consumer_group
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset;
      # latest:重置为分区中最新的offset(消费分区中新产生的数据);
      # none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: latest
      # 是否自动提交offset
      enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量消费每次最多消费多少条消息
      max-poll-records: 50
      # 提交offset延时(接收到消息后多久提交offset)
      auto:
        commit:
          interval:
            ms: 1000
      # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
      properties:
        session:
          timeout:
            ms: 120000
        # 消费请求超时时间
        request:
          timeout:
            ms: 180000
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    listener:
      missing-topics-fatal: false
#      concurrency: 3
#      ack-mode: MANUAL
      type: batch
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

springBoot整合kafka配置 的相关文章

  • flink连接kafka报:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic

    报错信息 Caused by org apache flink runtime JobException Recovery is suppressed by NoRestartBackoffTimeStrategy at org apach
  • gitLens插件简单使用(默认上传github)

    1 安装 在vscode中的插件管理输入如下后下载 GitLens Git supercharged 2 配置 点击文件 首选项 设置 点击右上角设置小图标 3 github使用 首先仓库文件一定是要git init是git所管理的 1 在
  • Java写入txt文件内容

    Java写入数据进txt文件 需求 多条数据追加进文件 且需要处理中文编码问题 以下代码只能处理向文件添加数据的功能 但是会覆盖掉之前的数据 import java io File import java io FileOutputStre
  • 《消息队列高手课》缓存策略:如何使用缓存来减少磁盘IO?

    现代的消息队列 都使用磁盘文件来存储消息 因为磁盘是一个持久化的存储 即使服务器掉电也不会丢失数据 绝大多数用于生产系统的服务器 都会使用多块儿磁盘组成磁盘阵列 这样不仅服务器掉电不会丢失数据 即使其中的一块儿磁盘发生故障 也可以把数据从其
  • python 自建kafka消息生成和消费小工具

    要将 Kafka 的消息生产和消费转换为 API 接口 我们可以使用 Python 的 Web 框架 其中 Flask 是一个轻量级且易于使用的选择 下面是一个简单的例子 使用 Flask 创建 API 来生成和消费 Kafka 消息 1
  • Kafka原理分析

    在基础篇中我们介绍MQ的一些基础原理 这篇文章 我们针对kafka进行较深入的分析 上篇文章中我们提到了kafka中一个名词broker 其实broker可以理解成为一台kafa服务器 kafka的特性和功能 在kafka设计之初是为了实时
  • Kafka:主题创建、分区修改查看、生产者、消费者

    文章目录 Kafka后台操作 1 主题 2 分区 3 生产者 4 消费者组 Kafka后台操作 1 主题 1 创建主题 bin kafka topics sh create bootstrap server hadoop102 9092 r
  • VS2019安装Qt插件教程,发现下载不了问题解决

    1 打开VS 最上方工具栏中点击扩展窗口 选择管理扩展 2 在右边搜索中搜索qt出现以下界面 这时可能出现问题 再点击下载发现迟迟下载不了 或者是下载到一定地步后无法下载 再或者是下载完成后安装无反应 解决办法 点击有点的详细信息或者进入如
  • Flink消费kafka出现空指针异常

    文章目录 出现场景 表现 问题 解决 tombstone Kafka中提供了一个墓碑消息 tombstone 的概念 如果一条消息的key不为null 但是其value为null 那么此消息就是墓碑消息 出现场景 双流join时 采用的是l
  • win10系统下安装Kafka 的详细步骤

    Win10 系统下要使用Kafka需要经过以下三个步骤 1 安装JDK 需要安装依赖java JDK 2 安装zookeeper 资源协调 分配管理 3 安装Kafka 一 安装 Java SE Development Kit 13 0 1
  • [Docker]使用Docker部署Kafka

    Kafka 是一个分布式流处理平台 它依赖于 ZooKeeper 作为其协调服务 在 Kafka 集群中 ZooKeeper 负责管理和协调 Kafka 的各个节点 因此 要在 Docker 容器中启动 Kafka 通常需要同时启动一个 Z
  • Java 正则表达式工具类大全

    import java util regex Matcher import java util regex Pattern author nql Description 验证工具类 date now public class Validat
  • 公司实战 ElasticSearch+Kafka+Redis+MySQL

    一 需求 前一段时间公司要进行数据转移 将我们ES数据库中的数据转移到客户的服务器上 并且使用定时将新增的数据同步 在这过程中学到了很多 在此记录一下 二 技术栈 Mysql Redis ElasticSearch Kafka 三 方案 为
  • Kafka 架构及原理分析

    Kafka 架构及原理分析 文章目录 Kafka 架构及原理分析 简介 使用场景 架构 Broker Topic 副本机制 存储 消费分组 消费编号 数据多写支持 基于 binlog 实现主从复制 Kafka 的进阶功能 消息幂等性 事务
  • WSL无法访问网络的解决办法

    今天在用WSL的时候突然网络抽风 域名解析出了问题 apt update都用不了 网上查了很多方法 什么vEthernet的IP啊 ifconfigip啊 ip route add default啥的 都不管用 最后还是看了一下 etc r
  • Kafka——Mac搭建kafka环境

    1 下载Kafka安装包 下载地址 将压缩包移动到 usr local mv kafka 2 12 3 1 0 tgz usr local 解压 tar zxvf kafka 2 12 3 1 0 tgz 2 启动 启动zookeeper
  • kafka的新API 得到最新一条数据

    业务的需要 需要得到最新的一条消息从kafka中 但是发现ConsumerRecords 这个对象并没有 get index 这种方式的获取并且只能 iterator 或者增强for 循环这种方式来循环 记录 但是有一个count 可以得到
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比
  • Kafka速度之谜:高性能的幕后秘密大揭秘

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 kafka高性能的原因 Page Cache ZeroCopy 零拷贝 前言 Kafka的介绍 kafka是linkedIn开源的分布式消息系统 归给Ap

随机推荐

  • 31岁零基础转行软件测试,现已成功入职月薪14K+

    二黑 华测在线上期学员 31岁 坐标上海 专科 石油与天然气地质勘探 学习3个月 从石油行业到IT行业 薪资 8K 14K 二黑同学通过三个月的学习 顺利入职成为一名软件测试工程师 成功从石油行业转行到IT行业 这篇文章分为三个部分 1 我
  • ubuntu latex 编译报错记录

    1 cls缺失 sudo apt get install texlive publishers 2 File algorithmic sty not found sudo apt install texlive science 这里推荐一个
  • 机器学习笔记(7)— 学习率、特征工程、多项式回归

    目录 判断梯度下降是否收敛 如何设置学习率 特征工程 多项式回归 判断梯度下降是否收敛 梯度下降的任务是找到能够使代价函数J最小的参数w和b 通常做法是绘制代价函数图 通过训练集计算出的 并且标出梯度下降每次迭代时J的值 此图中的横轴是梯度
  • 阅读Spring in action 实现书中实例时遇到的问题(三)之解决 JSR303校验获取ValidationMessage.properties错误信息文件的中文乱码问题

    使用 ValidationMessage properties配置错误信息 前端jsp页面回显错误提示信息时总是乱码 网上查了好久 最后还是下面的代码靠谱 在springmvc config xml中添加以下配置
  • 视图和内置函数的理解(很奈斯...)

    文章目录 一 视图介绍 二 视图作用 三 mysql内置函数 四 自主练习 五 mysql图形界面软件 六 数据备份 七 自主练习 一 视图介绍 视图 就是一张虚拟表 临时表 因为视图中表的结构和内容不是通过建表语句创建的 而是根据查询确定
  • es6-箭头函数

    代码
  • webpack 自动引入常用模块

    webpack 自动引入常用模块 描述 比如 lodash 这种库 要用时每个 js 文件都需要如下引入 import from lodash console log join 1 2 怎么才能不引入 lodash 也能直接 join 等方
  • OneDrive同步角标消失 - 解决方案

    问题 在电脑端使用OneDrive时 文件管理器OneDrive文件夹内的文件会在左下角显示同步状态 如下图 若没有显示同步角标 则此功能出现异常 下文介绍如何显示同步角标 值得一提的是 同步角标只起到显示作用 没有同步角标并不影响OneD
  • NCP1342芯片替代料PN8213 65W氮化镓充电器方案

    氮化镓快充已然成为了当下一个非常高频的词汇 在氮化镓快充市场迅速增长之际 65W这个功率段恰到好处的解决了大部分用户的使用痛点 从而率先成为了各大品牌的必争之地 ncp1342替代料PN8213氮化镓充电器主控芯片 适用于65w氮化镓充电器
  • 大数据课程K2——Spark的RDD弹性分布式数据集

    文章作者邮箱 yugongshiye sina cn 地址 广东惠州 本章节目的 了解Spark的RDD结构 掌握Spark的RDD操作方法 掌握Spark的RDD常用变换方法 常用执行方法 一 Spark最核心的数据结构 RDD弹性分布式
  • asp.net实现验证码程序

    1 可以实现验证码的自动更新 2 验证码html代码 li class mjiao2 span 验 证 码 span li
  • 出现Uncaught ReferenceError: $ is not defined错误

    今天在写ajax请求的时候 出现了Uncaught ReferenceError is not defined报错 未定义是为什么呢 后来才知道 原因一 你未引用jquery库jquery min js文件 或者说路径错误 原因二 忽略了H
  • 脚本语言与编译语言的区别

    文章目录 一 语法差异 二 执行方式差异 三 应用领域差异 四 总结 一 语法差异 脚本语言 脚本语言通常使用解释器逐行执行 不需要事先编译 它的语法相对简单 易于学习和使用 常见的脚本语言有Python JavaScript和Ruby等
  • 机器学习笔记(4)— 多特征变量

    1 多特征变量 本文主要介绍多特征变量的梯度下降法和特征缩放内容 2 多特征 多变量 多特征变量的目标函数为 假设x0 1 则目标函数为 把特征量x看作是一个向量 把特征量的参数也看做一个向量 所以目标函数可以表示为 多特征量的目标函数 又
  • linux 下 npm安装依赖报:stack Error: `gyp` failed with exit code: 1

    解决办法 切换到当前项目下即可 gt rm rf node gyp gt npm install g node gyp gt rm rf 项目 node modules 再次 npm install即可
  • Matlab学习入门篇(五)—— 数据可视化

    文章目录 一 离散数据图 1 1散点图 1 2 条形图 二 线图 2 1 二维线图 2 2 三维参数化曲线绘图函数 三 曲面 体积和多边形 数据可视化是指运用 计算机图形学和 图像处理技术 将数据转化为图形或图像并在屏幕上显示出来 以进行交
  • 合成数据加速高质量数据供给,AIGC的救星来了!

    毫无疑问 人工智能发展离不开海量数据 但真实世界的数据是有限的 且数据获取 数据标注等面临诸多难题 数据成为AI行业的 掣肘 如何打破 半个月来 数据要素频繁获得政策部门发声 中国证监会科技监管局局长姚前日前发文建议重点发展基于AIGC技术
  • MYSQL常见的4种数据类型

    转自 微点阅读 https www weidianyuedu com 一 数据类型是什么 数据类型是指列 存储过程参数 表达式和局部变量的数据特征 它决定了数据的存储格式 代表了不同的信息类型 有一些数据是要存储为数字的 数字当中有些是要存
  • Rust- 迭代器

    In Rust an iterator is a pattern that allows you to perform some task on a sequence of items in turn An iterator is resp
  • springBoot整合kafka配置

    pom xml