spark学习7:RDD编程

2023-11-06

1.目录

2.创建RDD

两种方式

2.1从文件系统加载

sc.textFile() 方法来加载文件数据,并将文件数据转换为RDD

2.1.1 从本地文件加载数据

val rdd1 = sc.textFile("file:///home/hzp/Documents/input.txt")

ps: 1.如果文件只存在master节点上,那么可能会报 fileNotFoundException,需要把数据文件传给 worker 节点

https://blog.csdn.net/hzp666/article/details/117112227

2.因为spark-shell 的惰性机制,所以即使这里文件路径写错了,也不会立马报错,而是等遇到action 算子才会去真正执行,才会报错。

另外,当输入路径写的是 文件夹/* 时,系统会把当前文件夹中所有文件都加载进来

 eg: sc.textFile("file:///D:/doc/spark/input/dir1/*")

3.当将计算结果保存到本地文件时,

保存本地文件

使用saveAsTextFile() 来保存

avgRDD.saveAsTextFile("D:\\doc\\spark\\out\\result1")

avgRDD.saveAsTextFile("D:/doc/spark/out/result1")

保存效果:

ps: 1. saveAsTextFile()  里边写的是 文件保存的路径,不是指定文件名,即使说写的路径为 avgRDD.saveAsTextFile("D:/doc/spark/out/result1.txt") 指定文件名,那么也是创建一个 名为result1.txt的文件夹。  

2.文件夹内有2种文件success文件和part-00000文件, success代表本次代码执行成功;

part-00000是数据文件,这里只有一个文件因为程序没设置分区数,数据文件数和分区数对应。

3.当读取本地文件时,只用写到文件夹名称,系统会自动把文件夹内的所有part 文件都读取

2.1.2加载hdfs文件 

val rdd2 = sc.textFile("hdfs:/user/root/source.txt")
 

scala> rdd2.flatMap(_.split(" ")).filter(s => s.contains("hadoop")).saveAsTextFile("hadoopWord.txt")
                                                                                
scala> 
 

# 保存计算结果到hdfs

.saveAsTextFile("writeback")

会保存到 hdfs文件中,默认保存在 /usr/对应用户下

比如登录用户名为hadoop,那么将保存在/user/hadoop/writeback下

hdfs 查看:http://master:50070/explorer.html#/

2.1.3 加载JSON数据文件

https://blog.csdn.net/hzp666/article/details/117752252

2.2使用列表List、集合Set,生成RDD

parallelize 方法

eg:

//create spark conf
val conf = new SparkConf().setAppName("sparkTemp").setMaster("local")

//create sc
val sc = new SparkContext(conf)

//create a list for make RDD
val list1 = List("mz 麻子","ww 王五","ls 李四","zs 张三")

//create rdd
val rdd1 = sc.parallelize(list1)

//rdd1.foreach(println(_))

//flatmap
val flatMapedRDD2 = rdd1.flatMap(s=>s.split(" "))

//flatMapedRDD2.foreach(println(_))
val filteredRDD3 = flatMapedRDD2.filter(s => s.contains("z"))
val filteredRDD4 = flatMapedRDD2.filter(s => s.startsWith("z"))

filteredRDD3.foreach(println(_))
filteredRDD4.foreach(println("444",_))

输出:

3.RDD的操作

对RDD的操作总的分为2种,transformation  和 action

两个的区别,

transformation 只是记录要对rdd进行的操作动作,而不会真正的执行,只有遇到action算子 才会开始真正的从头到尾的计算。

Transformation

常见的算子

1.  filter

过滤RDD中的元素

filter是一个高阶函数,参数中传递函数

eg1 过滤取到z 开头的元素:

val list1 = List("mz 麻子","ww 王五","ls 李四","zs 张三")
//create rdd
val rdd1 = sc.parallelize(list1)
//filter
val filteredRDD = rdd1.filter(s => s.startsWith("z"))
//输出
filteredRDD.foreach(println(_))

输出:

eg2 取出包含w 字符的元素:

//create a list for make RDD
val list1 = List("mz 麻子","ww 王五","ls 李四","zs 张三")

//create rdd
val rdd1 = sc.parallelize(list1)

//rdd1.foreach(println(_))
val filteredRDD2 = rdd1.filter(s=>s.contains("w"))
filteredRDD2.foreach(println(_))

输出:

2. map

对RDD中每个元素进行操作

eg1:

val arr1 = Array(1,2,3,4,5)
val rdd2 = sc.parallelize(arr1)
val mapedRDD2 = rdd2.map(s => s+10)
mapedRDD2.foreach(println(_))

输出:

eg2:

//create a list for make RDD
val list1 = List("mz 麻子","ww 王五","ls 李四","zs 张三")

//create rdd
val rdd1 = sc.parallelize(list1)
//map
val mapedRDD2 = rdd1.map(s => s+"-add")
mapedRDD2.foreach(println(_))

输出:

eg3:

3.flatmap

切分压平

eg1

val list1 = List("mz 麻子","ww 王五","ls 李四","zs 张三")
val rdd1 = sc.parallelize(list1)
//flatmap
val flatMapedRDD2 = rdd1.flatMap(s=>s.split(" "))
flatMapedRDD2.foreach(println(_))

输出:

eg2:

4.groupByKey

应用于(key, value)的数据集,返回(key, Interable)形式的数据集

val list1 = List("mz 麻子","ww 王五","ls 李四","zs 张三","mz 马志","ww 王武")
val flatMappedRDD = rdd1.flatMap(_.split(" "))
val mapedRDD = flatMappedRDD.map(s => (s,1))
val rdd3 = mapedRDD.groupByKey()
rdd3.foreach(println(_))

输出:

5.reduceByKey

应用于(key, value)的数据集,返回(key, value),其中 返回的value 是根据传入 reduceByKey的函数方法,计算得到的

ps:1.相当于是把 groupByKey返回的 (key,Interable)中的 Interable 多了一步操作

2,reduceByKey中传入的方法 其实是对 Interable中的元素的操作,eg: mapedRDD.reduceByKey((a,b)=> a+b)  其实是对 每个key中的Interable中的元素 组内求和

reduceByKey 就相当于 groupByKey().map(t => (t._1, t._2.sum))

eg

val list1 = List("mz 麻子","ww 王五","ls 李四","zs 张三","mz 马志","ww 王武")
val flatMappedRDD = rdd1.flatMap(_.split(" "))
val mapedRDD = flatMappedRDD.map(s => (s,1))

val rdd3 = mapedRDD.reduceByKey((a,b)=> a+b)


rdd3.foreach(println(_))

输出:

eg2:

Action操作

动作类型算子Action和Transformation转换算子的区别,就是基于惰性机制, 转换并不会真的执行,Action动作算子相当于点火装置,引燃整个计算。

1.count()

计算当前RDD的元素个数

val arr1 = Array(1,2,3,4,5)
val rdd2 = sc.parallelize(arr1)
//filter elements >3 
val more2RDD = rdd2.filter(_ > 3)

println(more2RDD.count())

输出:

2. collect

RDD把分布在各个节点的数据都回收回来,返回array 数组

eg:

val arr1 = Array(1,2,3,4,5)
println(rdd2.collect().mkString("Array(", ", ", ")"))

输出:

3.first

取到RDD中第一个元素

eg:

val arr1 = Array(1,2,3,4,5)

//create rdd
val rdd2 = sc.parallelize(arr1)
val rdd2 = sc.parallelize(arr1)
println(rdd2.first())

输出:

4.top

获取RDD的前几个元素

eg;

val arr1 = Array(1,2,3,4,5)

//create rdd

val rdd2 = sc.parallelize(arr1)

println(rdd2.top(3).mkString("Array(", ", ", ")"))

输出:

5.reduce

高阶函数, 传入一个函数,来聚合RDD中的元素

eg:

val arr1 = Array(1,2,3,4,5)

//create rdd

val rdd2 = sc.parallelize(arr1)
val result = rdd2.reduce((a,b)=> a+b)
println(result)

输出:

4.持久化

将RDD的计算结果保存到内存中去

有两种方法

rdd2.persist(MEMORY_ONLY)   //仅保存在内存,当内存不够时,之前的老数据会被新数据替换掉
rdd2.persist(MEMORY_AND_DISK) //优先保存在内存中,内存不够时,写到磁盘

rdd2.cache()   //等同于 rdd2.persist(MEMORY_ONLY)

取消内存持久化:

unpersist() 方法可以取消内存中的  数据持久化

5.RDD分区

1.分区定义:

RDD中的数据被分区存储在多台机器上。

2.分区的优势:

主要有两点,增加并行计算能力 和 减少网络传输通信开销

2.1并行计算

RDD中的数据被分布在多台机器,多台机器可以并行计算,增加计算效率。

2.2减少通信开销

首先,先讲下数据分区 和 分块的区别:

分块:只是把数据按照一定的大小进行打包,不考虑数据的分类,只是分为一定数据大小的block 块,存储在不同机器上。

分区:是把数据按照一定的规则,进行分类存储在不同的机器上。

那么分区是怎么减少通信开销的,

如果数据不进行分区,那么在做数据关联join时 会出现一个情况,就是要把散布在各个机器上的 同类型数据进行 归类聚合,然后再去关联。

比如,有两张表,一个学生表,一个考场表

学生表
student_name class_name
张三

一班 

李四 一班
王五 二班
麻子 三班
马六 一班
考场表
class_name exam_name
一班

A考场

二班 B考场
三班 C考场

那如果想要把学生表和考场表关联,得到每个学生的考场

student_name class_name exam_name
张三 一班 A考场
李四 一班 A考场
王五 二班 B考场
麻子 三班 C考场

那么如果数据没有分区存储,那就需要把散落在各个机器上的一班学生数据 都拉到机器A上, 二班的学生数据都拉到机器B上。。。。,就会产生大量的网络传输通信开销。

如果是数据分区存储的,比如按照班级分区,  一班的学生数据都存在A机器上, 二班的学生数据都存在B机器。。。那么在做join关联时 就可以直接把一班的数据放到A机器上关联计算,二班数据都在B机器上计算, 不需要数据的传输。

3.分区的原则

尽量让分区的数量,等于CPU的数量,来实现并行计算。

可以通过设置spark.dafault.parallelise 来设置分区数,

不同模式下的默认分区数

1.local模式 默认是机器最大CPU数,如果设置了启动时候 设置了  local[N]  那么会启动N个 分区数

2.Mesos默认分区数是 8 

3.standlone 和 yarn :会把集群中总CPU数  和 2进行比较,取最大值,来作为分区数。

4.设置分区的方法

4.1.设置分区数量

eg:

val rddT = sc.textFile("file:///D:/doc/spark/input/dir1/input.txt",2)
rddT.foreach(println(_))

val list1 = List("mz 麻子","ww 王五","ls 李四","zs 张三","mz 马志","ww 王武")
//create rdd
val rdd1 = sc.parallelize(list1,2)

界面会有 分区提示

查看分区数:

rddT.partitions.length

4.2 重分区

repartition 方法

eg:

val rddT = sc.textFile("file:///D:/doc/spark/input/dir1/input.txt",3)
rddT.foreach(println(_))
println("before",rddT.partitions.length)

val rddt2 = rddT.repartition(1)    //注意 重分区后,返回值是一个新RDD
rddt2.foreach(println("repartition: --",_))
println("repartition 's num",rddt2.partitions.length)

输出:

4.3 自定义分区

spark有自带的分区 哈希分区、区域分区等

自定义分区需要做的几步

getPartition()  这个方法是 真正计算 数据存储在哪个分区的地方, 计算后返回对应数据 应存储的分区编号

eg:

class Mypartitioner(numParts:Int) extends Partitioner {

  override def numPartitions = numParts

  override def getPartition(key: Any): Int = {

    //judge the number is ODD or EVEN
    return key.toString.toInt % 2

  }
}

//to create a rdd
val list = 1 to 10
val rdd = sc.parallelize(list,2)

// map((_,1)) because partitionBy only support (key,value) sytle 
//map(_._1) because to remove the added value
 rdd.map((_,1)).partitionBy(new Mypartitioner(2)).map(_._1).saveAsTextFile("D:/doc/spark/out/t3")

ps:1. 写完MyPartition 方法 extents Partition后,在MyPartition 上alt + enter, 选择override 选择全部重写即可

2. 在编写 getPartition 返回分区编号时,注意分区编号默认是从0 开始的

输出:

两个分区

第一个分区

第二个

自定义分区 eg2:

根据首字母来区分,分区

class MyPartition(partitionNum:Int) extends Partitioner{
  override def numPartitions: Int = partitionNum

  override def getPartition(key: Any): Int = {
    val firstStr = key.toString.substring(0, 1)
    
// if the word startwith h then save to partition 0, else save to partition 1
    firstStr match {
      case "h" =>{
        return 0
      }
      case _ =>{
        return 1
      }
    }

  }
}

val rddT = sc.textFile("D:/doc/spark/input/dir1/input.txt",2)
val rddfix = rddT.flatMap(s => s.split(" ")).map((_,1)).partitionBy(new MyPartition(2)).map(_._1)
rddfix.saveAsTextFile("D:/doc/spark/out/t5")

输出:

第一个分区:

第二个分区:

RDD写个wordCount

//读取数据生成RDD

val sourceText = sc.textFile("D:/doc/spark/input/dir1/input.txt")

//flatMap

val flatMapedRDD = sourceText.flatMap(_.split(" "))

//转换为 键值对,            组内字频求和 ,                     保存

flatMapedRDD.map((_,1)).reduceByKey((a,b) => a+b).saveAsTextFile("D:/doc/spark/out/t6")

6.键值对RDD

6.1 创建键值对RDD

把普通RDD通过map((_,1))转换

6.2键值对RDD的操作

reduceByKey  和 groupByKey之前已经讲过了

keys

是把键值对RDD中的所有key都放到一个list中返回

values

是返回值的list

sortByKey()

按照key进行排序,默认参数是true升序,  sortByKey(false) 降序

如果想根据 value值进行排序,eg: (a,10)    (b, 20)     (c,5)

想要根据 value进行排序 得到   (b, 20) (a, 10)   (c, 5)

 sortBy(_._2,false)   这种写法可以实现

sortBy()

eg:

val sourceText = sc.textFile("D:/doc/spark/input/dir1/input.txt")
val flatMapedRDD = sourceText.flatMap(_.split(" "))
val reduceedRDD = flatMapedRDD.map((_,1)).reduceByKey((a,b) => a+b)

//sortByKey
val sortedRDD = reduceedRDD.sortByKey()
sortedRDD.foreach(println(_))

//sortBy
val sortedByRDD = reduceedRDD.sortBy(_._2, false)
sortedByRDD.foreach(println("sortBy-----",_))

输出:

mapValues()

对键值对RDD中的value进行 操作,不影响key, 

mapValues(_+1) 等同于.map(item => (item._1, item._2 +1 ))

eg:

val sourceText = sc.textFile("D:/doc/spark/input/dir1/input.txt", 1)
    val mapedRDD = sourceText.flatMap(_.split(" ")).map((_, 1))
    val mapValueRDD = mapedRDD.mapValues(s=> s+1)
    mapValueRDD.foreach(println(_))

    val mapedRDD2 = mapedRDD.map(item => (item._1,item._2+1))
    mapedRDD2.foreach(println("maped2----",_))

输出:

反转键值对key value

eg:

val sourceText = sc.textFile("D:/doc/spark/input/dir1/input.txt", 1)
val mapedRDD = sourceText.flatMap(_.split(" ")).map((_, 1))
val mapValueRDD = mapedRDD.mapValues(s=> s+1)
mapValueRDD.foreach(println(_))

//reverse map
val reversedRDD = mapedRDD2.map(item => (item._2, item._1))
reversedRDD.foreach(println("reverse ----",_))

输出:

join()

把两个RDD 根据 key 进行关联,相同的key 对应的value 合并

eg:

val RDD1 = sc.parallelize(Array(("张三", 3), ("李四", 4), ("王五", 5), ("麻子六", 6), ("李四", "山西人")))
val RDD2 = sc.parallelize(Array(("张三", "广东人"), ("李四", "男")))
val RDD3 = RDD2.join(RDD1)
RDD3.foreach(println(_))

输出:

一个求平均值的案例:

有一堆数据 ("sparkBOOK", 5), ("hadoopBOOK", 4), ("hadoopBOOK", 2), ("hudiBOOK", 3), ("sparkBOOK", 8),

("sparkBOOK", 5) 代表一次卖出5本书,   

要计算出 每个技术栈数据平均每次卖几本书

eg:

val sourceRDD = sc.parallelize(Array(("spark", 5), ("hadoop", 4), ("hadoop", 2), ("hudi", 3), ("spark", 8)))

//to create rdd like this ("spark",(5,1))   ("hadoop", (4,1)) ...
// simple just for denominator
val addOneRDD = sourceRDD.map((item => (item._1, (item._2, 1))))

//to sum Numerator and denominator 
val reducedRDD = addOneRDD.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))

//numerator / denominator = avg
val avgRDD = reducedRDD.map(item => (item._1, item._2._1 / item._2._2))
avgRDD.foreach(println(_))

输出:

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spark学习7:RDD编程 的相关文章

随机推荐

  • DNS解析与CDN加速

    DNS解析与CDN加速 一 DNS解析 1 域名系统DNS 2 DNS解析 二 CDN加速 1 什么是CDN 2 静态加速 3 动态加速 一 DNS解析 1 域名系统DNS 域名系统的前世今生 域名系统的产生的原因是用户通过形如198 26
  • Web自动化测试流程:从入门到精通,帮你成为测试专家!

    摘要 Web应用程序在今天的软件开发中占据着越来越重要的地位 保证Web应用程序的质量和稳定性是非常必要的 而自动化测试是一种有效的方法 本文将介绍Web自动化测试流程 并提供代码示例 步骤一 选取测试工具 选择适合自己团队的自动化测试工具
  • Linux下安装Mysql5.7,超详细完整教程,以及云mysql连接

    安装前环境检查 1 首先检查自己电脑有没有安装过mysql 输入如下 rpm qa grep mysql 如果有则清理干净在安装 输入 whereis mysql 找到文件夹目录 再把它删除 rpm e nodeps mysql xxxx
  • 华为手机微信与电脑连接到服务器失败怎么办,华为微信到电脑上找不到了怎么办...

    1 华为手机连接电脑后找不到微信保存的视频 应该是在微信专用的文件夹里 文件夹的名字是英文的腾信 如果视频不多可以登陆电脑版微信 然后用文件助手传到电脑再保存 2 华为荣耀10微信存储图片连接电脑找不到 查找微信保存图片的文件信息 打开 文
  • React组件化一

    29 9React课程 第02节 react组件化 第2节 02课react组件化 02课react组件化 02课react组件化 初始化显示constructor构造函数 要使用super 否则没法在内部使用this 2与3之间要对组件进
  • Centos7本地yum安装FTP和HTTP服务

    Centos镜像包下载 http mirror centos org altarch 7 isos 32位 i386 64位 带64的 1 将Vmware中的光驱设置为镜像包 在虚拟机关闭时设置 1 打开虚拟机设置 2 选择CD DVD 3
  • clang简介

    文章目录 clang编译器 clang选项 阶段选择选项 语言选择和模式选项 目标选择选项 代码生成选项 O0 O1 O2 O3 Ofast Os Oz Og O O4 g gline tables only gmodules fstand
  • 适用于 24 V 电源系统的车载网络 ESD 保护

    Nexperia 安世半导体 近日推出符合 AEC Q101 标准的产品组合 其中包含六个 ESD 保护器件 PESD2CANFD36XX Q 旨在保护 LIN CAN CAN FD FlexRay 和SENT 等车载网络 IVN 中的总线
  • 日期补0位

    function getNowFormatDate var day new Date var Year 0 var Month 0 var Day 0 var CurrentDate 初始化时间 Year day getYear 有火狐下2
  • Red Hat Enterprise Linux 8 配置yum源

    Red Hat Enterprise Linux 8 配置YUM源的两种方式 一 本地YUM源 1 备份源文件 cd etc yum repos d mkdir bak mv repo bak 2 挂载镜像 mount t iso9660
  • 面试官都在问

    面试官都在问 Linux命令mpstat详解 1 mpstat的基本用法 mpstat的全称为Multiprocessor Statistics 是一款常用的多核CPU性能分析工具 用来实时查询每个CPU的性能指标 以及所有CPU的平均指标
  • 用qsetting实现文件保存和读取

    QSettings 是 Qt 库中的一个类 可以用来读取和保存应用程序的配置数据 使用 QSettings 可以方便地保存和读取配置信息 比如窗口的大小和位置 最近打开的文件列表等 实现保存文件的步骤如下 创建 QSettings 对象 并
  • OpenCV 3.3.1及Contrib附加库安装教程及问题Undefined reference to cv::xfeatures2d

    INSTALL OPENCV ON UBUNTU OR DEBIAN 1 KEEP UBUNTU OR DEBIAN UP TO DATE sudo apt get y update sudo apt get y upgrade sudo
  • cpp课程设计实验题:编写程序,定义抽象基类Shape(形状),由它派生出3个派生类: Circle(圆形)、Rectangle(矩形)和Square 正方形),用函数函数ShowArea()分别显

    编写程序 定义抽象基类Shape 形状 由它派生出3个派生类 Circle 圆形 Rectangle 矩形 和Square 正方形 用函数函数ShowArea 分别显示各种图形的面积 最后还要显示所有图形的总面积 要求用基类指针数组 使它的
  • adb 指令

    1 基本指令 指令 adb version 显示 adb 版本 指令 adb help 帮助信息 查看 adb 所支持的所有命令 指令 adb start server 启动 adb 服务 指令 adb kill server 关闭 adb
  • Unity 分帧加载和分块加载

    分帧加载和分块加载 在我们实际做项目的时候 往往会遇见需要创建大量数据的时候 这时如果在一帧里面大量创建数据 那我们的游戏就会发生卡顿从而降低了用户的体验 为了解决这种情况 可以使用使用分帧加载使得每帧只加载固定数量的数据来解决 也可以使用
  • 经纬度坐标与距离的相互转换及其实现

    经纬度坐标与距离的相互转换 1 经纬度与距离角度的换算关系 2 Python代码实现 1 经纬度与距离角度的换算关系 a 在纬度相等的情况下 经度每隔0 00001度 距离相差约1米 每隔0 0001度 距离相差约10米 每隔0 001度
  • 【ElementUI组件优化】自定义icon图标的使用

    风雨里做个大人 阳光下做个小孩 前端经常会用到UI提供的各种图表 推荐阿里的图标库 如果UI要求不是很严格 我们可以自己在图标库中找到想要的图标 搜索之后可以点击下载 在ElementUI中使用Icon图标组件使用非常简单 同时 在图标按钮
  • 微信小程序如何实现(点击发送弹幕)

    扫一扫以上小程序 许愿灯池 可以查看具体点击发送弹幕功能 效果图 点击 祝福一下吧 即可弹出弹幕 直接上代码 index wxml
  • spark学习7:RDD编程

    1 目录 2 创建RDD 两种方式 2 1从文件系统加载 sc textFile 方法来加载文件数据 并将文件数据转换为RDD 2 1 1 从本地文件加载数据 val rdd1 sc textFile file home hzp Docum