Kafka集群的搭建以及java生产消费代码测试

2023-11-04

1、什么是Kafka

官网上:Kafka®用于构建实时数据管道和流式应用程序。它具有横向可扩展性、容错性、速度极快,在数千家公司的生产中运行。

2、集群搭建准备

JDK
Zookeeper集群(https://blog.csdn.net/qq_32695789/article/details/86435349)
防火墙的关闭(很重要不然在启动的时候会一直报连接错误)
服务器之间的互信配置(.ssh目录下的操作见https://blog.csdn.net/qq_32695789/article/details/83477825)
下载安装包:http://kafka.apache.org/downloads

3、安装

1、解压:tar -zxvf xxx.jar
2、修改名字 mv xxx kafka (这一步可省。我就是习惯性操作)
3、在安装目录下新建logs文件夹 mkdir logs
在这里插入图片描述
4、修改配置文件 kafka/config/server.properties
broker.id=0 每台机器不一样不能重复
delete.topic.enable=true 允许删除topic
num.network.threads=3 处理网络请求的线程数量
num.io.threads=8 处理磁盘IO的线程数量
log.dirs=/home/soft/kafka/logs 存放运行日志的目录 默认在/tmp zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181 连接ZK的配置
5、复制到各个服务器上
6、复制完成之后修改每个服务器里的配置文件中的broker.id
7、安装完成启动:(启动前记得把zk启动)
bin/kafka-server-start.sh config/server.properties

4、基本命令

启动 后台启动使用nohup关键字(略)
bin/kafka-server-start.sh config/server.properties
关闭
bin/kafka-server-stop.sh stop
创建topic
bin/kafka-topics.sh --zookeeper hadoop1:2181 --create --replication-factor 3 --partitions 1 --topic first
查看topic列表
bin/kafka-topics.sh --zookeeper hadoop1:2181 --list
删除topic
bin/kafka-topics.sh --zookeeper hadoop1:2181 --delete --topic second
生产消息(9092是kafka的默认端口)
bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic first
消费消息
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic first
控制台测试生产消费时记得开两个窗口测试

5、java代码测试

1、引入pom依赖
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
	    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.2</version>
        </dependency>

2、编写生产者代码

    @Test
    public void producer(){
        Properties p = new Properties();
        //kafka地址,多个地址用逗号分割
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(p);
        String topic = "first";
        try {
            while (true) {
                String msg = "Hello," + new Random().nextInt(100);
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
                kafkaProducer.send(record);
                System.out.println("消息发送成功:" + msg);
                Thread.sleep(10000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            kafkaProducer.close();
        }
    }

然后再控制台里面进行消费查看
在这里插入图片描述
3、编写消费代码

    @Test
    public void consumer(){
        Properties p = new Properties();
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
        String topic = "first";
        kafkaConsumer.subscribe(Collections.singletonList(topic));// 订阅消息

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
                        record.topic(), record.offset(), record.value()));
            }
        }
    }

启动之后控制台打印如下
在这里插入图片描述

其他

