Kafka connect(单消息转换)行过滤

2023-12-10

我读到了 kafka 0.10.2.1 中引入的 Kafka 连接转换https://kafka.apache.org/documentation/#connect_transforms

我注意到所有转换都是基于列的转换。 我有一个需要基于值的过滤的用例。例如:

考虑以下一组人的数据集:

{"firstName": "FirstName1", "lastName": "LastName1", "age": 30}
{"firstName": "FirstName2", "lastName": "LastName2", "age": 30}
{"firstName": "FirstName3", "lastName": "LastName1", "age": 60}
{"firstName": "FirstName4", "lastName": "LastName2", "age": 60}

我希望我的工作人员过滤姓氏为 LastName2 的所有记录

是否可以使用 kafka-connect 或者我需要为此用例编写一个单独的程序。

Thanks


您没有理由不能使用单一消息转换来解决这个问题 - 但您需要编写一个自定义消息转换,因为您所描述的内容无法通过当前提供的转换来获得。

这是关于何时使用和不使用 SMT 的有用演讲:2017 年纽约卡夫卡峰会:单一消息转换并不是您正在寻找的转换(Ewen Cheslack-Postava,Confluence)


2020年4月编辑: 现在有一个过滤器表面贴装作为 Confluence 平台的一部分提供

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka connect(单消息转换)行过滤 的相关文章

  • 有没有办法使用 .NET 中的 Kafka Ksql Push 查询

    我目前正在 NET 中使用 Kafka 消费者处理大量 Kafka 消息 我的处理过程的第一步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息 我不想首先处理 特别是不下载 那些不需要的消息 看起来 kSql 查询 写为推送查
  • 如何通过字符串匹配加速 pandas 行过滤?

    我经常需要过滤 pandas 数据框df by df df col name string value 并且我想加快行选择操作 有没有快速的方法可以做到这一点 例如 In 1 df mul df 3000 2000 3 reset inde
  • 如何过滤反应中的状态数组?

    我有阶数减速器 它有很多状态 const initialState channel null order fetching true menu categories subcategories currentCategoryId 1 cur
  • Twitter Bootstrap 行过滤器/搜索框

    我无法找到有关如何为 Twitter Bootstrap 创建简单搜索查询或行过滤器的教程 我已经尝试了很多 我不确定是否我做错了什么或者插件与 Bootstrap 不兼容 如果可以的话请帮忙 我试过了 document ready fun
  • 带有 spring-kafka 的 Kafka 死信队列 (DLQ)

    最好的实施方式是什么死信队列 DLQ Spring Boot 2 0 应用程序中的概念 使用 spring kafka 2 1 x 来处理无法处理的所有消息 KafkaListener某些bean发送到某些预定义的Kafka DLQ主题的方
  • 删除主题级别配置

    为了删除主题中的所有数据 我将其retention ms配置设置为1000 bin kafka topics sh zookeeper KAFKAZKHOSTS alter topic
  • 如何删除 Apache Kafka 中的主题? [复制]

    这个问题在这里已经有答案了 我需要删除 Kafka 0 8 2 2 3 中的一个主题 我使用以下命令删除主题 bin kafka topics sh zookeeper localhost 2181 delete topic DummyTo
  • 如何在 C# 中将位图图像转换为黑白图像? [复制]

    这个问题在这里已经有答案了 可能的重复 在 c 中将图像转换为黑白或棕褐色 https stackoverflow com questions 4624998 convert image to black white or sepia in
  • jQuery 选择和过滤 div 内的元素

    我在选择和过滤 div 内的元素时遇到问题 HTML div div
  • Flink Kafka - 如何使应用程序并行运行?

    我正在 Flink 中创建一个应用程序 读取某个主题的消息 对其进行一些简单的处理 将结果写入不同的主题 我的代码确实有效 然而它不并行运行我怎么做 看来我的代码只在一个线程 块上运行 在 Flink Web 仪表板上 应用程序进入运行状态
  • 在 mule 中使用 groovy 表达式来限制 IP

    我在 mule 中使用 cxf 创建了一个代理服务 我的 mule 版本是 3 3 0 CE 现在 我想在使用代理服务创建的 wsdl 中添加限制 我的限制不应允许他们看到我的 wsdl 的每个 IP 为此 我找到了 Groovy 表达式和
  • JDBC Kafka Connector 可以从多个数据库中提取数据吗?

    我想设置一个 JDBC Kafka 连接器集群 并将它们配置为从同一主机上运行的多个数据库中提取数据 我一直在查看 Kafka Connect 文档 似乎在配置 JDBC 连接器后 它只能从单个数据库中提取数据 谁能证实这一点吗 根据您启动
  • Scipy max_filter 太疯狂了

    我对 scipy 的 Maximum filter 函数有一个小问题 但没有得到解决方案 我有三个不同的 numpy 数组 a np array 152 nan 30 nan nan nan nan nan nan nan nan nan
  • 如何组合过滤条件

    过滤器类函数接受一个条件 a gt Bool 并在过滤时应用它 当您有多个条件时 使用过滤器的最佳方法是什么 使用了应用函数 liftA2 而不是 liftM2 因为出于某种原因我不明白 liftM2 在纯代码中如何工作 liftM2 组合
  • 如何过滤视图两列 OR 而不是 AND?

    在 Google 表格中 我想过滤查看结果 以便仅显示 D 列和 或 E 列中带有 x 的行 如果我过滤以显示其中包含 x 的列 则它将仅显示 D 列和 E 列中包含 x 的行 我如何让它做 和 或 操作 当我单击列中的过滤器按钮时 它只会
  • 如何创建一个多重过滤函数来过滤掉多个属性?

    我有一个要过滤的对象数组 name Apple age 24 model Android status Under development name Roboto age 24 model Apple status Running 我需要使
  • Windows 上的 Apache Kafka 错误 - 无法找到或加载主类 QuorumPeerMain

    我刚刚从 Apache 网站下载了 Kafka 2 8 0 我正在尝试使用网站上给出的说明进行设置 但是当我尝试启动 Zookeper 服务器时 出现以下错误 错误 无法找到或加载主类 org apache zookeeper server
  • Kafka Consumer 如何(应该)应对有毒消息

    当 Kafka Consumer 无法反序列化消息时 客户端应用程序是否有责任处理有毒消息 Or Kafka是否会 增加 消息偏移并继续消费有效消息 是否有处理 Kafka 主题上的有毒消息的 最佳实践 当 Kafka 无法反序列化记录时
  • 从流中过滤/删除无效的 xml 字符

    首先 我无法更改 xml 的输出 它是由第三方生成的 他们在 xml 中插入无效字符 我得到了 xml 字节流表示形式的 InputStream 除了将流消耗到字符串中并对其进行处理之外 是否有一种更干净的方法来过滤掉有问题的字符 我找到了
  • Javascript正则表达式用于字母字符和空格? [关闭]

    这个问题不太可能对任何未来的访客有帮助 它只与一个较小的地理区域 一个特定的时间点或一个非常狭窄的情况相关 通常不适用于全世界的互联网受众 为了帮助使这个问题更广泛地适用 访问帮助中心 help reopen questions 我需要一个

随机推荐