尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy)】

2023-05-16

视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili

  1. 尚硅谷大数据技术Spark教程-笔记01【Spark(概述、快速上手、运行环境、运行架构)】
  2. 尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy、filter、sample、distinct、coalesce、repartition、sortBy、intersection、union、subtract、zip)】
  3. 尚硅谷大数据技术Spark教程-笔记03【SparkCore(核心编程,partitionBy、reduceByKey、groupByKey、aggregateByKey、foldByKey、combineByKey、sortByKey、join、leftOuterJoin、cogroup)】

目录

01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P022【022.尚硅谷_SparkCore - 分布式计算模拟 - 搭建基础的架子】12:48

P023【023.尚硅谷_SparkCore - 分布式计算模拟 - 客户端向服务器发送计算任务】10:50

P024【024.尚硅谷_SparkCore - 分布式计算模拟 - 数据结构和分布式计算】11:39

P025【025.尚硅谷_SparkCore - 核心编程 - RDD - 概念介绍】05:31

P026【026.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 1】10:11

P027【027.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 2】08:49

P028【028.尚硅谷_SparkCore - 核心编程 - RDD - RDD和IO之间的关系】12:24

P029【029.尚硅谷_SparkCore - 核心编程 - RDD - 特点】13:34

P030【030.尚硅谷_SparkCore - 核心编程 - RDD - 五大主要配置】11:19

P031【031.尚硅谷_SparkCore - 核心编程 - RDD - 执行原理】03:05

P032【032.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 内存】11:02

P033【033.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件】06:28

P034【034.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件1】04:42

P035【035.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区的设定】11:41

P036【036.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区数据的分配】13:54

P037【037.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区的设定】11:33

P038【038.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配】08:21

P039【039.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配 - 案例分析】06:13

P040【040.尚硅谷_SparkCore - 核心编程 - RDD - 算子介绍】07:49

P041【041.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map】07:46

P042【042.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 小功能】05:12

P043【043.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 并行计算效果演示】08:54

P044【044.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions】06:12

P045【045.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions - 小练习】03:49

P046【046.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions & map的区别 - 完成比完美更重要】02:21

P047【047.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitionsWithIndex】06:30

P048【048.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap】05:07

P049【049.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap - 小练习】02:41

P050【050.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - glom】06:33

P051【051.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 理解分区不变的含义】06:48

P052【052.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy】05:25

P053【053.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - shuffle来袭】06:01

P054【054.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - 小练习】07:51

P055【055.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - filter - 数据倾斜】07:11

P056【056.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sample - 抽奖喽】16:11

P057【057.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - distinct】06:13

P058【058.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - coalesce】11:11

P059【059.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - repartition】07:28

P060【060.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sortBy】06:31

P061【061.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链】08:19

P062【062.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链 - 注意事项】08:10


01_尚硅谷大数据技术之SparkCore

第05章-Spark核心编程

P022【022.尚硅谷_SparkCore - 分布式计算模拟 - 搭建基础的架子】12:48

第5章 Spark核心编程

Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于处理不同的应用场景。三大数据结构分别是:

➢ RDD : 弹性分布式数据集

➢ 累加器:分布式共享只写变量

➢ 广播变量:分布式共享只读变量

5.1 RDD

5.1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

 

package com.atguigu.bigdata.spark.core.test

import java.io.OutputStream
import java.net.Socket

object Driver {
  def main(args: Array[String]): Unit = {
    //连接服务器
    val client = new Socket("localhost", 9999)
    val out: OutputStream = client.getOutputStream
    out.write(2) //发送数据
    out.flush()
    out.close()

    client.close()
  }
}
package com.atguigu.bigdata.spark.core.test

import java.io.InputStream
import java.net.{ServerSocket, Socket}

object Executor {
  def main(args: Array[String]): Unit = {
    //启动服务器,接收数据
    val server = new ServerSocket(9999)
    println("服务器启动,等待接收数据...")

    //等待客户端的连接
    val client: Socket = server.accept()
    val in: InputStream = client.getInputStream

    val i: Int = in.read()
    println("接收到客户端发送的数据:" + i)
    in.close()
    client.close()
    server.close()
  }
}

P023【023.尚硅谷_SparkCore - 分布式计算模拟 - 客户端向服务器发送计算任务】10:50

 

package com.atguigu.bigdata.spark.core.test

class Task extends Serializable { //最基本的计算任务
  val datas = List(1, 2, 3, 4)

  //val logic = (num: Int) => { num * 2 }
  val logic: (Int) => Int = _ * 2

  //计算
  def compute() = {
    datas.map(logic)
  }
}
package com.atguigu.bigdata.spark.core.test

import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket

