Spark-scala-API

2023-05-16

1、sc.version
2、集群对象:SparkContext;获得Spark集群的SparkContext对象,是构造Spark应用的第一步!
SparkContext对象代表 整个 Spark集群,是Spark框架 功能的入口 ,可以用来在集群中创建RDD、累加器变量和广播变量。
SparkContext对象创建时可以指明连接到哪个集群管理器上,在Spark-Shell启动时,默认 连接到本地的集群管理器。
使用SparkContext对象(在Shell里,就是sc变量)的master方法,可以查看当前连接的集群管理器:sc.master
3、分布数据集:RDD;使用SparkContext对象创建RDD数据集,然后,才能干点有意义的事情!
Spark的核心抽象是一个分布式数据集,被称为弹性分布数据集(RDD) ,代表一个不可变的、可分区、可被并行处理 的成员集合。
RDD对象需要利用SparkContext对象的方法创建,Spark支持从多种来源创建RDD对象,比如:从本地文本文件创建、从Hadoop 的HDFS文件创建、或者通过对其他RDD进行变换获得新的RDD。
下面的示例使用本地Spark目录下的README.md文件创建一个新的RDD:
    scala> val textFile = sc.textFile("README.md")
    textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
我们看到,执行的结果是,返回了一个Spark.RDD类型的变量textFile,RDD是一个模板类,方括号里的String代表 这个RDD对象成员的类型。由于是一个对象,因此值用地址表示:spark.MappedRDD@2ee9b7e3 。
SparkContext对象的textFile方法创建的RDD中,一个成员对应原始文件的一行。我们看到在执行的结果中可以看到返回一个 RDD,成员类型为String,我们将这个对象保存在变量textFile中。
使用README.md文件,创建一个RDD,保存到变量 textFile中。
4、操作数据集:RDD可以执行两种操作:变换与动作
RDD的内部实现了分布计算的功能,我们在RDD上执行的操作,是透明地在整个集群上执行的。也就是说,当RDD建立 后,这个RDD就不属于本地了,它在整个集群中有效。当在RDD上执行一个操作,RDD内部需要和集群管理器进行沟通协商。
对一个RDD可以进行两种操作:动作(action)和变换(transformation)。动作总是从集群中取回数据,变换总是获得一个新的RDD,这是两种操作的字面上的差异。
事实上,当在RDD上执行一个变换时,RDD仅仅记录要做的变换,只有当RDD上需要执行一个动作时,RDD才 通过集群管理器启动实质分布计算。
这有点像拍电影,变换操作只是剧本,只有导演喊Action的时候,真正的电影才开始制作。
5、感受动作和变换的区别;RDD操作分为两种:动作和变换,只有动作才会触发计算!
下面的例子首先做一个映射变换,然后返回新纪录的条数。map是一个变换,负责将原RDD的每个记录变换到新的RDD,count是一个动作,负责获取这个RDD的记录总数。
先执行map,你应该看到很迅速干净地返回:
    scala> val rdd2=textFile.map(line=>line.length)
    rdd2: org.apache.spark.rdd.RDD[Int] = MappedRDD[52] ...
再执行count,这会有些不一样:
    scala> rdd2.count()
    ......
    res10: Long = 141
    .....
当执行map时,我们看到结果很快返回了。但当执行count时,我们可以看到一堆的提示信息,大概的意思就是 和调度器进行了若干沟通才把数据拉回来。
看起来确实这样,变换操作就只是写写剧本,Action才真正开始执行计算任务。
6、RDD动作:获取数据的控制权;RDD动作将数据集返回本地
对一个RDD执行动作指示集群将指定数据返回本地,返回的数据可能是一个具体的值、一个数组或一个HASH表。
让我们先执行几个动作:
    scala> textFile.count() // 这个动作返回RDD中的记录数
    res0: Long = 126
   
    scala> textFile.first() // 这个动作返回RDD中的第一个记录
