SpringBoot3集成Kafka

2023-11-01

标签:Kafka3.Kafka-eagle3;

一、简介

Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错,这使得它成为大规模消息处理应用程序的一个很好的解决方案;

二、环境搭建

1、Kafka部署

1、下载安装包:kafka_2.13-3.5.0.tgz

2、配置环境变量

open -e ~/.bash_profile

export KAFKA_HOME=/本地路径/kafka3.5
export PATH=$PATH:$KAFKA_HOME/bin

source ~/.bash_profile

3、该目录【kafka3.5/bin】启动zookeeper
zookeeper-server-start.sh ../config/zookeeper.properties

4、该目录【kafka3.5/bin】启动kafka
kafka-server-start.sh ../config/server.properties

2、Kafka测试

1、生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>id-1-message
>id-2-message

2、消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
id-1-message
id-2-message

3、查看topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
test-topic

4、查看消息列表
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --partition 0
id-1-message
id-2-message

3、可视化工具

配置和部署

1、下载安装包:kafka-eagle-bin-3.0.2.tar.gz

2、配置环境变量

open -e ~/.bash_profile

export KE_HOME=/本地路径/efak-web-3.0.2
export PATH=$PATH:$KE_HOME/bin

source ~/.bash_profile

3、修改配置文件:system-config.properties

efak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
efak.url=jdbc:mysql://127.0.0.1:3306/kafka-eagle

4、本地新建数据库:kafka-eagle,注意用户名和密码是否一致

5、启动命令
efak-web-3.0.2/bin/ke.sh start
命令语法: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate|cluster}

6、本地访问【localhost:8048】 username:admin password:123456

KSQL语句测试

select * from `test-topic` where `partition` in (0)  order by `date` desc limit 5

select * from `test-topic` where `partition` in (0) and msg like '%5%' order by `date` desc limit 3

三、工程搭建

1、工程结构

2、依赖管理

这里关于依赖的管理就比较复杂了,首先spring-kafka组件选择与boot框架中spring相同的依赖,即6.0.10版本,在spring-kafka最近的版本中3.0.8符合;

但是该版本使用的是kafka-clients组件的3.3.2版本,在Spring文档的kafka模块中,明确说明spring-boot:3.1要使用kafka-clients:3.4,所以从spring-kafka组件中排除掉,重新依赖kafka-clients组件;

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring-kafka.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka-clients.version}</version>
</dependency>

3、配置文件

配置kafka连接地址,监听器的消息应答机制,消费者的基础模式;

spring:
  # kafka配置
  kafka:
    bootstrap-servers: localhost:9092
    listener:
      missing-topics-fatal: false
      ack-mode: manual_immediate
    consumer:
      group-id: boot-kafka-group
      enable-auto-commit: false
      max-poll-records: 10
      properties:
        max.poll.interval.ms: 3600000

四、基础用法

1、消息生产

模板类KafkaTemplate用于执行高级的操作,封装各种消息发送的方法,在该方法中,通过topickey以及消息主体,实现消息的生产;

@RestController
public class ProducerWeb {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/send/msg")
    public String sendMsg (){
        try {
            // 构建消息主体
            JsonMapper jsonMapper = new JsonMapper();
            String msgBody = jsonMapper.writeValueAsString(new MqMsg(7,"boot-kafka-msg"));
            // 发送消息
            kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return "OK" ;
    }
}

2、消息消费

编写消息监听类,通过KafkaListener注解控制监听的具体信息,在实现消息生产和消费的方法测试后,使用可视化工具kafka-eagle查看topic和消息列表;

@Component
public class ConsumerListener {

    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);

    @KafkaListener(topics = "boot-kafka-topic")
    public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {
        try {
            String key =  String.valueOf(record.key());
            String body = record.value();
            log.info("\n=====\ntopic:boot-kafka-topic,key{},body:{}\n=====\n",key,body);
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            acknowledgment.acknowledge();
        }
    }
}

五、参考源码

文档仓库:
https://gitee.com/cicadasmile/butte-java-note

源码仓库:
https://gitee.com/cicadasmile/butte-spring-parent
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot3集成Kafka 的相关文章