object Driver {
  def main(args: Array[String]): Unit = {
    //连接服务器
    val client = new Socket("localhost", 9999)

    val out: OutputStream = client.getOutputStream
    val objOut = new ObjectOutputStream(out)

    val task = new Task()
    objOut.writeObject(task)
    objOut.flush()
    objOut.close()
    client.close()
    println("客户端数据发送完毕。")
  }
}
package com.atguigu.bigdata.spark.core.test

import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}

object Executor {
  def main(args: Array[String]): Unit = {
    //启动服务器,接收数据
    val server = new ServerSocket(9999)
    println("服务器启动,等待接收数据...")

    //等待客户端的连接
    val client: Socket = server.accept()
    val in: InputStream = client.getInputStream

    val objIn = new ObjectInputStream(in)
    val task: Task = objIn.readObject().asInstanceOf[Task]
    val ints: List[Int] = task.compute()

    println("节点计算任务的节点为:" + ints)//计算节点计算的结果为
    objIn.close()
    client.close()
    server.close()
  }
}

P024【024.尚硅谷_SparkCore - 分布式计算模拟 - 数据结构和分布式计算】11:39

Task与SubTask需要具有相同的逻辑。

   

 

package com.atguigu.bigdata.spark.core.test

class Task extends Serializable { //最基本的计算任务
  val datas = List(1, 2, 3, 4)

  //val logic = (num: Int) => { num * 2 }
  val logic: (Int) => Int = _ * 2

  //计算
  def compute() = {
    datas.map(logic)
  }
}
package com.atguigu.bigdata.spark.core.test

class SubTask extends Serializable {
  var datas: List[Int] = _
  var logic: (Int) => Int = _

  //计算
  def compute() = {
    datas.map(logic)
  }
}
package com.atguigu.bigdata.spark.core.test

import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket

object Driver {
  def main(args: Array[String]): Unit = {
    //连接服务器
    val client1 = new Socket("localhost", 9999)
    val client2 = new Socket("localhost", 8888)

    val task = new Task()

    val out1: OutputStream = client1.getOutputStream
    val objOut1 = new ObjectOutputStream(out1)

    val subTask = new SubTask()
    subTask.logic = task.logic
    subTask.datas = task.datas.take(2)

    objOut1.writeObject(subTask)
    objOut1.flush()
    objOut1.close()
    client1.close()

    val out2: OutputStream = client2.getOutputStream
    val objOut2 = new ObjectOutputStream(out2)

    val subTask1 = new SubTask()
    subTask1.logic = task.logic
    subTask1.datas = task.datas.takeRight(2)
    objOut2.writeObject(subTask1)
    objOut2.flush()
    objOut2.close()
    client2.close()
    println("客户端数据发送完毕...")
  }
}
package com.atguigu.bigdata.spark.core.test

import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}

object Executor {
  def main(args: Array[String]): Unit = {
    //启动服务器,接收数据
    val server = new ServerSocket(9999)
    println("服务器启动,等待接收数据...")

    //等待客户端的连接
    val client: Socket = server.accept()
    val in: InputStream = client.getInputStream
    val objIn = new ObjectInputStream(in)
    val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
    val ints: List[Int] = task.compute()
    println("计算节点[9999]计算的结果为:" + ints)
    objIn.close()
    client.close()
    server.close()
  }
}
package com.atguigu.bigdata.spark.core.test

import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}

object Executor2 {
  def main(args: Array[String]): Unit = {
    //启动服务器,接收数据
    val server = new ServerSocket(8888)
    println("服务器启动,等待接收数据...")

    //等待客户端的连接
    val client: Socket = server.accept()
    val in: InputStream = client.getInputStream
    val objIn = new ObjectInputStream(in)
    val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
    val ints: List[Int] = task.compute()
    println("计算节点[8888]计算的结果为:" + ints)
    objIn.close()
    client.close()
    server.close()
  }
}

P025【025.尚硅谷_SparkCore - 核心编程 - RDD - 概念介绍】05:31

5.1 RDD

5.1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

画图工具:Balsamiq Mockups 3

P026【026.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 1】10:11

 

P027【027.尚硅谷_SparkCore - 核心编程 - RDD - IO基本实现原理 - 2】08:49

P028【028.尚硅谷_SparkCore - 核心编程 - RDD - RDD和IO之间的关系】12:24

 

P029【029.尚硅谷_SparkCore - 核心编程 - RDD - 特点】13:34

