Kafka指南

2023-10-26

Kafka入门、API、Spring集成

入门

下载代码

https://kafka.apache.org/downloads

kafka_2.13-2.6.0.tgz

启动服务器

创建一个单节点ZooKeeper实例

bin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka服务器

bin/kafka-server-start.sh config/server.properties

创建一个 topic

创建一个名为“test”的topic,它有一个分区和一个副本

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

运行list(列表)命令来查看这个topic

发送一些消息

运行 producer,然后在控制台输入一些消息以发送到服务器

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

启动一个 consumer

将消息转储到标准输出

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

设置多代理集群

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

已经建立Zookeeper和一个单节点了,现在我们只需要启动两个新的节点

bin/kafka-server-start.sh config/server-1.properties &

bin/kafka-server-start.sh config/server-2.properties &

bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

创建一个副本为3的新topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

运行"describe topics"命令来查看代理

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

示例中,节点1是该主题中唯一分区的领导者

发表一些信息给我们的新topic

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

my test message 1

my test message 2

消费这些消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

测试一下容错性。Broker 1 现在是 leader,让我们来杀了它

ps aux | grep server-1.properties

kill -9 6369

领导权已经切换到一个从属节点,而且节点1也不在同步副本集中了

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

API

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>2.6.0</version>
		</dependency>

生产者

public class KafkaProducerTest {
    public static void main(String[] args) throws Exception {
        /**
         * 1、准备配置信息
         */
        Properties props = new Properties();
        //定义Kafka服务器地址列表,不需要指定所有的broker
        props.put("bootstrap.servers", "localhost:9092");
        //生产者需要leader确认请求完成之前接收的应答数
        props.put("acks", "-1");
        //客户端失败重试次数
        props.put("retries", 1);
        //生产者打包消息的批量大小,以字节为单位,此次是16k
        props.put("batch.size", 16384);
        //生产者延迟1ms发送消息
        props.put("linger.ms", 1);
        //生产者缓存内存的大小,以字节为单位,此处是32m
        props.put("buffer.memory", 33554432);
        // key 序列化类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value 序列化类
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /**
         * 2、创建生产者对象
         */
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        /**
         * 3、通过生产者发送消息
         *
         */
        /**
         * 发送消息的三种方式:
         *
         * - 同步阻塞发送
         * 适用场景:发送消息不能出错,消息的顺序不能乱,不关心高吞吐量。
         *
         * - 异步发送(发送并忘记)
         * 适用场景:发送消息不管会不会出错,消息的顺序乱了没有关系,关心高吞吐量。
         *
         * - 异步发送(进行回调处理)
         * 适用场景:发送消息不能出错,但我不关心消息的具体顺序。
         */
        // 1、同步阻塞发送
//        sync(producer);
        // 2、异步发送(发送并忘记)
        //async1(producer);
        // 3、异步发送(回调函数)
        async2(producer);
    }

    /**
     * 同步阻塞发送
     *
     * @param producer
     * @throws Exception
     */
    private static void sync(KafkaProducer producer) throws Exception {
        System.out.println("同步发送消息start......");
        ProducerRecord<String, String> record = new ProducerRecord<>("alanchen_topic", 0, "key-sync", "同步发送消息");
        Future<RecordMetadata> send = producer.send(record);
        RecordMetadata recordMetadata = send.get();
        producer.flush();
        System.out.println(recordMetadata);
        System.out.println("同步发送消息end......");
    }

    /**
     * 异步发送(发送并忘记)
     *
     * @param producer
     * @throws Exception
     */
    private static void async1(KafkaProducer producer) {
        System.out.println("异步发送(发送并忘记)start......");
        ProducerRecord<String, String> record = new ProducerRecord<>("alanchen_topic", 0, "key-async1", "异步发送(发送并忘记)");
        producer.send(record);
        producer.flush();
        System.out.println("异步发送(发送并忘记)end......");
    }

    /**
     * 异步发送(回调函数)
     *
     * @param producer
     * @throws Exception
     */
    private static void async2(KafkaProducer producer) {
        System.out.println("异步发送(回调函数)start......");
        ProducerRecord<String, String> record = new ProducerRecord<>("alanchen_topic", 0, "key-async2", "异步发送(回调函数)");
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                System.out.println("异步发送消息成功:" + metadata);
                if (exception != null) {
                    exception.printStackTrace();
                }
            }
        });
        producer.flush();
        System.out.println("异步发送(回调函数)end......");
    }
}

