RDD计算时是把数据全部加载至内存么

2023-05-16

 

RDD的本质

RDD的本质是一个函数,而RDD的变换不过是函数的嵌套.RDD有两类:

  1. 输入的RDD: 典型如KafkaRDD,JdbcRDD
  2. 转换的RDD: 如MapPartitionsRDD

 

RDD的处理流程:

以如下代码为例:

sc.textFile("abc.log").map().saveAsTextFile("")

1. textFile 会构建出一个NewHadoopRDD,

2. map函数运行后会构建出一个MapPartitionsRDD

3. saveAsTextFile触发了实际流程代码的执行

所以RDD不过是对一个函数的封装,当一个函数对数据处理完成后,我们就得到一个RDD的数据集(是一个虚拟的,后续会解释)。

 

NewHadoopRDD是数据来源,每个parition(分布式并行执行)负责获取数据,获得过程是通过iterator.next 获得一条一条记录的。假设某个时刻拿到了一条数据A,这个A会立刻被map里的函数处理得到B(完成了转换),然后开始写入到HDFS(一条一条写入)上。其他数据重复如此。所以整个过程:

  1. 理论上某个MapPartitionsRDD里实际在内存里的数据等于其Partition的数目,是个非常小的数值。
  2. NewHadoopRDD则会略多些,因为属于数据源,读取文件,假设读取文件的buffer是1M,那么最多也就是partitionNum*1M 数据在内存里
  3. saveAsTextFile也是一样的,往HDFS写文件,需要buffer,最多数据量为 buffer* partitionNum(可以汇聚到Driver端写,也可以各个Executor直接写入到HDFS)

 

所以整个过程其实是流式(一般是一条一条或者一批一批)的过程,一条数据被各个RDD所包裹的函数处理。

(Ps: 如果是mapPartition的话,那就是把整个partition的数据一起加载过来了,所以使用mapPartition函数比起map会容易造成内存溢出)

 

.map(...).map(...)..........map(...).map(...) 是嵌套调用, RDD compute 方法会调用上一个RDDcompute方法, 现在的rdd记住了自己的parent 然他们自己记住调用关系,带来的问题自然是不能嵌套太深

   对应的代码是RDD.scala中的iterator方法:

 

按上面的逻辑,内存使用其实是非常小的,10G内存跑100T数据也不是难事。但是为什么Spark常常因为内存问题挂掉呢? 我们接着往下看。

 

Shuffle的本质:

Stage是以shuffle作为分界的! Shuffle不过是偷偷的帮你加上了个类似saveAsLocalDiskFile的动作。

如果是M/R的话:

每个Stage其实就是上面说的那样,一套数据被N个嵌套的函数处理(也就是你的transform动作)。遇到了Shuffle,就被切开来。Shuffle本质上是把数据按规则临时都落到磁盘上,相当于完成了一个saveAsTextFile的动作,不过是存本地磁盘。然后被切开的下一个Stage则以本地磁盘的这些数据作为数据源,重新走上面描述的流程。

 

所以Spark的操作为:

前面我们提到,Shuffle不过是偷偷的帮你加上了个类似saveAsLocalDiskFile的动作。然而,写磁盘是一个高昂的动作。所以我们尽可能的把数据先放到内存,再批量写到文件里,还有读磁盘文件也是给费内存的动作。把数据放内存,就遇到个问题,比如10000条数据,到底会占用多少内存?这个其实很难预估的。所以一不小心,就容易导致内存溢出了。这其实也是一个很无奈的事情。

(还是说,Spark的shuffle的中间结果也是要写到本地磁盘的,只是顺序落盘和顺序读盘的话性能会快很多 -- 只写一次磁盘,而且是顺序写,那么也是非常快的。所以Spark其实是激进的使用内存)

(即进行第一次shuffle之后,数据就全部都放在内存中了? 还是都会写入到本地目录? 多次shuffle之间是如何操作的???

 对于以上的问题,后续会单独写一篇博客,理解下shuffle的逻辑)

 