5.1.1 什么是RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

  • ➢ 弹性
    • ⚫ 存储的弹性:内存与磁盘的自动切换;
    • ⚫ 容错的弹性:数据丢失可以自动恢复;
    • ⚫ 计算的弹性:计算出错重试机制;
    • ⚫ 分片的弹性:可根据需要重新分片。
  • ➢ 分布式:数据存储在大数据集群不同节点上
  • ➢ 数据集:RDD封装了计算逻辑,并不保存数据
  • ➢ 数据抽象:RDD是一个抽象类,需要子类具体实现
  • ➢ 不可变:RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑
  • ➢ 可分区、并行计算

P030【030.尚硅谷_SparkCore - 核心编程 - RDD - 五大主要配置】11:19

5.1.2 核心属性

RDD数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。

P031【031.尚硅谷_SparkCore - 核心编程 - RDD - 执行原理】03:05

5.1.3 执行原理

从计算的角度来讲,数据处理过程中需要计算资源(内存 & CPU)计算模型(逻辑)。执行时,需要将计算资源和计算模型进行协调和整合。

Spark框架在执行时,先申请资源,然后将应用程序的数据处理逻辑分解成一个一个的计算任务;然后将任务发到已经分配资源的计算节点上,按照指定的计算模型进行数据计算;最后得到计算结果。

P032【032.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 内存】11:02

5.1.4 基础编程

5.1.4.1 RDD创建

在Spark中创建RDD的创建方式可以分为四种:

  • 1) 从集合(内存)中创建 RDD
  • 2) 从外部存储(文件)创建 RDD
  • 3) 从其他 RDD 创建
  • 4) 直接创建 RDD(new)

ctrl+p:快捷键,提示函数参数列表。

 

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Memory {
  def main(args: Array[String]): Unit = {
    //TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //*表示当前系统的最大可用核数
    val sc = new SparkContext(sparkConf)

    //TODO 创建RDD
    //从内存中创建RDD,将内存中集合的数据作为处理的数据源
    val seq = Seq[Int](1, 2, 3, 4)

    //parallelize:并行
    //val rdd: RDD[Int] = sc.parallelize(seq)

    //makeRDD方法在底层实现时其实就是调用了rdd对象的parallelize方法。
    val rdd: RDD[Int] = sc.makeRDD(seq)

    rdd.collect().foreach(println)

    //TODO 关闭环境
    sc.stop()
  }
}

P033【033.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件】06:28

 

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_File {
  def main(args: Array[String]): Unit = {
    //TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    //TODO 创建RDD
    //从文件中创建RDD,将文件中的数据作为处理的数据源
    //path路径默认以当前环境的根路径为基准,可以写绝对路径,也可以写相对路径
    //sc.textFile("D:\\allCode\\JetBrains\\IdeaProjects\\atguigu-classes\\datas\\1.txt")
    //val rdd: RDD[String] = sc.textFile("datas/1.txt")

    //path路径可以是文件的具体路径,也可以目录名称
    //val rdd = sc.textFile("datas")

    //path路径还可以使用通配符 *
    val rdd = sc.textFile("datas/1*.txt")

    //path还可以是分布式存储系统路径:HDFS
    //val rdd = sc.textFile("hdfs://node1:8020/test.txt")

    rdd.collect().foreach(println)

    //TODO 关闭环境
    sc.stop()
  }
}

P034【034.尚硅谷_SparkCore - 核心编程 - RDD - 创建 - 文件1】04:42

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_File1 {
  def main(args: Array[String]): Unit = {
    //TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    //TODO 创建RDD
    //从文件中创建RDD,将文件中的数据作为处理的数据源

    //textFile:以行为单位来读取数据,读取的数据都是字符串
    //wholeTextFiles:以文件为单位读取数据
    //读取的结果表示为元组,第一个元素表示文件路径,第二个元素表示文件内容
    val rdd = sc.wholeTextFiles("datas")

    rdd.collect().foreach(println)

    //TODO 关闭环境
    sc.stop()
  }
}

P035【035.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区的设定】11:41

5.1.4.2 RDD 并行度与分区

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Memory_Par {
  def main(args: Array[String]): Unit = {
    //TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    sparkConf.set("spark.default.parallelism", "5")//5个分区
    val sc = new SparkContext(sparkConf)

    //TODO 创建RDD
    //RDD的并行度 & 分区
    //makeRDD方法可以传递第二个参数,这个参数表示分区的数量
    //第二个参数可以不传递的,那么makeRDD方法会使用默认值:defaultParallelism(默认并行度)
    //  scheduler.conf.getInt("spark.default.parallelism", totalCores)
    //  spark在默认情况下,从配置对象中获取配置参数:spark.default.parallelism
    //  如果获取不到,那么使用totalCores属性,这个属性取值为当前运行环境的最大可用核数

    //val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    //将处理的数据保存成分区文件
    rdd.saveAsTextFile("output")

    //TODO 关闭环境
    sc.stop()
  }
}

