Kafka Connect-在写入接收器之前修改记录

2023-11-30

我已经使用 confluence-4.0.0 安装了 Kafka connect 使用 hdfs 连接器,我可以将从 Kafka 主题收到的 Avro 记录保存到配置单元。 我想知道在写入hdfs接收器之前是否有任何方法可以修改记录。 我的要求是对记录的值进行小的修改。例如,对整数执行算术运算或对字符串进行操作等。 请建议是否有任何方法可以实现这一目标


您有多种选择。

  1. 单个消息转换,你可以看到它的实际效果here。非常适合消息通过 Connect 传递时的轻量级更改。基于配置文件,并且可使用提供API如果没有现有的转换可以满足您的要求。

    See the 在这里讨论关于 SMT 何时适合给定的要求。

  2. KSQL是 Kafka 的流式 SQL 引擎。您可以使用它来修改数据流,然后再将其发送到 HDFS。请参阅此处的示例.

  3. KSQL 构建于卡夫卡流 API,它是一个 Java 库,使您能够根据需要转换数据。这是一个例子.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka Connect-在写入接收器之前修改记录 的相关文章

  • Hive NVL 不适用于列的日期类型 - NullpointerException

    我正在使用 HDFS 上的 MapR Hive 发行版并面临以下问题 如果表的列类型是 日期 类型 则NVL https cwiki apache org confluence display Hive LanguageManual UDF
  • 如何强制 Spark 执行代码?

    我如何强制 Spark 执行对 map 的调用 即使它认为由于其惰性求值而不需要执行它 我试过把cache 与地图调用 但这仍然没有解决问题 我的地图方法实际上将结果上传到 HDFS 所以 它并非无用 但 Spark 认为它是无用的 简短回
  • 如何使用 Python 在 Kafka 中生成 Tombstone Avro 记录?

    我的水槽属性 name jdbc oracle config connector class io confluent connect jdbc JdbcSinkConnector tasks max 1 topics orders con
  • 如何检测java中的消费者是否无法使用kafka代理?

    我有一个简单的 Java Kafka 消费者 如果 Kafka 代理不可用 我试图捕获异常 我需要它来中断线程 我有这样的代码 KafkaConsumer
  • 带有 spring-kafka 的 Kafka 死信队列 (DLQ)

    最好的实施方式是什么死信队列 DLQ Spring Boot 2 0 应用程序中的概念 使用 spring kafka 2 1 x 来处理无法处理的所有消息 KafkaListener某些bean发送到某些预定义的Kafka DLQ主题的方
  • Kafka JDBC Sink Connector,批量插入值

    我每秒收到很多消息 通过 http 协议 50000 100000 并希望将它们保存到 PostgreSql 我决定使用 Kafka JDBC Sink 来实现此目的 消息以一条记录保存到数据库 而不是批量保存 我想在 PostgreSQL
  • Hive 聚集在多个列上

    据我所知 当配置单元表聚集在一列上时 它会执行该分桶列的哈希函数 然后将该行数据放入其中一个桶中 每个桶都有一个文件 即如果有 32 个桶 那么 hdfs 中就有 32 个文件 将 clustered by 放在多个列上意味着什么 例如 假
  • Kafka 0.10 Java 客户端超时异常:包含 1 条记录的批次已过期

    我有一个单节点 多 3 个代理 Zookeeper Kafka 设置 我正在使用 Kafka 0 10 Java 客户端 我编写了以下简单的远程 在与 Kafka 不同的服务器上 生产者 在代码中我用 MYIP 替换了我的公共 IP 地址
  • 如何使用PySpark结构流+Kafka

    我尝试将 Spark 结构流与 kafka 一起使用 并且在使用 Spark 提交时遇到问题 消费者仍然从生产中接收数据 但 Spark 结构出错 请帮我找到我的代码的问题 这是我在 test py 中的代码 from kafka impo
  • 即使在 Kafka 中进行轮询后,当前也不会发生分区分配

    我有 Java 8 应用程序与 Apache Kafka 2 11 0 10 1 0 一起使用 我需要使用seek特征为poll来自分区的旧消息 然而我遇到了一个例外No current assignment for partition每次
  • 将数据从 .txt 文件加载到 Hive 中以 ORC 形式存储的表

    我有一个数据文件位于 txt格式 我正在使用该文件将数据加载到 Hive 表中 当我将文件加载到类似表中时 CREATE TABLE test details txt visit id INT store id SMALLINT STORE
  • 获取:导入 Spark 模块时出错:没有名为“pyspark.streaming.kafka”的模块

    我需要将从 pyspark 脚本创建的日志推送到 kafka 我正在做 POC 所以在 Windows 机器上使用 Kafka 二进制文件 我的版本是 kafka 2 4 0 spark 3 0 和 python 3 8 1 我正在使用 p
  • Hadoop 超立方体

    嘿 我正在启动一个基于 hadoop 的超立方体 具有灵活的维度数 有人知道这方面现有的方法吗 我刚刚发现PigOLAP草图 http wiki apache org pig PigOLAPSketch 但没有代码可以使用它 另一种方法是Z
  • Kafka Consumer 如何(应该)应对有毒消息

    当 Kafka Consumer 无法反序列化消息时 客户端应用程序是否有责任处理有毒消息 Or Kafka是否会 增加 消息偏移并继续消费有效消息 是否有处理 Kafka 主题上的有毒消息的 最佳实践 当 Kafka 无法反序列化记录时
  • Hadoop 减速器数量配置选项优先级

    以下3个设置reduce数量的选项的优先级是什么 换句话说 如果三者都设置了 会考虑哪一个呢 Option1 setNumReduceTasks 2 within the application code Option2 D mapredu
  • “错误:无法找到或加载主类 org.apache.hadoop.util.RunJar”是什么意思?

    我正在尝试运行一个示例 因为它指出 Hadoop 实践 一书 http www manning com lam 第 15 页 这是需要运行的命令 bin hadoop jar hadoop examples jar 但我收到这个错误 Err
  • Spark中如何获取map任务的ID?

    Spark中有没有办法获取map任务的ID 例如 如果每个映射任务都调用用户定义的函数 我可以从该用户定义的函数中获取该映射任务的 ID 吗 我不确定您所说的地图任务 ID 是什么意思 但您可以使用以下方式访问任务信息TaskContext
  • 如何删除 Apache Kafka 中的多个主题

    假设我有许多具有相同前缀的主题 例如 giorgos topic1 giorgos topic2 giorgos topic3 用于删除单个主题的命令 例如giorgos topic1 如下 bin kafka topics sh zook
  • kafka中的Bootstrap服务器与zookeeper?

    为什么在 kafka consumer 中不推荐使用 Zookeeper 以及为什么建议使用 bootstrap 服务器 bootstrap server 有什么优点 Kafka消费者需要将偏移量提交给kafka并从kafka获取偏移量 由
  • HIVE - 使用WITH CLAUSE插入覆盖

    我有一个生成的查询以WITH子句开头 当我在控制台中运行它时 当我尝试使用INSERT OVERWRITE运行查询以将输出加载到单独的配置单元表中时 该查询工作正常 INSERT OVERWRITE TABLE proc db master