Cache和Persist的含义:

    其实就是给某个Stage加上了一个saveAsMemoryBlockFile的动作,然后下次再要数据的时候,就不用算了。这些存在内存的数据就表示了某个RDD处理后的结果。这个才是说为啥Spark是内存计算引擎的地方。在MR里,你是要放到HDFS里的,但Spark允许你把中间结果放内存里(如果不加这个的话,中间结果还是要落到磁盘上的?!)

    如果前一次stage操作完成之后,RDD没有进行cache之类的操作的话,那前一次的中间结果就会删除!

 

遗留的问题是:

  1. shuffle默认是要将数据落到磁盘的么?
  2. 落的时候是一条条落? – 还是说会合并?
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RDD计算时是把数据全部加载至内存么 的相关文章

  • Spark:reduce和reduceByKey之间的语义差异

    在 Spark 的文档中 它说 RDDs 方法reduce http spark apache org docs latest api scala index html org apache spark rdd RDD需要一个结合与交换的二
  • 缓存和持久化有什么区别?

    按照RDD坚持 两者有什么区别cache and persist 在火花 With cache 您仅使用默认存储级别 MEMORY ONLY for RDD MEMORY AND DISK for Dataset With persist
  • 坏元素的映射

    我正在实施k means我想创建新的质心 但映射遗漏了一个元素 然而 当K值较小 例如 15 效果会很好 基于此code http www cs berkeley edu rxin ampcamp ecnu machine learning
  • Spark:测试 RDD 是否为空的有效方法

    没有一个isEmptyRDD 上的方法 那么测试 RDD 是否为空的最有效方法是什么 RDD isEmpty https issues apache org jira browse SPARK 5270将成为 Spark 1 3 0 的一部
  • Pyspark RDD 收集前 163 行

    有没有办法在不转换为 df 的情况下获取 rdd 的前 163 行 我尝试过类似的东西newrdd rdd take 163 但这会返回一个列表 并且rdd collect 返回整个rdd 有没有办法做到这一点 或者如果没有 有没有办法将列
  • Spark:将 RDD 结果写入文件系统很慢

    我正在使用 Scala 开发 Spark 应用程序 我的应用程序仅包含一项需要改组的操作 即cogroup 它在合理的时间完美运行 我面临的问题是当我想将结果写回文件系统时 由于某种原因 它比运行实际程序花费的时间更长 起初 我尝试在不重新
  • 将 Spark 数据加载到 Mongo / Memcached 中以供 Web 服务使用

    我对 Spark 非常陌生 并且有一个特定的工作流程相关问题 虽然它并不是真正与编码相关的问题 但它更像是与 Spark 功能相关的问题 我认为它在这里是合适的 如果您认为这个问题不适合 请随时将我重定向到正确的网站 所以这里是 1 我计划
  • Apache Spark 中的递归方法调用

    我正在从 Apache Spark 上的数据库构建一个家谱 使用递归搜索来查找数据库中每个人的最终父级 即家谱顶部的人 假设搜索 id 时返回的第一个人是正确的父母 val peopleById peopleRDD keyBy f gt f
  • 不使用过滤函数删除 RDD 中的第一个元素

    我从一个文件构建了一个 RDD 其中 RDD 中的每个元素都是文件中由分隔符分隔的部分 val inputRDD1 RDD String Long myUtilities paragraphFile spark path1 coalesce
  • Spark groupByKey 替代方案

    根据 Databricks 最佳实践 SparkgroupByKey应该像 Spark 一样避免使用groupByKey处理的工作方式是 信息首先在工作人员之间进行洗牌 然后进行处理 解释 http databricks gitbooks
  • 在pyspark中合并两个RDD

    假设我有以下 RDD a sc parallelize 1 2 5 3 b sc parallelize a c d e 如何将这 2 个 RDD 合并为一个 RDD 如下所示 a 1 c 2 d 5 e 3 Using a union b
  • 在 Apache Spark 中,如何使 RDD/DataFrame 操作变得惰性?

    假设我想编写一个函数 foo 来转换 DataFrame object Foo def foo source DataFrame DataFrame complex iterative algorithm with a stopping c
  • Spark:按元组/列中的多个值对 RDD 进行排序

    所以我有一个RDD如下 RDD String Int String 举个例子 b 1 a a 1 b a 0 b a 0 a 最终结果应该类似于 a 0 a a 0 b a 1 b b 1 a 我该怎么做这样的事情 尝试这个 rdd sor
  • 如何将 RDD 保存到 HDFS 中并稍后将其读回?

    我有一个 RDD 其元素类型为 Long String 由于某种原因 我想将整个 RDD 保存到 HDFS 中 然后在 Spark 程序中读回该 RDD 可以这样做吗 如果是这样 怎么办 有可能的 在RDD中你有saveAsObjectFi
  • Spark中saveAsTextFile时如何命名文件?

    在 Spark 版本 1 5 1 中另存为文本文件时 我使用 rdd saveAsTextFile
  • 如何使用RDD API反转reduceByKey的结果?

    我有一个 key value 的 RDD 我将其转换为 key List value1 value2 value3 的 RDD 如下所示 val rddInit sc parallelize List 1 2 1 3 2 5 2 7 3 1
  • 当 Spark 主内存无法容纳文件时,Spark 如何读取大文件(PB)

    在这些情况下大文件会发生什么 1 Spark从NameNode获取数据的位置 Spark 是否会同时停止 因为根据 NameNode 的信息 数据大小太长 2 Spark按照datanode块大小对数据进行分区 但所有数据不能存储到主内存中
  • Spark parquet 分区:大量文件

    我正在尝试利用 Spark 分区 我试图做类似的事情 data write partitionBy key parquet location 这里的问题是每个分区都会创建大量镶木地板文件 如果我尝试从根目录读取 则会导致读取速度变慢 为了避
  • Spark JSON 文本字段到 RDD

    我有一个 cassandra 表 其中有一个名为 snapshot 的文本类型字段 其中包含 JSON 对象 identifier timestamp snapshot 我了解到 为了能够使用 Spark 对该字段进行转换 我需要将该 RD
  • Spark:Shuffle Write、Shuffle 溢出(内存)、Shuffle 溢出(磁盘)之间的区别?

    我有以下 Spark 工作 试图将所有内容保留在内存中 val myOutRDD myInRDD flatMap fp gt val tuple2List ListBuffer String myClass ListBuffer tuple