本人QQ:806751350
github地址:https://github.com/linminlm

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka集群的搭建以及java生产消费代码测试 的相关文章

  • kafka如何避免消费组重平衡

    目录 前言 协调者 重平衡的影响 避免重平衡 重平衡发生的场景 参考资料 前言 Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程 在 Rebalance
  • kafka知识 --kafka权威指南

    我想既然Kafka是为了写数据而产生的 那么用作家的名字来命名会显得更有意义 我在大学时期上过很多文学课程 很喜欢Franz Kafka 况且 对于开源项目来说 这个名字听起来很酷 因此 名字和应用本身基本没有太多联系 Jay Kreps
  • kafka处理快速的原因

    生产者分析 生产者 producer 是负责向Kafka提交数据的 我们先分析这一部分 Kafka会把收到的消息都写入到硬盘中 它绝对不会丢失数据 为了优化写入速度Kafka采用了两个技术 顺序写入 和 MMFile 顺序写入 因为硬盘是机
  • Kafka之基础笔记

    1 kafka offset 存储 1 1 去zookeeper依赖 比较广为人知的Kafka offset存储方式为zookeeper 在0 8版本时 默认依然是zk 但是此时其实已经出现另外一种offset存储方式了 Kafka以 co
  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs
  • Kafka消息阻塞

    转自 http jis117 iteye com blog 2279519 hi all 大家都很关心kafka消息阻塞的情况 感谢RoctetMQ给我们的教训 Kafka上线也有一段时间了 确实有出现过消息阻塞的情况 虽然不影响业务而且用
  • kafka中partition数量与消费者对应关系

    kafka是由Apache软件基金会开发的一个开源流处理平台 kafka是一种高吞吐量的分布式发布订阅消息系统 它可以处理消费者在网站中的所有动作流数据 kafka中partition类似数据库中的分表数据 可以起到水平扩展数据的目的 比如
  • springboot本机启动elasticjob抛出异常HostException(ip is null)

    1 使用的elasticjob版本为3 0 1 2 本机的IPV4在校验isReachable 返回false 可能是使用无线网 导致ip验证问题 3 最后引入Groovy解决 引入包
  • Linux 下搭建 Kafka 环境

    安装步骤 准备软件目录 mkdir datalake 上传之前下载好的安装包到 datalake 目录下 jdk 8u181 linux x64 gz kafka 2 11 2 1 0 tgz zookeeper 3 4 5 tar gz
  • 大数据技术之Kafka——Kafka入门

    目录 一 概述 1 1 为什么要有Kafka 1 2 定义 1 3 消息队列 1 消息队列的应用场景 2 消息队列的两种模式 1 4 基础架构 二 Producer生产者 2 1 生产者消息发送流程 2 1 1 发送原理 2 2 异步发送A
  • Kafka——集群

    文章目录 集群 1 搭建个集群 2 集群发送消息 3 集群消费 3 1 Procuder 3 2 Consumer 4 消费顺序 集群 对于kafka来说 一个单独的broker意味着kafka集群中只有一个节点 要想增加kafka集群中的
  • Flink消费kafka出现空指针异常

    文章目录 出现场景 表现 问题 解决 tombstone Kafka中提供了一个墓碑消息 tombstone 的概念 如果一条消息的key不为null 但是其value为null 那么此消息就是墓碑消息 出现场景 双流join时 采用的是l
  • kafka + zookeeper下载/安装/使用(超详细)

    kafka是需要zk来支持 所以先下载zk 1 下载安装zookeeper 下载地址 选择不带source的 下载下来解压2次 进入到 D zookeeper apache zookeeper 3 6 1 bin conf 目录下 把zoo
  • win10系统下安装Kafka 的详细步骤

    Win10 系统下要使用Kafka需要经过以下三个步骤 1 安装JDK 需要安装依赖java JDK 2 安装zookeeper 资源协调 分配管理 3 安装Kafka 一 安装 Java SE Development Kit 13 0 1
  • kafka配置内外网访问

    listeners 学名叫监听器 其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务 advertised listeners 和 listeners 相比多了个 advertised Advertised 的
  • [分布式] zookeeper集群与kafka集群

    目录 一 Zookeeper 概述 1 1 Zookeeper定义 1 2 Zookeeper 工作机制 1 3 Zookeeper 特点 1 4 Zookeeper 数据结构 1 5 Zookeeper 应用场景 1 6 Zookeepe
  • Kafka 监控系统Eagle 使用教程 V1.4.0

    1 下载安装zookeeper 2 下载安装kafka 3 下载安装kafka eagle http download kafka eagle org tar zvxf kafka eagle bin 1 4 0 tar gz 4 配置JA
  • kafka的新API 得到最新一条数据

    业务的需要 需要得到最新的一条消息从kafka中 但是发现ConsumerRecords 这个对象并没有 get index 这种方式的获取并且只能 iterator 或者增强for 循环这种方式来循环 记录 但是有一个count 可以得到
  • MQ - KAFKA 高级篇

    kafak是一个分布式流处理平台 提供消息持久化 基于发布 订阅的方式的消息中间件 同时通过消费端配置相同的groupId支持点对点通信 适用场景 构造实时流数据管道 用于系统或应用之间可靠的消息传输 数据采集及处理 例如连接到一个数据库系
  • 一文弄懂事件Event与Kafka的区别

    事件 Event 和 Apache Kafka 是两个概念层面上有所不同的东西 它们在应用程序中的作用和使用场景也有很大的差异 1 概念和定义 事件 Event 事件是 系统内发生 的特定事情或状态变化的表示 在编程和软件设计中 事件通常被

