Kafka入门、API、Spring集成
入门
下载代码
https://kafka.apache.org/downloads
kafka_2.13-2.6.0.tgz
启动服务器
创建一个单节点ZooKeeper实例
bin
/zookeeper-server-start
.sh config
/zookeeper
.properties
启动Kafka服务器
bin/kafka-server-start.sh config/server.properties
创建一个 topic
创建一个名为“test”的topic,它有一个分区和一个副本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
运行list(列表)命令来查看这个topic
发送一些消息
运行 producer,然后在控制台输入一些消息以发送到服务器
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
启动一个 consumer
将消息转储到标准输出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
设置多代理集群
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
已经建立Zookeeper和一个单节点了,现在我们只需要启动两个新的节点
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
创建一个副本为3的新topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
运行"describe topics"命令来查看代理
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
示例中,节点1是该主题中唯一分区的领导者
发表一些信息给我们的新topic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
my test message 1
my test message 2
消费这些消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
测试一下容错性。Broker 1 现在是 leader,让我们来杀了它
ps aux | grep server-1.properties
kill -9 6369
领导权已经切换到一个从属节点,而且节点1也不在同步副本集中了
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
API
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
生产者
public class KafkaProducerTest {
public static void main(String[] args) throws Exception {
/**
* 1、准备配置信息
*/
Properties props = new Properties();
//定义Kafka服务器地址列表,不需要指定所有的broker
props.put("bootstrap.servers", "localhost:9092");
//生产者需要leader确认请求完成之前接收的应答数
props.put("acks", "-1");
//客户端失败重试次数
props.put("retries", 1);
//生产者打包消息的批量大小,以字节为单位,此次是16k
props.put("batch.size", 16384);
//生产者延迟1ms发送消息
props.put("linger.ms", 1);
//生产者缓存内存的大小,以字节为单位,此处是32m
props.put("buffer.memory", 33554432);
// key 序列化类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
/**
* 2、创建生产者对象
*/
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
/**
* 3、通过生产者发送消息
*
*/
/**
* 发送消息的三种方式:
*
* - 同步阻塞发送
* 适用场景:发送消息不能出错,消息的顺序不能乱,不关心高吞吐量。
*
* - 异步发送(发送并忘记)
* 适用场景:发送消息不管会不会出错,消息的顺序乱了没有关系,关心高吞吐量。
*
* - 异步发送(进行回调处理)
* 适用场景:发送消息不能出错,但我不关心消息的具体顺序。
*/
// 1、同步阻塞发送
// sync(producer);
// 2、异步发送(发送并忘记)
//async1(producer);
// 3、异步发送(回调函数)
async2(producer);
}
/**
* 同步阻塞发送
*
* @param producer
* @throws Exception
*/
private static void sync(KafkaProducer producer) throws Exception {
System.out.println("同步发送消息start......");
ProducerRecord<String, String> record = new ProducerRecord<>("alanchen_topic", 0, "key-sync", "同步发送消息");
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata = send.get();
producer.flush();
System.out.println(recordMetadata);
System.out.println("同步发送消息end......");
}
/**
* 异步发送(发送并忘记)
*
* @param producer
* @throws Exception
*/
private static void async1(KafkaProducer producer) {
System.out.println("异步发送(发送并忘记)start......");
ProducerRecord<String, String> record = new ProducerRecord<>("alanchen_topic", 0, "key-async1", "异步发送(发送并忘记)");
producer.send(record);
producer.flush();
System.out.println("异步发送(发送并忘记)end......");
}
/**
* 异步发送(回调函数)
*
* @param producer
* @throws Exception
*/
private static void async2(KafkaProducer producer) {
System.out.println("异步发送(回调函数)start......");
ProducerRecord<String, String> record = new ProducerRecord<>("alanchen_topic", 0, "key-async2", "异步发送(回调函数)");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("异步发送消息成功:" + metadata);
if (exception != null) {
exception.printStackTrace();
}
}
});
producer.flush();
System.out.println("异步发送(回调函数)end......");
}
}
消费者
public class KafkaConsumerTest {
public static void main(String[] args) {
/**
* 1、准备配置信息
*/
Properties props = new Properties();
//定义Kafka服务器地址列表,不需要指定所有的broker
props.put("bootstrap.servers", "localhost:9092");
//消费者组ID
props.put("group.id", "ac");
//是否自动确认offset
props.put("enable.auto.commit", "true");
//自动确认offset时间间隔
props.put("auto.commit.interval.ms", 1000);
//生产者延迟1ms发送消息
props.put("linger.ms", 1);
//生产者缓存内存的大小,以字节为单位,此处是32m
props.put("buffer.memory", 33554432);
// key 序列化类
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value 序列化类
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
/**
* 2、创建一个消费者
*/
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/**
* 3、指定消费哪一个topic的数据
*/
//指定分区消费
TopicPartition partition = new TopicPartition("alanchen_topic", 0);
//获取已经提交的偏移量
long offset = 0L;
OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
if (offsetAndMetadata != null) {
offset = offsetAndMetadata.offset();
}
System.out.println("当前消费的偏移量:" + offset);
// 指定偏移量消费
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, offset);
//循环拉取数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("消费的数据为:" + record.value());
}
}
}
}
Spring集成
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
生产者
@Component
@Slf4j
public class KafkaProvider {
/**
* TOPIC
*/
private static final String TOPIC = "xiaoha";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
Order order = Order.builder()
.orderId(orderId)
.orderNum(orderNum)
.createTime(createTime)
.build();
// 发送消息
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));
// 监听回调
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("## Send message fail ...");
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("## Send message success ...");
}
});
}
}
消费者
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "xiaoha", groupId = "group_id")
public void consume(String message) {
log.info("## consume message: {}", message);
}
}
测试
@RunWith(SpringRunner.class)
@ComponentScan(basePackages = {"com.moses.microcloud.platform"})
@SpringBootTest(classes = SpringBootKafkaApplicationTests.class)
public class SpringBootKafkaApplicationTests {
@Autowired
private KafkaProvider kafkaProvider;
@Test
public void sendMessage() throws InterruptedException {
// 发送 1000 个消息
for (int i = 0; i < 10; i++) {
long orderId = i + 1;
String orderNum = UUID.randomUUID().toString();
kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
}
TimeUnit.MINUTES.sleep(1);
}
}