Kafka如何获取topic最近n条消息

2023-11-09

问题来源

项目运行中我们经常需要诊断个个环节是否正确,其中到kafka就需要查看最新的消息到达kafka没有,达到的内容是什么,这就需要查看kafka指定topic的最近的n条消息(将kakfa消息全部打印出来非常耗时而且不必要)。当然我们可以使用第三方提供的kafka工具查看,可是使用这些工具耗时费力而且不能很好集成到项目中。

备注:第三方工具包括kakfa命令行工具一起其他第三方的工具。
注意事项:本博客所有代码是为了介绍相关内容而编写或者引用的,示例代码并非可直接用于生产的代码。仅供参考而已。

大体思路

我们知道如果使用KafkaConsumer类,要么从topic最老的消息开始,要么从最新的消息开始。下面是官方文档中关于auto.offset.reset 的解释。

The default is “latest,” which means that lacking a valid offset,
the consumer will start reading from the newest records (records that were written after the consumer started running).
The alternative is “earliest,” which means that lacking a valid offset, the consumer will read all the data in the partition,
starting from the very beginning.

因此直接使用KafkaConsumer就行不通了。 但是我们可以先通过给指定coisumerGroup设置offset,然后在让KafkaConsumer以该coisumerGroup读取数据。这样就能从指定offset开始读取消息了。

具体代码

完整的代码在这里,欢迎加星和fork。 谢谢!

package com.yq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * 本例子试着读取当前topic每个分区内最新的30条消息(如果topic额分区内有没有30条,就获取实际消息)
 * className: ReceiveLatestMessageMain
 *
 * @author EricYang
 * @version 2019/01/10 11:30
 */
@Slf4j
public class ReceiveLatestMessageMain {
    private static final int COUNT = 30;
    public static void main(String... args) throws Exception {
        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ubuntu:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put(ConsumerConfig.GROUP_ID_CONFIG, "yq-consumer12");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");


        System.out.println("create KafkaConsumer");
        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        AdminClient adminClient = AdminClient.create(props);
        String topic = "topic01";
        adminClient.describeTopics(Arrays.asList(topic));
        try {
            DescribeTopicsResult topicResult = adminClient.describeTopics(Arrays.asList(topic));
            Map<String, KafkaFuture<TopicDescription>> descMap = topicResult.values();
            Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>> itr = descMap.entrySet().iterator();
            while(itr.hasNext()) {
                Map.Entry<String, KafkaFuture<TopicDescription>> entry = itr.next();
                System.out.println("key: " + entry.getKey());
                List<TopicPartitionInfo> topicPartitionInfoList = entry.getValue().get().partitions();
                topicPartitionInfoList.forEach((e) -> {
                    int partitionId = e.partition();
                    Node node  = e.leader();
                    TopicPartition topicPartition = new TopicPartition(topic, partitionId);
                    Map<TopicPartition, Long> mapBeginning = consumer.beginningOffsets(Arrays.asList(topicPartition));
                    Iterator<Map.Entry<TopicPartition, Long>> itr2 = mapBeginning.entrySet().iterator();
                    long beginOffset = 0;
                    //mapBeginning只有一个元素,因为Arrays.asList(topicPartition)只有一个topicPartition
                    while(itr2.hasNext()) {
                        Map.Entry<TopicPartition, Long> tmpEntry = itr2.next();
                        beginOffset =  tmpEntry.getValue();
                    }
                    Map<TopicPartition, Long> mapEnd = consumer.endOffsets(Arrays.asList(topicPartition));
                    Iterator<Map.Entry<TopicPartition, Long>> itr3 = mapEnd.entrySet().iterator();
                    long lastOffset = 0;
                    while(itr3.hasNext()) {
                        Map.Entry<TopicPartition, Long> tmpEntry2 = itr3.next();
                        lastOffset = tmpEntry2.getValue();
                    }
                    long expectedOffSet = lastOffset - COUNT;
                    expectedOffSet = expectedOffSet > 0? expectedOffSet : 1;
                    System.out.println("Leader of partitionId: " + partitionId + "  is " + node + ".  expectedOffSet:"+ expectedOffSet
                    + ",  beginOffset:" + beginOffset + ", lastOffset:" + lastOffset);
                    consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(expectedOffSet -1 )));
                });
            }

