kafka入门安装及消息发送接受初体验(附源码)

2023-10-27

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

官方文档

https://kafka.apache.org/quickstart

版本

  • 3.5.0

安装

这里我们提供两种安装方式,一种是编译好的源码包安装,一种是傻瓜式的Docker compose方式安装

本次提供的两种安装方式都是快速体验安装方式,线上不要使用这种方式。Docker compose安装方式未验证,但是应该也没很大问题

安装包部署

这里我们的安装方式还是选择依赖zookeeper,实际kafka也可以使用他们自研的注册中心KRaft

  1. 下载安装包
weget https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
  1. 解压
tar -xzf kafka_2.13-3.5.0.tgz
  1. 进入目录
cd kafka_2.13-3.5.0
  1. 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
  1. 打开另一个终端启动kafka
bin/kafka-server-start.sh config/server.properties

所有服务启动完成我们就拥有一个基本的Kafka环境运行并准备使用。

Docker compose安装

注意该Docker compose方式我未验证,官方也没有提供Docker compose的部署方式,如果有问题可以使用下面的安装包方式部署

Docker compose安装是最简单最省心的
比如我们编写一个docker compose

vim docker-compose.yml

copy如下代码

version: '3.5'
services:
  zookeeper:
    image: bitnami/zookeeper   ## 镜像
    container_name: zookeeper-1
    hostname: zookeeper-1
    user: root
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"
    volumes:
      - ./data/zookeeper-1:/bitnami/zookeeper
    environment:
      - ZOO_SERVER_ID=1
      # - ZOO_SERVERS=0.0.0.0:2888:3888,zookeeper-2:2888:3888,zookeeper-3:2888:3888
      - ALLOW_ANONYMOUS_LOGIN=yes
      - ZOO_AUTOPURGE_INTERVAL=1
  kafka:
    image: 'bitnami/kafka:2.8.0'
    ports:
      - '9092:9092'
      - '9999:9999'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      # 客户端访问地址,更换成自己的
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      # 允许使用PLAINTEXT协议(镜像中默认为关闭,需要手动开启)
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 关闭自动创建 topic 功能
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      # 全局消息过期时间 6 小时(测试时可以设置短一点)
      - KAFKA_CFG_LOG_RETENTION_HOURS=6
      # 开启JMX监控
      - JMX_PORT=9999
      #volumes:
      #- ./kafka:/bitnami/kafka
    depends_on:
      - zookeeper
  • 启动:
# docker-compose up 旧版本的docker 使用这个命令
docker compose up

测试

创建一个topic

cd kafka_2.13-3.5.0
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

新建一个生产者

在一个新终端执行如下命令

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

这样我们就启动了一个生产者,我们可以随意输入我们要发送的消息

This is my first event
This is my second event

新建一个消费者

在一个新终端执行如下命令

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

这样消费者就会自动消费我们刚才创建的topic:quickstart-events的消息了

java sdk接入

源码已上传至github地址

  • github:https://github.com/weihubeats/weihubeats_demos/blob/master
public class KafkaExample {

	private static final String TOPIC = "quickstart-events";
	private static final String BOOTSTRAP_SERVERS = "127.0.0.1:9092";

	private static final String GROUP_ID = "testGid";

	public static void main(String[] args) {
		new Thread(KafkaExample::consumeMessage).start();
		// 生产消息
		produceMessage();

		// 消费消息
//		consumeMessage(); 
	}

	private static void produceMessage() {
		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

		Producer<String, String> producer = new KafkaProducer<>(props);

		try {
			for (int i = 0; i < 10; i++) {
				String message = "Message " + i;
				System.out.println("开始发送消息");
				Future<RecordMetadata> send = producer.send(new ProducerRecord<>(TOPIC, message));
				RecordMetadata recordMetadata = send.get();
				System.out.println("Produced message: " + message);
			}
		}
		catch (Exception e) {
			e.printStackTrace();
		}
		finally {
			producer.close();
		}
	}

