我正在使用 Scala 开发 Spark 应用程序。我的应用程序仅包含一项需要改组的操作(即cogroup
)。它在合理的时间完美运行。我面临的问题是当我想将结果写回文件系统时;由于某种原因,它比运行实际程序花费的时间更长。起初,我尝试在不重新分区或合并的情况下写入结果,我意识到生成的文件数量很大,所以我认为这就是问题所在。我在编写之前尝试重新分区(和合并),但应用程序花了很长时间来执行这些任务。我知道重新分区(和合并)的成本很高,但我正在做的事情正确吗?如果不是,请您给我一些关于正确方法的提示。
Notes:
- 我的文件系统是 Amazon S3。
- 我的输入数据大小约为 130GB。
- 我的集群包含一个驱动节点和五个从节点,每个节点都有 16 个核心和 64 GB RAM。
- 我为我的工作分配 15 个执行程序,每个执行程序有 5 个核心和 19GB RAM。
P.S.我尝试使用 Dataframes,同样的问题。
这是我的代码示例,以防万一:
val sc = spark.sparkContext
// loading the samples
val samplesRDD = sc
.textFile(s3InputPath)
.filter(_.split(",").length > 7)
.map(parseLine)
.filter(_._1.nonEmpty) // skips any un-parsable lines
// pick random samples
val samples1Ids = samplesRDD
.map(_._2._1) // map to id
.distinct
.takeSample(withReplacement = false, 100, 0)
// broadcast it to the cluster's nodes
val samples1IdsBC = sc broadcast samples1Ids
val samples1RDD = samplesRDD
.filter(samples1IdsBC.value contains _._2._1)
val samples2RDD = samplesRDD
.filter(sample => !samples1IdsBC.value.contains(sample._2._1))
// compute
samples1RDD
.cogroup(samples2RDD)
.flatMapValues { case (left, right) =>
left.map(sample1 => (sample1._1, right.filter(sample2 => isInRange(sample1._2, sample2._2)).map(_._1)))
}
.map {
case (timestamp, (sample1Id, sample2Ids)) =>
s"$timestamp,$sample1Id,${sample2Ids.mkString(";")}"
}
.repartition(10)
.saveAsTextFile(s3OutputPath)
UPDATE
这是使用 Dataframes 的相同代码:
// loading the samples
val samplesDF = spark
.read
.csv(inputPath)
.drop("_c1", "_c5", "_c6", "_c7", "_c8")
.toDF("id", "timestamp", "x", "y")
.withColumn("x", ($"x" / 100.0f).cast(sql.types.FloatType))
.withColumn("y", ($"y" / 100.0f).cast(sql.types.FloatType))
// pick random ids as samples 1
val samples1Ids = samplesDF
.select($"id") // map to the id
.distinct
.rdd
.takeSample(withReplacement = false, 1000)
.map(r => r.getAs[String]("id"))
// broadcast it to the executor
val samples1IdsBC = sc broadcast samples1Ids
// get samples 1 and 2
val samples1DF = samplesDF
.where($"id" isin (samples1IdsBC.value: _*))
val samples2DF = samplesDF
.where(!($"id" isin (samples1IdsBC.value: _*)))
samples2DF
.withColumn("combined", struct("id", "lng", "lat"))
.groupBy("timestamp")
.agg(collect_list("combined").as("combined_list"))
.join(samples1DF, Seq("timestamp"), "rightouter")
.map {
case Row(timestamp: String, samples: mutable.WrappedArray[GenericRowWithSchema], sample1Id: String, sample1X: Float, sample1Y: Float) =>
val sample2Info = samples.filter {
case Row(_, sample2X: Float, sample2Y: Float) =>
Misc.isInRange((sample2X, sample2Y), (sample1X, sample1Y), 20)
case _ => false
}.map {
case Row(sample2Id: String, sample2X: Float, sample2Y: Float) =>
s"$sample2Id:$sample2X:$sample2Y"
case _ => ""
}.mkString(";")
(timestamp, sample1Id, sample1X, sample1Y, sample2Info)
case Row(timestamp: String, _, sample1Id: String, sample1X: Float, sample1Y: Float) => // no overlapping samples
(timestamp, sample1Id, sample1X, sample1Y, "")
case _ =>
("error", "", 0.0f, 0.0f, "")
}
.where($"_1" notEqual "error")
// .show(1000, truncate = false)
.write
.csv(outputPath)