Spark的RDD原理以及2.0特性的介绍

2023-11-14

      注:本文由王联辉在高可用架构群分享,本文转载自高可用架构「 ArchNotes 」

      王联辉,曾在腾讯,Intel 等公司从事大数据相关的工作。2013 年 – 2016 年先后负责腾讯 Yarn 集群和 Spark 平台的运营与研发。曾负责 Intel Hadoop 发行版的 Hive 及 HBase 版本研发。参与过百度用户行为数据仓库的建设和开发,以及淘宝数据魔方和淘宝指数的数据开发工作。给 Spark 社区贡献了 25+ 个 patch,接受的重要特性有 python on yarn-cluster,yarn-cluster 支持动态资源扩缩容以及 RDD Core 中 memory 管理等。

      Spark 是什么 

      Spark 是 Apache 顶级项目里面最火的大数据处理的计算引擎,它目前是负责大数据计算的工作。包括离线计算或交互式查询、数据挖掘算法、流式计算以及图计算等。全世界有许多公司和组织使用或给社区贡献代码,社区的活跃度见 www.github.com/apache/spark。

      2013 年开始 Spark开发团队成立 Databricks,来对 Spark 进行运作和管理,并提供 Cloud 服务。Spark 社区基本保持一个季度一个版本,不出意外的话 Spark 2.0 将在五月底发布。

      与 Mapreduce 相比,Spark 具备 DAG 执行引擎以及基于内存的多轮迭代计算等优势,在SQL 层面上,比 Hive/Pig 相比,引入关系数据库的许多特性,以及内存管理技术。另外在 Spark 上所有的计算模型最终都统一基于 RDD 之上运行执行,包括流式和离线计算。Spark 基于磁盘的性能是 MR 的 10 倍,基于内存的性能是 MR 的 100 倍 ❶ (见文后参考阅读❶ ,下同) 。

      Spark 提供 SQL、机器学习库 MLlib、流计算 Streaming 和图计算 Graphx,同时也支持 Scala、Java、Python 和 R 语言开发的基于 API 的应用程序。


      RDD,英文全称叫 Resilient Distributed Datasets。

      an RDD is a read-only, partitioned collection of records❸. 字面意思是只读的分布式数据集。

      但其实个人觉得可以把 RDD 理解为关系数据库 里的一个个操作,比如 map,filter,Join 等。在 Spark 里面实现了许多这样的 RDD 类,即可以看成是操作类。当我们调用一个 map 接口,底层实现是会生成一个 MapPartitionsRDD 对象,当 RDD 真正执行时,会调用 MapPartitionsRDD 对象里面的 compute 方法来执行这个操作的计算逻辑。但是不同的是,RDD 是 lazy 模式,只有像 count,saveasText 这种 action 动作被调用后再会去触发 runJob 动作。

      RDD 分为二类:transformation 和 action。

  • transformation 是从一个 RDD 转换为一个新的 RDD 或者从数据源生成一个新的 RDD;
  • action 是触发 job 的执行。所有的 transformation 都是 lazy 执行,只有在 action 被提交的时候才触发前面整个 RDD 的执行图。如下

      val file = sc.textFile(args(0))

      val words = file.flatMap(line => line.split(” “))

      val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _, 2) wordCounts.saveAsTextFile(args(1))

      这段代码生成的 RDD 的执行树是如下图所示:


      最终在 saveAsTextFile 方法时才会将整个 RDD 的执行图提交给 DAG 执行引擎,根据相关信息切分成一个一个 Stage,每个 Stage 去执行多个 task,最终完成整个 Job 的执行。

      还有一个区别就是,RDD 计算后的中间结果是可以被持久化,当下一次需要使用时,可以直接使用之前持久化好的结果,而不是重新计算,并且这些结果被存储在各个结点的 executor 上。下一次使用时,调度器可以直接把 task 分发到存储持久化数据的结点上,减少数据的网络传输开稍。这种场景在数据挖掘迭代计算是经常出现。如下代码