P036【036.尚硅谷_SparkCore - 核心编程 - RDD - 集合数据源 - 分区数据的分配】13:54

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Memory_Par1 {
  def main(args: Array[String]): Unit = {
    //TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    //TODO 创建RDD

    //【1,2】,【3,4】
    //val rdd = sc.makeRDD(List(1,2,3,4), 2)

    //【1】,【2】,【3,4】
    //val rdd = sc.makeRDD(List(1,2,3,4), 3)

    //【1】,【2,3】,【4,5】
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5), 3)

    //将处理的数据保存成分区文件
    rdd.saveAsTextFile("output")

    //TODO 关闭环境
    sc.stop()
  }
}

P037【037.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区的设定】11:33

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_File_Par {
  def main(args: Array[String]): Unit = {
    // TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    // TODO 创建RDD
    // textFile可以将文件作为数据处理的数据源,默认也可以设定分区。
    //     minPartitions : 最小分区数量
    //     math.min(defaultParallelism, 2)

    //val rdd = sc.textFile("datas/1.txt")
    //如果不想使用默认的分区数量,可以通过第二个参数指定分区数
    //Spark读取文件,底层其实使用的就是Hadoop的读取方式
    //分区数量的计算方式:
    //    totalSize = 7
    //    goalSize =  7 / 2 = 3(byte)

    //7 / 3 = 2...1 (1.1) + 1 = 3(分区)

    val rdd = sc.textFile("datas/1.txt", 2)

    rdd.saveAsTextFile("output")

    // TODO 关闭环境
    sc.stop()
  }
}

P038【038.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配】08:21

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_File_Par1 {
  def main(args: Array[String]): Unit = {
    //TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    //TODO 创建RDD
    //TODO 数据分区的分配
    //1. 数据以行为单位进行读取
    //    spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,和字节数没有关系
    //2. 数据读取时以偏移量为单位,偏移量不会被重复读取
    /*
       偏移量
       1@@  => 012
       2@@  => 345
       3    => 6
     */
    //3. 数据分区的偏移量范围的计算
    // 0 => [0, 3]  => 12
    // 1 => [3, 6]  => 3
    // 2 => [6, 7]  =>

    //【1,2】,【3】,【】
    val rdd = sc.textFile("datas/1.txt", 2)

    rdd.saveAsTextFile("output")

    //TODO 关闭环境
    sc.stop()
  }
}

P039【039.尚硅谷_SparkCore - 核心编程 - RDD - 文件数据源 - 分区数据的分配 - 案例分析】06:13

package com.atguigu.bigdata.spark.core.rdd.builder

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_File_Par2 {
  def main(args: Array[String]): Unit = {
    // TODO 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)

    // TODO 创建RDD

    // 14byte / 2 = 7byte
    // 14 / 7 = 2(分区)

    /*
    1234567@@  => 012345678
    89@@       => 9101112
    0          => 13

    [0, 7]   => 1234567
    [7, 14]  => 890
    */

    // 如果数据源为多个文件,那么计算分区时以文件为单位进行分区
    val rdd = sc.textFile("datas/word.txt", 2)

    rdd.saveAsTextFile("output003")

    // TODO 关闭环境
    sc.stop()
  }
}

P040【040.尚硅谷_SparkCore - 核心编程 - RDD - 算子介绍】07:49

 

P041【041.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map】07:46

5.1.4.3 RDD转换算子

1) map

RDD根据数据处理方式的不同算子整体上分为Value类型、双Value类型Key-Value类型

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - map

    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    // 1,2,3,4
    // 2,4,6,8

    //转换函数
    def mapFunction(num: Int): Int = {
      num * 2
    }

    //val mapRDD: RDD[Int] = rdd.map(mapFunction)
    //val mapRDD: RDD[Int] = rdd.map((num: Int) => { num * 2 })
    //val mapRDD: RDD[Int] = rdd.map((num: Int) => num * 2)
    //val mapRDD: RDD[Int] = rdd.map((num) => num * 2)
    //val mapRDD: RDD[Int] = rdd.map(num => num * 2)
    val mapRDD: RDD[Int] = rdd.map(_ * 2)

    mapRDD.collect().foreach(println)

    sc.stop()
  }
}

P042【042.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 小功能】05:12

小功能:从服务器日志数据apache.log中获取用户请求URL资源路径。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - map
    val rdd = sc.textFile("datas/apache.log")

    // 长的字符串
    // 短的字符串
    val mapRDD: RDD[String] = rdd.map(
      line => {
        val datas = line.split(" ")
        datas(6)
      }
    )
    mapRDD.collect().foreach(println)

    sc.stop()
  }
}

