datax 数据写入oracle报错缺失表达式_实战再次升级:流批一体处理百亿级别数据...

2023-11-05

需求背景

该篇内容基于之前写过的一篇<>,上一篇文章其实主要重点是结合logstash的实际应用。近期业务方提出了新的需求,增加了些业务逻辑,同时数据量也成倍增加,要求每日产出指标结果,这里再回顾下上篇的数据情况和技术方案同时对比下新调整后的数据量

调整前数据量 调整后数据量
生产端 每日20亿左右 每日500亿左右
输出端 减半,10亿左右 每日50亿+

考虑到投入产出比,该需求仍然采用来原来老的技术方案设计,只是做了些优化手段。具体使用到的技术:Java,Kafka,MLSQL,Logstash,Ruby,Hive,ES,SparkSQL,Datax「注意:这里均是实际的业务场景和实际的数据量,本文以分享为目的,如果读者有更好的方案,欢迎一起交流」

方案设计

「1.数据流向」3d6d635c70052797ad87d7648b400d48.png流程:
  1.业务方将数据推送至MQ,并将消息进行序列化处理
  2.通过流平台接入消费消息,并进行一部分逻辑处理,再次回转到MQ中
  3.使用logstash消费消息,编写ruby进行逻辑处理,将数据写入hdfs
  4.数仓对hdfs文件进行加载入表,进行建模处理
  5.最后将指标结果写入到业务方,以es存储
阅读过上篇文章的读者相信对这部分的流程比较清晰,其实这就属于数仓开发流程和分层处理。基本流程梳理完成后,如果直接按照该种模式每日处理如此大的数据量,必然会占用大量的资源,对其他的调度任务产生影响。接下来就要结合实际的需求进行详细的设计。「2.技术方案」ff886c82a24d3551709cb313b22127bb.png流程:
  1.业务侧通过protobuf序列化将数据推送至kafka
  2.通过MLSQL开发udf,消费kafka数据反序列化,并进行去重处理(「这里设置的是每3秒一个批次,这里做了一次批内去重」)并将数据再次写回kafka中
  3.logstash端消费kafka数据,并编写filter逻辑,这里涉及到外部接口调用的逻辑,将调用结果保存至系统环境变量中(「考虑数据接口更新频率和数据量,这里每小时调用一次,另存入环境变量其实就是一个缓存,之所以没有存入文件,是因为logstash每次处理event都要对文件进行操作,效率较低」),然后进行获取,最后otut端将数据写入hdfs中
  4.数据写入hdfs后,加载到hive表中,进行建模开发
  5.最后使用datax将指标统计结果写入到es中供业务方使用「3.问题引入」
虽然上述的方案能够实现实时消息上百亿的数据量,但未考虑到数仓侧的计算压力,目前生产上每日跑批任务有6000+个任务,如果夜间去计算50亿+的数据量,将会把所有的资源全部占用,导致其他任务一直阻塞,最后会影响重要任务产出(「别问我怎么知道的,因为这是踩过的坑」)。所以接下来需要对该部分任务进行优化,尽量做到占用最小的资源以最快的时间产出。

性能优化

「1.消费优化,其目标:即做到消费不延迟」
  消费端的优化仍然沿用上篇的优化手段,唯一不同的地方在于logstash端涉及到的缓存问题,刚才上面也提及过需要调用外部接口来过滤一部分数据,但接口不可能是一直调用的,否则会对接口造成压力,结合接口更新和数据特性,这里每小时调用一次即可,那么问题来了,调用的结果存储到哪里呢?首先不能存储到外部系统,一方面增加了强依赖,另一方面对外部系统也会有压力负载,因此需要落入本地,之前采用过落入文件的方式,然后ruby中对文件进行操作,但是效果不佳(「这里涉及到频繁的文件打开关闭和同步问题」),后采用环境变量的方式进行存储,每次处理event的时候读取环境变量字段即可。具体使用方式如下:

if not ENV["app_list"].nil?
   event["arrays"] = ENV["app_list"].split(" ")
   event.cancel if event["arrays"].include? event["service_name"]
   
   # 这里的ENV["app_list"]就是读取的环境变量app_list字段
