spring boot配置双Kafka方法

2023-11-09

第一步:application.yml的配置

server:
  port: 8080
spring:
  application:
    name: demo 
  kafka:
    one:
      bootstrap-servers: xxx.xxx.xxx.xxx
      consumer:
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        group-id: xxxx
        enable-auto-commit: true
    two:
      bootstrap-servers: xxx.xxx.xxx.xxx
      consumer:
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        group-id: xxxx
        enable-auto-commit: true

第二步:配置config

@EnableKafka
@Configuration
public class xxxxConfig {
    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.one.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean
    public KafkaTemplate<String, String> xxxxTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> xxxxxxContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(20);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        return props;
    }
}
@EnableKafka
@Configuration
public class xxxxConfig {
    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean
    public KafkaTemplate<String, String> xxxxxxxTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> xxxxxxContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(6);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 不能写成 1
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        //        value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
        return props;
    }
}

注意!注意!注意!!!代码中的一些字段名自己改一下。xxxx之类的换成自己的就行

第三步:

@Resource
private KafkaTemplate<String, String> xxxOneTemplate;
@Resource
private KafkaTemplate<String, String> xxxxTwoTemplate;

直接在你要用到的类中直接引用就行。

跟着以上三步走就可以简单的配置两个Kafka了,还有跟高级的spring切面切点的方法作者还没有研究成功就不献丑了。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spring boot配置双Kafka方法 的相关文章

随机推荐

  • 华为OD机试 - 选修课(Java & JS & Python)

    题目描述 给定一个元素类型为小写字符串的数组 请计算两个没有相同字符的元素长度乘积的最大值 如果没有符合条件的两个元素 返回0 输入描述 第一行为第一门选修课学生的成绩 第二行为第二门选修课学生的成绩 每行数据中学生之间以英文分号分隔 每个
  • 汇编笔记

    更新于20190929 1 Intel和AT T汇编 参数是反的 AT T寄存器前加 常量前加 Intel mov rax rcx rcx gt rax mov cl 2 对应AT T movq rcx rax rcx gt rax mov
  • RHEL/centos8.0离线安装n卡驱动,cuda10.1,cudnn7.5,anaconda3,pycharm以及mmdeection和simpledet的搭建

    我最近在两台RHEL8 0的服务器装了这些玩意 特此记录一下 1 离线安装nvidia driver cuda10 1 cudnn7 5 关键因素 显卡型号 Quadro P4000 系统 RHEL 8 0 用 cat etc redhat
  • IPv4与ipv6联系

    IPv4又称互联网通信协议第四版 是网际协议开发过程中的第四个修订版本 也是此协议第一个被广泛部署的版本 但是2019年11月26日 全球所有43亿个IPv4地址已分配完毕 IPV6是互联网工程任务组设计的用于替代IPv4的下一代IP协议
  • Java高级程序设计_JAVA高级程序设计

    恢复内容开始 import java awt import java awt event ActionEvent import java awt event ActionListener import java awt event Mous
  • 12个C语言必背实例

    C语言实例第01期 十进制数转换二进制数 实例代码 include stdio h int main int m n k 定义变量 int a 16 0 printf 请输入一个0 32767之间的数字 n scanf d n printf
  • ImageNet零样本准确率首次超过80%,地表最强开源CLIP模型更新

    关注公众号 发现CV技术之美 本文转自新智元 编辑LRS 开源模型OpenCLIP达成ImageNet里程碑成就 虽然ImageNet早已完成历史使命 但其在计算机视觉领域仍然是一个关键的数据集 2016年 在ImageNet上训练后的分类
  • STM32--STM32CubeMX的Timer3定时1ms功能HAL库操作

    一 STM32CubeMX的设置 时钟源的选择 Crystal Ceramic Resonator 调试方法选择 Serial Wire 时钟输入为40MHz Timer3的参数设置 使能Timer3的中断 点击 Generate Code
  • unigui中的unidbgrid单元格内容太长自动回行

    1 servermodule中customcss中加入
  • Android 使用ffmpeg软编码 将摄像头采集视频编码成视频文件

    Android 使用ffmpeg软编码 将摄像头采集视频编码成视频文件 这次代码实现的是视频采集的功能 Android 通过jni 调用ffmpeg 编码yuv数据变成视频文件 先上代码 编码器上下文保存的实体 struct EnCodeB
  • R语言课程论文

    本文是自己在学习R统计分析课程后的课程小论文 对详细详细的文档及实现的R代码感兴趣者 可见文末获取方式 若转载请注明出处 欢迎大家交流学习 不足之处请多指教 Word版全文以及相应的r代码获取方式 资源下载链接 https download
  • 学一点Wi-Fi:WPA3 BP/OCV/SCV/PK/H2E/TD

    WFA在2020年底发布了WPA3标准的第三版 其中又提出了一些新的feature 这里结合之前的版本简单总结一下 1 BP BP是Beacon Protection的缩写 问 Beacon中的信息都是未加密的 所以可能存在攻击者会对AP发
  • 课设:影院管理系统

    影院管理系统 导言 知识点总结 课设介绍 导言 从3月份开始到现在 大概两周多的时间 写了一个影院管理系统 功能有待改善 有的功能还有点bug需要该 现在总结一下 影院管理系统告一段落 接下来要学习算法和数据结构 知识点总结 一 三层架构
  • 一文看懂npm、yarn、pnpm之间的区别

    本文作者对比了当前主流的包管理工具npm yarn pnpm之间的区别 并提出了合适的使用建议 以下为译文 NPM npm是Node js能够如此成功的主要原因之一 npm团队做了很多的工作 以确保npm保持向后兼容 并在不同的环境中保持一
  • 大数据毕设选题 - 深度学习口罩佩戴检测系统(python OpenCV YOLO)

    文章目录 0 前言 1 课题介绍 2 算法原理 2 1 算法简介 2 2 网络架构 3 关键代码 4 数据集 4 1 安装 4 2 打开 4 3 选择yolo标注格式 4 4 打标签 4 5 保存 5 训练 6 实现效果 6 1 pyqt实
  • Linux(ubuntu)上安装RDP Server(Xrdp)使用的注意事项

    ubuntu上的基本安装方法 1 apt get install xrdp 基本上就已经安装完成了 但是此时连接会出现异常 类似黑屏的情况 原因 1 Xrdp不支持unity 3D的图形 解决方法 1 使用xfce或者gnome 2d等 如
  • C#小知识

    项目编译后复制文件到生成目录 方法1 对于单个文件 可以点击属性 输出目录里选择始终复制 方法2 把项目中的ServerScripts复制到输出目录 在项目设置中 生成事件里添加批处理 xcopy ProjectDir ServerScri
  • anaconda用法

    查看已经安装的包 pip list 或者 conda list 安装和更新 pip install requests pip install requests upgrade 或者 conda install requests conda
  • LINUX权限-bash: ./startup.sh: Permission denied

    LINUX权限 bash startup sh Permission denied 执行 startup sh 或者 shutdown sh的时候 报 Permission denied 需要用命令 chmod 修改一下bin目录下的 sh
  • spring boot配置双Kafka方法

    第一步 application yml的配置 server port 8080 spring application name demo kafka one bootstrap servers xxx xxx xxx xxx consume