随机推荐

  • ubuntu远程桌面连接windows系统

    转载自 https www cnblogs com brainworld p 7755779 html ubuntu端 xff1a sudo apt get install rdesktop windows端 xff1a 需要允许此wind
  • Linux挂载硬盘

    原文链接 我要挂载的硬盘为sda1 xff0c 首先将硬盘插上 1 查看硬盘 使用检测硬盘命令 xff1a lsblk 看到 sda1 存在 然后使用查看硬盘命令 xff1a span class token function df spa
  • ModuleNotFoundError No module named ‘PIL‘问题解决

    原文链接 Python使用时出现报错 xff1a ModuleNotFoundError No module named PIL 该提示表示缺少pillow模块 xff0c 可以用以下命令安装 xff1a pip span class to
  • SpringBoot实现多线程

    原文链接 代码地址 xff1a https github com Snowstorm0 learn async 1 线程同步和异步 线程同步 xff1a A线程要请求某个资源 xff0c 但是此资源正在被B线程使用中 xff0c 因为同步机
  • pip安装报错:UnicodeDecodeError ‘utf-8‘ codec can‘t decode byte 0xc3 in position 4

    原文链接 使用pip命令安装模块时 xff0c 若出现报错 xff1a UnicodeDecodeError utf 8 codec can t decode byte 0xc3 in position 4 invalid continua
  • Prometheus的使用

    原文链接 Prometheus 是一个开放性的监控解决方案 xff0c 用户可以非常方便的安装和使用 Prometheus 并且能够非常方便的对其进行扩展 在Prometheus的架构设计中 xff0c Prometheus Server
  • Java中restTemplate携带Header请求

    原文链接 RestTemplate是Spring提供的用于发送HTTP请求的客户端工具 现在我们要在Java中使restTemplate携带Header请求 创建请求 创建请求头 xff1a span class token class n
  • SpringBoot整合ElasticSearch

    原文链接 ElasticSearch是个开源分布式搜索引擎 xff0c 提供搜集 分析 存储数据三大功能 它的特点有 xff1a 分布式 xff0c 零配置 xff0c 自动发现 xff0c 索引自动分片 xff0c 索引副本机制 xff0
  • Python将二维数组输出为图片

    原文链接 使用Python读取二维数组 xff0c 将二维数组输出为图片 xff0c 并保存在本地 代码如下 xff1a span class token comment coding 61 utf8 span span class tok
  • 如何使用Tin快速搭建Gitlab-ce?(史上最简单方法)

    GitLab是Git代码版本管理平台 xff0c 相比于GitHub xff0c GitLab还免费支持私人仓库 GitLab ce是gitlab的开源版本 目前网上Gitlab ce的安装方法有很多 xff0c 但大同小异都需要安装各种依
  • 堆叠降噪自动编码器 Stacked Denoising Auto Encoder(SDAE)

    原文链接 自动编码器 xff08 Auto Encoder xff0c AE xff09 自动编码器 xff08 Auto Encoder xff0c AE xff09 自编码器 xff08 autoencoder xff09 是神经网络的
  • PyTorch中 nn.Conv2d与nn.ConvTranspose2d函数的用法

    原文链接 1 通道数问题 xff1a 描述一个像素点 xff0c 如果是灰度 xff0c 那么只需要一个数值来描述它 xff0c 就是单通道 如果有RGB三种颜色来描述它 xff0c 就是三通道 最初输入的图片样本的 channels xf
  • Python中LSTM回归神经网络的时间序列预测

    原文链接 这个问题是国际航空乘客预测问题 xff0c 数据是1949年1月到1960年12月国际航空公司每个月的乘客数量 xff08 单位 xff1a 千人 xff09 xff0c 共有12年144个月的数据 网盘链接 提取码 xff1a
  • pip安装时 fatal error C1083: 无法打开包括文件: “io.h”: No such file or directory

    原文链接 使用pip安装模块 xff0c 出现错误 xff1a c users anaconda3 include pyconfig h 68 fatal error C1083 无法打开包括文件 io h No such file or
  • linux 程序被Killed,查看原因

    原文链接 1 查看信息 xff1a 输入以下程序 xff1a dmesg egrep i B100 39 killed process 39 可以输出最近killed的信息 2 设定kill优先度 xff1a xff08 1 xff09 完
  • Kafka的幂等性与事务性理解

    最近在深入理解Flink的Exactly Once xff0c 发现Flink Checkpoint只能保障Flink程序内部的一致性 xff0c 无法保证Sink到外部系统的Exactly Once语义 但是Sink到外部如果实现了Two
  • 为什么不直接操作State,而是要额外定义一个变量

    最近浏览Flink文章的时候发现一个现象 xff0c 就是在操作State的时候 xff0c 很多文章里面并不会直接操作State xff0c 而是会定义一个相似的变量去操作 xff0c 在 snapshot 和 recover 的时候讲变
  • 了解下SparkSQL中的笛卡尔积

    虽然应该尽量避免使用笛卡尔积 xff0c 因为要全量匹配 xff0c 所以运算的效率十分低下 xff0c 但是有些业务有必须得用 xff0c 所以在此了解下SparkSQL中的笛卡尔积 SparkSQL中计算笛卡尔积时有两种Join方式 x
  • 在Redis集群模式下使用pipeline进行批量操作

    最近开始又接触到了Redis xff0c 之前在工作中使用Redis的时候 xff0c 由于QPS不高 xff0c 都是直接get set搞定了 这次遇到的业务数据量比较大 xff0c 更新也很频繁 xff0c Redis使用是集群模式 x
  • RDD计算时是把数据全部加载至内存么

    RDD的本质 RDD的本质是一个函数 而RDD的变换不过是函数的嵌套 RDD有两类 输入的RDD 典型如KafkaRDD JdbcRDD转换的RDD 如MapPartitionsRDD RDD的处理流程 以如下代码为例 sc textFil