end

「2.批处理优化,其目标:占用最少的资源花最小的时间产出」
1.mapper和reduce数量调整
  对于批处理的优化无非是对mapreduce的优化,由于logstash写入的hdfs文件是可切分的,所以产生的mapper数跟块的个数有关系,但由于mapred.min.split.size参数在服务端已经固定配置了(固定256M),如果大于该值则会直接报错。因此如果文件越大,那么切分的mapper数也就越多,申请的资源也就越多,所以对于占用小量资源的优化,从源头上就不可行。
2.存储优化
  由于logstash输出的格式是textfile的,如果直接对该种格式进行处理,将会占用大量的网络资源,因此需要进行格式转换和压缩存储。这里优化的手段是采用orc存储,snappy格式压缩(「相对于lz4压缩,snappy是属于不可切分的,那么这也对mapper数量进行了控制」)。

create table if not exists tableA(
  id string,
  field1 string,
  field2 string
)
partitioned by (date_id string,hour string)
stored as orc
tblproperties ("orc.compress" = "SNAPPY");

3.改用执行引擎
基于第一点的优化思路,受限于源头的文件切分和可调参数导致无法控制资源,因此这里需要借用于分而治之的思想,即将每日的处理调整为每小时处理(「这里只是过滤对于该次需求无用的数据,不做每小时聚合的操作,即业务场景需要对全天的数据进行聚合处理」),为了尽可能使用较少时间执行,需要将小时处理的任务执行引擎由hive调整为sparksql执行,同时调整了以下几个参数

--开启动态分区
set spark.sql.auto.repartition=true;
set spark.shuffle.service.enabled=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;
--开启sparksql自适应,即不采用固定的最小分区,避免产生小文件
set spark.sql.adaptive.enabled=true;
set spark.sql.autoBroadcastJoinThreshold=209715200;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=1024000000;

效果产出

基于以上的几种优化手段,目前已经解决了前面提到的占用资源过多和产出时间过长的问题。

资源使用(队列最大资源使用量) 产出时间
优化前 1200Cores+4293189M+Fair Scheduler 14803秒
优化后 100Cores+92752M+Fair Scheduler 3470秒
7575f5b9e49ea88e71fdae8d7668dbb6.png
yarn队列资源配置
3e3cc6a16760056d39efb59bd9823414.png
调度任务执行情况
78fbe31cbf5d19c0d791fe5f512f72f6.png
最终要聚合统计的数据量

待解决问题

虽然已经解决掉了占用资源多,产出时间长的问题,但是稳定性也是亟需要解决的问题。由于logstash消费端部署的机器配置各有差异,所以在写入hdfs的时候及其不稳定,也容易导致延迟产生。后续消费这块逻辑可能会迁移至flink来改造实现,目前这块仍在集成开发阶段中,待完善后会再次分享给读者们。另如果读者们如果有更好的解决方案,欢迎一起沟通讨论

往期推荐

Flink从入门到放弃之-入门篇(一)

教你如何使用正确姿势关闭SparkStreaming

数据开发必经之路-数据倾斜

元数据管理-技术元数据解决方案

2020年大厂面试题-数据仓库篇

一万字完整总结Flume

SparkStreaming完整学习教程

数据同步神器-Datax源码重构

zookeeper源码解读之-源码编译

zookeeper源码解读之-服务端启动流程

zookeeper源码解读之-DataTree模型构建+Leader选举

实战:如何实时采集上亿级别数据?

Spark数据倾斜之骚操作解决方案

f0c0d1f2ca1ead62fafe603b8a61260a.png

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