val links = spark.textFile(…).map(…).persist() var ranks = // RDD of (URL, rank) pairs
for (i <- 1 to ITERATIONS) {
// Build an RDD of (targetURL, float) pairs // with the contributions sent by each page val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size)) }
// Sum contributions by URL and get new ranks

ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum) }

      以上代码生成的 RDD 执行树如下图所示:


      计算 contribs-0 时需要使用 links 的计算逻辑,当 links 每个分片计算完后,会将这个结果保存到本地内存或磁盘上,下一次 contribs-1 计算要使用 links 的数据时,直接从上一次保存的内存和磁盘上读取就可以了。这个持久化系统叫做blockManager,类似于在内部再构建了一个 KV 系统,K 表示每个分区 ID 号,V 表示这个分区计算后的结果。

      另外在 streaming 计算时,每个 batch 会去消息队列上拉取这个时间段的数据,每个 Recevier 接收过来数据形成 block 块并存放到 blockManager 上,为了可靠性,这个 block 块可以远程备份,后续的 batch 计算就直接在之前已读取的 block 块上进行计算,这样不断循环迭代来完成流处理。

      一个 RDD 一般会有以下四个函数组成。

      1. 操作算子的物理执行逻辑

      定义为:

      def compute(split: Partition, context: TaskContext): Iterator[T]

      如在 MapPartitionsRDD 里的实现是如下:

      override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))

      函数定义

      f: (TaskContext, Int, Iterator[T]) => Iterator[U]

      2. 获取分片信息

      protected def getPartitions: Array[Partition] 

      即这个操作的数据划分为多少个分 区。跟 mapreduce 里的 map 上的 split 类似的。

      3. 获取父 RDD 的依赖关系

      protected def getDependencies: Seq[Dependency[_]] 

      依赖分二种:如果 RDD 的每个分区最多只能被一个 Child RDD 的一个分区使用,则称之为 narrow dependency;若依赖于多个 Child RDD 分区,则称之为 wide dependency。不同的操作根据其特性,可能会产生不同的依赖 ❹。如下图所示


      map 操作前后二个 RDD 操作之间的分区是一对一的关系,故产生 narrow dependency,而 join 操作的分区分别对应于它的二个子操作相对应的分区,故产生 wide dependency。当最后要生成具体的 task 运行时,就需要利用这个依赖关系也生成 Stage 的 DAG 图。

      4. 获取该操作对应数据的存放位置信息,主要是针对 HDFS 这类有数据源的 RDD。

      protected def getPreferredLocations(split: Partition): Seq[String]

      Spark 的执行模式

      Spark 的执行模式有 local、Yarn、Standalone、Mesos 四类。后面三个分别有 cluster 和 client 二种。client 和 cluster 的区别就是指 Driver 是在程序提交客户端还是在集群的 AM 上。 比如常见的 Yarn-cluster 模式如下图所示:


      一般来说,运行简单测试或 UT 用的是 local 模式运行,其实就是用多线程模似分布式执行。 如果业务部门较少且不需要对部门或组之间的资源做划分和优先级调度的话,可以使用 Standalone 模式来部署。

      当如果有多个部门或组,且希望每个组织可以限制固定运行的最大资源,另外组或者任务需要有优先级执行的话,可以选择 Yarn 或 Mesos。

      Spark 2.0 的特性

      Unifying DataFrames and Datasets in Scala/Java

      DataFrame❺ 和 Dataset❽ 的功能是什么?

      它们都是提供给用户使用,包括各类操作接口的 API。1.3 版本引入 DataFrame,1.6 版本引入 Dataset,2.0 提供的功能是将二者统一,即保留 Dataset,而把 DataFrame 定义为 Dataset[Row],即是 Dataset 里的元素对象为 Row 的一种(SPARK-13485)。

      在参考资料 中有介绍 DataFrame,它就是提供了一系列操作 API,与 RDD API 相比较,DataFrame 里操作的数据都是带有 Schema 信息,所以 DataFrame 里的所有操作是可以享受 Spark SQL Catalyst optimizer 带来的性能提升,比如 code generation 以及 Tungsten❼ 等。执行过程如下图所示


      但是 DataFrame 出来后发现有些情况下 RDD 可以表达的逻辑用 DataFrame 无法表达。比如 要对 group by 或 join 后的结果用自定义的函数,可能用 SQL 是无法表达的。如下代码:

      case class ClassData(a: String, b: Int)

      case class ClassNullableData(a: String, b: Integer)

      val ds = Seq(ClassData(“a”, 1), ClassData(“a”, 2)).toDS()

      val agged = ds.groupByKey(d => ClassNullableData(d.a, null))

      .mapGroups {

      case (key, values) => key.a + values.map(_.b).sum

      }

      中间处理过程的数据是自定义的类型,并且 groupby 后的聚合逻辑也是自定义的,故用 SQL 比较难以表达,所以提出了 Dataset API。Dataset API 扩展 DataFrame API 支持静态类型和运行已经存在的 Scala 或 Java 语言的用户自定义函数。同时 Dataset 也能享受 Spark SQL 里所有性能 带来的提升。

      那么后面发现 Dataset 是包含了 DataFrame 的功能,这样二者就出现了很大的冗余,故在 2.0 时将二者统一,保留 Dataset API,把 DataFrame 表示为 Dataset[Row],即 Dataset 的子集。

      因此我们在使用 API 时,优先选择 DataFrame & Dataset,因为它的性能很好,而且以后的优化它都可以享受到,但是为了兼容早期版本的程序,RDD API 也会一直保留着。后续 Spark 上层的库将全部会用 DataFrame,比如 MLlib、Streaming、Graphx 等。

      Whole-stage code generation

      在参考资料 9 中有几个例子的代码比较,我们看其中一个例子:

      select count(*) from store_sales where ss_item_sk = 1000

      那么在翻译成计算引擎的执行计划如下图:


      而通常物理计划的代码是这样实现的:

      class Filter {

      def next(): Boolean = {

      var found = false

      while (!found && child.next()) {

      found = predicate(child.fetch())

      }

      return found

      }

      def fetch(): InternalRow = {

      child.fetch()

      }…

      }

      但是真正如果我们用 hard code 写的话,代码是这样的:

      var count = 0

      for (ss_item_sk in store_sales) {

         if (ss_item_sk == 1000) {

         count += 1

        }

      }

      发现二者相关如下图所示:


      那么如何使得计算引擎的物理执行速度能达到 hard code 的性能呢?这就提出了 whole-stage code generation,即对物理执行的多次调用转换为代码 for 循环,类似 hard code 方式,减少中间执行的函数调用次数,当数据记录多时,这个调用次数是很大。 最后这个优化带来的性能提升如下图所示:


      从 benchmark 的结果可以看出,使用了该特性后各操作的性能都有很大的提升。

      Structured Streaming

      Spark Streaming 是把流式计算看成一个一个的离线计算来完成流式计算,提供了一套 Dstream 的流 API,相比于其他的流式计算,Spark Streaming 的优点是容错性和吞吐量上要有优势❿,关于 Spark Streaming 的详细设计思想和分析,可以到 https://github.com/lw-lin/CoolplaySpark 进行详细学习和了解。

      在 2.0 以前的版本,用户在使用时,如果有流计算,又有离线计算,就需要用二套 API 去编写程序,一套是 RDD API,一套是 Dstream API。而且 Dstream API 在易用性上远不如 SQL 或 DataFrame。

      为了真正将流式计算和离线计算在编程 API 上统一,同时也让 Streaming 作业能够享受 DataFrame/Dataset 上所带来的优势:性能提升和 API 易用,于是提出了 Structured Streaming。最后我们只需要基于 DataFrame/Dataset 可以开发离线计算和流式计算的程序,很容易使得 Spark 在 API 跟业界所说的 DataFlow 来统一离线计算和流式计算效果一样。

      比如在做 Batch Aggregation 时我们可以写成下面的代码


      那么对于流式计算时,我们仅仅是调用了 DataFrame/Dataset 的不同函数代码,如下:


      最后,在 DataFrame/Dataset 这个 API 上可以完成如下图所示的所有应用:


