【SpringBoot】整合Kafka集群

2023-10-29

一、环境

使用Kafka3.0.0

master slave1 slave2
ip 193.168.3.34 193.168.3.35 193.168.3.36

二、maven引入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

三、application配置

spring:
  kafka:
    bootstrap-servers: 192.168.3.34:9092,192.168.3.35:9092,192.168.3.36:9092 # 指定 kafka 的地址
    producer: #生产者
      retries: 0  #重复次数 ,失败不重发
      batch-size: 16384 #每次批量发送消息的数量
      buffer-memory: 33554432 #缓存大小达到buffer.memory就发送数据
      acks: 1  # 0=生产者将不会等待来自服务器的任何确认  1=leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应  -1 =leader将等待完整的同步副本集以确认记录      
      key-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 key 的序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer #指定 value 的序列化器
    consumer:
      group-id: nacl #指定消费者组的 group_id
      auto-offset-reset: earliest   #latest 最新的位置 , earliest最早的位置
      auto-commit-interval: 100  #自动提交offset频率 100毫秒      
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 key 的反序列化器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #指定 value 的反序列化器
    listener:
      concurrency: 3  #3个并行监听

四、SpringBoot-生产者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@CrossOrigin
@RestController
public class ProducerController {

    // Kafka 模板用来向 kafka 发送数据
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @RequestMapping("/kf")
    public String data() {
        kafkaTemplate.send("first", "hello");
        return "ok";
    }
}

五、SpringBoot-消费者

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaConsumer {
    // 指定要监听的 topic
    @KafkaListener(topics = "first")
    public void consumeTopic(String msg) { // 参数: 收到的 value
        System.out.println("收到的信息: " + msg);
    }

}