            consumer.subscribe(Arrays.asList(topic));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("read offset =%d, key=%s , value= %s, partition=%s\n",
                            record.offset(), record.key(), record.value(), record.partition());
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("when calling kafka output error." + ex.getMessage());
        } finally {
            adminClient.close();
            consumer.close();
        }
    }
}

运行效果

效果图解释:
我们只读取topic01每个分区最近的30条消息,如果该partition没有30条,那就读取全部的。 可以看到partition 0有39条消息,因此从第8条开始读取。

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka如何获取topic最近n条消息 的相关文章

  • 通过纹理偏移滚动 2D/3D 背景

    我一直在尝试在 Unity 中使用四边形来显示纹理来制作无限滚动的 2D 背景 我的想法是根据玩家的位置改变四边形的偏移 由于某种原因 当我更改偏移量时 我的图像无法正确重复 并且一旦达到偏移量 2 图像就会完全消失 纹理上 3 个不同 x
  • 在 dplyr 中使用动态位置数创建滞后/超前变量

    我正在寻找一种方法来生成从具有动态位置数的滞后列派生的列 参数n 这意味着这个新列应该作为参数n存储在另一列中的值 参见lag文档功能 样本数据 set seed 42 df lt as tibble data frame id c rep
  • R:如何按组计算数据表的多列滞后

    我想计算数据表中按 id 分组的变量的差异 这是一些示例数据 数据以 1 Hz 的采样率记录 我想估计一阶和二阶导数 速度 加速度 df lt read table text x y id 1 2 1 2 4 1 3 5 1 1 8 2 5
  • VBA/Excel 中行和列范围偏移的最大值是多少?

    我正在使用 microsoft excel 2003 执行以下 If 语句时收到 应用程序定义或对象定义错误 如果 Range MyData CurrentRegion Offset i 0 Resize 1 1 Value Range M
  • Kafka基础—3、Kafka 消费者API

    一 Kafka消费者API 1 消息消费 当我们谈论 Kafka 消费者 API 中的消息消费时 我们指的是消费者如何从 Kafka 主题中拉取消息 并对这些消息进行处理的过程 消费者是 Kafka 中的消息接收端 它从指定的主题中获取消息
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De
  • prevObject 是什么?为什么我的选择器返回它?

    我试图从元素中获取顶部 但收到此错误 这是什么意思以及如何摆脱它 hover offset top gt Uncaught TypeError Cannot read property top of undefined hover div
  • 如何使用 ARRAYFORMULA 与上一行的偏移量而不会出现循环引用错误

    示例表 https docs google com spreadsheets d 14ma y3esh1S EkzHpFBvLb0GzDZZiDsSVXFktH3Rr E edit usp sharing https docs google
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版

    Flink 系列文章 一 Flink 专栏 Flink 专栏 系统介绍某一知识点 并辅以具体的示例进行说明 1 Flink 部署系列 本部分介绍Flink的部署 配置相关基础内容 2 Flink基础系列 本部分介绍Flink 的基础部分 比
  • 如何在编译时计算类成员的偏移量?

    给定 C 中的类定义 class A public methods definition private int i char str 是否可以使用 C 模板元编程在编译时计算类成员的偏移量 该类不是 POD 并且可以具有虚拟方法 基元和对
  • 如何获取 MySQL 中特定行的偏移量?

    我正在尝试创建一个图像数据库 该数据库不保存一致的 ID 记录 例如 它可能是 1 2 6 7 12 但正如您所看到的 只有 5 行 在表中我有 fileid 和 filename 我创建了一个 PHP 脚本 当我给出文件 ID 时 它会显
  • offsetLeftAndRight() 到底做了什么?

    offsetLeftAndRight 到底做了什么 文档说 将此视图的水平位置偏移指定的像素量 那么 这是否意味着如果视图的左侧位置假设为 50 并且在其上调用 offsetLeftAndRight 20 那么视图将移动 20 像素并且其左
  • 如何使用element.offsetBottom?

    因此 当你滚动经过英雄时 我正在努力制作一个粘性导航 我使用时没有遇到任何问题element offsetTop前 但我正在尝试找到一种方法来对元素的底部执行此操作 element offsetBottom似乎没什么 那么有没有办法在JS中
  • 如何在 Pygame 表面中实现洪水填充

    我想知道填充 Pygame 表面部分的好方法 我想要的最好的例子是 MS Paint 中油漆桶的工作方式 例如 如果在白色表面上用黑色绘制一个圆圈 我想填充圆圈内的白色 或任何形状 为了让您了解我正在做什么 我正在制作一个像素艺术工具 并且
  • Android - GC 滞后于列表视图滚动“更大”的图像

    在列表视图中 我想在列表条目上绘制一个图像 这 20 张图像必须缩放以填充垂直模式的宽度 手机分辨率为 480 x 800 像素 SGS2 图像分辨率为 400x400 大小约为 100KB 我已将图像放在可绘制文件夹中 当我滚动列表时 它
  • 获取参考成员(非 POD)的偏移量

    这是代码片段 include
  • C 代码获取相对于 UTC 的本地时间偏移(以分钟为单位)?

    我没有找到一种简单的方法来获取本地时间和 UTC 时间之间的时间偏移 以分钟为单位 起初我打算使用tzset 但它不提供夏令时 根据手册页 如果夏令时有效 它只是一个不为零的整数 虽然通常是一个小时 但在某些国家 地区可能是半小时 我宁愿避
  • SplFileObject + LimitIterator + 偏移量

    我有两行数据文件 两行仅用于我的示例 实际上 该文件可以包含数百万行 并且我使用 SplFileObject 和 LimitIterator 进行偏移 但这种组合在某些情况下会有奇怪的行为 offset 0 file new SplFile
  • ViewPager 在三星 S4 上非常慢

    解决了 显然 您需要将片段可绘制对象放在相应的可绘制对象 xhdpi 等中 而不是可绘制对象中 我用三个片段编写了一个 ViewPager 可以在 LG G2 Sony Xperis S Nexus4 和 5 上完美运行 但是在 Samsu

