Kafka Streams - 减少大型状态存储的内存占用

2024-04-25

我有一个拓扑(见下文),可以读取一个非常大的主题(每天超过十亿条消息)。这个 Kafka Streams 应用程序的内存使用量相当高,我正在寻找一些关于如何减少状态存储占用空间的建议(更多详细信息如下)。Note:我并不是想逃避国有商店,我只是认为可能有一种方法可以改善我的拓扑 - 见下文。

// stream receives 1 billion+ messages per day
stream
    .flatMap((key, msg) -> rekeyMessages(msg))
    .groupBy((key, value) -> key)
    .reduce(new MyReducer(), MY_REDUCED_STORE)
    .toStream()
    .to(OUTPUT_TOPIC);

// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);


// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)

// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)

// etc

更具体地说,我想知道是否流式传输OUTPUT_TOPIC因为 KTable 导致状态存储(REKEYED_STORE)比本地需要的要大。对于具有大量唯一键的变更日志主题,将它们作为流式传输会更好吗?KStream并进行窗口聚合?或者这不会像我想象的那样减少占用空间(例如,只有记录的子集 - 窗口中的记录,会存在于本地状态存储中)。

不管怎样,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效。这是我的问题:

  • 对于具有这种吞吐量级别的 Kafka Streams 应用程序,是否应该考虑任何配置选项、一般策略等?
  • 是否有关于单个实例的内存密集程度的指导原则?即使您有一个有点武断的指导方针,与他人分享也可能会有所帮助。我的一个实例当前使用 15GB 内存 - 我不知道这是否好/坏/无关紧要。

任何帮助将不胜感激!


以你目前的模式

stream.....reduce().toStream().to(OUTPUT_TOPIC);
builder.table(OUTPUT_TOPIC, REKEYED_STORE)

您会得到两家内容相同的商店。一个为reduce()运算符和一个用于读取table()-- 不过,这可以减少到一个商店:

KTable rekeyedTable  = stream.....reduce(.);
rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely

这应该会显着减少你的内存使用量。

关于窗口化与非窗口化:

  1. 这是你所需的语义问题;如此简单地从非窗口化到窗口化缩减似乎是有问题的。

  2. 即使您也可以使用窗口语义,也不一定会减少内存。请注意,在聚合情况下,Streams 不存储原始记录,而仅存储当前聚合结果(即 key + currentAgg)。因此,对于单个密钥,两种情况的存储要求是相同的(单个窗口具有相同的存储要求)。同时,如果您使用 Windows,当您获得聚合专业密钥专业窗口时,您实际上可能需要更多内存(而在非窗口情况下您仅获得单个聚合专业密钥)。唯一可以节省内存的情况是“密钥空间”分布在很长一段时间内的情况。例如,您可能很长时间无法获取某些按键的输入记录。在非窗口情况下,这些记录的聚合将始终被存储,而对于窗口情况,键/聚合记录将被删除,并且如果稍后出现具有此键的记录,则将重新创建新条目再次打开(但请记住,在这种情况下您丢失了之前的聚合门 - 参见 (1))

最后但并非最不重要的一点是,您可能需要查看调整应用程序大小的指南:http://docs.confluence.io/current/streams/sizing.html http://docs.confluent.io/current/streams/sizing.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Streams - 减少大型状态存储的内存占用 的相关文章

