kafka 应用实战

2023-11-12

一、Java 中使用 kafka 进行通信

依赖

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.0.0</version>
</dependency>

发送端代码

public class MyKafkaProducer extends Thread{

    //producer api
    KafkaProducer<Integer,String> producer;
    String topic;  //主题

    public MyKafkaProducer(String topic) {
        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.150:9092,192.168.10.151:9092,192.168.10.152:9092");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"my-producer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //连接的字符串
        //通过工厂
        //new
        producer=new KafkaProducer<Integer, String>(properties);
        this.topic = topic;
    }
    @Override
    public void run() {
        int num=0;
        while(num<20) {
			//get 会拿到发送的结果
            //同步 get() -> Future()
            String msg="pratice test message:"+num;
            try {
                producer.send(new ProducerRecord<Integer, String>
                        (topic,msg)).get();
                TimeUnit.SECONDS.sleep(2);
                num++;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
			
            /*try {
                String msg="my kafka practice msg:"+num;
                //回调通知
				//异步
                producer.send(new ProducerRecord<>(topic, msg), (metadata, exception) -> {

                    System.out.println(metadata.offset()+"->"+metadata.partition()+"->"+metadata.topic());
                });
                TimeUnit.SECONDS.sleep(2);
                ++num;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }*/

        }
    }

    public static void main(String[] args) {
        new MyKafkaProducer("test_partition").start();
    }
}

消费端代码

public class MyKafkaConsumer extends Thread{

    KafkaConsumer<Integer,String> consumer;
    String topic;

    public MyKafkaConsumer(String topic) {
        Properties properties=new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.150:9092,192.168.10.151:9092,192.168.10.152:9092");
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"my-consumer");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"my-gid3");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000"); //自动提交(批量确认)
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //一个新的group的消费者去消费一个topic
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //从头开始消费
        consumer=new KafkaConsumer<Integer, String>(properties);
        this.topic = topic;
    }

    @Override
    public void run() {
        consumer.subscribe(Collections.singleton(this.topic));
        while(true){
            ConsumerRecords<Integer,String> consumerRecords=consumer.poll(Duration.ofSeconds(1));
            consumerRecords.forEach(record->{
                //null->my kafka practice msg:0->63
                System.out.println(record.key()+"->"+record.value()+"->"+record.offset());
            });
        }
    }

    public static void main(String[] args) {
        new MyKafkaConsumer("test_partition").start();
    }
}

二、异步发送

kafka对于消息的发送,可以支持同步和异步,前面演示的案例中,我们是基于同步发送消息。

同步会 需要阻塞,而异步不需要等待阻塞的过程。 从本质上来说,kafka 都是采用异步的方式来发送消息到 broker,但是 kafka 并不是每次发送消息都会直 接发送到 broker上,而是把消息放到了一个发送队列中,然后通过一个后台线程不断从队列取出消息进 行发送,发送成功后会触发 callback。kafka 客户端会积累一定量的消息统一组装成一个批量消息发送出去,触发条件是前面提到的batch.sizelinger.ms

而同步发送的方法,无非就是通过future.get()来等待消息的发送返回结果,但是这种方法会严重影响消 息发送的性能。

