如何删除消费者已经消费过的数据?卡夫卡

2024-01-09

我正在kafka中进行数据复制。但是,kafka 日志文件的大小增长得非常快。一天的大小达到 5 GB。作为这个问题的解决方案,我想立即删除已处理的数据。我正在 AdminClient 中使用删除记录方法来删除偏移量。但是当我查看日志文件时,与该偏移量对应的数据并未被删除。

RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffset(offset);
TopicPartition topicPartition = new TopicPartition(topicName,partition);
Map<TopicPartition,RecordsToDelete> deleteConf = new HashMap<>();
deleteConf.put(topicPartition,recordsToDelete);
adminClient.deleteRecords(deleteConf);

我不需要像这样的建议(log.retention.hours、log.retention.bytes、log.segment.bytes、log.cleanup.policy=删除)

因为我只想删除消费者消费的数据。在这个解决方案中,我还删除了没有消耗的数据。

您有什么建议?


你没有做错任何事。您提供的代码有效并且我已经测试过。以防万一我忽略了您代码中的某些内容,我的代码是:

public void deleteMessages(String topicName, int partitionIndex, int beforeIndex) {
    TopicPartition topicPartition = new TopicPartition(topicName, partitionIndex);
    Map<TopicPartition, RecordsToDelete> deleteMap = new HashMap<>();
    deleteMap.put(topicPartition, RecordsToDelete.beforeOffset(beforeIndex));
    kafkaAdminClient.deleteRecords(deleteMap);
}

我使用过组:'org.apache.kafka',名称:'kafka-clients',版本:'2.0.0'

因此,请检查您的目标分区是否正确(第一个分区为 0)

检查您的经纪人版本:https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html says:

0.11.0.0版本的经纪商支持此操作

从同一应用程序生成消息,以确保连接正确。

