Spark Job写文件个数的控制以及小文件合并的一个优化

2023-11-18

背景说明

在大数据领域,平台小文件治理一直是一个非常重要的问题。我司大佬在Spark平台里,在向目标表中增加一个Shuffle,然后在Reduce端合并数据,以实现将小文件合并成大文件,来减少平台中的小文件。
我司还对单个任务写HDFS文件个数做了限制,同时限制了单个Task 和 单次Job 可写的HDFS个数限制。

通过引入额外Shuffle对写入数据进行合并

最终实现效果如下

== Optimized Logical Plan ==
CreateDataSourceTableAsSelectCommand `P_WAKUN_T`.`user_part_2`, ErrorIfExists, [id, name, dt]
+- Relation p_wakun_t.user_part[id#18274940,name#18274941,dt#18274942] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- Execute CreateDataSourceTableAsSelectCommand `P_WAKUN_T`.`user_part_2`, ErrorIfExists, [id, name, dt]
   +- CustomShuffleReader coalesced
      +- ShuffleQueryStage 0
         +- Exchange RoundRobinPartitioning(10000), REPARTITION_BY_NONE, [id=#30214771]
            +- *(1) ColumnarToRow
               +- FileScan parquet p_wakun_t.user_part[id#18274940,name#18274941,dt#18274942] Batched: true, DataFilters: [], Format: Parquet, Location: xx

EnsureRepartitionForWriting Rule

EnsureRepartitionForWriting 中对 DataWritingCommandExec 进行数据写入之前,增加一个Shuffle。
当然 Rule 中还要考虑Partition table, Bucket table 的Shuffle 方式,不能把数据给搞混了。

CoalesceShufflePartitions Rule

CoalesceShufflePartitions Rule 会根据Shuffle结果,coalesce 数据到合适的 Partition 个数。

OptimizeShuffleWithLocalRead Rule

Local Shuffle Read 是Spark新增的对Spark Shuffle 过程进行优化的Rule,当Shuffle required distribution 不需要按照Hash分布的约束,以及满足其他的一些条件时,Reduce 端修改为连续读某一个Map 的Shuffle Output,这样会有更好的数据本地性,Shuffle 性能也会有提升。
这个Rule 之前叫 OptimizeLocalShuffleReader Rule。
其他应用条件:

  • 如果是 DataWritingCommandExec, 只能优化它的Child 节点
  • 如果是 Shuffle Query Stage, Shuffle 类型只能是 ENSURE_REQUIREMENTSREPARTITION_BY_NONE

分布式数据写控制

在 Hadoop 的 MapReduce 中,通过 FileOutputCommitter 控制分布式数据写Job setup,Task commit, 以及Job commit.
FileOutputCommitter 的 v1 算法对task 输出做两次rename 控制, v2算法对task输出做一次rename控制。

在Spark中有一套新的 FileCommitProtocol, 组合使用了 Hadoop 的 FileOutputCommitter 来控制Job 的写过程。上面要实现的控制单 Task 和 Job 输出文件个数的实现也就是在这里实现的。
通过下面的时序图可以看到,Task端可以通过创建新的文件 newTaskTempFile() 时check task file number; SparkContext.runJob() 方法有一个参数 resultHandler 用于处理Task 执行完成后 result 的回调。写数据的Task 最终返回的结果就是 WriteTaskResult (内部包含写的文件个数),在 resultHandler 中对所有Tasks 的写文件个数进行累加。当超过 maxCreatedFilesInDynamicPartition 报错。

在这里插入图片描述

FileFormatWriter SparkContext Task SQLHadoopMapReduceCommitProtocol OutputCommitter DynamicPartitionDataWriter createCommitter() setupJob(jobContext) setupCommitter() setupJob() runJob() executeTask() setupTask() setupTask() Execute Task write() newTaskTempFile() create new file && fileCounter += 1 loop [call DynamicPartitionDataWriter to write data] commitTask() commitTask() abortTask() abortTask() alt [task success] [task fail] WriteTaskResult onTaskCommit() check file numbers commitJob() commitJob() move all stage files to final directory abort() abortJob() alt [job success] [job fail] FileFormatWriter SparkContext Task SQLHadoopMapReduceCommitProtocol OutputCommitter DynamicPartitionDataWriter
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Job写文件个数的控制以及小文件合并的一个优化 的相关文章

  • HDFS的block和切片(split)的联系和区别

    lt 1 gt 联系 HDFS的block和切片 split 的大小相等 lt 2 gt 区别 1 HDFS存储数据在数据节点上 block是数据节点储存数据的一个个单位 2 split是把block切分而成的虚拟定义 3 split是Ma
  • Flink checkPoint和SavePoint

    savepoint和checkpoint都是flink为容错提供的强大功能特性 能够自动或手动保存job的运行状态 两者区别 checkpoint 应用定时触发 用户保存状态 会过期 内部应用失败重启的时候启用 但是手动cancel时 会删
  • 电商零售行业1--用户消费行为分析

    用户消费行为分析 项目背景 数据处理 导入数据 数据探索可视化 数据预处理 用户整体消费分析 用户个体消费分析 用户消费金额 消费次数 描述性统计 用户消费分布图 用户累计消费金额占比分析 贡献度 用户消费行为 首购时间 最后一次购买时间
  • 记一次Spark打包错误:object java.lang.Object in compiler mirror

    使用maven compile和package 一直报错scala reflect internal MissingRequirementError object scala runtime in compiler mirror not f
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-3)

    文章目录 1 3 Scala的数据结构 1 3 1 数组 数组的遍历 数组转换 1 3 2 元组 创建元组 获取元组中的值 拉链操作 1 3 3 集合 List Set Map 1 3 Scala的数据结构 对于每一门编程语言来说 数组 A
  • 大数据入门 - 基础概念

    文章目录 1 发展历史 2 分布式系统 可靠性 可扩展性 可维护性 4 单机引擎的问题 事务 写入和存储 数据的序列化 3 GFS Google File System master 的快速恢复性和可用性保障 数据写入的优化 4 MapRe
  • 全球及中国冷链物流产业需求前景与投资竞争力研究报告2022版

    全球及中国冷链物流产业需求前景与投资竞争力研究报告2022版 HS HS HS HS HS HS HS HS HS HS HS HS 修订日期 2021年11月 搜索鸿晟信合研究院查看官网更多内容 第一章 冷链物流相关概述 1 1 冷链物流
  • 大数据三道习题

    Lambda 架构设计图 Lambda体系架构的优点 鲁棒性和容错能力 由于批处理层被设计为追加式 即包含了自开始以来的整体数据集 因此该系统具有一定的容错能力 如果任何数据被损坏 该架构则可以删除从损坏点以来的所有数据 并替换为正确的数据
  • 【精】彻底吃透HDFS写流程(5)-- DataStreamer线程类run方法分析以及如何构建pipeline?

    有关HDFS写流程的系列文章 精 彻底吃透HDFS写流程 1 BlockConstructionStage 精 彻底吃透HDFS写流程 2 Namenode侧create文件 精 彻底吃透HDFS写流程 3 DataStreamer线程和输
  • 大数据—— Flink 的优化

    目录 一 Flink内存优化 1 1 Flink 内存配置 二 配置进程参数 2 1 场景 2 2 操作步骤 三 解决数据倾斜 3 1 场景描述 3 2 解决方式 3 2 1 数据源的消费不均匀 调整并发度 3 2 2 数据分布不均匀 四
  • spark内存模型

    Spark 1 6 开始使用了统一内存管理模块 UnifiedMemoryManager 并引入了堆外内存 Off heap memory 1 6之前的内存管理就不进行介绍了 spark堆内和堆外内存模型的示意图 注意 堆外内存是依赖于wo
  • 学习笔记-Spark环境搭建与使用

    一 20 04 Ubuntu安装 清华源ISO源 https mirrors tuna tsinghua edu cn ubuntu releases 20 04 下载链接 https mirrors tuna tsinghua edu c
  • Hbase Sehll基本命令

    进入hbase shell命令 hbase shell 1 status 查看hbase运行状态 2 version 查看hbase版本 3 list 列出hbase所有的 表 4 创建表 create info member member
  • 大数据手册(Spark)--Spark基本概念

    文章目录 Spark 基本概念 Hadoop 生态 Spark 生态 Spark 基本架构 Spark运行基本流程 弹性分布式数据集 RDD Spark安装配置 Spark基本概念 Spark基础知识 PySpark版 Spark机器学习
  • 【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)

    欢迎关注博客主页 https blog csdn net u013411339 本文由 王知无 原创 首发于 CSDN博客 本文首发CSDN论坛 未经过官方和本人允许 严禁转载 欢迎点赞 收藏 留言 欢迎留言交流 声明 本篇博客在我之前发表
  • Spark的常用概念总结

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 基本概念 1 RDD的生成 2 RDD的存储 3 Dependency 4 Transformation和Action 4 1 Transformatio
  • sparkstreamming 消费kafka(2)

    spark streaming提供了两种获取方式 一种是同storm一样 实时读取缓存到内存中 另一种是定时批量读取 这两种方式分别是 Receiver base Direct 一 Receiver base Spark官方最先提供了基于R
  • 物联网产业到2023年连接数将突破20亿

    导读 随着经济社会数字化转型和智能升级步伐加快 物联网逐渐成为新型基础设施的重要组成部分 近日 工信部等8部门联合印发 物联网新型基础设施建设三年行动计划 2021 2023年 下称 行动计划 明确到2023年底 在国内主要城市初步建成物联
  • JAVA 安装与简单使用

    JAVA简易安装 下载安装 环境变量 进入变量界面 设置变量 验证JAVA环境 运行Java程序 个人站 ghzzz cn 还在备案 很快就能访问了 下载安装 第一步当然是从官网下载安装java了 网上有很多的教程 这里简单的写一下 在这里
  • spark相关

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 pandas是什么 二 使用步骤 1 引入库 2 读入数据 总结 前言 提示 这里可以添加本文要记录的大概内容 例如 随着人工智能的不断发展 机器学习这门

随机推荐

  • 视频无损放大修复工具:Topaz Video AI对Mac和Windows的系统要求

    Topaz Video AI是一款基于人工智能技术的视频增强软件 旨在提供高质量的视频修复 增强和转换功能 它可以通过智能算法和图像处理技术 改善视频的清晰度 稳定性 降噪效果 还能进行视频转码和格式转换 Mac Topaz Video A
  • 奇迹服务器维护,奇迹MU 3月31日服务器维护更新公告

    尊敬的用户 为了奇迹 mu 服务器能够始终保持高效和稳定的运行 使玩家能够在更好的游戏网络环境中享受游戏的乐趣 我们将于2021年3月30日 周二 9 00开始进行游戏服务器的维护工作 维护时间为大约为6个小时 维护期间官方主页的充值 登陆
  • Acwing2554. 排列数

    在一个排列中 一个折点是指排列中的一个元素 它同时小于两边的元素 或者同时大于两边的元素 对于一个 1 n 的排列 如果可以将这个排列中包含 t 个折点 则它称为一个 t 1 单调序列 例如 排列 1 4 2 3 是一个 3 单调序列 其中
  • SDN/NFV标准组织&SDN架构

    标准组织 1 ONF 开放网络基金会 2 ODL OpenDayLight 3 ETSI 欧洲电信标准协会 作为标准制定的依据 2012年成立 由运营商主导 通信设备 信息设备等厂家共同参与 推动NFV标准研究和产业进程的临时性组织 4 I
  • ERP的灵魂

    ERP应该是有灵魂的 这个灵魂就是规划 开发和完善时的理念 用土话说 上ERP到底是为了啥 有了灵魂 ERP的开发和实施就不会摇摆不定 灵魂源于初心 要回归本源 注意这个本源不是数字化 也不是上档升级 这些只是手段 结果或目的 而不是本源
  • 内容多,鼠标略过显示内容

    格式化单元格提示信息 function formatCellTooltip value return span title value span th 异常类型 th
  • knime工具介绍(1)

    本文旨在介绍knime在数据分析中可具体扮演的角色 安利给大家这个超好用数据分析工具 截图部分转自亚洲数析协会公开课截图 如有侵权请及时私信处理 因为内容比较多 先慢慢更新 未完待续9 14 一 数据分析的全流程均可以用到这个工具 台湾数析
  • 解决:修改JAVA_HOME后,Java版本无法正常切换

    经验总结 步骤1 检查路径是否正确 步骤2 将JAVA HOME配置到path最前面 步骤3 删除 C ProgramData Oracle Java javapath 目录下三个 exe 文件 步骤4 重新测试是否 可正常切换Java 版
  • 软件测试从自学到工作,软件测试学习到底要怎样进行?

    前言 首先 请不要奢望有多么简单的办法 学习没有捷径 这里只是让你明白这一点 顺便根据个人经验帮你理一下学习的过程 其实有文章是说怎么学习以及学习什么的 但是可能还是有些抽象 或者内容有点多 有点杂 以至于不少朋友仍然觉得不知道如何下手 大
  • R语言描述性统计

    使用Hmisc这个包 只需要调用 my data read csv test csv Hmisc describe my data 可以打印出各个变量的均值方差等信息
  • mysql远程连接权限grant all privileges on *.* to ‘root‘@‘%‘ identified by ‘123456‘ with grant option语句报错

    mysql远程连接权限grant all privileges on to root identified by 123456 with grant option语句报错 记录一下自己安装mysql遇到的小坑 grant all privi
  • Integer中缓存池讲解

    文章目录 一 简介 二 实现原理 三 修改缓存范围 一 简介 Integer缓存池是一种优化技术 用于提高整数对象的重用和性能 在Java中 对于整数值在 128 到 127 之间的整数对象 会被放入缓存池中 以便重复使用 这是因为在这个范
  • Centos7操作系统服务器优化流程(关闭防火墙、关闭selinux、更换yum源、安装Docker和docker-compose)

    Centos7 测试环境服务器优化流程 本文讲解内容 将Centos7操作系统作为公司开发环境或者自学者搭建DevOps流程而优化的几项内容 生产环境慎用 防止被网络攻击 纯干货教程 已在本地操作多次 请放心使用 推荐一个笔者长期使用的ss
  • 卡西欧casio手表质量怎么样

    Casio的仿货 淘宝在300以上的质量都还可以 500以上手感就挺好了 我买了一个4折的 没问题 绝对真货 有真货单的 带激光防伪标 好像是广东出的 就是没发票 不过店家保一年 但我觉得casio的质量还是可以的 一年内不会有问题 1年后
  • Jupyter 配置默认工作目录(起始位置)

    没有配置文件 1 安装了 Anaconda 在Anaconda prompt中输入以下命令 也可以用来查找已有配置文件路径 jupyter lab jupyter lab generate config jupyter notebook j
  • OVP保护芯片首选ETA7008,耐压36V,过压保护点可调

    产品描述主要特点 低成本 过压保护点可调 高耐压 低内阻 快速响应ETA7008是一款低侧过压保护 OVP IC 仅具有34mohm开关电阻 确保非常低的导通电阻和高保护电压 负端保护 耐压36V 过压保护点可设 导通内阻小 可蕞大过4A电
  • clang-format configurator - 交互式创建 clang-format 格式配置文件

    clang format configurator 交互式创建 clang format 格式配置文件 clang format configurator https zed0 co uk clang format configurator
  • Apache APISIX 默认密钥漏洞(CVE-2020-13945)

    Vulhub Apache APISIX 默认密钥漏洞 CVE 2020 13945 文章目录 Vulhub Apache APISIX 默认密钥漏洞 CVE 2020 13945 APISIX简介 漏洞复现 payload分析 APISI
  • PCB板框文件丢失的问题

    问题 PCB 板框文件丢失的问题 在制作好PCB并导出Gerber文件后 送厂制板的时候审查被提醒说没有边框文件 缺少 GM1 层 解决办法 经过反复检查 确定添加了边框文件 BOARD GEOMETRY CUT Design outlin
  • Spark Job写文件个数的控制以及小文件合并的一个优化

    文章目录 背景说明 通过引入额外Shuffle对写入数据进行合并 EnsureRepartitionForWriting Rule CoalesceShufflePartitions Rule OptimizeShuffleWithLoca