//发送端中run方法修改
@Override
public void run() {
        int num=0;
        while(num<20) {
            try {
                String msg="my kafka practice msg:"+num;
                //get 会拿到发送的结果
                //同步 get() -> Future()
                //回调通知

                producer.send(new ProducerRecord<>(topic, msg), (metadata, exception) -> {

                    System.out.println(metadata.offset()+"->"+metadata.partition()+"->"+metadata.topic());
                });
                TimeUnit.SECONDS.sleep(2);
                ++num;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

batch.size

生产者发送多个消息到 broker上 的同一个分区时,为了减少网络请求带来的性能开销,通过批量的方式 来提交消息,可以通过这个参数来控制批量提交的字节数大小,默认大小是16384byte,也就是16kb, 意味着当一批消息大小达到指定的 batch.size 的时候会统一发送

linger.ms

Producer 默认会把两次发送时间间隔内收集到的所有 Requests 进行一次聚合然后再发送,以此提高吞 吐量,而 linger.ms 就是为每次发送到 broker 的请求增加一些 delay,以此来聚合更多的 Message 请求。 这个有点像 TCP 里面的 Nagle 算法,在 TCP 协议的传输中,为了减少大量小数据包的发送,采用了 Nagle 算法,也就是基于小包的等-停协议。

batch.size 和 linger.ms 这两个参数是 kafka 性能优化的关键参数,我们会发现 batch.size 和 linger.ms 这两者的作用是一样的,如果两个都配置了,那么怎么工作的呢?实际上,当二者都配 置的时候,只要满足其中一个要求,就会发送请求到broker上

三、基础配置分析

group.id

consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以 有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有 消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由 同一个消费组内的一个 consumer 来消费。

如下图所示,分别有三个消费者,属于两个不同的 group,那 么对于 firstTopic 这个 topic 来说,这两个组的消费者都能同时消费这个 topic 中的消息,对于此时的架构来说,这个 firstTopic 就类似于 ActiveMQ 中的 topic 概念。
在这里插入图片描述

如下图所示,如果3个消费者都属于同一个 group,那么此时 firstTopic 就是一个 Queue 的概念

在这里插入图片描述

enable.auto.commit

消费者消费消息以后自动提交,只有当消息提交以后,该消息才不会被再次接收到,还可以配合 auto.commit.interval.ms 控制自动提交的频率。

当然,我们也可以通过 consumer.commitSync() 的方式实现手动提交

auto.offset.reset

这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来消费指定的 topic 时,对于该参数的配置,会有不同的语义

  • auto.offset.reset=latest情况下,新的消费者将会从其他消费者最后消费的 offset 处开始消费 Topic 下的消息
  • auto.offset.reset= earliest情况下,新的消费者会从该topic最早的消息开始消费
  • auto.offset.reset=none情况下,新的消费者加入以后,由于之前不存在offset,则会直接抛出异常。

max.poll.records

此设置限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔要处理的最大值。通过调 整此值,可以减少 poll 间隔

四、Springboot+kafka

springboot 的版本和 kafka 的版本,有一个对照表格,如果没有按照正确的版本来引入,那么会存在版 本问题导致 ClassNotFound 的问题,具体请参考

https://spring.io/projects/spring-kafka

jar包依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.4</version>
</dependency>

KafkaProducer

@Component
public class KafkaProducer {
	@Autowired
	private KafkaTemplate<String,String> kafkaTemplate;
	public void send(){
		kafkaTemplate.send("test","msgKey","msgData");
	}
}

KafkaConsumer

@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"test"})
    public void listener(ConsumerRecord record){
    	Optional<?> msg=Optional.ofNullable(record.value());
    	if(msg.isPresent()){
    		System.out.println(msg.get());
    	}
    }
}

application配置

spring.kafka.bootstrap.servers=192.168.10.150:9092,192.168.10.151:9092,192.168.10.152:9092
spring.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

测试

public static void main(String[] args) {
    ConfigurableApplicationContext context=SpringApplication.run
    	(KafkaDemoApplication.class, args);
    KafkaProducer kafkaProducer=context.getBean(KafkaProducer.class);
    for(int i=0;i<3;i++){
        kafkaProducer.send();
        try {
        	Thread.sleep(3000);
        } catch (InterruptedException e) {
        	e.printStackTrace();
        }
    }
}

五、自定义分区(Partitioner)

MyPartition

public class MyPartition implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //这里可以根据 key 来返回分区,实现某类消息统一进入哪个分区,key发送消息时可以指定
        // if (key == X) {...}
        System.out.println("enter");
        // 根据topic获取全部的分区
        List<PartitionInfo> list=cluster.partitionsForTopic(topic);
        int leng=list.size();
        // 为空随机分配消息到分区
        if(key==null){
            Random random=new Random();
            return random.nextInt(leng);
        }
        // 不为空取模分配消息
        return Math.abs(key.hashCode())%leng;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }

    public static void main(String[] args) {
        System.out.println(Math.abs("my-gid1".hashCode())%50);
    }
}