您还可以考虑另一种选择。使用cleanup.policy=紧凑如果您的消息键重复,您可能会从中受益。不仅因为该键的较旧消息将被自动删除,而且您可以利用具有空负载的消息删除该键的所有消息这一事实。只是不要忘记设置删除.retention.ms and 最小压缩延迟毫秒到足够小的值。在这种情况下,您可以使用消息,然后为同一密钥生成空有效负载(但要谨慎使用此方法,因为这样您可以删除未使用的消息(使用该密钥))

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何删除消费者已经消费过的数据?卡夫卡 的相关文章

  • Java Swing 应用程序消息对话框帮助

    我正在开发 Java Swing 应用程序 我需要创建一个如图所示的对话框 我不知道这个的名字 我无法解释 所以我附上一张照片 请告诉我这叫什么以及如何在我的 GUI 应用程序中创建它 给猫剥皮的方法不止一种 public final cl
  • 我在使用 JavaFX 绘制十字时遇到问题

    我正在尝试编写代码 在网格上对角绘制 3 个形状 前两个形状是正方形和圆形 我能做到 然而 第三种形状让我有些悲伤 我应该画一个十字 T 版本 而不是 X 每次我写出代码时 它看起来就像一个侧面 我知道我只是错过了一些简单的东西 但我真的很
  • 像 Google Play 商店一样在垂直 RecyclerView 中动态不同图像水平 RecyclerView

    我一直在关注这个教程 http android pratap blogspot co za 2015 12 horizo ntal recyclerview in vertical html http android pratap blog
  • 查找所有数组的长度多维数组,Java

    我想使用多维数组来存储数据网格 但是 我还没有找到一种简单的方法来查找长度2nd数组的一部分 例如 boolean array new boolean 3 5 System out println array length 只会输出3 是否
  • 模式更新后 jOOQ 生成的类的运行时验证?

    我用org jooq util DefaultGenerator在构建过程中生成 jOOQ 类来表示我的数据库模式 当应用程序运行时 架构预计会在应用程序不知情的情况下发生更改 此类更改可能与已生成的代码兼容 也可能不兼容 如何在运行时检测
  • 有没有办法让Maven自动下载快照版本?

    所以我有一个项目依赖于另一个项目的快照版本 依赖关系是
  • Eclipse Oxygen - 该项目未构建,因为其构建路径不完整

    我刚刚安装了 Eclipse Oxygen 并尝试在工作台中打开现有项目 但收到此错误 该项目未构建 因为其构建路径不完整 不能 找到 java lang Object 的类文件 修复构建路径然后尝试 建设这个项目 我尝试右键单击该项目 转
  • 在 libgdx 中渲染 box2d

    我有一个使用 FitViewport 的大小为 800x480 的游戏世界 并且最初使用像素渲染 box2d 实体 固定装置 因此所有物理效果都显得浮动且缓慢 查看文档后 我意识到 box2d 使用度量单位 因此我将 box2d 位置和大小
  • 如何在 Java 中使用 HTML 解析器和 Apache Tika 来提取所有 HTML 标签?

    我下载了 tika core 和 tika parser 库 但找不到将 HTML 文档解析为字符串的示例代码 我必须删除网页源的所有 html 标签 我能做些什么 如何使用 Apache Tika 进行编码 您想要 html 文件的纯文本
  • Java 8 Stream - 并行执行 - 不同的结果 - 为什么?

    假设我有一个List
  • 在 Apache Servicemix 4 中的 OSGi 包之间共享配置文件?

    有人能够在 SMX4 中的两个或多个捆绑包之间成功共享配置吗 我正在寻找的是这样的 有一个文件 SMX HOME etc myconfiguration cfg 使此配置 可用 以便使用 Spring dm 通过 OSGi 配置管理将其注入
  • 带等待/通知的同步块与不带等待/通知的同步块之间的区别?

    如果我只是使用synchronized 不是wait notify方法 它仍然是线程安全的吗 有什么不同 Using synchronized使方法 块一次只能由一个线程访问 所以 是的 它是线程安全的 这两个概念是结合在一起的 而不是相互
  • “强制更新快照/版本” - 这是什么意思

    在 Maven 项目中 选择 更新项目 时 有一个名为 强制更新快照 版本 的选项 它有什么作用 强制更新快照 版本 就像运行以下命令 mvn U install U 也可以用作 update snapshot 看here http boo
  • 短 2 个字节

    我正在从串行端口读取一个长度为 133 字节的数据包 最后 2 个字节包含 CRC 值 我使用 Java 将 2 个字节值制成单个 我认为很短 这就是我所做的 short high 48 0x00ff short low 80 short
  • Web 服务客户端的 AXIS 与 JAX-WS

    我决定用Java 实现Web 服务客户端 我已经在 Eclipse 中生成了 Axis 客户端 并使用 wsimport 生成了 JAS WS 客户端 两种解决方案都有效 现在我必须选择一种来继续 在选择其中之一之前我应该 考虑什么 JAX
  • 按钮悬停和按下效果 CSS Javafx

    我是 CSS 新手 为按钮定义了以下 CSS 样式 其中id并且应用了自定义样式 但不应用悬停和按下效果 bevel grey fx background color linear gradient f2f2f2 d6d6d6 linear
  • 在Java内存管理中,“PS”代表什么?

    每当我看到 Java 中对内存的引用时 各种空格总是以 PS 为前缀 PS 是什么意思 它开始困扰我 到目前为止我唯一的猜测是 泳池空间 但这将是多余的 例子 PS伊甸园空间 PS 幸存者空间 PS 终身空间 老一代 PS Perm Gen
  • Android - 从渲染线程内结束活动

    下午好 我不熟悉 android 中的活动生命周期 并且一直在尽可能地阅读 但我不知道如何以良好的方式解决以下问题 我有一个使用 GLSurfaceView 的活动来在屏幕上绘制各种内容 在这个 GLSurfaceView 的渲染线程中 我
  • java中什么是静态接口?

    我正在阅读Map Entry界面 当我注意到它是一个static界面 我不太明白什么是静态接口 它与常规接口有什么不同 public static interface Map Entry
  • removeall 和removeif 的用例

    我找到了这个 fun main val list MutableList

随机推荐