Linux Kafka 3.5 KRaft模式集群部署

2023-10-31

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

背景

kafkaKIP-500引入了KRaft替代Zookeeper来实现自我管理元数据

详细信息可以看原文链接

KRaft简介

KRaft是kafka用来取代zookeeper的分布式协调管理组件。

架构改变

原先依赖于Zookeeper选举出一个controller
现在由KRaft集群中自己选举,产生一个controller

优点

  • Kafka不用再依赖外部框架,能够做到独立运行
  • Kafka集群扩展时不用再受到Zookeeper读写能力的限制

更多优点和缺点这里暂时不太多讨论主要以部署为主

部署3节点kafaka集群

KRaft部署方式支持controllerbroker在同一进程。也支持分开部署
线上推荐分开部署。这里由于是测试集群,打算controllerbroker在同一进程部署

记得所有机器90929093端口打开

下载Kafka

wget https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz

这里第一次下载报错说证书已过期,添加证书忽略下载

wget --no-check-certificate https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz

发现国内服务器下载国外软件还是非常慢。最终决定找国内镜像。

  • 阿里云Kafka镜像:http://mirrors.aliyun.com/apache/kafka/3.5.0/?spm=a2c6h.25603864.0.0.3c7d126emg02YS

使用国内镜像下载

wget http://mirrors.aliyun.com/apache/kafka/3.5.0/kafka_2.13-3.5.0.tgz

三台机器都执行

解压

tar -xzf kafka_2.13-3.5.0.tgz

三台机器都执行

给集群生成一个UUID

我们进入到解压的bin目录,我这里是/data/kafka_2.13-3.5.0/bin
然后执行如下命令

kafka_2.13-3.0.0/bin/kafka-storage.sh random-uuid

单台机器生成即可

执行完会生产一个字符串,类似这样xgK3spReSO7ijVK4rEbbbQ

格式化存储路径

sh kafka-storage.sh format -t xgK3spReSO7ijVK4rEbbbQ  -c ../config/kraft/server.properties

三台机器都执行

修改配置

我们这里要修改三台机器的server.properties配置
我这里的路径是在/data/kafka_2.13-3.5.0/config/kraft/server.properties

  • node1
node.id = 1
controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
process.roles = broker,controller
listeners=PLAINTEXT://192.168.1.1:9092,CONTROLLER://92.168.1.1:9093
log.dirs=/data/kakfa01/logs
  • node2
node.id = 2
controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
process.roles = broker,controller
listeners=PLAINTEXT://192.168.1.2:9092,CONTROLLER://92.168.1.2:9093
log.dirs=/data/kakfa02/logs
  • node3
node.id = 3
controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
process.roles = broker,controller
listeners=PLAINTEXT://192.168.1.3:9092,CONTROLLER://92.168.1.3:9093
log.dirs=/data/kakfa03/logs

启动集群

export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"&&nohup sh /data/kafka_2.13-3.5.0/bin/kafka-server-start.sh /data/kafka_2.13-3.5.0/config/kraft/server.properties &

三台机器都执行

启动完我们就有了一个三节点的kafka集群

测试

创建topic

sh kafka-topics.sh --create --topic xiaozou --partitions 1 --replication-factor 1 --bootstrap-server 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092

查看topic

sh kafka-topics.sh --list --bootstrap-server 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092

代码测试

  • 生产消息
public class KafkaProducer {

    private static final String TOPIC = "xiaozou";
    private static final String BOOTSTRAP_SERVERS = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092";

    public static void main(String[] args) {
        // 生产消息
        produceMessage();
    }

    private static void produceMessage() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

        try {
            for (int i = 0; i < 10; i++) {
                String message = "小奏message " + i;
                System.out.println("开始发送消息");
                Future<RecordMetadata> send = producer.send(new ProducerRecord<>(TOPIC, message));
                RecordMetadata recordMetadata = send.get();
                System.out.println("Produced message: " + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }

}

  • 消费消息
public class KafkaConsumerExample {

    private static final String TOPIC_NAME = "xiaozou";

    private static final String GROUP_ID = "xiaozou_gid";

    private static final String BOOTSTRAP_SERVERS = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092";


    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);



        // 创建消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅主题
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        // 消费消息
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("接收到消息:key = " + record.key() + ", value = " + record.value() +
                        ", partition = " + record.partition() + ", offset = " + record.offset());
                }
                consumer.commitSync(); // 手动提交偏移量
            }
        } finally {
            consumer.close();
        }
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Linux Kafka 3.5 KRaft模式集群部署 的相关文章

