如何从 Kafka 主题获取最近的消息

2024-01-20

我们是否有任何选项,例如从 Kafka 主题获取最近 10/20 等消息。我可以看到 --from-beginning 选项从主题中获取所有消息,但如果我只想获取第一个、最后一个、中间或最新的几条消息 10. 我们有一些选择吗?


前 N 条消息

您可以使用--max-messages N为了获取第一个N某个主题的消息。

例如,要获取前 10 条消息,请运行

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning  --max-messages 10

下 N 条消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --max-messages 10

最后 N 条消息

要获取最后 N 条消息,您需要定义特定的分区和偏移量:

bin/kafka-simple-consumer-shell.sh --bootstrap-server localhost:9092 --topic test--partition testPartition --offset yourOffset

M 到 N 条消息

同样,对于这种情况,您必须定义分区和偏移量。 例如,您可以运行以下命令来获取从您选择的偏移量开始的 N 条消息:

bin/kafka-simple-consumer-shell.sh --bootstrap-server localhost:9092 --topic test--partition testPartition --offset yourOffset --max-messages 10

如果您不想坚持使用二进制文件,我建议您使用kt https://github.com/fgeller/kt这是一个具有更多选项和功能的 Kafka 命令行工具。


更详细的内容可以参考文章如何在 Apache Kafka 中获取特定消息 https://betterprogramming.pub/how-to-fetch-specific-messages-in-apache-kafka-4133dad0b4b8

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 Kafka 主题获取最近的消息 的相关文章

  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确
  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • 连接到 Apache Kafka 多节点集群中的 Zookeeper

    我按照以下说明设置了多节点 kafka 集群 现在 如何连接到zookeeper 是否可以从 JAVA 中的生产者 消费者端仅连接到一个 ZooKeeper 或者是否有一种方法可以连接所有 ZooKeeper 节点 设置多节点 Apache
  • 如何在 Python 中以编程方式检查 Kafka Broker 是否已启动并运行

    我正在尝试使用来自 Kafka 主题的消息 我正在使用包装器confluent kafka消费者 我需要在开始使用消息之前检查连接是否已建立 我读到消费者很懒 所以我需要执行一些操作才能建立连接 但我想检查连接建立而不执行consume o
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • kafka消费者群体正在重新平衡

    我正在使用 Kafka 9 和新的 java 消费者 我正在循环内进行轮询 当代码尝试执行 Consumer commitSycn 时 由于组重新平衡 我收到 commitfailedexcption 请注意 我将 session time
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153
  • 如何检测 KTable 连接的哪一侧触发了更新?

    当您在 Kafka 中连接两个表时 每次更新两个 KTable 之一时 您的输出 Ktable 也会更新 想象一下你正在加入Customers与一个列表Orders你已经适当减少了 再次想象一下 您使用此连接的结果来为最终客户提供特别优惠和
  • Kafka 中的内部和外部通信

    流动 本地 gt 代理 gt Kafka advertised listeners PLAINTEXT proxyhostname 8080 for external communication listeners PLAINTEXT 90
  • 如何避免连续“重置偏移量”和“寻找最新偏移量”?

    我正在尝试遵循本指南 https spark apache org docs latest structed streaming kafka integration html https spark apache org docs late
  • 嵌入式 Kafka 测试随机失败

    我使用 EmbededKafka 实现了一系列集成测试 以测试使用 spring kafka 框架运行的一个 Kafka 流应用程序 流应用程序正在从 Kafka 主题读取消息 将其存储到内部状态存储中 进行一些转换并将其发送到另一个微服务
  • Zookeeper + Kafka - 无法创建数据目录

    我在单节点中使用zookeeper 3 4 8并尝试使用kafka 当我运行这个命令时 zookeeper server start sh usr local kafka 2 9 2 0 8 2 2 config zookeeper pro
  • Kafka Streams 反序列化处理程序

    我正在尝试在反序列化中使用 LogAndContinueExceptionHandler 当发生错误时 通过成功记录错误并继续 它可以正常工作 但是 假设我的传入消息有连续的错误流 我停止并重新启动 kafka 流应用程序 然后我看到失败并
  • 删除主题级别配置

    为了删除主题中的所有数据 我将其retention ms配置设置为1000 bin kafka topics sh zookeeper KAFKAZKHOSTS alter topic
  • 为什么我的 Kafka Streams 拓扑无法正确重放/重新处理?

    我有一个如下所示的拓扑 KTable
  • Flink Kafka - 如何使应用程序并行运行?

    我正在 Flink 中创建一个应用程序 读取某个主题的消息 对其进行一些简单的处理 将结果写入不同的主题 我的代码确实有效 然而它不并行运行我怎么做 看来我的代码只在一个线程 块上运行 在 Flink Web 仪表板上 应用程序进入运行状态
  • 获取:导入 Spark 模块时出错:没有名为“pyspark.streaming.kafka”的模块

    我需要将从 pyspark 脚本创建的日志推送到 kafka 我正在做 POC 所以在 Windows 机器上使用 Kafka 二进制文件 我的版本是 kafka 2 4 0 spark 3 0 和 python 3 8 1 我正在使用 p
  • 如何在kafka中定义多个序列化器?

    比如说 我发布和使用不同类型的 java 对象 对于每个对象 我必须定义自己的序列化器实现 我们如何在 serializer class 属性下提供kafka消费者 生产者属性文件中的所有实现 我们有一个类似的设置 不同主题中的不同对象 但
  • Windows 上的 Apache Kafka 错误 - 无法找到或加载主类 QuorumPeerMain

    我刚刚从 Apache 网站下载了 Kafka 2 8 0 我正在尝试使用网站上给出的说明进行设置 但是当我尝试启动 Zookeper 服务器时 出现以下错误 错误 无法找到或加载主类 org apache zookeeper server

