是否可以使用 Kafka Streams 访问消息头?

2024-05-07

随着添加Headers http://apache.spinellicreations.com/kafka/0.11.0.0/javadoc/org/apache/kafka/common/header/Header.html到记录(生产者记录 https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html & 消费者记录 http://mirror.reverse.net/pub/apache/kafka/0.11.0.0/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)在 Kafka 0.11 中,使用 Kafka Streams 处理主题时是否可以获取这些标头?当调用类似方法时map on a KStream它提供了以下论据keyvalue的记录,但我无法看到访问headers。如果我们能的话那就太好了map超过ConsumerRecords.

ex.

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ... 

像这样的东西会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> {
        record.headers();
        record.key();
        record.value();
    })
    ...

从 2.0.0 版本开始可以访问记录标题(参见KIP-244 https://cwiki.apache.org/confluence/display/KAFKA/KIP-244%3A+Add+Record+Header+support+to+Kafka+Streams+Processor+API了解详情)。

您可以通过处理器 API 访问记录元数据(即通过transform(), transformValues(), or process()),通过给定的“上下文”对象(参见https://docs.confluence.io/current/streams/developer-guide/processor-api.html#accessing-processor-context https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).

Update

从 2.7.0 版本开始,处理器 API 得到了改进(参见KIP-478 https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API),添加一个新的类型安全api.Processor与 一起上课process(Record)代替process(K, V)方法。对于这种情况,标头(和记录元数据)可以通过Record class).

这个新功能是尚不可用在“DSL 的 PAPI 方法”中(例如KStream#process(), KStream#transform()和兄弟姐妹)。

+++++

在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳,但不公开在旧版本中读取时实际上被 Streams 删除的标头。

但元数据在 DSL 级别不可用。然而,通过以下方式扩展 DSL 的工作也在进行中KIP-159 https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

是否可以使用 Kafka Streams 访问消息头? 的相关文章

随机推荐

  • AngularJS:在任何部分页面控制器之前调用特定函数

    我想调用一个特定的函数 GetSession 在我的应用程序加载开始时 这个函数使 http调用并获取会话令牌 GlobalSessionToken从服务器 然后 该会话令牌用于其他控制器逻辑并从服务器获取数据 我已经打电话给这个GetSe
  • 无法在 Google Cloud Function 中加载 node_modules(index.js,不在项目根目录上)

    因此 我需要部署 Google Cloud Function 以在 PostgreSQL 数据库 Cloud SQL 上发出选择请求 我需要使用 pg 模块 然后使用以下命令安装它 npm i pg 你需要知道我的项目目录是这样的 proj
  • 添加到 std::vector 的中间

    有没有办法将值添加到 a 的中间vector在 C 中 假设我有 vector
  • 在react中自定义useAxios钩子

    我正在使用 axios 和 React 所以我想为此编写一个自定义钩子 我这样做了 它工作正常 如下所示 const useAxios gt const response setResponse useState const error s
  • 弹性图表隐藏数据提示

    我们从多个源获取数据 并且某个日期的数据可能存在也可能不存在 因此 对于没有数据的点 我们发送 NaN 问题 在下面的代码中 有没有办法不显示那些为空的数据提示 我添加了一个数据提示功能 但它确实显示了一个小的空方块 是否有可能甚至不显示
  • VSCode 扩展的安全性和隐私性

    我发现 VSCode 有很多不错的扩展 然而 我担心这些扩展是否将我的代码发送到他们的任何服务器 有什么办法可以查到吗 我可以使用 fiddler 并隔离插件中可能发生的调用 但不想对我安装的每个扩展都这样做 VScode 团队对此有一些指
  • VS2015 nuget包管理器找不到包

    我安装了 VS2015 Update 2 现在 nuget 包管理器找不到 Microsoft 和 net 包源之外的任何包 看起来 nuget 包源已被删除 当我将其添加回 http www nuget org http www nuge
  • 外部“C”声明如何工作?

    我正在学习编程语言课程 我们正在讨论extern C 宣言 除了 它与 C 和 C 接口 之外 此声明如何在更深层次上工作 这对程序中发生的绑定有何影响 extern C 用于确保后面的符号不是mangled http en wikiped
  • 如何测量异步发电机所花费的时间?

    我想测量生成器花费的时间 阻塞主循环的时间 假设我有以下两个生成器 async def run for i in range 5 await asyncio sleep 0 2 yield i return async def walk f
  • 预注册 ATL 窗口类

    我在一个项目中使用了 ATL 和 WTL 的组合 并从中派生了我自己的类CWindowImpl 看起来像这样 class CMyControl public CWindowImpl
  • 浮点型、双精度型和十进制最大值与大小的关系[重复]

    这个问题在这里已经有答案了 我在 C 中遇到了这些数据类型的大小和最大值的令人困惑的模式 在使用 Marshal SizeOf 比较这些大小时 我发现了以下结果 Float 4 bytes Double 8 bytes Decimal 16
  • Symfony2 -> Twig -> 表单 -> 字段 -> 设置渲染 = true

    我有一个简单的问题 我有一个带有字段的表单 例如 builder gt add x gt add y gt add z 在我的树枝文件中 我使用了多个块 并且我想停止渲染字段 我查看了 b html twig 文件 a html twig
  • LibGDX dispose() 方法应该如何使用?

    我很不清楚如何dispose LibGDX 框架中的方法有效并且应该使用 据我所知 当你不需要某种资源后 你必须进行处置以确保你的程序运行最佳 我正在开发一个移动应用程序 并且我有一个AssetManager在启动时在特殊指定的加载屏幕中加
  • 消息:Hive 架构版本 1.2.0 与 Metastore 的架构版本 2.1.0 不匹配 Metastore 未升级或损坏

    环境 spark2 11 hive2 2 hadoop2 8 2 hive shell 运行成功 并且没有错误或警告 但是当运行application sh时 启动失败 usr local spark bin spark submit cl
  • Android 中如何获取帧

    实际上 我需要从视频中获取所有帧 但在使用 Mediametadataretriever 缩略图 时间戳获取帧时 我经常重复获取第一帧 然后获取特定时间帧 我通过更改所有 GetFrameAtTime options 尝试了很多修复 但仍然
  • 如何避免javascript中for循环内的for循环

    我已经编写了一段运行良好的代码 我想要一个新数组 其中包含 myArr 中的元素 按照 orderArr 中指定的顺序 但是 它在另一个 for 循环中使用 for 循环来匹配数组元素 var myArr a b c d e var ord
  • 从所有通讯组中删除所有前雇员

    因此 今天我被分配的任务是从所有 DL 中删除域中的所有前员工 他们在 AD 中拥有自己的文件夹 有没有什么方法可以快速做到这一点 或者至少比单独检查每个并转到 gt 的成员删除所有更快 Thanks 编辑以添加更多信息 有 822 个用户
  • API 错误 (500):清单未知:清单未知

    it failes to pull the image with SHA256 digest identifier 不幸的是 这是 DockerHub 消除 Docker 1 9 守护进程的向后兼容性的副作用 当使用 Docker 1 10
  • PHP 命名空间和 use 即使我已经使用 use 指定了类,也找不到致命错误类

    我在 PHP 中的名称空间方面遇到了麻烦 例如我有一个这样的文件 namespace App Models Abstracts abstract class Country 然后是另一个像这样的文件 namespace App Models
  • 是否可以使用 Kafka Streams 访问消息头?

    随着添加Headers http apache spinellicreations com kafka 0 11 0 0 javadoc org apache kafka common header Header html到记录 生产者记录