这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党
官方文档
https://kafka.apache.org/quickstart
版本
安装
这里我们提供两种安装方式,一种是编译好的源码包安装,一种是傻瓜式的Docker compose
方式安装
本次提供的两种安装方式都是快速体验安装方式,线上不要使用这种方式。Docker compose
安装方式未验证,但是应该也没很大问题
安装包部署
这里我们的安装方式还是选择依赖zookeeper
,实际kafka也可以使用他们自研的注册中心KRaft
- 下载安装包
weget https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
- 解压
tar -xzf kafka_2.13-3.5.0.tgz
- 进入目录
cd kafka_2.13-3.5.0
- 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
- 打开另一个终端启动kafka
bin/kafka-server-start.sh config/server.properties
所有服务启动完成我们就拥有一个基本的Kafka环境运行并准备使用。
Docker compose安装
注意该Docker compose方式我未验证,官方也没有提供Docker compose的部署方式,如果有问题可以使用下面的安装包方式部署
Docker compose
安装是最简单最省心的
比如我们编写一个docker compose
vim docker-compose.yml
copy如下代码
version: '3.5'
services:
zookeeper:
image: bitnami/zookeeper ## 镜像
container_name: zookeeper-1
hostname: zookeeper-1
user: root
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
volumes:
- ./data/zookeeper-1:/bitnami/zookeeper
environment:
- ZOO_SERVER_ID=1
# - ZOO_SERVERS=0.0.0.0:2888:3888,zookeeper-2:2888:3888,zookeeper-3:2888:3888
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOO_AUTOPURGE_INTERVAL=1
kafka:
image: 'bitnami/kafka:2.8.0'
ports:
- '9092:9092'
- '9999:9999'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
# 客户端访问地址,更换成自己的
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
# 允许使用PLAINTEXT协议(镜像中默认为关闭,需要手动开启)
- ALLOW_PLAINTEXT_LISTENER=yes
# 关闭自动创建 topic 功能
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# 全局消息过期时间 6 小时(测试时可以设置短一点)
- KAFKA_CFG_LOG_RETENTION_HOURS=6
# 开启JMX监控
- JMX_PORT=9999
#volumes:
#- ./kafka:/bitnami/kafka
depends_on:
- zookeeper
# docker-compose up 旧版本的docker 使用这个命令
docker compose up
测试
创建一个topic
cd kafka_2.13-3.5.0
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
新建一个生产者
在一个新终端执行如下命令
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
这样我们就启动了一个生产者,我们可以随意输入我们要发送的消息
This is my first event
This is my second event
新建一个消费者
在一个新终端执行如下命令
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
这样消费者就会自动消费我们刚才创建的topic:quickstart-events的消息了
java sdk接入
源码已上传至github地址
-
github:https://github.com/weihubeats/weihubeats_demos/blob/master
public class KafkaExample {
private static final String TOPIC = "quickstart-events";
private static final String BOOTSTRAP_SERVERS = "127.0.0.1:9092";
private static final String GROUP_ID = "testGid";
public static void main(String[] args) {
new Thread(KafkaExample::consumeMessage).start();
// 生产消息
produceMessage();
// 消费消息
// consumeMessage();
}
private static void produceMessage() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
Producer<String, String> producer = new KafkaProducer<>(props);
try {
for (int i = 0; i < 10; i++) {
String message = "Message " + i;
System.out.println("开始发送消息");
Future<RecordMetadata> send = producer.send(new ProducerRecord<>(TOPIC, message));
RecordMetadata recordMetadata = send.get();
System.out.println("Produced message: " + message);
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
producer.close();
}
}
private static void consumeMessage() {
System.out.println("消费消息开始");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put("group.id", GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
try {
while (true) {
ConsumerRecords<String, String> poll = consumer.poll(Duration.ofSeconds(1));
poll.forEach(record -> System.out.println("Consumed message: " + record.value()));
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
consumer.close();
}
}
}