P043【043.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - map - 并行计算效果演示】08:54

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform_Par {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - map

    // 1. rdd的计算一个分区内的数据是一个一个地执行逻辑
    //    只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。
    //    分区内数据的执行是有序的。
    // 2. 不同分区数据计算是无序的。
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)//2个分区

    val mapRDD = rdd.map(
      num => {
        println(">>>>>>>> " + num)
        num
      }
    )
    val mapRDD1 = mapRDD.map(
      num => {
        println("######" + num)
        num
      }
    )

    mapRDD1.collect()

    sc.stop()
  }
}

P044【044.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions】06:12

2) mapPartitions

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - mapPartitions
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    // mapPartitions:可以以分区为单位进行数据转换操作
    //   但是会将整个分区的数据加载到内存进行引用
    //   如果处理完的数据是不会被释放掉,存在对象的引用。
    //   在内存较小,数据量较大的场合下,容易出现内存溢出。
    val mpRDD: RDD[Int] = rdd.mapPartitions(
      iter => {
        println(">>>>>>>>>>")
        iter.map(_ * 2)
      }
    )
    mpRDD.collect().foreach(println)

    sc.stop()
  }
}

P045【045.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions - 小练习】03:49

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_Transform_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - mapPartitions
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)

    // 【1,2】,【3,4】
    // 【2】,【4】
    val mpRDD = rdd.mapPartitions(
      iter => {
        List(iter.max).iterator
      }
    )
    mpRDD.collect().foreach(println)

    sc.stop()
  }
}

P046【046.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitions & map的区别 - 完成比完美更重要】02:21

思考一个问题:mapmapPartitions的区别

  • 数据处理角度
    • Map 算子是分区内一个数据一个数据的执行,类似于串行操作。
    • 而 mapPartitions 算子是以分区为单位进行批处理操作。
  • 功能的角度
    • Map 算子主要目的将数据源中的数据进行转换和改变,但是不会减少或增多数据。
    • MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据。
  • 性能的角度
    • Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处理,所以性能较高。
    • 但是mapPartitions 算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。

完成比完美更重要。

P047【047.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - mapPartitionsWithIndex】06:30

3) mapPartitionsWithIndex

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.{SparkConf, SparkContext}

object Spark03_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - mapPartitions
    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    val mpiRDD = rdd.mapPartitionsWithIndex(
      (index, iter) => {
        // 1,   2,    3,   4
        //(0,1)(2,2),(4,3),(6,4)
        iter.map(
          num => {
            (index, num)
          }
        )
      }
    )

    mpiRDD.collect().foreach(println)

    sc.stop()
  }
}

P048【048.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap】05:07

4) flatMap

 

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - flatMap
    val rdd: RDD[List[Int]] = sc.makeRDD(List(
      List(1, 2), List(3, 4)
    ))
    val flatRDD: RDD[Int] = rdd.flatMap(
      list => {
        list
      }
    )
    flatRDD.collect().foreach(println)

    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - flatMap
    val rdd: RDD[String] = sc.makeRDD(List(
      "Hello Scala", "Hello Spark"
    ))

    val flatRDD: RDD[String] = rdd.flatMap(
      s => {
        s.split(" ")
      }
    )
    flatRDD.collect().foreach(println)

    sc.stop()
  }
}

P049【049.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - flatMap - 小练习】02:41

小功能:将 List(List(1,2),3,List(4,5))进行扁平化操作。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark04_RDD_Operator_Transform2 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - flatMap
    val rdd = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))

    val flatRDD = rdd.flatMap(
      data => {
        data match {//模式匹配
          case list: List[_] => list
          case dat => List(dat)
        }
      }
    )

    flatRDD.collect().foreach(println)

    sc.stop()
  }
}

P050【050.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - glom】06:33

5) glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark05_RDD_Operator_Transform_Test {
    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
        val sc = new SparkContext(sparkConf)

        // TODO 算子 - glom
        val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4), 2)

        // 【1,2】,【3,4】
        // 【2】,【4】
        // 【6】
        val glomRDD: RDD[Array[Int]] = rdd.glom()

        val maxRDD: RDD[Int] = glomRDD.map(
            array => {
                array.max
            }
        )
        println(maxRDD.collect().sum)

        sc.stop()
    }
}

P051【051.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 理解分区不变的含义】06:48

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Transform_Part {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - map
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
    // 【1,2】,【3,4】
    rdd.saveAsTextFile("output1")
    val mapRDD = rdd.map(_ * 2)
    // 【2,4】,【6,8】
    mapRDD.saveAsTextFile("output2")

    sc.stop()
  }
}

P052【052.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy】05:25

6) groupBy