随机推荐

  • 无法在后台任务中调用 Task.Run()

    我想在后台任务的线程中做一些事情 所以我尝试使用 Task Run 但它不起作用 任何人都可以向我展示另一种在后台任务中创建线程的方法 这是我的代码 public sealed class KatzBackgroundTask IBackg
  • 无法将属性与数字进行比较。错误:“‘AnsibleUnsafeText’和‘int’实例之间不支持”

    getent database passwd debug var getent passwd dict2items selectattr value 1 gt 1000 map attribute key list 输出是 TASK deb
  • Fortran 03/08(gfortran 编译器)中使用无限多态类型进行数组操作

    我想通过以下方式实现有用的数组操作 添加元素 删除元素 通过可分配 指针 二叉树结构实现不同的实现 class 特征 无限多态性 我使用 gfortran 5 0 应该可以处理这样的功能 我需要它 以免为我使用的每种类型重复相同的代码 这应
  • 如何在 Django 中创建 unique_for_field slug?

    姜戈有一个日期唯一 http docs djangoproject com en dev ref models fields unique for date您可以在将 SlugField 添加到模型时设置的属性 这会导致 slug 仅对于您
  • 像在eclipse中一样关闭intellij idea中未使用的模块

    据我所知 目前 intellij idea 中没有任何功能可以做到这一点 我不知道为什么 但他们不支持这样做 至少这是我通过所有研究发现的结果 也许我们中的一些人用不同的方式来解决这个问题 如何在 intellij 中使用多个模块 在处理多
  • 如何从 USB 加载 LUKS 密码,然后返回键盘?

    我想设置一台具有全磁盘加密功能的无头 Linux Debian Wheezy PC 能够使用 USB 驱动器或通过键盘输入密码来解锁磁盘 我的起点是使用 Debian 安装程序中基本的整个磁盘加密选项进行全新安装 该安装程序将 boot 之
  • 如何在 Square MockWebServer 中使用 SSL?

    我尝试启用 SSLSquare 的 MockWebServer https github com square okhttp tree master mockwebserver在测试下模拟我的 Android 应用程序中的所有 Web 服务
  • 如何使用 PowerShell 递归合并/“展平”文件夹结构

    我正在寻求帮助来重组许多子文件夹中的大量文件 示例来源 folderX aaa txt bbb txt folderY ccc txt folderZ ddd txt eee txt 理想结果 folderX aaa txt folderX
  • 自上一步以来进程或线程已更改

    我正在 Visual Studio 上调试一些代码 此代码属于我创建的自定义会话提供程序 我正在 Web 应用程序启动时对其进行调试 它开始初始化我的提供程序 并且在该函数上我有一个第一次成功命中的断点 但是 同一断点再次被击中 但它有一个
  • 带有自定义离线页面的 Angular PWA

    在 Angular 8 应用程序中 我想添加一个自定义离线页面 只是一个简单的 html 文件 我已将我的应用程序设置为 PWA 使用 angular pwa并配置了一切 以便它至少在在线时顺利工作 然而 我很难为 PWA 用户提供更新 因
  • unsafePerformIO 和 FFI 库初始化

    我正在为 C 中的库创建一个 FFI 模块 该模块希望在执行其他操作之前调用一个一次性 不可重入的函数 这个调用是幂等的 但是有状态的 所以我可以在每个 Haskell 调用中调用它 但它很慢 并且由于不可重入 可能会导致冲突 那么现在是使
  • 允许用户在 Android 应用程序中插入图像

    我的问题是 如何创建 imageButton 允许用户从手机上传图像并将其作为图片配置文件插入应用程序中 例如 像 Whatsapp 一样 它允许用户从手机中选择图像并将其设置为图片配置文件 Thanks 我的 XML 文件
  • 为什么 Func 与 Func> 不明确?

    这个问题让我很困惑 所以我想我会在这里问 希望 C 大师可以向我解释一下 为什么这段代码会产生错误 class Program static void Main string args Foo X the error is on this
  • Laravel 5.3 存储和读取文件目录

    目前正在尝试处理文件 但很难弄清楚将它们放在哪里以及如何在列表中读回它们 我尝试过将一些测试文件放入 files array dir opendir asset files open the cwd also do an err check
  • 如何使用 pyspark 从 s3 存储桶读取 csv 文件

    我正在使用 Apache Spark 3 1 0 和 Python 3 9 6 我正在尝试从 AWS S3 存储桶读取 csv 文件 如下所示 spark SparkSession builder getOrCreate file s3 b
  • 不获取AudioListenerInterruptionEnd触发器

    我对 OpenAl 和 MPMoviePlayerController 的组合有疑问 我在 OpenAl 设置过程中注册了 AudioInterruptionLister 当我开始播放视频时 侦听器会收到 AudioListenerInte
  • 离子 3 角度 4 动画不起作用

    我有一个组件 我正在尝试为手风琴列表设置动画 我已经进行了所有更改 例如包括import BrowserModule from angular platform browser and import BrowserAnimationsMod
  • std::unordered_set 迭代器遍历的复杂性

    我最近玩了一个std unordered set http en cppreference com w cpp container unordered set 我怀疑我的 STL 版本会跟踪某些 FILO 数据结构 看起来像列表 中的非空存
  • Android JSON解析并存储到数据库

    我正在制作一个具有数据库的应用程序 现在我正在尝试从中解析数据值
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认