发送端代码添加自定义分区

public MyKafkaProducer(String topic) {
    Properties properties=new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.150:9092,192.168.10.151:9092,192.168.10.152:9092");
    properties.put(ProducerConfig.CLIENT_ID_CONFIG,"my-producer");
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.MyPartition");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    //连接的字符串
    //通过工厂
    //new
    producer=new KafkaProducer<Integer, String>(properties);
    this.topic = topic;
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka 应用实战 的相关文章

  • flink连接kafka报:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic

    报错信息 Caused by org apache flink runtime JobException Recovery is suppressed by NoRestartBackoffTimeStrategy at org apach
  • Kafka3.0.0版本——消费者(消费者组案例)

    目录 一 消费者组案例 1 1 案例需求 1 2 案例代码 1 2 1 消费者1代码 1 2 2 消费者2代码 1 2 3 消费者3代码 1 2 4 生产者代码 1 3 测试 一 消费者组案例 1 1 案例需求 测试同一个主题的分区数据 只
  • 《消息队列高手课》缓存策略:如何使用缓存来减少磁盘IO?

    现代的消息队列 都使用磁盘文件来存储消息 因为磁盘是一个持久化的存储 即使服务器掉电也不会丢失数据 绝大多数用于生产系统的服务器 都会使用多块儿磁盘组成磁盘阵列 这样不仅服务器掉电不会丢失数据 即使其中的一块儿磁盘发生故障 也可以把数据从其
  • kafka中partition数量与消费者对应关系

    kafka是由Apache软件基金会开发的一个开源流处理平台 kafka是一种高吞吐量的分布式发布订阅消息系统 它可以处理消费者在网站中的所有动作流数据 kafka中partition类似数据库中的分表数据 可以起到水平扩展数据的目的 比如
  • Kafka原理分析

    在基础篇中我们介绍MQ的一些基础原理 这篇文章 我们针对kafka进行较深入的分析 上篇文章中我们提到了kafka中一个名词broker 其实broker可以理解成为一台kafa服务器 kafka的特性和功能 在kafka设计之初是为了实时
  • 基于Spark的电商用户行为实时分析可视化系统(Flask-SocketIO)

    基于Spark的电商用户行为实时分析可视化系统 Flask SocketIO 项目简介 该项目已上线蓝桥课程 有需要的可凭邀请码 UB5mdLbl 学习哦 有优惠 课程地址 https www lanqiao cn courses 2629
  • 大数据技术之Kafka——Kafka入门

    目录 一 概述 1 1 为什么要有Kafka 1 2 定义 1 3 消息队列 1 消息队列的应用场景 2 消息队列的两种模式 1 4 基础架构 二 Producer生产者 2 1 生产者消息发送流程 2 1 1 发送原理 2 2 异步发送A
  • 黑马头条 热点文章实时计算、kafkaStream

    热点文章 实时计算 1 今日内容 1 1 定时计算与实时计算 1 2 今日内容 kafkaStream 什么是流式计算 kafkaStream概述 kafkaStream入门案例 Springboot集成kafkaStream 实时计算 用
  • windows python kafka 初级使用

    今天花了点时间在这个kafka上 因为我们工作中也用到了kafka 我这边对于kafka的理解是能用或者知道基本原理就行 实现在自己的windows环境搭建一次kafka 然后使用python进行数据的生产和消费 如果之后工作中对于kafk
  • 仿kafka实现java版时间轮

    系统定时 超时 在我们平时的项目开发中 会设置系统的超时时间 比如在http接口中设置超时时间 在定时调度中也会用到 在jdk的开发的实现Timer和ScheduledThreadPoolExecutor DelayQueue定时调度中使用
  • kafka + zookeeper下载/安装/使用(超详细)

    kafka是需要zk来支持 所以先下载zk 1 下载安装zookeeper 下载地址 选择不带source的 下载下来解压2次 进入到 D zookeeper apache zookeeper 3 6 1 bin conf 目录下 把zoo
  • kafka问题解决:org.apache.kafka.common.errors.TimeoutException

    记录使用kafka遇到的问题 1 Caused by java nio channels UnresolvedAddressException null 2 org apache kafka common errors TimeoutExc
  • kafka 监控工具--CMAK

    CMAK previously known as Kafka Manager is a tool for managing Apache Kafka clusters See below for details about the name
  • [Docker]使用Docker部署Kafka

    Kafka 是一个分布式流处理平台 它依赖于 ZooKeeper 作为其协调服务 在 Kafka 集群中 ZooKeeper 负责管理和协调 Kafka 的各个节点 因此 要在 Docker 容器中启动 Kafka 通常需要同时启动一个 Z
  • kafka(三)重平衡

    历史文章 kafka 一 kafka的基础与常用配置 文章目录 一 kafka消费者组 二 重平衡 Rebalance 2 1 重平衡触发条件 2 2 重平衡策略 2 2 1 Range 平均分配 2 2 2 RoundRobin 轮询分配
  • Kafka性能保证和延时队列实现原理

    数据不丢不漏 不重不错 一 不丢 生产写入消息不丢失 数据组织形式 首先 从数据组织形式来说 kafka有三层形式 kafka有多个主题 topic 每个主题有多个分区 分区分为主分区和副本分区 每个分区又有多条消息 而每个分区可以分布到不
  • kafka的新API 得到最新一条数据

    业务的需要 需要得到最新的一条消息从kafka中 但是发现ConsumerRecords 这个对象并没有 get index 这种方式的获取并且只能 iterator 或者增强for 循环这种方式来循环 记录 但是有一个count 可以得到
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De
  • 如何清除 MassTransit 队列?

    我想在集成测试设置例程中删除队列中的所有消息 如何实现 谷歌搜索 智能感知暴力没有运气 如果重要的话 我使用 RabbitMq 作为传输 无法从 MassTransit 内的队列中 删除 对于测试 您可以通过使用临时的随机队列 URIrab
  • Java:在后台更新有大量行的 JTable

    我正在编写一个简单的 Java Swing 实用程序 它将从 MQ JMS 服务器读取消息并将它们显示在 JTable 中 private void getMessages try if null Queue Queue close Clo

随机推荐

  • 传递点的大小

    仍然用attribute 因为是顶点着色器 11
  • 博客要写得好,Emoji要用对!

    EMOJI 01 表情与心情 02 动作与身份 03 动物与植物 04 食品与饮料 05 旅行和地点 06 娱乐与活动 07 物品与工具 08 符号与标识 01 表情与心情 02 动作与身份
  • 华为OD机试 - 最差产品奖( Python)

    题目描述 A公司准备对他下面的N个产品评选最差奖 评选的方式是首先对每个产品进行评分 然后根据评分区间计算相邻几个产品中最差的产品 评选的标准是依次找到从当前产品开始前M个产品中最差的产品 请给出最差产品的评分序列 输入描述 第一行 数字M
  • 手把手教你,把3D模型从stl格式导出iges格式的方法

    工具 Hypermesh 注意 下载和安装视频在我的上传资源里面 记得安装路径不能有中文 自己的操作账户名也不能是中文的 方法 第一 按照如下步骤 导入stl模型 第二步 点击Shaded 按钮 显示实体网格 第三步 点击Geom下的sur
  • 自己搭的12V 电机驱动电路设计

    1 单MOS管驱动电路 采用P75NF75为主角构成 2 双MOS管电机驱动电路设计 采用D4184管组合 3 半桥驱动电路设计 采用BTS7960芯片 两路半桥构成全桥驱动电路
  • s3c6410 android 移植Step by step

    Report 2009 03 05 转载请说明出处 不得用于商业用途 Linux forum id hongjiujing Porting android on s3c6410 Environment ubuntu 8 10 Board X
  • KeilMDK编译错误Error: L6218E: Undefined symbol __aeabi_assert (referred from xxx.o).

    问题描述 AirPressure AirPressure axf Error L6218E Undefined symbol aeabi assert referred from mbrtu o 问题原因 Error L6218E Unde
  • 爬虫乱码UnicodeEncodeError: ‘gbk’ codec can’t encode character ‘\xa0’ in position

    在Python中将网址写入文件的时候 会碰到 UnicodeEncodeError gbk codec can t encode character xa0 in position 0这个问题 其实就是在windows中 新建的文本文件的默
  • Connecting the Dots: 基于图神经网络的多元时间序列预测——学习合集

    关于源代码和数据集的理解可以参考以下几篇文章 1 github源代码 数据集 安装环境要求 2 原论文百度云链接 提取码 1234 3 帮助理解论文的文章1 4 帮助理解论文的文章2 5 帮助理解论文的文章3 6 交通流量预测数据集解读 7
  • UnityVR--小程序8--激光门伤害

    本文使用Line Renderer组件 在门框中画出一道激光线 被线照射到的主角将会扣分 另外 激光仅检测ragCast层 所以主角必须添加到ragCast层中 与坦克对战小程序 UnityVR 小程序7 坦克对战 的设置相同 1 在场景中
  • MySQL多表查询 (超详细)

    一 多表关系 项目开发中 在进行数据库表结构设计时 会根据业务需求及业务模块之间的关系 分析并设计表结构 由于业务之间相互关联 所以各个表结构之间也存在着各种联系 基本上分为三种 一对多 多对一 多对多 一对一 1 1 一对多 案例 部门与
  • 在线算法(online algorithm)--竞争性分析

    文章目录 一 Problem Setting 1 1 competitve analysis 1 2 page replacement 二 Deterministic Online Algorithms 2 1 deterministic
  • 穆土 的学习和生活网站

    前端学习 哔哩哔哩 哔哩哔哩中有很多的 up 主 都有学习的资源 你可以学到等多东西 个人关注的 up 主 pink 老师 黑马培训的一位老师 他带领了很多人进入了前端的领域 我看他的 html css js 的入门商品 珠峰培训官方 周萧
  • 任意版本pytorch-gpu环境搭建方法

    任意版本pytorch gpu环境搭建方法 tortorish的博客 CSDN博客 网上关于pytorch gpu环境搭建的教程非常多 但若在国内 直接使用pytorch官网上的命令 经常会遇到下载过慢的情况 而若使用清华源 阿里源等网站
  • WindowBuilder for Eclipse 3.7

    http download eclipse org windowbuilder WB integration 3 7 MyEclipse10 7 1和MyEclipse10 7的下载地址 http downloads myeclipseid
  • 深聊自动化测试之:10年小鱼给你10条建议,让你在自动化界占据一个墙角

    10年小鱼的自动化建议 1 哪一刻 让你想起了自动化 1 1 执行回归测试 1 2 压测场景执行并发 1 3 UI稳定 接口不断升级 2 七问 是否了解自动化风险 2 1 团队成员的资历 2 2 自动化成本投入产出比 2 3 慎重对待UI级
  • Python中eval()函数的使用

    今天给大家分享一下Python中的eval 函数 如果感觉博主的文章还不错的话 希望大家点赞支持一下博主 文章目录 eval 函数 语法 实例 实例1 实例2 实例3 eval 函数 eval 函数用来执行一个字符串表达式 并返回表达式的值
  • 【Spring传播机制底层原理】

    一 Spring的事务传播机制 Spring的事务传播机制是Spring框架中最核心的机制之一 它能够灵活地控制多个事务方法的执行顺序 提交或回滚等行为 在Spring中 事务是通过TxManager来管理的 TxManager是一个接口
  • Hyper-V:无法打开虚拟机,因为虚拟机监控程序未运行

    管理员权打开 cmd 窗口 输入 bcdedit set hypervisorlaunchtype Auto 前提是机器已经开启了 虚拟化和开启了虚拟机监控程序
  • kafka 应用实战

    一 Java 中使用 kafka 进行通信 依赖