	private static void consumeMessage() {
		System.out.println("消费消息开始");
		Properties props = new Properties();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
		props.put("group.id", GROUP_ID);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

		Consumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Collections.singletonList(TOPIC));

		try {
			while (true) {
				ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
				poll.forEach(record -> System.out.println("Consumed message: " + record.value()));
			}
		}
		catch (Exception e) {
			e.printStackTrace();
		}
		finally {
			consumer.close();
		}
	}

}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka入门安装及消息发送接受初体验(附源码) 的相关文章

  • Linq 连接两个值

    假设我有一个列表 City State 它最初来自数据库 我有LocationID 但现在我将它加载到内存中 假设我还有一张快餐店表 其中记录了城市和州 我需要获取与城市和州相匹配的机构列表 注意 我尝试描述一个简化的场景 我的业务领域完全
  • “ToListAsync()”和“AsAsyncEnumerable().ToList()”之间的区别

    函数需要返回Task
  • LINQ 如何强制查询具体化?

    假设我有简单的 LINQ 扩展 var filtered data Where i gt i Count gt 0 我知道除非我开始使用 否则不会对其进行评估filtered i e foreach DataItem i in filter
  • Linq 的 let 语句将引用设置为 null

    某个天体星座包含SortedList将对这些列表的引用设置为null当使用一个let在 Linq 查询中 这是一个最小的工作示例 其中大部分只是一些设置 以确保它反映我正在工作的环境 重要的部分是在最后一行 它会抛出一个NullRefere
  • 如果数组为空,LINQ 返回 null

    public class Stuff public int x other stuff 我有一个IEnumerable
  • C# 动态 Linq 变量Where 子句

    我正在按照 Scott Gu 的文章创建动态 LINQhttp weblogs asp net scottgu archive 2008 01 07 dynamic linq part 1 using the linq dynamic qu
  • 自动加载 linq2entities 中的关系

    当我的模型中的两个实体之间存在关系时 组成员 1 用户 并尝试使用 LINQ 从该关系中选择项目 从 user GroupMember 中的实体选择实体 除非我首先使用以下语句加载关系 否则我总是得到空结果 user GroupMember
  • 在 LINQ 查询中进行转换

    是否可以在 LINQ 查询中进行强制转换 为了编译器的缘故 下面的代码并不糟糕 但最好将其放入一个查询中 Content content dataStore RootControl as Controls Content List
  • 在一个解决方案中调用不同项目的方法

    1 个解决方案中有 3 个项目 我对第一个项目中的主文件进行的主要操作 但是我需要调用第三个项目中的方法并使用类 例如 第三个项目有 public DataClasses1DataContext base global WindowsFor
  • 如何使用“Linq to Objects”将一组连续的日期放入一个组中?

    我有一个麻烦的查询要写 我目前正在编写一些令人讨厌的 for 循环来解决这个问题 但我很想知道 Linq 是否可以为我做到这一点 I have struct TheStruct public DateTime date get set ti
  • C#/Linq 获取相邻的集合

    我有一个有序列表 例如 0 1 2 6 7 10 我想要得到数字加 1 的集合 我想要第一个数字和计数或系列 所以我会得到开始 0 计数 3开始 6 计数 2开始 10 计数 1 我怎样才能在 C 中做到这一点 答案是我认为最好的方式 对我
  • 左外连接 - LINQ to Datatable

    我正在尝试使用 LINQ 在两个数据表上应用左外连接 当我尝试调试和查看结果变量中包含的数据时 我收到下面列出的异常 System ArgumentException 值不能为空 参数名称 行 Code private DataTable
  • 实体框架在不同的工作站上生成不同的查询

    我们在单个开发人员机器和一些客户端上遇到问题 单个 Linq 查询 生成两个不同的 SQL 查询 问题实际上是第二个查询有 firebird 不支持的 OUTER APPLY 语句 我们认为这不是代码问题 而是环境问题 但我会粘贴代码 li
  • 使用 MongoDB 和 ASP.NET MVC 进行分页的有效方法

    我们正在创建一个应用程序 MongoDB 作为数据库 我们正在使用MongoDB 的官方 C 驱动程序 http docs mongodb org ecosystem drivers csharp 我们有一个包含数千条记录的集合 我们想要创
  • 使用 LINQ 和 C# 的随机数组

    我在 MSDN 杂志上读到一篇关于使用LINQ 中的枚举类 http msdn microsoft com en us magazine cc700332 aspx生成随机数组 本文使用 VB NET 我不能立即确定 C 中的等效项是什么
  • 如何按双精度值对 List 进行排序?

    这听起来很简单 但其实没那么简单 我想根据 T 的一个属性 double 类型 来排序 List 如果您在编译前知道属性名称 myList myList OrderBy a gt a propertyName ToList or myLis
  • LINQ 的 Expression.Quote 方法的用途是什么?

    MSDN 文档指出 表达 引用 方法创建一个 代表一个的 UnaryExpression 具有常量值的表达式 类型为表达式 我已经能够通过使用 Expression 类手动构建谓词表达式来构建用于 LINQ 查询的谓词表达式 但从未遇到过需
  • 在List中找到对应的项目

    为了清楚地显示 我有以下列表项 我可以想象下面的小列表 可能有数百行 CourseId ClassName StartDate 12321 Math 08 25 2017 32342 Physics 08 25 2017 34345 Che
  • int -> int list 与类型 int -> IEnumerable<'a> 不兼容

    Given open System Linq 这是一个可以接受的表达方式 2 3 4 SelectMany fun n gt 1 n 但这不是 2 3 4 SelectMany fun n gt 1 n 错误消息显示 int gt int
  • Java 表达式树 [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 是否有相当于 net的 LINQ 下的表达式树JVM 我想实现一些类似 LINQ 的代码结构Scala

随机推荐

  • 通向Golang的捷径【7. 数组和 slice】

    从本章开始 将学习一些可包含一组元素的数据结构 也被称为数据集合 比如数组 切片 slice 和 map 这很显然是受到 Python 语言的影响 数组类型会使用 符号 这也是大多数编程语言的基本类型 Go 语言的数组与其他语言基本类似 但
  • android常见的monkey命令

    adb shell monkey p 包名 v 9000000 adb shell ps grep monkey 查找到monkey pid adb shell kill 刚才查到的进程号 指定一个包 adb shell monkey p
  • PySpark 连接Hive

    文章目录 简介 环境搭建与效果演示 更细节的搭建方法 搭建HDFS Spark或hive的前提 已经有了远程可访问的测试集群 搭建hadoop2 7 2 修改hadoop配置 格式化hdfs 测试 搭建spark 2 4 5 解压hive
  • C++成员函数末尾const关键字…

    原文地址 C 成员函数末尾const关键字的作用 作者 olym 1 gt 尽管函数名和参数列表都相同 void foo const成员函数是可以与void foo 并存的 可以形成重载 我们假设调用语句为obj foo 如果obj为non
  • git常用命令与常见面试题总结

    目录 1 git框架介绍 2 列举工作中常用的几个git命令 3 提交时发生冲突 如何解决 4 新建git功能分支的步骤 5 说明GIT合并的两种方法以及区别 6 Git提交代码的步骤 7 idea集成git 7 1 File gt set
  • IDEA开发及运行第一个Android项目

    IDEA自动下载SDK Gradle 保证能访问网络 原来eclipse能使用的sdk 配到idea报错 就换成自动下载最新的了 之前没成功可能是我防火墙禁用了上网 新建项目 提示安装SDK 等待下载完成 继续建项目 选择手机或平板及目标设
  • JVM实战:JVM内存分配策略

    JVM运行时数据区 Java虚拟机在执行Java程序的过程中会把它所管理的内存划分为若干个不同的数据区域 这些区域都有各自的用途 以及创建和销毁的时间 有的区域随着虚拟机进程的启动而存在 有些区域则依赖用户线程的启动和结束而建立和销毁 Ja
  • JAVA学习笔记——内部类

    目录 基本概念 使用内部类访问对象状态 内部类的特殊语法规则 局部内部类 访问外部变量 匿名内部类 静态内部类 基本概念 内部类 inner class 是定义在另一个类中的类 主要特点有 内部类方法可以访问该类定义所在的作用域中的数据 包
  • QString转化成其他字符串

    1 QChar 转char char xxx QChar unicode 2 QString 转 String String xxx QString toStdStrng 3 QString转 char 先将 QString 转为标准库中的
  • 【手把手教你写服务器】客户端程序和服务器程序的简单实现

    文章目录 1 基本TCP客户 服务器程序的套接字函数 2 server c 3 client c 1 基本TCP客户 服务器程序的套接字函数 下图中各个函数的功能 参数及返回值自行查阅 UNIX网络编程卷1 套接字联网API 第4章 2 s
  • Python字符串与Bytes之间的互相转换

    Python字符串与Bytes之间的互相转换 byte转字符串 方式一 data b x31 x32 x33 print data b 123 strdata data decode gbk print strdata 123 strdat
  • .Net core基于xUnit的单元测试查看测试覆盖率

    写代码如何保证代码质量 基本大家都知道要做单元测试 那如何知道你单元测试是不是测试到了所有代码场景呢 这就要通过测试覆盖率来体现了 测试覆盖率 一般来说主要是Line代码行数覆盖率 同样还会有Branch分支覆盖率 Method方法覆盖率等
  • 使用QFrame类实现界面美化

    使用QFrame类实现界面美化 QFrame类是Qt框架中用于创建和显示矩形框架的基本组件 它可以用于美化界面 分割界面等多种场景 在Qt中使用QFrame类非常简单 我们只需要在ui文件中拖动一个QFrame控件并在代码中设置它的属性即可
  • 一刷总结!

    前面都还算顺利 走到贪心和动态规划的时候就感觉比较吃力了 就是那种怎样都感觉自己想不出来的 还需要多多练习和多多理解 有了这个监督之后 已经养成了每天要写算法的习惯 hh不错 希望能继续坚持下去 秋招能有一个好结果
  • Java-1.5

    题目描述 编写程序 计算 9 5 4 5 2 5 3 45 5 3 5 代码 法1 public class Calculate public static void main String args final double a 9 5
  • linux网络编程(三) TCP通信时序与多进程/线程并发服务器的编写

    文章目录 1 TCP通信时序 2 滑动窗口 TCP流量控制 3 出错处理封装函数 4 多进程并发服务器编写 5 多进程并发服务器编写 4 TCP状态转换 5 半关闭 6 2MSL 6 1 2MSL 6 2 端口复用 1 TCP通信时序 下图
  • 用python写注册登录界面web_用Python实现web端用户登录和注册功能

    这篇文章主要介绍了用Python实现web端用户登录和注册功能的教程 需要的朋友可以参考下 用户管理是绝大部分Web网站都需要解决的问题 用户管理涉及到用户注册和登录 用户注册相对简单 我们可以先通过API把用户注册这个功能实现了 RE M
  • GD32F303调试小记(零)之工程创建与编译

    前言 干这行的朋友都知道 真正拿单片机做项目时 作为软件编写人员 你所掌握的肯定不止一款单片机 又或者说你必须有能独立上手新单片机的能力 这里的新指的是对你个人来说是从未接触过的或者不熟悉的 而不一定是说这个单片机有多新 而调试一款新的单片
  • 二分类模型评价指标

    二分类模型指标 混淆矩阵 TP 实际为正预测为正 FP 实际为负但预测为正 TN 实际为负预测为负 FN 实际为正但预测为负 准确率 A c c u r a
  • kafka入门安装及消息发送接受初体验(附源码)

    这里是weihubeats 觉得文章不错可以关注公众号小奏技术 文章首发 拒绝营销号 拒绝标题党 官方文档 https kafka apache org quickstart 版本 3 5 0 安装 这里我们提供两种安装方式 一种是编译好的