消费者

public class KafkaConsumerTest {
    public static void main(String[] args) {
        /**
         * 1、准备配置信息
         */
        Properties props = new Properties();
        //定义Kafka服务器地址列表,不需要指定所有的broker
        props.put("bootstrap.servers", "localhost:9092");
        //消费者组ID
        props.put("group.id", "ac");
        //是否自动确认offset
        props.put("enable.auto.commit", "true");
        //自动确认offset时间间隔
        props.put("auto.commit.interval.ms", 1000);
        //生产者延迟1ms发送消息
        props.put("linger.ms", 1);
        //生产者缓存内存的大小,以字节为单位,此处是32m
        props.put("buffer.memory", 33554432);
        // key 序列化类
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value 序列化类
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        /**
         * 2、创建一个消费者
         */
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        /**
         * 3、指定消费哪一个topic的数据
         */
        //指定分区消费
        TopicPartition partition = new TopicPartition("alanchen_topic", 0);
        //获取已经提交的偏移量
        long offset = 0L;
        OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
        if (offsetAndMetadata != null) {
            offset = offsetAndMetadata.offset();
        }
        System.out.println("当前消费的偏移量:" + offset);
        // 指定偏移量消费
        consumer.assign(Arrays.asList(partition));
        consumer.seek(partition, offset);
        //循环拉取数据
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("消费的数据为:" + record.value());
            }
        }
    }
}

Spring集成

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

生产者

@Component
@Slf4j
public class KafkaProvider {
    /**
     * TOPIC
     */
    private static final String TOPIC = "xiaoha";
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
        Order order = Order.builder()
                .orderId(orderId)
                .orderNum(orderNum)
                .createTime(createTime)
                .build();
        // 发送消息
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));
        // 监听回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("## Send message fail ...");
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("## Send message success ...");
            }
        });
    }
}

消费者

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics = "xiaoha", groupId = "group_id")
    public void consume(String message) {
        log.info("## consume message: {}", message);
    }
}

测试

@RunWith(SpringRunner.class)
@ComponentScan(basePackages = {"com.moses.microcloud.platform"})
@SpringBootTest(classes = SpringBootKafkaApplicationTests.class)
public class SpringBootKafkaApplicationTests {
    @Autowired
    private KafkaProvider kafkaProvider;

    @Test
    public void sendMessage() throws InterruptedException {
        // 发送 1000 个消息
        for (int i = 0; i < 10; i++) {
            long orderId = i + 1;
            String orderNum = UUID.randomUUID().toString();
            kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
        }
        TimeUnit.MINUTES.sleep(1);
    }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka指南 的相关文章

  • 101.对称二叉树

    给定一个二叉树 检查它是否是镜像对称的 例如 二叉树 1 2 2 3 4 4 3 是对称的 1 2 2 3 4 4 3 但是下面这个 1 2 2 null 3 null 3 则不是镜像对称的 1 2 2 3 3 方法1 根左右遍历一次树得到

随机推荐

  • 微信小程序个人简历界面(编辑版)

    微信小程序个人简历界面 可编辑修改 包含全部源码 1 微信小程序实现简单的个人简历界面 包含基本信息 教育背景 获奖证书 兴趣爱好等 简历信息支持修改编辑内容 2 通过此文章 希望能带给更多学习微信小程序的伙伴们一点点经验 示例简洁 布局简
  • Linux系统编程之pthread多线程与互斥编程

    Linux系统编程之pthread多线程与互斥编程 include
  • tcp三次握手、四次挥手

    原文 https www cnblogs com qq78292959 p 3922231 html https blog csdn net qq 38950316 article details 81087809 经典的四次握手关闭图 T
  • 几款常用压测工具介绍与使用

    现在市面上的压测工具数不胜数 挑几款常用的做个简单的介绍 1 Apache ab ab是apache自带的压力测试工具 使用起来非常方便 安装 1 ab运行需要依赖apr util包 安装命令为 yum install apr util 2
  • 腾讯云短信Java调用示例(SDK3.0)

    腾讯云短信Java调用示例 SDK3 0 1 pom xml 添加以下依赖 2 需要引入的包 3 程序实例 1 pom xml 添加以下依赖
  • IDEA控制台乱码问题 maven-javadoc-plugin