随机推荐

  • GitHub SSH 密钥声称未使用

    为什么 在我的 GitHub 帐户上的 设置 gt SSH 密钥 下 它显示 由 GitHub for Mac 于 2014 年 10 月 24 日添加 从未使用过 没用过 我用过 我的个人资料中显示了很多贡献 从that机器 我还有另一把
  • Android:使用 onTouchListener() 循环执行线程

    您好 我的应用程序中有 8 个按钮 每个按钮都配置为 onclickListener 当单击该按钮时 字符串将写入套接字 现在我希望当我按住按钮时 字符串必须循环写入 这就是我正在尝试做的事情 bLeft setOnTouchListene
  • 在 Ubuntu 上安装 Java 7

    Note 这个问题是在 Oracle 将 OpenJDK 作为 Oracle JDK 的免费版本之前提出的 历史答案反映了这一点 从 2022 年起 您不应使用 Java 7 除非您必须使用无法在 OpenJDK 8 上运行的项目 为了安装
  • ELK 未将元数据从 filebeat 传递到 Logstash

    通过以下方式安装 ELK 服务器 https www digitalocean com community tutorials how to install elasticsearch logstash and kibana elk sta
  • R 编程:从数据框中查找所有因子

    我正在尝试获取数据框列的类类型 我正在做的是 sapply mydata class 但现在 我只想找到那些作为因素的列名 我尝试了以下方法 sapply data is factor 但它给了我 ResponseFlag Gender M
  • ANTLR 隐式乘法

    我是 ANTLR 的新手 我正在尝试扩展所提供的简单计算器的示例here https stackoverflow com a 1932664 具体来说 我尝试添加一些简单的函数 负数等 以熟悉 ANTLR 然而 我在尝试实现 隐式 乘法时遇
  • 如何收集与输入函数匹配通配符的Snakemake输入文件?

    我有一组使用 BWA MEM 生成并使用 GATK IndelRealigner 等进一步处理的 BAM 文件 我正在以较小的块对 BAM 文件进行预处理 以加快处理速度 然而 我必须在变体调用之前将这些单独的文件合并到一个 BAM 文件中
  • 为什么我不能从互斥锁中可变地借用单独的字段? [复制]

    这个问题在这里已经有答案了 尝试通过以下方式获取对单独字段的可变引用MutexGuard struct MyObject pub a i32 pub b i32 fn func 1 mtx Mutex
  • x86_64 执行 Shellcode 失败:

    我在 64 位 Linux 上使用 Python 2 7 我有以下 Python 脚本 应该执行一个简单的 Hello World shellcode import urllib2 import ctypes shellcode xb8 x
  • Dynamic_cast 不适用于非多态类型的原因

    有课B和派生类D class B int b class D public B int d D d new D B b dynamic cast
  • python 组合数据框中的行并将值相加

    我有一个数据框 Type Volume Q 10 Q 20 T 10 Q 10 T 20 T 20 Q 10 我想将类型 T 合并到一行中 并且仅当两个 或更多 T 连续时才添加音量 即 Q 10 Q 20 T 10 Q 10 T 20 2
  • 我如何近似“你的意思是?”不使用谷歌?

    我知道这个问题重复 谷歌 你是说吗 是怎么回事 算法工作 https stackoverflow com questions 307291 how does the google did you mean algorithm work 如何
  • 错误“virtualenv:找不到命令”,但安装位置位于 PYTHONPATH 中

    在过去的两天里 这让我发疯 我在 Macbook 上安装了 virtualenvpip install virtualenv 但是当我尝试使用创建一个新的 virtualenv 时virtualenv venv 我收到错误消息 virtua
  • 如何使用 Java/Swing 旋转图像,然后将其原点设置为 0,0?

    我能够旋转已添加到 JLabel 的图像 唯一的问题是 如果高度和宽度不相等 旋转后的图像将不再出现在 JLabel 的原点 0 0 处 这就是我正在做的事情 我还尝试使用 AffineTransform 并旋转图像本身 但结果相同 Gra
  • 在 WPF DataGrid 中使用 Enter 键作为 Tab

    我有一个DataGrid in WPF I want to move to the NextCell when i hit Enter and when the LastColumn is reached it should have th
  • Android Studio - 恐慌:无法打开 AVD

    经过几个小时修复 Gradle 问题后 我能够在 Android Studio 中构建我的测试应用程序 但是当我尝试在 AVD 中运行它时 它就是打不开 这是日志 Waiting for device C Users Rahaman App
  • 如何在不使用 len 的情况下知道列表是否仅包含 1 个元素

    我想知道列表是否只包含一个元素 而不使用len 在这两种解决方案之间 最Pythonic的方法是什么 或者也许这些都不是Pythonic的 如果是的话那又是什么 解决方案a 删除位置1处的项目 除了IndexError所以我知道只有 1 件
  • Python 求解一个变量的方程

    我正在尝试使用 SymPy 求解 python 中的方程 我有一个生成的方程 类似于function y 8 0 y 3 0 我将其与 SymPy 一起使用来创建一个如下所示的新方程 eq sympy Eq function 2 哪个输出y
  • 如何反序列化动态Json对象?

    我目前从我的 api 收到以下 JSON 响应 Lastname ERRLASTNAMEEMPTY Firstname ERRFIRSTNAMEEMPTY 请注意 上述响应是动态的 即有时我可以有名字 有时可以有姓氏 有时两者都有 此响应基
  • 如何从 Kafka 主题获取最近的消息

    我们是否有任何选项 例如从 Kafka 主题获取最近 10 20 等消息 我可以看到 from beginning 选项从主题中获取所有消息 但如果我只想获取第一个 最后一个 中间或最新的几条消息 10 我们有一些选择吗 前 N 条消息 您