datax 数据写入oracle报错缺失表达式_实战再次升级:流批一体处理百亿级别数据... 的相关文章

  • 时间的计算方法 (根据两个时间段判断是否为年,月)

    1 根据两个时间段判断是否为一年 格式 20230206 20240205 校验年 格式 20230206 20240205 public Boolean checkYear Date date1 Date date2 SimpleDate
  • Go使用Redis 发布和订阅消息

    发布消息 在Go中 Redis客户端库可以提供一个Publish方法来实现消息的发布 不同的Redis客户端库可能有不同的API和方法命名 此处以 v8 为例 v8 版本以下不需要context 下面是一个示例使用go redis库进行Pu
  • 中国佛学66句震撼世界的禅语

    author skate time 2010 06 10 中国佛学66句震撼世界的禅语 1 人之所以痛苦 在于追求错误的东西 2 如果你不给自己烦恼 别人也永远不可能给你烦恼 因为你自己的内心 你放不下 3 你永远要感谢给你逆境的众生 4
  • VM虚拟机桥接模式无法联网解决办法

    1 背景介绍 桥接模式 使虚拟机客户机可以和主机在同一网段 这样 和主机同局域网内的其他主机就也可以ping到虚拟机了 因此 虚拟机设置为桥接模式 且设为静态IP 这样以后就可以方便的使用虚拟机了 2 问题描述 桥接模式之前是好用的 但是主
  • 大话linux运用层访问到硬件驱动层过程

    维哥简述 面试就这么答就OK 今天面试居然忘记了 老司机也翻车 唉 概论流程图 如下图所示 1 当运用程序调用open dev path mode 时 2 虚拟文件系统 vfs 的sys open 函数就会被调用 该函数会在dev path
  • SQLServer中使用加密函数,解密函数

    加密函数 加密 password 要加密的字段 Salt 盐值 pwd 加密后的数据 Create FUNCTION dbo EncryptByPassPhrasePwd password nvarchar 200 Salt nvarcha
  • Unity5.4 Assetbundles官方说明七(在AssetBundle中存储和加载二进制数据)

    转 https blog csdn net u010377179 article details 52922717 第一步是用 bytes 的扩展名保存二进制文件 Unity将把这个文件作为一个TextAsset文本资源 这样就可以打包成A
  • Linux中chown与chmod两个命令的区别详解

    今天小编就为大家分享一篇关于Linux中chown与chmod两个命令的区别详解 小编觉得内容挺不错的 现在分享给大家 具有很好的参考价值 需要的朋友一起跟随小编来看看吧 在linux系统中 chmod和chown命令都可以来设置权限 但他
  • vue项目Error:Cannot find module ‘xxx’ 类报错的解决方法

    现发现只要是报错Error Cannot find module xxx 例如 Error Cannot find module webpack 这类的问题都可以用下面的方法解决 报错内容如下 运行cnpm install没问题 运行cnp
  • 高通平台 Display 杂记

    一 代码位置 User space SurfaceTexture frameworks native libs gui SurfaceFlinger frameworks native services surfaceflinger Gra
  • 【python之argparse模块学习】简单入门

    目录 0 前言 1 入门程序 2 参数 2 1 位置参数 2 2 可选参数 2 3 矛盾选项 3 总结 4 参考文献 0 前言 该模块地位 Python 标准库中推荐的命令行解析模块 类比linux命令ls来理解该模块功能 与该命令类似的
  • 杂音 & pop 音的解决方法

    杂音 pop 音的解决方法 1 喇叭有严重的 吱吱 破音 绝大多数的原因有可能在于V out 电压不稳定 所以最好测一下无负载时的输出电压 同时也可以测量 VCC 即boost 的输出 输入电压 正常的VCC 可以通过客户的 反馈电阻和 V
  • 解决Error:Kotlin: Module was compiled with an incompatible version of Kotlin. The binary version of ..

    文章目录 1 问题 2 分析问题 3 升级kotlin插件版本 3 1 升级方法1 3 1 升级方法2 4 其他问题 4 1 方法1中的Cannot download Read timed out问题 4 2 方法2中的Plugin Kot
  • 23种常用设计模式(C++)

    Part One Methods for constrcting a new object 1 Factory method 我们把简单工厂方法归类到工厂方法中 工厂方法的目的是用来解决具有同一接口 基类 派生类对象的生成问题 尽管可以通过
  • vray渲染白屏卡死_3DMAX使用VRay渲染的时候,在building embree static这一步最后一点卡死...

    DMAX使用vray渲染的时候 在building embree static这一步最后一点卡死是参数设置错误 解决方法如下 1 在我们为场景打好vray灯光以后 要设置vray渲染的参数 按 F10 打开vray渲染编辑器 在 公用 下栏
  • react-Suspense工作原理分析

    Suspense 基本应用 Suspense 目前在 react 中一般配合 lazy 使用 当有一些组件需要动态加载 例如各种插件 时可以利用 lazy 方法来完成 其中 lazy 接受类型为 Promise lt gt default
  • 大数据从入门到精通文章体系

    大数据知识可谓是多而杂 大数据相关的组件更是数不胜数 但是我们每一次的感受到学习的累 就足以证明我们在认真的学习 每感到到一次累的同时 就应该感受到一次进步 所以不要让自己停下来 各位小伙伴冲冲冲 大数据系列资源链接 名称 链接 提取码 H
  • PS笔记2

    第01堂课 出识Ps 图像处理软件 学习ps要做到三点 了解基本概念 掌握操作规律 开发扩展思维 第02堂课 软件安装 百度搜索ps 下载 会发现两种下载情况 第一种不需要安装 解压后在文件里直接找到PS图标 打开就可以了 第二种安装包 安
  • 织梦网站调用变量失败_织梦dedecms无法调用新添加变量的解决办法

    织梦dedecms无法调用新添加变量 在项目中使用了几次织梦cms程序 感觉越来越好用 以前刚接触dedecms时一看后台界面 如此之乱 使我心乱如麻 不知从何下手 后来因为工作逐渐就熟悉了它的后台 特别是一些客户的特殊要求 靠dede自带