随机推荐

  • urldecode 报错 Malformed UTF-8 characters, possibly incorrectly encoded

    使用urlencode 编码了一段字符串写入数据库 读取的时候使用urldecode 解码报错 Malformed UTF 8 characters possibly incorrectly encoded 解决方案 检查一下是否保存到数据
  • ajax不弹出新页面问题

    ajax默认是异步请求 做局部刷新的 指的是当前页数据渲染的 如果后台是转发或者重定向了 如果用ajax的话是不会弹出新的页面的 from提交的话 如果后台是转发或者重定向了 是可以打开新的页面的
  • 【人脸识别】【python】Object arrays cannot be loaded when allow_pickle=False解决方案

    2020年2月11日 0次阅读 共1625个字 0条评论 0人点赞 QueenDekimZ mtcnn debug 用mtcnn对LFW人脸数据集进行人脸检测与关键点对齐 并裁剪到160 160维 为后续facenet训练作training
  • wx.login wx.getUserProfile 获取登录凭证

    wx login 调用接口获取登录凭证 code 通过凭证进而换取用户登录态信息 包括用户在当前小程序的唯一标识 openid 微信开放平台帐号下的唯一标识 unionid 若当前小程序已绑定到微信开放平台帐号 及本次登录的会话密钥 ses
  • 通过hexo快速搭建个人博客

    个人博客预览点击这里 菜卷的博客 快速搭建一个博客 一 需要安装的工具 二 开始安装Hexo 三 安装完成后 初始化项目 四 在项目根目录下执行命令 五 启动项目 六 部署到github 七 配置文件 八 安装next主题 九 优化next
  • C语言程序实训--实验设备管理系统

    之前学校c语言程序实训课要求写的 如果程序有错误或可以改进的地方 希望各位指出 开发环境 IDE Visual Studio Code Dev C 处理器 AMD Ryzen 7 PRO 6850HS with Radeon Graphic
  • 73家!华为鸿蒙OS合作伙伴汇总

    6月2日 华为发布了最新版的鸿蒙操作系统 HarmonyOS 2 0 以及一系列搭载鸿蒙的硬件产品 比如手机 手表 平板 耳机 显示器等等 如今的智能终端越来越多 厂商不可能为每个设备单独准备一个系统 因为这不仅让开发者工作量倍增 消费者用
  • Flask网站中使用Keras时报错“Tensor Tensor(*) is not an element of this graph”

    HyperLPR车牌识别程序本地中能进行正常识别 但将其放到flask搭建的网站中进行识别 不能运行 并报错 Tensor Tensor is not an element of this graph HyperLPR中的识别模型采用的是K
  • Mask掩码

    Python中Mask的用法 引例 Numpy的MaskedArray模块 小于 或小于等于 给定数值 大于 或大于等于 给定数值 在给定范围内 超出给定范围 在算术运算期间忽略NaN和 或infinite值 All men are scu
  • Count Color

    http poj org problem id 2777 Description Chosen Problem Solving and Program design as an optional course you are require
  • 【QT】——布局

    目录 1 在UI窗口中布局 2 API设置布局 2 1 QLayout 2 2 QHBoxLayout 2 3 QVBoxLayout 2 4 QGirdLayout 注意 示例 Qt 窗口布局是指将多个子窗口按照某种排列方式将其全部展示到
  • Apifox—诠释国产接口管理工具新高度

    揭开Apifox的神秘面纱 曾经在对于接口管理和调试工作上 大量的开发者往往会选择使用Swagger做接口文档管理 用Postman做接口调试工具 然而这样使用的痛处其实也不言而喻 原本同一类型的工作却被放置在不同的软件工具上 并且对于接口
  • 图像二值化方法--OTSU(最大类间方差法)

    前面学习了直方图双峰法 图像二值化方法中的阈值法 最大类间方差法 OTSU 是找到自适应阈值的常用方法 原理参考了冈萨雷斯的 数字图像处理 以下是自己写的函数 获取灰度图in的OTSU阈值 int Segment otsuMat Mat i
  • [译] Scratch 平台的神经网络实现(R 语言)

    原文地址 Neural Networks from Scratch in R 原文作者 Ilia Karmanov 译文出自 掘金翻译计划 本文永久链接 github com xitu gold m 译者 CACppuccino 校对者 I
  • 【通信协议】笔记之Redis协议抓取分析

    RESP Redis序列化协议 概念 Redis底层使用的通信协议是RESP Redis Serialization Protocol的缩写 RESP协议可以序列化多种类型 比如Simple Strings 简单字符串 Errors 错误类
  • FreeRTOS记录(九、一个裸机工程转FreeRTOS的实例)

    记录一下一个实际项目由裸机程序改成FreeRTOS 以前产品的平台还是C8051单片机上面的程序 硬件平台改成了STM32L051 同时使用STM32CubeMX生成的工程 使用FreeRTOS系统 EEPROM数据存储读取函数修改更新 2
  • 数学建模第二天:数学建模工具课之MATLAB绘图操作

    目录 一 前言 二 二维绘图 1 曲线图 散点图plot 2 隐函数 显函数与参数方程的绘图 ezplot fplot 三 三维绘图 1 单曲线plot3 2 多曲线plot3 3 曲面 实曲面surf 网格曲面mesh 四 特殊的二维 三
  • 9.Linux虚拟机下Hive的安装配置

    hadoop 3 1 3 jdk 8u162 linux x64 apache hive 3 1 2 bin 本案例软件包 链接 https pan baidu com s 1ighxbTNAWqobGpsX0qkD8w 提取码 lkjh
  • 基于Python机器学习算法小分子药性预测(岭回归+随机森林回归+极端森林回归+加权平均融合模型)

    目录 前言 总体设计 系统整体结构图 系统流程图 运行环境 Python 环境 配置工具包 模块实现 1 数据预处理 2 创建模型并编译 3 模型训练 系统测试 工程源代码下载 其它资料下载 前言 麻省理工科技评论 于2020年发布了 十大
  • Kafka如何获取topic最近n条消息

    问题来源 项目运行中我们经常需要诊断个个环节是否正确 其中到kafka就需要查看最新的消息到达kafka没有 达到的内容是什么 这就需要查看kafka指定topic的最近的n条消息 将kakfa消息全部打印出来非常耗时而且不必要 当然我们可