其他主要性能提升

  1. 采用 vectorized Parquet decoder 读取 parquet 上数据。以前是一行一行的读取,然后处理。现在改为一次读取 4096 行记录,不需要每处理一行记录去调用一次 Parquet 获取记录的方法,而是改为一批去调用一次(SPARK-12854)。加上 Parquet 本身是列式存储,这个优化使得 Parquet 读取速度提高 3 倍。
  2. 采有 radix sort 来提高 sort 的性能(SPARK-14724)。在某些情况下排序性能可以提高 10-20 倍。
  3. 使用 VectorizedHashmap 来代替 Java 的 HashMap 加速 group by 的执行(SPARK-14319)。
  4. 将 Hive 中的 Window 函数用 Native Spark Window 实现,因为 Native Spark Window 在内存管理上有优势(SPARK-8641)。
  5. 避免复杂语句中的逻辑相同部分在执行时重复计算(SPARK-13523)。
  6. 压缩算法默认使用 LZ4(SPARK-12388)。

语句的增强

  1. 建立新的语法解析(SPARK-12362)满足所有的 SQL 语法,这样即合并 Hive 和标准 SQL 的语句解析,同时不依赖 Hive 的语法解析 jar(SPARK-14776)。之前版本二者的语法解析是独立的,这样导致在标准 SQL 中无法使用窗口函数或者 Hive 的语法,而在使用 Hive 语法时无法使用标准 SQL 的语法,比如 In/Exists 子句等。在 SQL 编写时,没法在一个 Context 把二者的范围全部支持,然而有了这个特性后,使得 SQL 语句表达更强大,后续要增加任何语法,只需要维护这一个语法解析即可。当然缺点是后续 Hive 版本的新语法,需要手动添加进来。
  2. 支持 intersect/except(SPARK-12542)。如 select * from t1 except select * from t2 或者select * from t1 intersect select * from t2
  3. 支持 uncorrelated scalar subquery(SPARK-13417)。如 select (select min(value) from testData where key = (select max(key) from testData) – 1)
  4. 支持 DDL/DML(SPARK-14118)。之前 DDL/DML 语句是调用 Hive 的 DDL/DML 语句命令来完成,而现在是直接在 Spark SQL 上就可以完成。
  5. 支持 multi-insert(SPARK-13924)。
  6. 支持 exist(SPARK-12545)和 NOT EXISTS(SPARK-10600),如 select * from (select 1 as a union all select 2 as a) t where exists (select * from (select 1 as b) t2 where b = a and b < 2)
  7. 支持 subqueries 带有 In/Not In 子句(SPARK-4226),如 select * from (select 1 as a union all select 2 as a) t where a in (select b as a from t2 where b < 2)
  8. 支持 select/where/having 中使用 subquery(SPARK-12543),如 select * from t where a = (select max(b) from t2)select max(a) as ma from t having ma = (select max(b) from t2)
  9. 支持 LeftSemi/LeftAnti(SPARK-14853)。
  10. 支持在条件表达式 In/Not In 里使用子句(SPARK-14781),如 select * from l where l.a in (select c from r) orl.a in (select c from r where l.b < r.d)
  11. 支持所有的 TPCDS 语句(SPARK-12540)。