count是一个动作,负责获取这个RDD的记录总数。first也是一个动作,负责返回RDD中的第一条记录。
在使用Spark时,最好在脑海中明确地区隔出两个区域:本地域和集群域。RDD属于集群域,那是Spark管辖的地带;RDD的动作结果属于本地域,这是我们的地盘。
只有当RDD的数据返回本地域,我们才能进行再加工,比如打印等等。
7、RDD变换:数据的滤镜;RDD变换总是返回RDD,这让我们可以把变换串起来!
RDD变换将产生一个新的RDD。下面的例子中,我们执行一个过滤(Filter)变换,将获得一个新的RDD,由原 RDD中符合过滤条件(即:包含单词Spark)的记录成员构成:
    scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
    linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
变量lineWithSpark现在是一个RDD,由变量textFile这个RDD中所有包含"Spakr"单词的行构成。
由于一个RDD变换总是返回一个新的RDD,因此我们可以将变换和动作使用链式语法串起来。下面的 例子使用了链式语法解决一个具体问题:在文件中有多少行包含单词“Spark”?
    scala> textFile.filter(line => line.contains("Spark")).count()
    res3: Long = 15
这等同于:
    scala> val rdd1 = textFile.filter(line => line.contains("Spark"))
    ...
    scala> rdd1.count()
    res12: Long = 15
用链式语法写起来更流畅一些,不过这只是一种口味的倾向而已。
8、RDD操作组合;RDD的变换有点像PS的滤镜,有时要用好几个滤镜,才能把脸修好。
RDD的诸多动作和变换,经过组合也可以实现复杂的计算,满足相当多现实的数据计算需求。
假设我们需要找出文件中单词数量最多的行,做个map/reduce就可以了:
    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
    res4: Long = 15
上面语句首先使用map变换,将每一行(成员)映射为一个整数值(单词数量),这获得了一个新的RDD。然后在 这个新的RDD上执行reduce动作,找到(返回)了单词数量最多的行。
9、count :计数
使用count成员函数获得RDD对象的成员总数,返回值为长整型
10、top :前N个记录
使用top成员函数获得RDD中的前N个记录,可以指定一个排序函数进行排序比较。 如果不指定排序函数,那么使用默认的Ascii码序进行记录排序。
返回值包含前N个记录的数组,记录类型为T。
11、take:无序采样
使用take成员函数获得指定数量的记录,返回一个数组。与top不同,take在提取记录 前不进行排序,它仅仅逐分区地提取够指定数量的记录就返回结果。可以将take方法 视为对RDD对象的无序采样。
返回值包含指定数量记录的数组,记录类型为T。
12、first : 取第一个记录;使用first成员函数获得RDD中的第一个记录。
使用RDD的first方法获得第一条记录。不过,没有last方法!
13、max : 取值最大的记录
使用max成员函数获得值最大的记录,可以指定一个排序函数进行排序比较。默认使用 Ascii码序进行排序。
14、min : 取值最小的记录
使用min成员函数获得值最小的记录,可以指定一个排序函数进行排序比较。默认使用 Ascii码序进行排序。
15、reduce : 规约RDD;使用RDD的reduce方法进行聚合!
使用reduce成员函数对RDD进行规约操作,必须指定一个函数指定规约行为。
语法
    def reduce(f: (T, T) => T): T
参数 f : 规约函数 , 两个参数分别代表RDD中的两个记录,返回值被RDD用来进行递归计算。
示例
下面的示例使用匿名函数,将所有的记录连接起来构成一个字符串:
    scala> textFile.reduce((a,b)=>a+b)
    res60:String = #Apache SparkSpake is a fast...
16、collect : 收集全部记录
使用collect成员函数获得RDD中的所有记录,返回一个数组。collect方法 可以视为对RDD对象的一个全采样。
17、map : 映射
映射变换使用一个映射函数对RDD中的每个记录进行变换,每个记录变换后的新值集合构成一个新的RDD。
语法
    def map[U](f: (T) => U)(implicit arg0: ClassTag[U]): RDD[U]
参数
    f : 映射函数 , 输入参数为原RDD中的一个记录,返回值构成新RDD中的一个记录。
    下面的示例将textFile的每个记录(字符串)变换为其长度值,获得一个新的RDD,然后取回第一个记录查看:
    scala> textFile.map(line=>line.length).first()
    res13:Int = 14