随机推荐

  • 为什么我的节点服务器处理请求两次?

    我有以下简单的节点服务器 const http require http http createServer function req resp console log request arrived resp writeHead 200
  • 分离度

    有没有一种方法 有效或无效 来找到给定 facebook 上的两个人 X 和 Y 像这样定义的数字 如果 X 和 Y 是朋友 则 1 否则 如果 X 有一个朋友 而该朋友也是 Y 的朋友 则 2 否则 如果 X 是朋友 Y 的朋友的朋友 则
  • Angular 2 中的 XML 数据解析

    我是 angularjs2 和 typescript 的新手 在我的项目中 我有一个包含 xml 作为字符串的字符串变量 我需要处理该字符串并根据 XML 中的节点访问字符串中的数据 我在谷歌搜索方面遇到了困难 请帮助我出去
  • 在 Spring Boot 中动态更改 application.properties 值

    目前我正在 Spring Boot 中开发一个基于 REST 的项目 我已在 application properties 文件中添加了 api url i e 应用程序属性 api base url http localhost 8080
  • 等k子集算法

    有谁知道相等 k 子集算法的良好且有效的算法吗 最好是 c 或 c 可以处理 100 个元素向量 可能具有复杂性和时间估计 前任 9元向量 x 2 4 5 6 8 9 11 13 14 我需要生成所有 k 3 不相交子集 总和 24 该算法
  • toString() Java 中的泛型类型

    如何打印通用 java 类型的类型 反射 有什么技巧吗 public class Foo
  • OpenSSL 和 OpenMP 的多线程程序段错误

    我在 C 多线程程序中使用 OpenSSL 并遇到问题 所以我写了一个小程序来尝试缩小问题的范围 除了主函数之外的函数都是从https github com plenluno openssl blob master openssl cryp
  • Linux 下 C++ Promise.set_value 失败并出现未知错误

    我正在尝试让我的模拟在我们的高性能服务器上运行 它 不幸的是 使用 CentOS Linux 版本 7 7 1908 核心 而不是我正在开发程序的 Win10 随之而来的是大量错误 其中一个我无法修复 include
  • 如何使用 Python 写入 Excel 电子表格?

    我需要将程序中的一些数据写入 Excel 电子表格 我在网上搜索过 似乎有很多可用的软件包 xlwt XlsXcessive openpyxl 其他人建议写入 csv 文件 从未使用过 CSV 也不太明白它是什么 该程序非常简单 我有两个列
  • 使用 grep 提取 html 文件的标题

    cat 1 html grep
  • 如何在 Android Studio 中覆盖/更新 Jama Matrix 而不会出现任何错误?

    Jama 矩阵在我的代码中定义 矩阵计算类 如下 private Matrix A private Matrix B private Matrix C 矩阵A初始化如下 A new Matrix 2 2 A set 0 0 1 5 A se
  • 使用部分显式排序然后再进行另一个排序?

    我需要的是以自定义方式订购列表 我正在研究正确的方式并找到了 guava 的订购 api 但问题是我订购的列表并不总是相同的 我只需要2 个字段位于列表顶部 例如我有这个 List
  • 如何绕过 gitlab-runner 要求输入 sudo 命令的密码或 gitlab-runners 的默认密码是什么

    我是 gitlab runner 的新手 并尝试自动化我的项目 以便每当发布新标签时 它都应该构建一个新的 deb 包 PS 我用的是mac 下列的thisgitlab 的官方链接来完成我的任务 我的第一个 gitlab ci yml 文件
  • jquery连续运动动画

    连续运动 我想在上面的网站中重新创建卡车时刻 这是在 mootools 中完成的 我该如何编码 是否有 jQuery 插件可以做到这一点 因此 从屏幕的开始到结束为对象设置动画 然后重新开始 我该如何做这个 jQuery 任何帮助将不胜感激
  • Sparkr 将 DF 写入文件 csv/txt

    你好 我正在纱线模式下开发 SparkR 我需要将 Sparkr df 写入 csv txt 文件 我看到有write df但它会写入镶木地板文件 我尝试做这些事情 RdataFrame lt collect SparkRDF write
  • CircleCI 中的 Terraform 销毁失败

    我目前使用 CircleCI 作为我的 CI 工具来使用 Terraform 构建 AWS 基础设施 我的流程是 使用 Terraform 创建 AWS 实例 安装 Docker 并在其上运行 Nginx 镜像 破坏基础设施 我的 Circ
  • '' 的错误冲突类型是什么意思?

    我收到一条错误消息 错误 的类型冲突 这是什么意思 快速解决 确保您的函数在调用之前声明一次且仅声明一次 例如 更改 main myfun 3 4 double myfun double x return x To double myfun
  • 使用 netcat 和 grep 有条件地运行命令

    我需要 netcat 来侦听传入的 HTTP 请求 并且根据请求 我需要运行一个脚本 到目前为止我有这个 netcat lk 12345 grep Keep Alive 因此 每次 netcat 收到包含 keep alive 的包时 我都
  • 在 Delphi XE5/Android 平台上播放声音警报/蜂鸣声

    有没有办法在Delphi XE5 Android平台上播放声音警报 蜂鸣声 我想要实现的是如何使用 Beep 功能像 Windows 应用程序一样播放系统警报 蜂鸣声 或者至少找到系统音频文件的路径 以便我可以根据事件运行特定的音频文件 我
  • Kafka Connect-在写入接收器之前修改记录

    我已经使用 confluence 4 0 0 安装了 Kafka connect 使用 hdfs 连接器 我可以将从 Kafka 主题收到的 Avro 记录保存到配置单元 我想知道在写入hdfs接收器之前是否有任何方法可以修改记录 我的要求