与以前版本兼容(SPARK-11806)

  1. 不支持运行在 Hadoop 版本 < 2.2 上(SPARK-11807)。
  2. 去掉 HTTPBroadcast(SPARK-12588)。
  3. 去掉 HashShuffleManager(SPARK-14667)。
  4. 去掉 Akka RPC。
  5. 简化与完善 accumulators and task metrics(SPARK-14626)。
  6. 将 Hive 语法解析以及语法移至 Core 里(SPARK-14825),在没有 Hive 元数据库和 Hive 依赖包时,我们可以像之前版本使用标准 SQL 一样去使用 HiveQL 语句。

1.6 版本严重问题的解决

在 http://geek.csdn.net/news/detail/70162 提到的 1.6 问题中 Spillable 集合内存溢出问题在 SPARK-4452 里已解决,BlockManager 死锁问题在 SPARK-12757 里已解决。

最后 2.0 版本还有一些其他的特性,如:

  1. 用 SparkSession 替换掉原来的 SQLContext and HiveContext。
  2. mllib 里的计算用 DataFrame-based API 代替以前的 RDD 计算逻辑。
  3. 提供更多的 R 语言算法。
  4. 默认使用 Scala 2.11 编译与运行。

参考资料

  1. http://spark.apache.org/
  2. https://databricks.com/blog/2016/05/11/spark-2-0-technical-preview-easier-faster-and-smarter.html
  3. http://www.eecs.berkeley.edu/Pubs/TechRpts/2014/EECS-2014-12.pdf
  4. http://www.infoq.com/cn/articles/spark-core-rdd
  5. https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html
  6. http://www.slideshare.net/databricks/spark-summit-eu-2015-spark-dataframes-simple-and-fast-analysis-of-structured-data
  7. https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
  8. https://databricks.com/blog/2016/01/04/introducing-spark-datasets.html
  9. https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6122906529858466/293651311471490/5382278320999420/latest.html
  10. http://www.csdn.net/article/2014-01-28/2818282-Spark-Streaming-big-data
  11. http://www.slideshare.net/rxin/the-future-of-realtime-in-spark

Q&A

1.  Hive 迁移到 Spark SQL 怎么迁移?有哪些坑?

王联辉:一般 hive 执行有二种模式,一种是客户端,一种是 HiveServer 模式。

  • 对于客户端命令行的话,直接把以前的 Hive 命令,改成 spark-sql 命令行+SQL 语句即可。
  • 对于 HiveServer 的方式,Spark SQL 里提供 Hive thriftServer,可以使用这种方式。

