FlinkCDC-自定义序列化器

2023-11-13

package com.lcy.app.customer;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomerBinlogDeserialization implements DebeziumDeserializationSchema<String> {
    /**
     * BINLOG格式:SourceRecord{
     *      sourcePartition={server=mysql_binlog_source},
     *      sourceOffset={file=mysql-bin.000003, pos=154, row=1, snapshot=true}}
     *      ConnectRecord{topic='mysql_binlog_source.gmall-210325-flink.user_info', kafkaPartition=null, key=Struct{id=3982},
     *      keySchema=Schema{mysql_binlog_source.gmall_210325_flink.user_info.Key:STRUCT},
     *      value=Struct{after=Struct{id=3982,login_name=g959roj95n,nick_name=冠策,name=司空冠策,phone_num=13339656498,email=g959roj95n@msn.com,user_level=2,birthday=1982-12-04,gender=M,create_time=2020-12-04 23:28:45},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=gmall-210325-flink,table=user_info,server_id=0,file=mysql-bin.000003,pos=154,row=0},op=c,ts_ms=1639523578556},
     *      valueSchema=Schema{mysql_binlog_source.gmall_210325_flink.user_info.Envelope:STRUCT},
     *      timestamp=null,
     *      headers=ConnectHeaders(headers=)
     * }
     * @param sourceRecord
     * @param collector
     * @throws Exception
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        String topic = sourceRecord.topic();
        String[] tableInfos = topic.split("\\.");
        String tableName = tableInfos[2];
        String dbName = tableInfos[1];

        Struct value = (Struct)sourceRecord.value();
        Struct before = value.getStruct("before");
        List<Field> fields = null;
        JSONObject beforeJson = new JSONObject();
        if (before == null) {
            fields = before.schema().fields();
            fields.forEach(field -> {
                Object v = before.get(field.name());
                beforeJson.put(field.name(), v);
            });
        }

        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            fields = after.schema().fields();
            fields.forEach(field -> {
                Object v = after.get(field.name());
                afterJson.put(field.name(), v);
            });
        }

        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }

        JSONObject result = new JSONObject();
        result.put("dbName",dbName);
        result.put("tableName",tableName);
        result.put("type",type);
        result.put("before",beforeJson);
        result.put("after",afterJson);

        collector.collect(result.toJSONString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

FlinkCDC-自定义序列化器 的相关文章

  • apache_request_headers() 与 $_SERVER

    据我所知 apache request headers 提供与以下相同的信息 SERVER 但按键略有不同 为什么有人应该使用apache request headers 而不仅仅是从那里获取这些信息 SERVER 我在 Centos 上使
  • 将古吉拉特语文本插入 MySQL 表会产生垃圾字符和不可读的文本

    我有三个 MySQL 表 我正在向其中插入古吉拉特语内容 当我插入两个表时 它们插入得很好并且可读 但在一个表中 它显示垃圾字符 不可读的文本 我怎样才能解决这个问题 MySQL 有每个表的字符集设置 http dev mysql com
  • Laravel leftJoin 仅右表的最后一条记录

    我是 Laravel 的新手 我有两张桌子 1 产品 2 价格 products id product int p key name varchar prices id price int p key id product int
  • MySQL Connector C/C API - 使用特殊字符进行查询

    我是一个 C 程序 我有一个接受域名参数的函数 void db domains query char name 使用 mysql query 我测试数据库中是否存在域名 如果不是这种情况 我插入新域名 char query 400 spri
  • 比特纳米。重置mysql根密码

    我如何重置 MySQL 中的 root 密码和帐户 因为我按照如何为其他服务器授予权限的说明操作 并且意外地将 root 用户 Mysql 绑定到其他 IP 地址 现在看来我无法在 localhost 上以管理员身份登录 Thanks 您有
  • 如何使用 Perl 更改 mysql 密码

    我需要使用 Perl 脚本更改一些 mysql 密码 以下内容在更改数据库条目时有效 但是当我针对 mysql 用户更改修改它时 它将它们重置为空白密码 最后 刷新权限 也很好 但我还没有找到方法 usr bin perl use DBI
  • 在 PHP 字符串中格式化 MySQL 代码

    是否有任何程序 IDE 可以在 PHP 字符串中格式化 MySQL 代码 例如 我使用 PHPStorm IDE 但它无法做到这一点 它对 PHP 和 MYSQL 执行此操作 但不适用于 php 字符串内的 MYSQL 我已准备好使用新的
  • 通过互联网IP地址从一台计算机访问xampp到另一台计算机

    我试图从另一台计算机访问我的 xampp 它显示为禁止错误 然后我在 google 上搜索答案 因为他们告诉在 apache 文件夹中的 httpd conf 文件中更改一些设置 如下所示 Order Deny Allow Deny fro
  • 如何从shell脚本自动登录MySQL?

    我有一个 MySQL 服务器 其中有一个用户和密码 我想在 shell 脚本中执行一些 SQL 查询而不指定密码 如下所示 config sh MYSQL ROOT root MYSQL PASS password mysql sh sou
  • 一次从多个表中删除行

    我正在尝试将 2 个查询合并为一个这样的查询 result db gt query DELETE FROM menu WHERE name new or die db gt error result db gt query DELETE F
  • 在 MySQL 中存储表情符号的编码问题:如何使用 Prisma ORM 在 NodeJS 中定义字符排序规则?

    亲爱的 Nodejs 专家和数据库专家 我们在 MySQL 数据库中存储表情符号和其他特殊字符时遇到问题 我们使用 Prisma 得到一个错误 这是我们使用的 ORM 参数无法从排序规则 utf8 general ci 转换为 utf8mb
  • Mysql 时间匹配连接

    我有两个表cpuinfo和jobinfo 我想使用这两种数据创建报告 tabes CREATE TABLE cpuinfo id int 11 NOT NULL AUTO INCREMENT usagetime datetime DEFAU
  • 无法分配内存:fork:无法创建新进程?

    我们的托管在aws 最近 我们的博客从wordpress to aws 我们遇到服务器响应时间明显延迟的情况 主要是在访问博客时 以下是来自error log file Wed Feb 25 06 10 10 2015 error 12 C
  • 如何让 mod_perl 在更改时重新加载源文件?

    我正在开发一个带有 mod 的应用程序 perl 并在每次更改代码时重新启动服务器是一个巨大的阻力 我还是想用mod perl 用于开发 因为我计划将其用于实时服务器 我在文档中没有看到有关如何执行此操作的任何内容 想法 我认为 Apach
  • covertJSONtoSQL 在 NiFi 中返回空值

    我正在设计一项工作 使用以下命令将数据从 MySQL 中的数据库转移到另一个数据库 MySQL 执行SQL处理器随后将Avro转换为Json then 将Json转换为SQL then PutSQL如下流程图所示 将JSON转换为SQL返回
  • 在 MySQL 中对整数字段运行带引号的数字(字符串)查询时会发生哪些复杂情况

    在 SQL 中 不应引用整数 因为如果引用 它将是一个字符串 但我很好奇如果我这样做会出现什么问题 并发症 例如 SELECT FROM table WHERE id 1 正确的 vs SELECT FROM table WHERE id
  • CNAME 速度慢吗?

    我将 CNAME 与 S3 CloudFront 一起使用来提供一些静态文件 例如 js css 图像等 我这样做是为了使存储桶的 URL 更漂亮 因为我认为最好将所有内容都定位到我的网站 以防万一将来我想移动这些文件 更改应该是透明的 今
  • mod_rewrite 将 example.com/page.php?v1=abc&v2=def 重写为 example.com/abc/def

    Using Apache s htaccess文件 我正在尝试rewrite网址http example com page php v1 abc v2 def to http example com abc def 到目前为止我有 Opti
  • db:schema:load 与 db:migrate 使用 capistrano

    我有一个 Rails 应用程序 我正在将其移动到另一台服务器 我认为我应该使用 db schema load 来创建 mysql 数据库 因为这是推荐的 我的问题是我正在使用 capistrano 进行部署 并且它似乎默认为 rake db
  • Google Cloud SQL 在重新启动时卡住

    我的云 sql 实例长时间处于重新启动状态 在操作窗格中 重新启动的状态显示为待处理 并且还发生了导出 其状态仍为Running 有没有办法可以强制重新启动或取消重新启动或从常规备份中恢复数据 不 没有办法 如果您向 Google 支付高级

随机推荐

  • 【Spark NLP】第 6 章:信息检索

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • 忘记文档密码,教你破解WORD/EXECL/PPT文件加密密码

    大家办公时 有设置密码习惯 并且容易忘记密码 今天给大家提供一款超好用得小工具 不定时更新软件 高效率工具小福利 软件 Advanced OfficePassword Recovery 今天给大家带来一款破解Excel密码的神器 涉及到重要
  • VLC解码播放H264文件

    转自 http www cnblogs com ImageVision p 4744391 html utm source tuicool utm medium referral 昨天收到几个文件名是 xxx 264的文件 这种文件属于视频
  • web开发技术总结

    web开发可以理解为动态网站的开发 以java语言为例 就是基于java动态网站的开发 前台框架 jQuery Mvc框架 Struts spring Mvc 核心框架 Spring orm框架 Hibernate Spring JDBC
  • 复数乘法是什么?

    逛木虫的时候看到一个很旧的数学帖子被人挖了坟 这个帖子大概是讨论如果把复数看作是向量 那么复数乘法应该怎么看待 向量之间有乘法 例如复数 1 i 和复数 i 其对应的向量分别是 left begin array 20 c 1 1 end a
  • git没有冲突 但是提示有_git 处理冲突步骤

    背景 工程中有一块功能是在别的远程分支上的 然后自己的分支也是一直在更新的 现在要将该分支上的信功能合到自己的分支上 于是采用了git cherry pick的方法 但是出现了报错 查了许多网上的资料最后总结出处理冲突的步骤 具体实现 输入
  • [实习]Skywalking

    SkyWalking 1 是什么 skywalking是一个包含监控 追踪 并拥有故障诊断能力的分布式系统 它主要的作用是全链路监控 收集数据 分析处理数据 然后可视化呈现 这么说有点抽象 接下来画图来说 这是skywalking的架构 它
  • VscodeSSH免密远程登录服务器

    1 windows下cmd或git bash 或powershell等输入 ssh keygen 指令输入后一直回车 在C Users user name ssh路径下生成如下文件 2 linux服务器Terminal输入 ssh keyg
  • 【SVN命令】之 revert

    名称 子命令Svn revert 取消所有的本地编辑 概要 子命令Svn revert PATH 描述 Reverts any local changes to a file or directory and resolves any co
  • nodejs HelloWorld

    nodejs 服务器端 HelloWorld 程序 a hello js d02 hollo js var http require http http createServer function request response 请求对象
  • C++&Qt 各种数据类型转换

    1 uint64转QString QString strfilerename QString 1 arg nFileID nFileID为uint64类型 QString number nFileID 2 QString转超长数字串 QSt
  • 计算机图形学GAMES101(三)变换(模型、视图、投影)

    补充内容 R 是逆时针方向旋转的矩阵 R 是顺时针方向旋转的矩阵 可以发现R T R 1 像这样的矩阵叫做正交矩阵 以后如果要求往相反的方向旋转相同角度的变换 R 只需要求正向旋转的矩阵然后转置就可以了 本节涉及内容 仿射变换 线性变换 平
  • LeetCode-Python-389. 找不同

    给定两个字符串 s 和 t 它们只包含小写字母 字符串 t 由字符串 s 随机重排 然后在随机位置添加一个字母 请找出在 t 中被添加的字母 示例 输入 s abcd t abcde 输出 e 解释 e 是那个被添加的字母 第一种思路 转成
  • Java笔记:泛型、限定通配符与非限定通配符

    目录 1 泛型 2 限定通配符与非限定通配符 2 1 限定通配符 2 2 非限定通配符 3 PECS Producer Extends Consumer Super 原则 3 1 Producer Extends 3 2 Consumer
  • jar文件怎么打开 查看jar文件内容操作方法

    jar文件怎么打开 查看jar文件内容操作方法 jar文件是java项目生成的一个小的文件项目 也可以描述为一个java压缩包 里面封装了 许多java类以及方法 变量 很多用户想要查看jar文件内容 可是却不知道jar文件怎么打开 下面小
  • TorchServe环境构建+模型更新+新模型注册

    目录 1 背景 2 torchserve环境搭建 2 1jdk环境搭建 2 2 python 环境搭建 2 3 启动服务 2 3 1 注册模型 2 3 2 模型查看 2 3 3 接口调用 3 进阶功能 3 1 模型多版本管理 3 2 新模型
  • NLP神器Gensim库(一):入门操作

    Gensim是一款开源的第三方Python工具包 用于从原始的非结构化的文本中 无监督地学习到文本隐层的主题向量表达 它支持包括TF IDF LSA LDA 和word2vec在内的多种主题模型算法 支持流式训练 并提供了诸如相似度计算 信
  • 【值得收藏的种子搜索引擎】

    种子搜索引擎和磁力搜索引擎是用于搜索和下载种子文件和磁力链接的工具 本文将介绍五个值得收藏的子搜索引擎和磁力搜索引擎 并提供两个示例说明 BT Kitty BT Kitty是一个功能强大的子搜索引 可以搜索各种类型的种子文件和磁力链接 它的
  • nextjs开发 + vercel 部署 ssr ssg

    前言 最近想实践下ssr 就打算用nextjs 做一个人博客 vercel 部署 提供免费域名 来学习实践下ssr ssg nextjs 一个轻量级的react服务端渲染框架 vercel 由 Next js 的创建者制作 支持nextjs
  • FlinkCDC-自定义序列化器

    package com lcy app customer import com alibaba fastjson JSONObject import com alibaba ververica cdc debezium DebeziumDe