随机推荐

  • Ubuntu配置国内源

    Ubuntu配置国内源 Ubuntu源 使用配置生成器 安装依赖包 打开apt源配置文件 国内apt源 中科大源 阿里源 Ubuntu源 每个 Ubuntu 版本都有自己的一组四个官方存储库 Main Canonical 支持的自由开源软件
  • Qt4_写FTP客户端

    写FTP客户端 QFtp类在Qt中实现了FTP协议的客户端程序 它提供了非常多的函数来执行多数常见的FTP操作 同时还可以执行任意的FTP指令 QFtp类是异步工作的 若调用一个像get 或者put 这样的函数 它会立即返回并且仅在控制权回
  • 创建 VirtualBoxClient COM对象失败

    错误问题描述 安装VirtualBox程序 鼠标右键点击VirtualBox属性 在兼容性选项中 勾选兼容性模式运行这个程序 下拉选择 Windows Server 2008 Service Pack 1 勾选以管理员身份运行此程序 3 点
  • [django项目] 用户注册功能 之 注册用户到数据库

    VIIII 注册功能 谋定而后动 先做分析在写代码 1 gt 业务流程分析 对参数进行校验 判断用户名是否为空 是否已注册 判断密码是否为空 是否一致 格式是否正确 判断手机号码是否为空 格式是否正确 判断短信验证码是否为空 格式是否正确
  • 创米云入驻集简云平台,实现无代码集成数百款应用

    PART 1 创米云介绍 创米云科技 是一家专注于小程序开发 同时也是 微信 阿里本地生活客如云 支付宝 的优质IT技术服务商 国内领先自主研发的小程序开发工具 制作过程无需代码 拖拽可视化组件即可完成 拥有海量小程序行业模板 帮助千万商户
  • python 饼图、直方图、散点图和盒图基本绘制

    文章目录 python 饼图 直方图 散点图和盒图基本绘制 饼图 直方图 使用pygal 使用pyplot 散点图 例1 例2 例3 盒图 python 饼图 直方图 散点图和盒图基本绘制 饼图 import matplotlib pypl
  • 将STM32工程下载到GD32中

    在已经写好stm32f103工程的情况下 将stm的工程直接下载到GD32的开发板中 1 导入GD32的pack包 选择使用的芯片型号 并将flash download换为GD32的 2 修改工程参考 STM32移植到GD32 3 如果代码
  • JAVA static修饰符

    static修饰的属于类本身 没有static修饰的属于实例 创建类时 static修饰的成员存放在堆内存中的permanent代 permanent代存放类的信息 如类中的static的成员 class a static int n 4
  • JDBC连接数据库工具类

    最近有人问加哥怎么连接数据库 和数据库建立连接 下面给大家分享一下JDBC连接数据库的模版方法 加哥是以mysql为例 若是其他数据库替换层其他的即可 步骤 1 引入数据库架包 2 加载驱动 3 创建连接 4 进行相应数据库操作 5 关闭数
  • fastreport在delphi中的使用

    文章目录 前言 一 发布后的设计 二 小计 三 分组 1 分组时 MasterData选择保持一致性 2 分组设置 四 页码 五 透视表 前言 记录fastreport在delphi中的使用方法 一 发布后的设计 程序发布后 需要在客户那边
  • 链表的定义和基本操作

    文章目录 单链表 定义 插入操作 删除操作 查找操作 单链表的建立 双链表 初始化 插入 删除 遍历 循环链表 初始化 单链表 定义 要表示一个单链表时 只需要声明一个头指针L 指向单链表的第一个节点 LNode L 或者 LinkList
  • C++基础——this指针

    目录 一 this指针 1 this指针定义 2 this指针的特性 3 this指针存在的位置 一 this指针 例 class Date public void Print int year int month int day cout
  • WDK李宏毅学习笔记重点知识复习总结

    李宏毅学习笔记复习总结 文章目录 李宏毅学习笔记复习总结 1 Regression 回归 和 Classification 分类 1 1 是什么 1 2 怎么做 2 Deep Learning 2 1 是什么 2 2 怎么做 3 CNN 卷
  • 什么是dns流量?如何监控dns流量

    DNS是重要的基础设施 用于域名服务 在负载均衡 移动IP等方面也有着重要的应用 DNS流量激增对互联网的正常运作的影响 并提出了恶意DNS流量攻击 蜂窝效应概念 什么是DNS流量 监控它的方法有哪些 一起来看看吧 什么是DNS流量 dns
  • 基于互联网的信号传输系统

    参加完电子设计竞赛不知不觉快过去两个月了 今天小刚写一下当时我们这道题的方案 本系统基于互联网的信号传输系统采用STM32F407ZGT6和FPGA Cyclone IV作为主控芯片 对信号进行采样和处理 系统由幅度测量电路 频率测量电路
  • Flask框架之视图高级技巧

    文章目录 4 1 app route与app url rule简介 4 1 1 app route的使用 4 1 2 add url rule的使用 4 2 Flask类视图 4 2 1 标准类视图 4 2 2 基于方法的类视图 4 3 F
  • win10c语言文件不运行,教你解决win10系统无法打开EXE文件的问题

    win10电脑上的程序一般都是exe格式的 鼠标双击一下即可运行 最近 有Win10系统的用户抱怨说竟然打不开Exe的文件 双击好多次了 甚至关机重启还是一样打不开Exe的文件 真是让人感到疑惑 其实 遇到此故障问题不必慌张 大家可以尝试按
  • 拜耳再投4亿元提升在华处方药产能;阿斯利康进博会公布新冠疫苗最新进展

    进博会看点 拜耳围绕 共享健康 消除饥饿 的全新企业愿景 携一系列亮点展品和精彩活动亮相进博会 进博会上 拜耳与北京经济技术开发区签署合作意向书 拜耳今年在公共卫生防疫专区设置独立的展台 并呈现覆盖预防 诊断 缓解及治疗于一体的医疗解决方案
  • vue3+ts 使用qrcode(解决了找不到qrcode类型声明文件问题)

    1 安装 全局安装 npm install g qrcode 安装类型声明 npm i save dev types qrcode 2 在所需处引用 import QRcode from qrcode 3 配合canvas 生成二维码
  • datax 数据写入oracle报错缺失表达式_实战再次升级:流批一体处理百亿级别数据...

    需求背景 该篇内容基于之前写过的一篇 lt gt 上一篇文章其实主要重点是结合logstash的实际应用 近期业务方提出了新的需求 增加了些业务逻辑 同时数据量也成倍增加 要求每日产出指标结果 这里再回顾下上篇的数据情况和技术方案同时对比下