2.  Spark 未来在内存管理上的优化应该是什么方向?有 RDD sharing on multiple application 计划吗?用 Spark 感觉内存调优需要花很大功夫。

王联辉:从 1.5 的 Tungsten 项目到 1.6,Spark 最大的一个亮点就是内存优化,所以 1.6 开始的版本不管数据量多大都完全没问题,而且性能也比 MR 要快几倍。

未来应该也不会在 Spark 内部去做 RDD sharing on multiple application,因为这其实是 Job server 去做的事情。1.6 的动态内存管理出来后,内存调优稍微简单化了,因为 executor 和 storage 的内存可以动态调整,当然如果用 RDD 的 API,可能内存占用会大一些,因此后续推荐 DataFrame > Dataset > RDD api。

3.  Spark Streaming 在 checkpoint 后自动恢复,怎样做到 exactly once 而且恢复速度快?

王联辉:在出现故障时,Spark Streaming 本身是没办法做到完全 exactly once,这是一个事务性的问题。说白了,就是跟 Storm 做比较,Streaming 的优势是吞吐量高,也就是一段间隔内执行时间快。

为什么?首先 Storm 如果要保证消息一定要被处理的话,每一条消息都需要有一个 ack 的附加消息,这个会导致网络压力大,而且消息之间的时序性是乱的。

而在 Spark Streaming,每次是小批量数据处理,中间过程不需要 ack,一旦失败,则重新执行这个小批量数据。

具体可以看一下这篇文章:http://www.csdn.net/article/2014-01-28/2818282-Spark-Streaming-big-data

4. Spark 的应用场景有哪些?可以举些实际应用场景例子吗?

王联辉:Spark 的理念是在一个 RDD 计算框架之上可以满足各种应用,比如 HiveQL,MapReduce。除此之外,还有机器学习挖掘,流计算和图计算。

其实应该是你有什么样的计算场景,然后首先看 Spark 可不可以满足,再来考虑别的计算模式。

5.  请问搭建 Spark+HDFS 这样的集群一般硬件怎么选择?CPU,内存,磁盘……

王联辉:Spark 其实跟 HDFS 没有啥依赖关系,所以逻辑是可以独立搭建,但是物理上是在一台机器,CPU 跟内存是按比例搭配的,一般比如一个核是 4 – 6G,一台机器有 12 虚拟核的话,内存就可以配 64G,当然配 128G 内存也行,这样一个核的内存用的更多,作业跑的就更快。

6:  RDD 在迭代的过程中,已经计算完成的 RDD 是何时释放?

王联辉:这个分二部分,一个是 RDD 的元信息,一个是 RDD 的数据。

  • 元信息的话,当没有人引用它了,在每个 job 执行完后就会自动释放。
  • RDD 的数据,非 shuffle 的数据如果被持久化了,是需要用户调用 unpersist 手动释放。
  • shuffle 数据在 executor 丢失时会自动册除,这个一般 Yarn/mesos 会去完成这个工作。

7.  Spark 2.0 中的新特性 Structured Streaming ,它的设计初衷是什么?使用场景是什么?

王联辉:设计初衷就是把 Spark SQL 跟 Streaming 统一,让 Streaming 任务可以用 Dataset 来表达,同时底下的执行可以用 Spark SQL 引擎。这样在易用性和性能上都有提升。

之前的 Dstreaming API 能做的事情,都可以用 Dataset 去写 Streaming 任务,最后离线计算和流式计算都用 Dataset 去编写,达到 dataflow 的效果。

8.   Spark Streaming 作业在运行的过程中出现错误(比如,集群挂掉半小时后恢复),在重试 N 次后不能自动恢复运行?

王联辉:首先可以需要解决的是集群为啥会挂掉半小时,是 HDFS 还是 yarn 的问题。假设不是 Spark Streaming 的问题,那么运行中出现错误,一般在二个地方会出错,一个是 Receiver 采集数据时,一个是每个小 batch 计算时。

  • 第一种情况,可能是你集群的 storage 不够了,把前面没有计算完的 block 删除了,这种一般建议将 block 存储加上磁盘,这样即时内存不够,可以刷磁盘。
  • 第二种情况的话,就跟离线计算一样,是任务执行出错了,是不是数据倾斜了导致内存占用太高了,那么建议 partition 数设大一些。

9. Hive on Spark 生产环境稳定性如何?Spark Streaming 滑动窗口大于 1 小时以上的发现性能很低。大神那边是怎么处理的或者有啥好的解决方案优化方案?