18、filter : 过滤
过滤变换使用一个筛选函数对RDD中的每个记录进行筛选,只有筛选函数返回真值的记录,才 被选中用来构造新的RDD。
语法
    def filter(f: (T) => Boolean): RDD[T]
参数
    f : 筛选函数,输入参数为原RDD中的一个元素,返回值为True或False 。
    下面的示例仅保留原RDD中字符数多于20个的记录(行),获得一个新的RDD,然后取回第一个 记录查看:
    scala> textFile.filter(line=>line.length>20).first()
    res20: String = Spark is a fast and generic .
19、sample : 采样;使用RDD的sample方法获得一个采样RDD!
采样变换根据给定的随机种子,从RDD中随机地按指定比例选一部分记录,创建新的RDD。采样变换 在机器学习中可用于进行交叉验证。
语法
    def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]
参数
    withReplacement : Boolean , True表示进行替换采样,False表示进行非替换采样
    fraction : Double, 在0~1之间的一个浮点值,表示要采样的记录在全体记录中的比例
    seed :随机种子
示例
下面的示例从原RDD中随机选择20%的记录,构造一个新的RDD,然后返回新RDD的记录数:
    scala> textFile.sample(true,0.2).count()
    res12: Long = 26

20、union : 合并;使用RDD的union方法,可以获得两个RDD的并集!
合并变换将两个RDD合并为一个新的RDD,重复的记录不会被剔除。
语法
    def union(other: RDD[T]): RDD[T]
参数
    other : 第二个RDD
示例
下面的示例,首先对textFile这个RDD进行一个每行反转的映射变换,获得一个新的RDD,再 将这个新的RDD和原来的RDD:textFile进行合并,最后我们使用count查看一下总记录数:
    scala> textFile.map(line=>line.reverse).union(textFile).count()
    res13: Long = 282    
可以看到,合并后的总记录数是原来的2倍。

21、intersection : 相交;使用RDD的intersection方法,可以获得两个RDD的交集!
相交变换仅取两个RDD共同的记录,构造一个新的RDD。
语法
    def intersection(other: RDD[T]): RDD[T]
参数
    other : 第二个RDD
示例
下面的示例将每个记录进行逆转后的RDD与原RDD相交,获得一个新的RDD,我们使用collect回收全部 数据以便显示:
    scala> textFile.map(line=>line.reverse).intersection(textFile).collect()
    res27: Array[String] =Array("   ","")

可以看到,只有空行被保留下来,因为空行的逆序保持不变。

22、distinct : 剔重;使用RDD的distinct方法,可以进行记录剔重!
剔重变换剔除RDD中的重复记录,返回一个新的RDD。
语法
    def distinct(): RDD[T]
示例
下面的示例将RDD中重复的行剔除,并返回新RDD中的记录数:
    sala> textFile.distinct().count()
    res20: Long =91