六、SpringBoot-主题分区

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaTopic {
    @Bean
    public NewTopic batchTopic() {
        //项目启动时,自动创建topic,指定分区和副本数量
        return new NewTopic("first", 3, (short) 1);
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【SpringBoot】整合Kafka集群 的相关文章

随机推荐

  • 为什么下载pytorch时,总是下载cpu版本,而不是gpu版本?

    首先 查看一下自己cuda与cudnn版本 创建的虚拟环境python版本 我的python3 10 cuda 11 2 cudnn8 0 因为我们下载都是通过清华源下载的 所以 当清华源里面 没有我们指定python3 10 cuda 1
  • video 标签设置样式

    设置video标签的默认样式 在这里插入图片描述 全屏按钮 video webkit media controls fullscreen button display none 播放按钮 video webkit media control
  • 算法导论

    好像在豆瓣上看到一句话 算法导论之所以经典 是因为它选取的算法每个都是常用的 是精中选精 于是我就有了重读算法导论的冲动 自己非计算机科班出身 所以对于算法这个基础真的比较薄弱 虽然学过算法 但是基础真的太差了 再说学习算法完全是为了锻炼思
  • Java卸载删除(2023最强版)

    Java卸载删除 2023最强版 卸载程序 删除相关环境变量 删除编辑注册表相关 检查C盘相关文件是否删除 收尾 看了网上很多教程 结果还是提示没卸载干净 做了以下整理 希望对大家有所帮助 卸载程序 在应用和功能中 或控制面板的卸载程序中卸
  • Openssl-AES加密

    AES加密算法 此次介绍AES两种加密算法 其他的暂不使用 1 ECB模式 按照块密码的块大小被分为数个块 并对每个块进行独立加密 优点 1 简单 2 有利于并行计算 3 误差不会被传送 缺点 1 不能隐藏明文的模式 2 可能对明文进行主动
  • 解析和校验Flink SQL语句

    Flink对SQL的支持是基于Apache Calcite实现的 且Flink包中集成了Apache Calcite 所以我们可以直接调用Flink包中的SQL解析类 来解析和校验我们的Flink SQL语句 import org apac
  • kettle--数据库间的数据迁移工具

    一 kettle介绍 kettle 是纯 java 开发 开源的 ETL工具 用于数据库间的数据迁移 可以在 Linux windows unix 中运行 有图形界面 通过图形化界面的配置 可以实现数据迁移 并不用开发代码 也有命令脚本还可
  • CTFShow web入门题刷题记录

    CTFShow web入门题刷题记录 信息搜集 web1 提示 开发注释未及时删除 打开网页查看源代码发现 flag flag 2b2cf8e3 f880 41e1 a8ff 02601b3d998f web2 提示 js前台拦截 无效操作
  • Linux安全--iptables详解

    目录 1 iptables介绍 2 iptables四表五链详解 3 iptables基本语法 4 实际操作 4 1 增加规则 4 2 删除规则 4 3 修改规则 5 命令语法总结 6 基本匹配条件 7 iptables进阶用法 7 1 i
  • 一个Debug版本不崩而Release版本可能崩的问题

    引子 今天一个朋友在QQ上向我求助 说他的一个MFC程序用VS2013编译生成的Debug版本运行正常 而编译生成的Release版本却在启动后还没出现界面便崩溃了 经过一番折腾之后 通过调试找到了崩溃点 但却根本不像是崩溃在这儿 因为崩溃
  • Struts标签基本知识

    Struts标签基本知识 1 普通标签 控制执行的流程 1 1 控制标签 实现分支 循环等流程控制 if elseif else append generator iterator merge sort subset 1 2 数据标签 输出
  • 区块链-密码学与安全技术

    密码学与安全技术 工程领域从来没有黑科技 密码学不仅是工程 密码学相关的安全技术在整个信息技术领域的重要地位无需多言 如果没有现代密码学和信息安全的研究成果 人类社会根本无法进入信息时代 区块链技术大量依赖了密码学和安全技术的研究成果 实际
  • C++基础知识 - 函数返回引用深度解析

    函数返回引用深度解析 C 引用使用时的难点 当函数返回值为引用时 若返回局部变量 不能成为其它引用的初始值 不能作为左值使用 返回静态变量 或 全局变量的引用 可成为其他引用的初始值 也可以作为左值 也可作为右值 返回函数的普通形参作为引用
  • Basic Level 1019 数字黑洞 (20分)

    题目 给定任一个各位数字不完全相同的 4 位正整数 如果我们先把 4 个数字按非递增排序 再按非递减排序 然后用第 1 个数字减第 2 个数字 将得到一个新的数字 一直重复这样做 我们很快会停在有 数字黑洞 之称的 6174 这个神奇的数字
  • Java实现微信小程序获取unionID

    前言 微信开发平台为开发者提供openId用来区分用户的唯一性 但是openId只是在独立的应用内是唯一的 如果开发者拥有多个移动应用 网站应用 和公众帐号 包括小程序 可通过 UnionID 来区分用户的唯一性 因为只要是同一个微信开放平
  • linux监控php脚本执行时间,在LINUX环境下定时执行php脚本

    1 使用Crontab定时执行linux环境下的php脚本文件 Cron 它是一个linux下的定时执行工具 根用户以外的用户可以使用 crontab 工具来配置 cron 任务 所有用户定义的 crontab 都被保存在 var spoo
  • springboot+vue实现ChatGPT逐字输出打字效果

    文章目录 前言 一 效果 二 Springboot后端 1 封装请求OpenAI接口的客户端 2 对话处理 3 对话请求接口 二 Vue前端 前言 在调用OpenAI GPT接口时 如果不使用流式 stream true 参数 接口会等待所
  • Centos7安装mysql遇到的问题

    使用yum y install mysql community server安装mysql时候提示 The GPG keys listed for the MySQL 5 7 Community Server repository are
  • AIX 下磁盘 I/O 性能分析

    转自 http www ibm com developerworks cn aix library 1203 weixy aixio 磁盘 I O 的概念 I O 的概念 从字义来理解就是输入输出 操作系统从上层到底层 各个层次之间均存在
  • 【SpringBoot】整合Kafka集群

    学习笔记 一 环境 二 maven引入 三 application配置 四 SpringBoot 生产者 五 SpringBoot 消费者 六 SpringBoot 主题分区 一 环境 使用Kafka3 0 0 master slave1