王联辉:我之前没有在生产使用 Hive on Spark,所以不做回答。

第二个问题,滑动窗口大于 1 小时以上,需要看具体的业务,因为你这个窗口时间比较大,可能数据也比较大,可能会导致集群的内存无法存放,建议的话,还不如先 5 分钟计算一次,然后到一个小时再前面 5 分钟的数据做个合并。这个在计算一个小时内 distinct 值时无法满足。

10. Spark job 调度管理有啥好的方案吗?azkaban 支持度如何?还有好的方式推荐吗?

王联辉:在上层作业调度来看,其实 Spark 作业跟 MR 作业没有太大的区别,调度一个 Spark 任务跟调度一个 MR 是一样的。因为据我所知,很多公司都会自已开发一套作业管理系统。所以我对这方面的开源系统也不是很了解。

11. Spark Streaming 和 Storm 比较,各自应用场景和优缺点是什么?

王联辉:应用场景我就不回答,我觉得应该反过来问,比如我有这样的应用场景,Spark Streaming 或 Storm 哪个更优?

优缺点的话,Spark Streaming 社区活跃度要高很多,遇到问题查找答案要容易,相反 Storm 这个优势会弱一点,特别是 clojure 语言。

当然 Storm 在实时性方面比 Spark Streaming 要好,比如你要求在 1 秒或者毫秒内计算出结果,Storm 可能会比较容易做到,而 Spark Streaming 各方面的开稍使得当前可能达不到这个要求。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark的RDD原理以及2.0特性的介绍 的相关文章

  • spark创建maven工程创建scala目录并编译

    背景 我创建spark的maven工程的时候 在java目录同级还创建了一个scala目录 这就得考虑编译相关的事了 解决 1 创建source folder 如下图所示 直接创建就好了 2 编译带来的问题 编译的时候发现一个问题 就是在s
  • 大数据--pyspark远程连接hive

    上一篇文章介绍了python连接hive的过程 通过地址 端口号访问到hive并对hive中的数据进行操作 这一篇文章介绍一下怎么通过windows本地pyspark 本地部署好的spark 远程虚拟机的hive 完成本地pyspark对h
  • spark-shell 加载本地文件报错 java.io.FileNotFoundException

    学习spark shell 时候发现一个问题 从本地文件加载数据生成RDD 报错 文件找不到 原因 spark shell 如果启动了集群模式 真正负责计算的executor会在 该executor所在的 worker节点上读取文件 并不是
  • pyspark 连接远程hive集群配置

    今天本地spark连接远程hive集群 直接把配置导入进去 本地直接应用远程环境 1 安装spark 设置spark环境变量 2 拿到远程集群配置文件 将配置文件放在spark conf 目录下 xml 一共五个文件 3 将mysql co
  • cdh下spark2-yarn运行sparkstreaming获取kafka数据使用spark-streaming-kafka-0-10_2.11报错解决

    报错问题 20 07 15 17 20 51 INFO utils AppInfoParser Kafka version 0 9 0 kafka 2 0 0 20 07 15 17 20 51 INFO utils AppInfoPars
  • 重新定义分析 - EventBridge 实时事件分析平台发布

    对于日志分析大家可能并不陌生 在分布式计算 大数据处理和 Spark 等开源分析框架的支持下 每天可以对潜在的数百万日志进行分析 事件分析则和日志分析是两个完全不同的领域 事件分析对实时性的要求更高 需要磨平事件领域中从半结构化到结构化的消
  • Spark on Kubernetes 与 Spark on Yarn 不完全对比分析

    前言 Apache Spark 是目前应用最广泛的大数据分析计算工具之一 它擅长于批处理和实时流处理 并支持机器学习 人工智能 自然语言处理和数据分析应用 随着 Spark 越来越受欢迎 使用量越来越大 狭义上的 Hadoop MR 技术栈
  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs
  • 大数据相关常用软件下载地址集锦

    文章目录 每日一句正能量 前言 一 软件下载地址如下 二 文档地址如下 结语 每日一句正能量 生命中有一些人与我们擦肩了 却来不及遇见 遇见了 却来不及相识 相识了 却来不及熟悉 熟悉了 却还是要说再见 前言 由于大数据开发中经常需要用到Z
  • 深入理解 SQL 中的 Grouping Sets 语句

    前言 SQL 中 Group By 语句大家都很熟悉 根据指定的规则对数据进行分组 常常和聚合函数一起使用 比如 考虑有表 dealer 表中数据如下 id Int city String car model String quantity
  • 基于Spark的电商用户行为实时分析可视化系统(Flask-SocketIO)

    基于Spark的电商用户行为实时分析可视化系统 Flask SocketIO 项目简介 该项目已上线蓝桥课程 有需要的可凭邀请码 UB5mdLbl 学习哦 有优惠 课程地址 https www lanqiao cn courses 2629
  • spark-submit 报错 Initial job has not accepted any resources

    spark submit 报这样的错误 WARN scheduler TaskSchedulerImpl Initial job has not accepted any resources check your cluster UI to
  • spark中repartition和coalesce的区别

    总的来讲 两者 对是否允许shuffle 不同 coalesce numPartitions shuffle false repartition numPartitions repartition 其实是 coalesce 中参数shuff
  • 使用Flink1.16.0的SQLGateway迁移Hive SQL任务

    使用Flink的SQL Gateway迁移Hive SQL任务 前言 我们有数万个离线任务 主要还是默认的DataPhin调度CDP集群的Hive On Tez这种低成本任务 当然也有PySpark 打Jar包的Spark和打Jar包的Fl
  • Flume之:二、企业开发案例

    Flume之 二 企业开发案例 文章目录 Flume之 二 企业开发案例 三 企业开发案例 1 监控端口数据官方案例 2 实时读取本地文件到HDFS案例 3 实时读取目录文件到HDFS案例 4 flume监控Kafka gt Spark知识
  • 11.Linux下Spark的安装配置以及spark-shell的启动和 Spark集群环境搭建

    本案例软件包 链接 https pan baidu com s 1zABhjj2umontXe2CYBW DQ 提取码 1123 若链接失效在下面评论 我会及时更新 目录 1 安装Spark 1 先用xftp将安装包传到home hadoo
  • Spark 【分区与并行度】

    RDD 并行度和分区 SparkConf setMaster local 我们在创建 SparkContext 对象时通常会指定 SparkConf 参数 它包含了我们运行时的配置信息 如果我们的 setMaster 中的参数是 local
  • Spark学习(文件读取路径)

    在不同的启动模式下 加载文件时的路径写法是不一样的 对于local模式下 默认就是读取本地文件 而在standlone或者yarn client 或者cluster模式下 默认读的都是hdfs文件系统 这几种模式下很难读取本地文件 这是很显
  • sparkstreamming 消费kafka(1)

    pom
  • JAVA 安装与简单使用

    JAVA简易安装 下载安装 环境变量 进入变量界面 设置变量 验证JAVA环境 运行Java程序 个人站 ghzzz cn 还在备案 很快就能访问了 下载安装 第一步当然是从官网下载安装java了 网上有很多的教程 这里简单的写一下 在这里