随机推荐

  • 自动化测试优势&劣势

    目录 一 自动化测试概述 二 自动化测试优势 劣势 优势 劣势 三 自动化测试常用工具 代码级别 接口 协议级别 界面级别 一 自动化测试概述 软件自动化测试是相对手工测试而存在的 由测试人员根据测试用例中描述的规程一步步执行测试 得到实际
  • 创建chrome右键菜单划词搜索扩展

    转载请注明出处 http blog csdn net zhymax article details 8552830 上网时经常在多个搜索引擎间切换 但使用chrome自带的搜索引擎切换比较麻烦 换一个引擎就需要设置一次配置 因此也在chro
  • Working routine【Codeforces 706 E】【二维链表】

    Codeforces Round 367 Div 2 E 可以说是一道模拟题了 写了有些时候 可能是太菜了吧 题意 给出一个原始矩阵 之后有Q次操作 我们将两个矩阵交换位置 题目中保证两个矩阵不相交 给出的是两个矩阵的左上方的端点 以及它们
  • 机器学习sklearn-特征工程

    数据挖掘的5大流程 1 获取数据 2 数据预处理 3 特征工程 将原始数据转换为更能代表预测模型的潜在问题的特征的过程 可以通过挑选最相关的特征 提取特征以及创建特征来是想 4建模 测试模型并预测结果 5 上线模型 特征工程 sklearn
  • Qt也有垃圾回收(通过QScopedPointer实现),下决心在项目里使用QScopedPointer,省了太多事情了,而且更安全!!...

    也谈Qt的垃圾回收 前几天在做代码审核的时候 Kai Uwe Broulik建议使用QScopedPointer来替代手工内存管理 使用后发觉确实节约了不少代码量 我的CHERRY可以延长寿命了 但是通过简单地阅读代码 发现和Python等
  • Ubuntu18.04 ROS Melodic的cv_bridge指向问题(四种方式,包括opencv4)

    备注 2023 7 4修改 如果是ros空间 可以在工作空间中使用单独cv bridge的方式 比较简单 是我目前常用的方式 放在文章最后 由于ROS Melodic自带的是Opencv3 2 0 而我自己下载的是opencv3 4 5 所
  • AndroidFFmpeg

    https github com appunite AndroidFFmpeg git 本地路径 这个比较全 四种播放方式都有 问题流媒体播放无控制 太快太慢都报错 lbg android ffmpeg AndroidFFmpeg http
  • 机器学习葡萄酒质量_通过数据和机器学习制作出更好的啤酒和葡萄酒

    机器学习葡萄酒质量 带GPS的狗 电子鼻和可倾倒完美啤酒的机器人 GPS Wearing Dogs an Electronic Nose and a Robot That Pours the Perfect Beer Bushfires i
  • Hal库自动生成Freertos时出现osSemaphoreCreate和osSemaphoreWait报错

    由于freertos和Hal版本问题 生成的函数会生成老版本的 所以不兼容 需要改掉 将osSemaphoreCreate osSemaphore SEM 1 改成 osSemaphoreNew 1 1 osSemaphore SEM in
  • TimeGAN学习记录

    一 学习TimeGAN主要参考的链接如下 1 知乎上的TimeGAN论文研读 2 csdn上的一篇博客 论文阅读 Time Series Generative Adversrial Networks TimeGAN 时间序列GAN 3 时间
  • 使用神经网络对黄金期货交割价格进行预测-4 MATLAB

    上一篇文章讲述了如何对预测的结果进行合理化修正 本文主要讲述的是对神经网络本身的学习算法进行优化 一般优化神经网络有三种模式 一种为优化神经网络的连接结构 一种为优化神经网络的学习算法 一种为既优化连接结构 又优化学习算法 由于笔者的知识水
  • JetBrains下载历史版本

    https www jetbrains com clion download other html 在上方的链接中将clion改为idea phpstrom webstrom等等 转载于 https www cnblogs com yang
  • WuThreat身份安全云-TVD每日漏洞情报-2023-10-08

    漏洞名称 Glibc ld so本地权限提升漏洞 漏洞级别 高危 漏洞编号 CVE 2023 4911 相关涉及 系统 ubuntu 22 04 glibc Up to excluding 2 35 0ubuntu3 4 漏洞状态 POC
  • java调用.so文件

    第一步 public class JavaToCTest private native void sayHello 声明本地方法 static System loadLibrary JavaToCTest 需要加载的so库文件的名称 在li
  • iptables的CONNMARK与MARK

    iptables的CONNMARK与MARK Posted on January 24 2012 iptables的CONNMARK与MARK是用于给数据连接和数据包打标记的两个target 一直没搞明白二者的区别 直到昨天花了不少时间解决
  • 【zookeeper】fsync-ing the write ahead log in took which will adversely effect operation latency

    1 概述 原文 zookeeper fsync ing the write ahead log in took which will adversely effect operation latency 在解决上一个问题的时候遇到这个问题的
  • windows应用商店或者其他微软应用打不开的修复办法

    最近突然发现微软自带的那个便笺打不开了 想在应用商店里面重装一下 又发现windows store也变成灰色的 并且有个小叹号 打不开了 右键这个store 设置里面重置也没用 最后在某乎上面看了个回答 照着试了试 发现可以了 这里把步骤记
  • Vue3 如何实现一个全局搜索框

    效果 点击搜索或者按下ctrl k键 页面就会出现搜索框 搜索框页面 SearchBar vue 搜索框的ts SearchBar ts 封装的搜索框hook useSearch ts 在app vue中出现搜索框 App vue 1 搜索
  • 使用 Go API 快速下载 excel 文件

    我们有几个 Golang API 可以为 csvfiles 提供服务 但在提供以编程方式生成的 excel 文件方面没有任何帮助 为了避免重新编写 我们可以借助此服务器开始 main go 这使我们能够服务于路由 和 excel downl
  • Linux Kafka 3.5 KRaft模式集群部署

    这里是weihubeats 觉得文章不错可以关注公众号小奏技术 文章首发 拒绝营销号 拒绝标题党 背景 kafka在KIP 500引入了KRaft替代Zookeeper来实现自我管理元数据 详细信息可以看原文链接 KIP 500 KRaft