随机推荐

  • Kotlin 协程(Coroutines)配合使用 Retrofit,网络请求

    第一步 添加所需依赖 管理生命周期 implementation androidx lifecycle lifecycle livedata ktx 2 2 0 implementation androidx lifecycle lifec
  • K-近邻法(KNN算法)

    1 kNN算法 K 最近邻 k Nearest Neighbors 描述 简单地说 k 近邻算法采用测量不同特征值之间的距离方法进行分类 k 近邻算法 是一种基本 分类与回归 方法 它是是 监督学习 中分类方法的一种 属于 懒散学习法 惰性
  • 【实验四】【使用Select 语句查询数据】

    文章目录 数据 一 简单查询 二 汇总查询 三 连接查询和子查询 数据 这里为了体现查询语句的效果 下面根据查询语句的要求设计数据 结果如下 KC表 XSQK表 XS KC表 打开 SQL Server Management Studio
  • 【数据结构与算法】--二叉树OJ题

    单值二叉树 如果二叉树每个节点都具有相同的值 那么该二叉树就是单值二叉树 只有给定的树是单值二叉树时 才返回 true 否则返回 false 示例 1 输入 1 1 1 1 1 null 1 输出 true 示例 2 输入 2 2 2 5
  • 【C语言技巧】滑动滤波算法滤除抖动

    简易滑动滤波算法 算法原理 将新数据放入到数组的最后 每次在得到数据之前先将数据左移一个元素 踢掉第一个元素最旧的数据 最后数组计算平均 include
  • 解决adb push时出现的“Read-only file system“问题

    出现Read only file system问题 不是因为文件或者文件夹的权限不对 而是要push的目录对应的分区是以只读方式挂载的 网上给出的解决办法是重新以读写方式挂载对应分区 以 system分区为例 使用命令 mount o re
  • 手写数字识别画板前后端实现

    1 系统概要 手写数字识别画板系统 按照MVC原则开发 主要由两部分组成 交互界面 视图View 部分是传统的HTML CSS JS网页 这同样也是一种遵循MVC开发方式 手写数字识别部分 模型Model 是使用Python开发的深度学习的
  • 【网络原理】传输层重点协议 TCP与UDP协议详解

    文章目录 一 UDP协议 1 UDP特点 2 UDP协议报文格式 3 基于UDP的应用层协议 4 关于UDP协议的一个拓展问题 经典面试题 二 TCP协议 1 TCP协议报文格式 2 TCP原理 1 确认应答机制 安全机制 2 超时重传机制
  • 【C++STL】快速排序算法(sort)的原理与使用

    一 sort算法原理 std sort 是 C 标准库中提供的排序算法 它使用的是一种经典的排序算法 快速排序 Quicksort 或者是其变种 快速排序是一种基于比较的排序算法 通过不断地选择一个基准值 pivot 将待排序序列分割为两个
  • 做好参加蓝桥杯省赛的准备

    1 选择方向 在除此选择要参加蓝桥杯的方向时是刚学完Java程序设计 对Java产生了比较大的兴趣 也觉得Java是一个特别灵活好用的语言 特别是eclipse的强大快捷键和找错功能使得编程快了很多 也有部分原因是因为C 好长时间没用 都快
  • android oaid

    Oaid获取接入流程 移动智能设备标识公共服务平台 AndroidID IMEI OAID获取 oaid sdk 1 1 0的aar 随着Google对隐私的重视以及Android10的逐渐普及 获取设备的唯一标识越来越来难 在Androi
  • python爬取豆瓣电影并分析_爬取豆瓣电影top250提取电影分类进行数据分析

    标签 空格分隔 python爬虫 一 爬取网页 获取需要内容 我们今天要爬取的是豆瓣电影top250 页面如下所示 我们需要的是里面的电影分类 通过查看源代码观察可以分析出我们需要的东西 直接进入主题吧 知道我们需要的内容在哪里了 接下来就
  • echarts实现横向柱图文字在柱图上面

    前言 echarts实现横向柱图文字在柱图上面 效果图 实现源代码 div style width 100 height 800px div
  • Immer编写简洁的更新state逻辑

    react官网推荐库use immer https www npmjs com package use immer 引入 import useImmer from use immer 优点 简化代码 只需要关注需要变动的部分 而 immer
  • python 使用sys.setdefaultencoding(‘utf-8‘) 显示中文(中文默认会乱码)

    正常情况下 我们在使用python做页面开发时 防止中文出现乱码问题 python2 情况下会使用 如下语句 import sys reload sys sys setdefaultencoding utf 8 但在python3下 报错
  • linux安装nodejs_Ubuntu 18.04 Linux上安装Etherpad,基于Web的实时协作编辑器

    介绍 Etherpad是一个开源的 基于Web的实时协作编辑器 它允许多个人使用他们的Web浏览器同时编辑文档 它还提供了一些很酷的功能 如富文本格式和即时消息 目标是在Ubuntu 18 04 Linux上安装Etherpad 约定 要求
  • vmwre15.5.1安装mac遇到的问题以及解决方法和相应工具

    虚拟键15 5 1 mac cdr文件 百度都是 注意版本 破解vmware没有apple操作系统选项 unlock工具 链接 https pan baidu com s 1tE91r3eCG3Swg3FDi VZ8A 提取码 d54g 放
  • ubuntu22安装和卸载nvidia驱动

    一 安装nvidia驱动 查看可以安装的版本 ubuntu drivers devices 选择安装nvidia driver 515 sudo apt install nvidia driver 515 重启 sudo reboot 验证
  • 服务器系统兼容性问题,微软表示因兼容性问题,部分用户无法升到Windows10最新版本...

    微软已警告Windows 10用户 由于英特尔Thunderbolt NVMe SSD的兼容性问题 他们可能被禁止升级到Windows 10版本2004或20H2 每当Microsoft发布新功能更新时 即使是次要功能更新 例如Window
  • SpringBoot3集成Kafka

    标签 Kafka3 Kafka eagle3 一 简介 Kafka是一个开源的分布式事件流平台 常被用于高性能数据管道 流分析 数据集成和关键任务应用 基于Zookeeper协调的处理平台 也是一种消息系统 具有更好的吞吐量 内置分区 复制