小功能:将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - groupBy
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

    // groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
    // 相同的key值的数据会放置在一个组中
    def groupFunction(num: Int) = {
      num % 2
    }

    val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)

    groupRDD.collect().foreach(println)
    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - groupBy
    val rdd = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)

    // 分组和分区没有必然的关系
    val groupRDD = rdd.groupBy(_.charAt(0))

    groupRDD.collect().foreach(println)
    sc.stop()
  }
}

P053【053.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - shuffle来袭】06:01

 

P054【054.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - groupBy - 小练习】07:51

6) groupBy

小功能:从服务器日志数据apache.log中获取每个时间段访问量。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark06_RDD_Operator_Transform_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - groupBy
    val rdd = sc.textFile("datas/apache.log")

    val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
      line => {
        val datas = line.split(" ")
        val time = datas(3)
        //time.substring(0, )
        val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
        val date: Date = sdf.parse(time)
        val sdf1 = new SimpleDateFormat("HH")
        val hour: String = sdf1.format(date)
        (hour, 1)
      }
    ).groupBy(_._1)

    timeRDD.map {
      case (hour, iter) => {
        (hour, iter.size)
      }
    }.collect.foreach(println)

    sc.stop()
  }
}

P055【055.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - filter - 数据倾斜】07:11

7) filter

小功能:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径。

 

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - filter
    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    val filterRDD: RDD[Int] = rdd.filter(num => num % 2 != 0)

    filterRDD.collect().foreach(println)

    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark07_RDD_Operator_Transform_Test {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - filter
    val rdd = sc.textFile("datas/apache.log")

    rdd.filter(
      line => {
        val datas = line.split(" ")
        val time = datas(3)
        time.startsWith("17/05/2015")
      }
    ).collect().foreach(println)

    sc.stop()
  }
}

P056【056.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sample - 抽奖喽】16:11

8) sample

思考一个问题:有啥用,抽奖吗?使用场景:数据倾斜,分区:均衡、shuffle。

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.{SparkConf, SparkContext}

object Spark08_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - sample
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

    // sample算子需要传递三个参数
    // 1. 第一个参数表示,抽取数据后是否将数据返回,true(放回)、false(丢弃)
    // 2. 第二个参数表示,
    //         如果抽取不放回的场合:数据源中每条数据被抽取的概率,基准值的概念
    //         如果抽取放回的场合:表示数据源中的每条数据被抽取的可能次数
    // 3. 第三个参数表示,抽取数据时随机算法的种子
    //         如果不传递第三个参数,那么使用的是当前系统时间

    //  println(rdd.sample(
    //    false,
    //    0.4,
    //    1
    //  ).collect().mkString(",")
    //  )

    println(rdd.sample(
      true,
      2,
      1
    ).collect().mkString(","))

    sc.stop()
  }
}

P057【057.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - distinct】06:13

9) distinct

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark09_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - distinct
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))

    // map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)

    // (1, null),(2, null),(3, null),(4, null),(1, null),(2, null),(3, null),(4, null)
    // (1, null)(1, null)(1, null)
    // (null, null) => null
    // (1, null) => 1
    val rdd1: RDD[Int] = rdd.distinct()

    rdd1.collect().foreach(println)

    sc.stop()
  }
}

P058【058.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - coalesce】11:11

10) coalesce

 

 

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark10_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - coalesce
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)

    // coalesce方法默认情况下不会将分区的数据打乱重新组合
    // 这种情况下的缩减分区可能会导致数据不均衡,出现数据倾斜
    // 如果想要让数据均衡,可以进行shuffle处理
    // val newRDD: RDD[Int] = rdd.coalesce(2)
    val newRDD: RDD[Int] = rdd.coalesce(2, true)

    newRDD.saveAsTextFile("output004")

    sc.stop()
  }
}

P059【059.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - repartition】07:28

11) repartition

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark11_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - repartition
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)

    // coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
    // 所以如果想要实现扩大分区的效果,需要使用shuffle操作
    // spark提供了一个简化的操作
    // 缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
    // 扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle

    // val newRDD: RDD[Int] = rdd.coalesce(3, true)
    val newRDD: RDD[Int] = rdd.repartition(3)

    newRDD.saveAsTextFile("output005")

    sc.stop()
  }
}

P060【060.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - sortBy】06:31

12) sortBy

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark12_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - sortBy
    val rdd = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)), 2)

    // sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式
    // sortBy默认情况下,不会改变分区,但是中间存在shuffle操作。
    val newRDD = rdd.sortBy(t => t._1.toInt, false)

    newRDD.collect().foreach(println)

    sc.stop()
  }
}

P061【061.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链】08:19

13) intersection

14) union

15) subtract