本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark-scala-API 的相关文章

  • 对 Scala Not Null 特征的库支持

    Notice 从 Scala 2 11 开始 NotNull已弃用 据我了解 如果您希望引用类型不可为空 则必须混合魔法NotNull特征 编译器会自动阻止你输入null 可以值在里面 看到这个邮件列表线程 http www nabble
  • 使用 scala 集合 - CanBuildFrom 麻烦

    我正在尝试编写一个接受任何类型集合的方法CC 并将其映射到一个新的集合 相同的集合类型但不同的元素类型 我正在挣扎 基本上我正在尝试实施map but 不在集合本身上 问题 我正在尝试实现一个带有签名的方法 它看起来有点像 def map
  • Java 中的“Lambdifying”scala 函数

    使用Java和Apache Spark 已用Scala重写 面对旧的API方法 org apache spark rdd JdbcRDD构造函数 其参数为 AbstractFunction1 abstract class AbstractF
  • 类型级编程有哪些示例? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我不明白 类型级编程 是什么意思 也无法使用Google找到合适的解释 有人可以提供一个演示类型级编程的示例吗 范式的解释和 或定义将
  • 有没有办法通过API调用访问私有数据集

    我正在使用 CKAN 2 8 运行 Mirth 3 6 1 作为新手 我遇到了一个问题 有没有办法通过 API 请求访问 CKAN 中私有数据集中的资源 我好像做不到 我有一个拥有公共数据集的组织 我可以通过 API 路由器通过 Mirth
  • SoftLayer_Account::getOperatingSystemReloadImages

    我想在 OSReload 期间使用 API 获取可用操作系统列表 我发现提到了 SoftLayer Account getOperatingSystemReloadImages 方法 但找不到该方法的用法 谁能帮我解决这个问题 谢谢 我找不
  • 玩:将表单字段绑定到双精度型?

    也许我只是忽略了一些明显的事情 但我无法弄清楚如何将表单字段绑定到 Play 控制器中的双精度型 例如 假设这是我的模型 case class SavingsGoal timeframeInMonths Option Int amount
  • Scala 解析器组合器的运算符优先级

    我正在研究需要考虑运算符优先级的解析逻辑 我的需求并不太复杂 首先 我需要乘法和除法比加法和减法具有更高的优先级 例如 1 2 3 应视为 1 2 3 这是一个简单的例子 但你明白了 我需要将更多自定义标记添加到优先级逻辑中 我可以根据此处
  • Spring @RequestMapping 带有可选参数

    我的控制器在请求映射中存在可选参数的问题 请查看下面的控制器 GetMapping produces MediaType APPLICATION JSON VALUE public ResponseEntity
  • scala play框架如何对异步控制器进行单元测试

    使用 Scala play 2 5 版并尝试遵循以下文档中的单元测试控制器指南 https www playframework com documentation 2 5 x ScalaTestingWithScalaTest https
  • 为什么在 Scala 中函数类型需要以单独的参数组传递到函数中

    我是 scala 新手 我用两种方式编写了相同的代码 但我对两种方式有点困惑 在第二种方式中 f 的参数类型是自动派生的 但在 type1 中 scala 编译器无法执行相同的操作 我只是想了解这背后的想法是什么 Type1 给出编译错误
  • 如何在 apache Spark 作业中执行阻塞 IO?

    如果当我遍历 RDD 时 我需要通过调用外部 阻塞 服务来计算数据集中的值怎么办 您认为如何才能实现这一目标 值 Future RDD Double Future sequence tasks 我尝试创建一个 Futures 列表 但由于
  • Scalatest PlusPlay Selenium 无法调整窗口大小

    对此已经研究了一段时间 我似乎找不到使用 scalatest plus 调整窗口大小的方法 我发现在线搜索或文档的唯一方法http doc scalatest org 2 1 5 index html org scalatest selen
  • 使用 Apache Spark 读取 JSON - `corrupt_record`

    我有一个json file nodes看起来像这样 toid osgb4000000031043205 point 508180 748 195333 973 index 1 toid osgb4000000031043206 point
  • 关于 scala.math.Integral 的问题

    有什么方法mkNumericOps andmkOrderingOps of scala math Integral http www scala lang org api current scala math Integral html我们
  • 将字符串转换为枚举值的 Scala 安全方法

    假设我有枚举 object WeekDay extends Enumeration type WeekDay Value val Mon Tue Wed Thu Fri Sat Sun Value 我希望能够将 String 转换为 Wee
  • Spark Scala 相当于 SKEW 连接提示

    Spark SQL 有一个可用的倾斜提示 请参阅here https docs databricks com spark latest spark sql skew join html relation columns and skew v
  • 为什么 Scala 选项的 foreach 比 get 更好?

    为什么使用foreach map flatMap等被认为比使用更好get对于 Scala 选项 如果我使用isEmpty我可以打电话get安全 好吧 这又回到了 告诉 不要问 考虑这两行 if opt isDefined println o
  • 对象内的类中的 Scala 抽象类型

    如果我这样做 object Parent class Inner extends Testable type Self lt Inner def inner new Inner object Child class Inner extend
  • Scala:如何获取数据框中的行范围

    我有一个DataFrame通过运行创建sqlContext readParquet 文件的一个 The DataFrame由 300 M 行组成 我需要使用这些行作为另一个函数的输入 但我想以较小的批次进行操作 以防止 OOM 错误 目前

随机推荐