基于Springboot实现Kafka消费数据

2023-11-14

本文介绍使用Kafka监听和订阅两种不同方式进行数据消费

1、配置文件

spring:
     kafka:
        bootstrap-servers: 192.168.1.16:9092
        #消费者
        consumer:
          group-id: alarmService
          max-poll-records: 10 # 一次 poll 最多返回的记录数
            enable-auto-commit: false
            auto-commit-interval: 1000ms
            properties:
                max.poll.interval.ms: 360000
                session.timeout.ms: 150000
            #以下为kafka用户名密码的配置,不开启sasl时将以下配置删除
            # SASL鉴权方式
            sasl.mechanism: PLAIN
            # 加密协议
            security.protocol: SASL_PLAINTEXT
            # 设置jaas帐号和密码
            sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          auto-offset-reset: earliest
        #    listener:
        #      type: batch
        #      concurrency: 6
        #生产者
        producer:
          retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送
          batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
          buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
          key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类
          value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类
          properties:
            session.timeout.ms: 15000
            sasl.mechanism: PLAIN
            security.protocol: SASL_PLAINTEXT
            sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";

2、订阅模式

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String autoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String interval;

    @Value("${spring.kafka.consumer.key-deserializer}")
    private String key;

    @Value("${spring.kafka.consumer.value-deserializer}")
    private String value;

    @Value("${spring.kafka.consumer.properties.security.protocol}")
    private String securityProtocol;

    @Value("${spring.kafka.consumer.properties.sasl.mechanism}")
    private String SASLMechanism;

    @Value("${spring.kafka.consumer.properties.sasl.jaas.config}")
    private String SASLJaasConfig;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String offsetReset;
    
    @Value("${spring.kafka.consumer.max-poll-records}")
    private Integer records;

    @Value("${spring.kafka.consumer.properties.session.timeout.ms}")
    private Integer timeout;

    @Value("${spring.kafka.consumer.properties.max.poll.interval.ms}")
    private Integer pollInterval;

    /**
     * 消费者
     * @param topic 主题
     * @param groupId group.id
     */
    public void kafkaConsumer(String topic, String groupId) {
        Properties props = new Properties();
       //Kafka集群
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        //消费者组,只要group.id相同,就属于同一个消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, key);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, timeout);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);

        //用户密码认证参数
        props.put("security.protocol", securityProtocol);
        props.put("sasl.mechanism", SASLMechanism);
        props.put("sasl.jaas.config", SASLJaasConfig);
        
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费者订阅主题
        consumer.subscribe(Arrays.asList(topic));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
                    if (kafkaMessage.isPresent()) {
                        Object message = kafkaMessage.get();
                        JSONObject json = JSON.parseObject(message.toString());
                        //处理逻辑
                        
                        //同步提交,当前线程会阻塞直到offset提交成功
                        consumer.commitSync();
                    }
                }
            }
        } finally {
            consumer.close();
        }

    }

3、监听模式

@KafkaListener(topicPattern="#{'${spring.kafka.consumer.topics}'}",groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void kafkaConsumer(ConsumerRecord<?,?> record) {
    System.out.println("--------------kafka----------------");
    //获取小区id
    List<String> communityIds = communityBaseinfoMapper.getCommunityBaseinfoCommunityId();
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
        Object message = kafkaMessage.get();
        JSONObject json = JSON.parseObject(message.toString());
        ApplyAccess papplyAccess = json.toJavaObject(ApplyAccess.class);
        String communityId = papplyAccess.getCommunityId();
        if (communityIds.contains(communityId)){
            //数据存储
            String idCard = papplyAccess.getIdCard().replace("*", "");
            peopleBaseinfoService.savePapplyAccess(papplyAccess,idCard);
        }
    }
}

4、kafka配置从头消费历史数据

消费者要从头开始消费某个topic的全量数据,需要满足2个条件(spring-kafka):

(1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);

(2)指定"auto.offset.reset"参数的值为earliest;

5、auto.offset.reset设置说明

auto.offset.reset具体含义:(注意版本不同,配置参数会有所不一致,具体参考官网)

  • earliest
    当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest
    当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none
    topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于Springboot实现Kafka消费数据 的相关文章