16) zip

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - 双Value类型

    // 交集,并集和差集要求两个数据源数据类型保持一致
    // 拉链操作两个数据源的类型可以不一致

    val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
    val rdd2 = sc.makeRDD(List(3, 4, 5, 6))

    // 交集 : 【3,4】
    val rdd3: RDD[Int] = rdd1.intersection(rdd2)
    println(rdd3.collect().mkString(","))

    // 并集 : 【1,2,3,4,3,4,5,6】
    val rdd4: RDD[Int] = rdd1.union(rdd2)
    println(rdd4.collect().mkString(","))

    // 差集 : 【1,2】
    val rdd5: RDD[Int] = rdd1.subtract(rdd2)
    println(rdd5.collect().mkString(","))

    // 拉链 : 【1-3,2-4,3-5,4-6】
    val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))

    sc.stop()
  }
}

P062【062.尚硅谷_SparkCore - 核心编程 - RDD - 转换算子 - 交集&并集&差集&拉链 - 注意事项】08:10

13) intersection

14) union

15) subtract

16) zip

package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Operator_Transform {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - 双Value类型

    // 交集,并集和差集要求两个数据源数据类型保持一致
    // 拉链操作两个数据源的类型可以不一致

    val rdd1 = sc.makeRDD(List(1, 2, 3, 4))
    val rdd2 = sc.makeRDD(List(3, 4, 5, 6))
    val rdd7 = sc.makeRDD(List("3", "4", "5", "6"))

    // 交集 : 【3,4】
    val rdd3: RDD[Int] = rdd1.intersection(rdd2)
    //val rdd8 = rdd1.intersection(rdd7)
    println(rdd3.collect().mkString(","))

    // 并集 : 【1,2,3,4,3,4,5,6】
    val rdd4: RDD[Int] = rdd1.union(rdd2)
    println(rdd4.collect().mkString(","))

    // 差集 : 【1,2】
    val rdd5: RDD[Int] = rdd1.subtract(rdd2)
    println(rdd5.collect().mkString(","))

    // 拉链 : 【1-3,2-4,3-5,4-6】
    val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
    val rdd8 = rdd1.zip(rdd7)
    println(rdd6.collect().mkString(","))

    sc.stop()
  }
}
package com.atguigu.bigdata.spark.core.rdd.operator.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark13_RDD_Operator_Transform1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    // TODO 算子 - 双Value类型

    // Can't zip RDDs with unequal numbers of partitions: List(2, 4)
    // 两个数据源要求分区数量要保持一致
    // Can only zip RDDs with same number of elements in each partition
    // 两个数据源要求分区中数据数量保持一致
    val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    val rdd2 = sc.makeRDD(List(3, 4, 5, 6), 2)

    val rdd6: RDD[(Int, Int)] = rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))

    sc.stop()
  }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