随机推荐

  • 常用数学函数

    转自 https zh cppreference com w cpp numeric math 函数 定义于头文件
  • 详解Transformer的自注意力机制、位置编码以及整体架构(内容生动易懂,并有完整transformer架构详解)

    文章目录 1 3 Transformer一些前置知识 1 3 1 自注意力机制 Self Attention very important 1 3 2 位置编码 Positional Encoding 1 3 3 Transformer模型
  • @RabbitListener和@RabbitHandler的使用

    1 RabbitListener 注解是指定某方法作为消息消费的方法 例如监听某 Queue 里面的消息 2 RabbitListener标注在方法上 直接监听指定的队列 此时接收的参数需要与发送市类型一致 Component public
  • 常见的Linux系统性能问题及其解决方法

    CPU负载高 如果CPU负载高 可能是由于CPU资源不足或进程 服务的异常活动所致 可以使用top或htop命令查看系统的CPU使用情况 并确定哪个进程或服务使用了大量的CPU资源 如果负载高 可以考虑增加CPU资源或优化进程 服务的代码或
  • 快手如何引流的话术?在快手引流精准粉丝的技巧

    互联网创业圈大部分的项目都离不开流量 更别说精准流量了 流量可以说是大部分互联网创业者心里的痛处 很多人都说 流量难求 其实 要么是不擅长引流 要么是方法出了问题 那么 要如何引流比较好呢 今天 主要讲讲快手这个渠道是如何做的 其实 点石汇
  • C++11-14 第6讲 explicit关键字

    explicit 用来针对构造函数有多个实参 include
  • 利用python进行数据分析——pandas

    import pandas as pd import numpy as np np random seed 12345 import matplotlib pyplot as plt 1 介绍Pandas 工具一 Series obj pd
  • 总结一下多益网络数据研发校招面试

    1 自我介绍 2 说一下你觉得最好的项目 3 数据仓库分层 4 说一下主键和唯一索引 5 说一下hive的group by 怎么在mapreduce执行的 6 说一下hashmap和map区别 7 说一下数据预处理 8 说一下MR原理 9
  • 少儿编程要先学会电脑和英语

    格物斯坦小坦克想说的是少儿编程和计算机代码编程是不一样的 很多人对孩子学习的Scratch语言和成人学习的Java C语言等区分不开 因而习惯性的抗拒说 我的孩子以后可能不会成为一个程序员 学习之前是不是先要懂英语和电脑 实际上少儿电脑编程
  • 如何编写有多个返回值的C语言函数

    1引言 笔者从事C语言教学多年 在教学中学生们常常会问到如何编写具有多个返回值的C语言函数 编写有多个返回值的函数是所有C语言教材里均没有提到的知识点 但在实际教学与应用的过程中我们都有可能会遇到这样的问题 有学生也尝试了不少方法 如把多个
  • B - Equations

    B Equations Consider equations having the following form ax12 b x22 cx32 d x42 0 a b c d are integers from the interval
  • 使用Idea创建一个JavaWeb的SSM(maven)项目~(史上最详细,傻瓜式教学,跟着我的做,不会你找我)

    今天讲的是如何用idea创建一个JavaWeb的Maven SSM项目并且实现简单的登陆功能 项目源码在最后 需要的可以自行下载 本文章过于详细过于面向小白 并且在讲解SSM项目的搭建过程中涉及到了小白们极有可能遇到的Idea的坑 文章比较
  • Keystone 高可靠性部署与性能测试

    Goal Keystone Region 为跨地域的 Openstack 集群提供了统一的认证和用户租户管理 目前公司在国内外部署了数十套 Openstack 集群 其中既有集群在内网 又有集群在公网 既有 Havana 集群 也有 Ice
  • Thumb和ARM指令不能切换问题(error:unsupported interworking call (Thumb -> ARM))

    1 报错现象 xxx ko ection 3 reloc 4 sym xxxxxx unsupported interworking call Thumb gt ARM 2 报错原因和分析 报错信息的翻译 程序不支持代码交织 thumb态切
  • Spring入门简介

    Spring框架是Java平台上最流行的企业应用程序开发框架之一 它是一个轻量级的开源框架 被设计为简化企业级应用程序的开发 Spring的核心目的是为了解决企业应用程序的开发问题 包括代码的耦合 事务管理 异常处理 数据访问 安全性 We
  • fatal error C1083: 无法打开预编译头文件:“Debug\opencv.pch”: No such file or directory

    步骤 方法右键点击你创建的项目 选择 属性标签 点击属性 弹出 项目属性页 在左侧找到以下位置 配置属性 gt C C gt 预编译头 并选择它 在右边的菜单中选择 创建 使用预编译头 中的 不使用预编译头文件 点击 确定 按钮退出即可原因
  • Transformer 架构和 BERT、GPT 和 T5 的兴起:初学者指南

    在广阔且不断发展的人工智能 AI 领域 有些创新不仅会留下深刻的印象 而且会带来巨大的影响 他们重新定义了整个领域的轨迹 在这些突破性的创新中 Transformer 架构成为变革的灯塔 这类似于工业革命期间蒸汽机的发明 推动人工智能进入一
  • 深入理解Unity5中的StandardShader屏幕像素化特效的实现

    概要 续接上文 本文进一步讲解与分析了上文未讲完的Unity5中Standard Shader正向基础渲染通道源码的片段着色实现部分 以及对屏幕像素化后期特效进行了实现 同样需要声明的是 本文中对Stardard Shader源码的一些分析
  • NTP时间服务器安装与配置

    第1章 Server端的安装与配置 1 1查看系统是否已经安装ntp服务组件 rpm qa grep ntp 查看是否已经安装ntp组件 有如下两个组件说明已经安装 ntpdate 4 2 6p5 1 el6 centos x86 64 n
  • Spark的RDD原理以及2.0特性的介绍

    注 本文由王联辉在高可用架构群分享 本文转载自高可用架构 ArchNotes 王联辉 曾在腾讯 Intel 等公司从事大数据相关的工作 2013 年 2016 年先后负责腾讯 Yarn 集群和 Spark 平台的运营与研发 曾负责 Inte