随机推荐

  • 惠普笔记本的Windows10和Ubuntu20.04双系统安装教程

    Windows10和Ubuntu20 04双系统安装教程 1 下载Ubuntu系统镜像到D盘 2 下载安装镜像工具到D盘 3 制作Ubuntu的系统安装盘 4 磁盘分区 5 GPT分区安装Ubuntu 6 结语 7 参考资料 惠普暗影精灵使
  • Python+selenium模块爬虫实战---拉勾网

    Python selenium模块爬虫实战 拉勾网 一 项目需求 二 selenium概述 三 爬虫思路 四 代码实现 五 完整代码 一 项目需求 项目需求 实现一个可以自动获取拉钩网 自定义搜索 的岗位招聘信息的爬虫程序 实现工具 Pyc
  • 【Unity InputSystem】基础教程(保姆级超详细超基础!!!)

    InputSystem基础教程 1 基础概念 1 1 前言 2 基础操作 2 1插件安装 2 2 如何创建InputActions 2 3 InputActions概念及结构关系 ActionMaps Actions ActionPrope
  • Disentangled Representation:IVIF

    DRF Disentangled Representation for Visible and Infrared Image Fusion DRF 可见光和红外图像融合的解纠缠表示 在本文中 我们通过将 解纠缠表示 应用于可见光和红外图像融
  • ssh登录一直提示修改密码解决

    用SSH登录就会出现用户登录失败 提示诸如 WARNING Your password has expired You must change your password now and login again 等等 在开发板手工修改 大致
  • java调用c++文件

    网上有很多例子 参照着执行有时候相对路径问题经常卡住了 实战了一把通过后 特重新整理一份 供小白参考 首先需要有一个含有native方法的java类 vi Users mac work git javademo src main java
  • 一点整理

    1 美国在2010年以后开始流行数字化转型的 在2010年以前 2006年社交网络FB YOU 在2004 2006 Web2 0热之前 企业是无法直接触达到每个消费者的 2006年Amazon电子商务 这个是我瞎凑的 但因为是在线交易所以
  • 入门Python必备100道练习题

    给大家整理了这份今天给大家分享100道Python练习题 在此之前 先给大家推荐一个工具 是一个对 Python 运行原理进行可视化分析的工具 Python Tutor 点击 Next 按钮就会根据执行步骤显示原理 对新手理解代码运行原理有
  • C++day01

    一 C 简介 本贾尼 斯特劳斯特卢普 于1979年4月在贝尔实验室负责分析UNIX系统的内核的流量情况 希望有一款更加模块化的工具 于1979年10月开始着手开发一种新的编程语言 在C语言的基础上增加了面向对象机制 这就是C 的来历 在19
  • 从量子物理到AI医疗,这位清华博士后想用十年弥补病理医生的"百年缺口"

    大数据文摘出品 作者 易琬玉 根据WHO下属 国际癌症研究机构 公布的最新全球癌症数据报告 2018年全球新增癌症诊断病例约1910万 死亡病例约960万 约1 5男性和1 6女性在一生中会罹患癌症 1 8男性和1 11女性因癌症而死亡 对
  • 安全应急响应案例

    1 产生背景 1988年11月发生的莫里斯蠕虫病毒事件 Morris Worm Incident 致使当时的互联网络超过10 的系统不能工作 该案件轰动了全世界 并且在计算机科学界引起了强烈的反响 为此 1989年 美国国防部高级研究计划署
  • linux系统ipcclean命令,Linux学习笔记29——IPC状态命令

    一 IPC IPC是进程间通讯 在前面 我们相继学习了进程间通讯机制有信号量 内存共享 消息队列 状态命令 ipcs 和删除命令 ipcrm 提供了一种检查和清理IPC机制的方法 二 状态命令 1 显示信号量状态用ipcs s 2 显示共享
  • 华为OD机试真题- 狼羊过河【2023Q2】【JAVA、Python、C++】

    题目描述 一农夫带着m只羊 n只狼过河 农夫有一条可载x只狼 羊的船 农夫在时或者羊的数量大于狼时 狼不会攻击羊 农夫在不损失羊的情况下 运输几次可以完成运输 返程不计入次数 输入描述 输入参数为 m n x m 为羊的数量 n为狼的数量
  • nodeJS入门(四)之身份验证

    身份验证 一 bcrypt模块 1 1 简单介绍一下 1 2 安装 1 3 加密 二 身份验证 2 1 session 会话 2 1 1 session实现身份验证的思路 2 1 2 session的业务流程 2 1 3 express s
  • QT学习总结

    转眼间工作已经三个月了 实习期也已经满了 在这三个月中 对qt的学习遇到了很多问题 也解决了很多问题 也留下了一些现在的未解之谜 在工作中关于qt的一些东西也需要进行一些记录 QT Creator对大于带有两个空格的目录和中文命名的目录不支
  • cv2.error: OpenCV(4.6.0) /io/opencv/modules/imgcodecs/src/grfmt_exr.cpp:103: error

    问题描述 cv2 error OpenCV 4 6 0 io opencv modules imgcodecs src grfmt exr cpp 103 error 213 The function feature is not impl
  • 数据库服务器配置参数修改,数据库服务器参数配置

    数据库服务器参数配置 内容精选 换一换 云搜索服务 Cloud Search Service 为用户提供结构化 非结构化文本的多条件检索 统计 报表 本章节介绍如何通过CDM将本地Elasticsearch整库迁移到云搜索服务中 流程如下
  • 如何在GitHub的repository中建立文件夹

    GitHub的repository无法直接建立文件夹 需要曲线建立 第一步 创建新的repository 若已有repository可直接将其打开 第二步 在repository界面右上角选择创建新文件 create new files 不
  • 费曼技巧学习笔记

    博主狂言 技巧正文 技巧的详细步骤 技巧步骤一 技巧步骤二 技巧步骤三 技巧的提纲总结 费曼技巧可以解决的问题 费曼技巧的另一种描述四步学习法 步骤一 步骤二 步骤三 步骤四 博主读后感 博主狂言 初识费曼技巧 甚是熟悉 博主自认有那么一点
  • 基于Springboot实现Kafka消费数据

    本文介绍使用Kafka监听和订阅两种不同方式进行数据消费 1 配置文件 spring kafka bootstrap servers 192 168 1 16 9092 消费者 consumer group id alarmService