尚硅谷大数据技术Spark教程-笔记02【SparkCore(核心编程,map、mapPartitions、mapPartitionsWithIndex、flatMap、glom、groupBy)】 的相关文章

  • myCobot pro 机械臂(5)Robotics Toolbox for MATLA(开发环境:matlab)

    感谢 机器人工程师进阶之路 xff1a matlab robotics toolbox安装方法 目录 一 matlab robotics toolbox安装方法 二 改进DH法对myCobot进行仿真 一 matlab robotics t
  • myCobot pro 机械臂(6)逆向运动学

    机械臂逆运动学求解常用的方法有几何法 解析法 数值法 从求解的方式和计算的效率上来看 xff0c 几何法和解析法会考虑机械臂结构不同而造成的差异 xff0c 因此对于不同结构的机械臂会有特定的求解方式 通常来说 xff0c 这两种方法具有速
  • STM32开发基础知识入门

    C语言基础 位操作 对基本类型变量可以在位级别进行操作 1 不改变其他位的值的状况下 xff0c 对某几个位进行设值 先对需要设置的位用 amp 操作符进行清零操作 xff0c 然后用 操作符设值 2 移位操作提高代码的可读性 3 取反操作
  • 蜂鸣器实验

    蜂鸣器介绍 这里采用的是有源蜂鸣器 xff0c 有源蜂鸣器自带了振荡电路 xff0c 一通电就会发声 xff1b 无源蜂鸣器则没有自带震荡电路 xff0c 必须外部提供 2 5Khz 左右的方波驱动 xff0c 才能发声 STM32的单个
  • CodeBlocks 20.03版的若干已知问题及其解决方法

    CodeBlocks 20 03在Win10上运行时会出现一些问题 xff0c 我通过搜索网络找到了解决办法 xff0c 下面分享给大家 一 改变编辑器的字体后引发 wxWidgets debug alert A debugging che
  • sumo的简单使用

    sumo简单教程 安装配置环境变量文件配置运行python文件生成真实路网python调用 如果你也对交通仿真感兴趣 xff0c 且是一个小白 xff0c 但是有python基础 xff0c 我想我可以帮到你 安装 首先安装为我们入门的第一
  • Kube-Prometheus Stack监控mysql

    准备环境 mysql 环境可以搭建2个或者一个来进行监控 我这个是搭建了2个一个使用容器启动 xff0c 一个二进制安装如图所示 配置mysql exporter进行采集数据 apiVersion apps v1 span class to
  • Promise

    一 定义 xff1a 所谓Promise xff0c 简单说就是一个容器 xff0c 里面保存着某个未来才会结束的事件 xff08 通常是一个异步操作 xff09 的结果 从语法上说 xff0c Promise 是一个对象 xff0c 从它
  • 全国大学生智能汽车竞赛硬件篇(二)—电磁信号采集部分

    对于智能车硬件的整体框架主要由5部分组成 xff1a 电磁信号采集与处理部分 电机驱动部分 电源管理部分 主控部分 其他部分 xff08 停车模块等 xff09 1 电磁信号采集部分 这一部分对于电磁组别至关重要 xff0c 对与摄像头组别
  • 嵌入式笔试(1)—海康威视试题

    单选 xff08 15题 xff09 第一题 栈简介 栈由操作系统 xff08 编译器 xff09 自动分配释放 xff0c 用于存放函数的参数值 局部变量等 xff0c 其操作方式类似于数据结构中的栈 堆简介 堆由开发人员分配和释放 xf
  • 常用通信协议——IIC详解(全网最全)

    一 IIC 简介 I2C xff08 Inter Integrated Circuit xff09 是内部整合电路的称呼 xff0c 是一种串行通讯总线 xff0c 使用多主从架构 xff0c 由飞利浦公司在1980年为了让主板 嵌入式系统
  • 常用通信协议——IIC协议编程实现

    一 IIC连接实物示意图 二 IIC协议程序编写的要点 xff1a 1 空闲状态 2 开始信号 3 停止信号 4 应答信号 5 数据的有效位 6 数据传输 三 IIC驱动编写 1 硬件准备 此处使用正点原子Mini板STM32F103 xf
  • Linux驱动编程篇(四)——LED驱动(二)LED驱动框架

    为应对多种芯片或开发板适配的LED驱动程序 xff0c 同时减少开发流程 xff0c 故需要在APP 驱动程序 硬件三个部分中添加一个部分 xff0c 用于放置各单板LED驱动程序的公共部分 一 LED驱动程序框架的流程图 二 对于公共部分
  • Linux驱动编程篇(五)——驱动设计的思想(面向对象)(分层)(分离)

    Liunx驱动 61 驱动框架 43 硬件操作 61 61 61 Liunx驱动 61 驱动框架 43 单片机 对于驱动的的框架 xff0c 大体有三种设计思想 xff1a 1 面向对象 xff1b 2 分层 xff1b 3 分离 xff0
  • 嵌入式Android底层开发(一)——安卓开发的整体框架与简述

    一 Android简介 Android是一种基于Linux内核 xff08 不包含GUN组件 xff09 的自由及开放源代码的操作系统 xff0c 主要使用于移动设备 xff0c 如智能手机和平板电脑 xff0c 由美国Google公司和开
  • 旋转矩阵

    关注下方公众号 xff0c 分享硬核知识 作者 小K 出品 公众号 xff1a 小K算法 xff08 ID xff1a xiaok365 xff09 01 故事起源 有这样的一种矩阵 xff0c 从左上角开始 xff0c 顺时针从外向里旋转
  • 【转】 linux port scan

    https www binarytides com tcp syn portscan in c with linux sockets Port Scanning searches for open ports on a remote sys
  • 嵌入式Android音频系统(一)Android音频系统专栏开篇

    对于安卓音频系统 xff0c 在前面我们已经讲了大致框架 xff0c 嵌入式Android底层开发 xff08 六 xff09 Android音频系统 总体框架 本专栏将对Android音频系统进行更细致的阐述 xff0c 同时这些也是我的
  • 嵌入式Android音频系统(二)Android音频系统的专用名词解释

    对于Android音频系统学习时可能会出现许多名词 xff0c 这些词光看字面意思可能无法直接理解 xff0c 为了更好的对Android系统进行学习 xff0c 特开一章对Android音频系统会出现的名词进行解释 xff0c 欢迎大家补
  • 嵌入式Android音频系统(三)Android音频系统涉及的文件

    本专栏将对Android音频系统进行更细致的阐述 xff0c 同时这些也是我的学习笔记 xff0c 如有错误欢迎大家在评论区批评指正 xff0c 谢谢大家 本专栏的学习资料来源 xff1a 本专栏的学习资料主要来自韦东山老师的Android

随机推荐