    接手老项目 上来就是编译报错 一看是控制台还打印一堆乱码 所以上来百度搜到一篇不错的文章DEA控制台乱码问题 原因 解决方式 解决不了算我输 一顿操作仍然无效 不过学会了不少 后来控制台报错中发现了 maven javadoc plugin
  • SpringAOP的5种增强类型应用讲解

    SpringAOP的5种增强类型应用讲解 一 前言 spring框架中为我们提供的增强包括针对切面的增强和针对切入点的增强 对一个方法的增强底层使用的是动态代理 所以在学习springAop增强之前大家有必要先了解一下动态代理相关内容 本文
  • linux系统配置文件

    1 etc sysconfig i18n 语言配置文件 2 etc sysconfig network scripts ifcfg eth0 eth0配置文件 3 boot grub grub conf grup配置文件 或 boot gr
  • 记一次线上BUG排查过程

    1 线上遇到一个非常奇怪的bug 为一个用户分配业务线类型后 该用户登录时 提示502 但其它的用户登录完全是正常的 2 问题现象 3 排查思路 先去看线上日志 看是否有error 但日志里边这个接口200正常返回 本地debug 也复现一
  • 快速入门ASP.NET Core

    本来这篇只是想简单介绍下ASP NET Core MVC项目的 毕竟要照顾到很多新手朋友 但是转念一想不如来点猛的 考虑到急性子的朋友 让你通过本文的学习就能快速的入门ASP NET Core 既然是快速入门所以过多过深的内容我这里就一笔带
  • mybatis调用oracle视图

    多数据源切换调用oralce里的某个视图 后台报错 表或视图不存在 select from table 最后在select 语句中加上前缀即可 select from zzzz table
  • SM4算法原理

    前面的文章介绍了SM4算法的C语言实现 源码可见文章 SM4国密对称算法源码解析 10点43的博客 CSDN博客 sm4代码 本文将会介绍SM4算法原理 这部分可能会比较枯燥 但数学要求也不是太高 目录 1 概述 2 参数产生 3 轮函数
  • 【EI检索】2022年第四届大数据、物联网与计算国际会议(ICBICC 2022)

    2022年第四届大数据 物联网与计算国际会议 ICBICC 2022 重要信息 会议网址 www icbicc org 会议时间 2022年11月11 13日 召开地点 中国北京 截稿时间 2022年10月11日 录用通知 投稿后2周内 收
  • React国际化——多语言切换

    1 安装react intl universal npm install react intl universal save 2 配置语言包 json文件根据需要支持几种语言决定 将新建的语言包json文件放置于项目根目录的 public
  • Qt信号与槽的五种连接方式

    qt信号与槽的五种连接方式 1 默认连接 如果是在同一线程等价于直连 在不同线程等价于队列连接 2 直连 信号在哪 在哪个线程执行 最好只在同一线程中用 3 队列连接 槽在哪就在哪个线程执行 槽函数不会立刻执行 等到接受者的当前执行的函数执
  • 【Bootstrap作业】flex布局实现可伸缩菜单

    flex布局实现可伸缩菜单
  • C#中DataSet类的使用

    C 中DataSet类的使用 DataSet类是ADO NET中最核心的成员之一 也是各种开发基于 Net平台程序语言开发数据库应用程序最常接触的类 每一个DataSet都有很多个DataTables和Relationships Relat
  • 共享内存是最快的一种IPC方式

    在linux进程间通信的方式中 共享内存是一种最快的IPC方式 因此 共享内存用于实现进程间大量的数据传输 共享内存的话 会在内存中单独开辟一段内存空间 这段内存空间有自己特有的数据结构 包括访问权限 大小和最近访问的时间等 为什么说共享内
  • 【编译原理】语义分析

    第四章 语义分析 该章节建议观看国防科技大学MOOC 讲解的太棒了 本章节所提到的LL分析都可以理解为自上而下的分析 LR分析都可以理解为自下而上的分析 语法制导定义 语法制导定义是带属性和语义规则的上下文无关文法 其中每个文法符号都有一组
  • Kafka指南

    Kafka入门 API Spring集成 入门 下载代码 https kafka apache org downloads kafka 2 13 2 6 0 tgz 启动服务器 创建一个单节点ZooKeeper实例 bin zookeepe