随机推荐

  • macbook pro 散热方案,温度仅29度

    结论 Macbook Pro 13 3 寸 2017 控制住温度 性能飞起 5年前散热不好时 容易触发 CPU 降频 一 需求 长时间满载运行不降频 控制住温度 控制住散热噪音 二 尝试过的散热方案 散热方案 说明 最低温度 满载温度 一
  • 华为云云耀云服务器L实例评测

    前言 在上篇文章 华为云云耀云服务器L实例评测 快速部署MySQL使用指南 中 我们已经用 华为云云耀云服务器L实例 在命令行窗口内完成了MySQL的部署并简单使用 但是后台有小伙伴跟我留言说 能不能用 华为云云耀云服务器L实例 来实现个简
  • 联盛德W800开发板

    目录 W800 芯片介绍 W800开发板 主要接口如下 1 概述 2 准备工作 3 SDK目录结构如下 4 W800编译固件编译 4 1 安装MSYS到本地 4 2增加国内软件更新源 编辑4 3下载工具链 4 5 make工具链配置 5 M
  • 浪潮服务器不显示光驱,电脑不从光驱启动怎么办?我是浪潮品牌的机子。

    在DOS下可以装系统的 WIN98启动软盘引导系统为例在DOS下安装XP 为提高安装速度 需要在启动盘中添加smartdrv exe磁盘高速缓存 cache 程序 并且在安装之前运行该程序 smartdrv是一个磁盘高速缓存程序 称之为sm
  • React事件处理方法

    一 注意事项 1 React元素的事件处理和Dom元素很相似 但是有一点语法的不同 2 React事件的命名采用小驼峰的命名方式 而不是纯小写 camelCase 3 使用JSX语法时你需要传入一个函数作为事件处理函数 而不是一个字符串 例
  • 【CVPR 2022 多模态融合(有3D检测)】Multimodal Token Fusion for Vision Transformers

    Multimodal Token Fusion for Vision Transformers 论文简介 具体实现 Alignment agnostic fusion Alignment aware fusion Multimodal To
  • 如何快速的只取出列表中的数字

    my list a a a 1 2 3 4 5 A B C 提取出 12345 方法一 使用try方法测试 isalnum 判断是否是字母 my list a a a 1 2 3 4 5 A B C str1 for i in my lis
  • Elasticsearch 在Windows上安装和启动

    1 安装JDK 至少1 8以上 2 下载和解压缩Elasticsearch安装包 下载地址 https www elastic co cn downloads 3 启动Elasticsearch bin elasticsearch bat
  • H5存储方案——cookie、session、SessionStorage和LocalStorage

    1 简述 浏览器端存储网页中的数据有三种存储方案 cookie SessionStorage和LocalStorage 其中 SessionStorage和LocalStorage是H5新增的存储方案 而cookie经常同session一并
  • 数据结构之链表详解(2)——双向链表

    目录 前言 一 双向链表 A 双向链表的含义 B 双向链表的实现 1 双向链表的结构 2 链表的初始化 初始化图解 函数代码 3 动态申请节点函数 函数代码 4 打印双向链表函数 函数代码 5 尾部插入节点 图解 函数代码 测试 6 头插函
  • 关于指针的面试题,指向字符串和字符数组的单指针,二级指针,三级指针的使用。

    int a 3 4 0 printf d n sizeof a 48 printf d n sizeof a 0 0 4 printf d n sizeof a 0 16 printf d n sizeof a 0 1 4 地址 print
  • tkinter运行时卡住,点击按钮运行任务时界面卡住

    在tkinter中添加按钮 点击按钮在程序运行过程中tkinter界面会卡住 当运行完按钮任务 就好了 懒得自己写 在百度一搜整整一页都是一样的答案 看着一点都不方便 还得是自己动手丰衣足食 这种情况下 应该将耗时操作放在一个独立的线程中进
  • Vue.js 2.0 教程

    Vue js 介绍 Vue js 读音 vju 类似于 view 是一套构建用户界面的渐进式框架 Vue js 安装 全局安装 vue cli npm install global vue cli 创建一个基于 webpack 模板的新项目
  • linux idea 快捷键,Linux 下 IDEA 的 Ctrl+Alt+S

    前言 这是个困扰我一年多的问题 今天终于解决了 起因 一年前将主系统换成 Arch Linux 后 其他一切正常就是 IDEA 的打开设置的快捷键 ctrl alt s 失效 让我很是头疼 虽然不是很重要 但是对于我这种强迫症来说别提多难受
  • 大数据与云计算的关系

    就目前而言 要想发展好大数据 就离不开云计算 我们在进行大数据的时候同样也是离不开云计算的 于是很多人觉得大数据与云计算都有一定的关系 那么大家知道不知道大数据的云计算有什么关系呢 我们在这篇文章中给大家带来这个问题的答案 首先我们说一下大
  • Unity 解决添加自定义宏不生效的问题

    Unity版本 2020 3 平台 Android 问题描述 执行代码添加 删除宏定义 或者直接在PlayerSetting面板里直接添加 删除宏 通过if判断 获取的还是之前的 新增的宏并没有生效 代码添加 删除宏定义 添加宏定义 pri
  • 代码审计作业-area39/pikachu

    1 问答题 1 使用 docker 构建 pikachu镜像 1 搜索pikachu docker search pikachu 2 拉取镜像 docker pull area39 pikachu 3 启动pikachu镜像 docker
  • PaddlePaddle(3)——深度学习模型训练和关键参数调优详解

    转载请注明作者和出处 https blog csdn net qq 28810395 运行平台 Windows 10 AIstudio官网 https aistudio baidu com 飞桨领航团AI达人创造营 前言 1 什么是人工智能
  • ftp下载出现空文件,需要修改编码

    ftp下载出现空文件 需要修改编码 ftpClient retrieveFile new String ff getName getBytes gbk ISO 8859 1 is
  • Kafka集群的搭建以及java生产消费代码测试

    1 什么是Kafka 官网上 Kafka 用于构建实时数据管道和流式应用程序 它具有横向可扩展性 容错性 速度极快 在数千家公司的生产中运行 2 集群搭建准备 JDK Zookeeper集群 https blog csdn net qq 3