您遇到的问题与闭包或 RDD 无关,与普遍的看法相反,是可序列化的.
它只是违反了 Spark 的基本规则,该规则规定您不能从另一个操作或转换触发一个操作或转换*,并且这个问题的不同变体已被多次询问。
要理解为什么会出现这种情况,您必须考虑架构:
-
SparkContext
由驱动程序管理
- 转换中发生的所有事情都在工作人员身上执行。每个工作人员只能访问自己的部分数据,并且不与其他工作人员进行通信**。
如果你想使用多个 RDD 的内容,你必须使用组合 RDD 的转换之一,例如join
, cartesian
, zip
or union
.
在这里,您很可能(我不确定为什么您传递元组并仅使用该元组的第一个元素)想要使用广播变量:
val squaresMapBD = sc.broadcast(squaresMap)
def findSquare(n: Int): Seq[(Int, Int)] = {
squaresMapBD.value
.filter{case (k, v) => k == n}
.map{case (k, v) => (n, k)}
.take(1)
}
primes.flatMap(findSquare)
或笛卡尔:
primes
.cartesian(squaresRDD)
.filter{case (n, (k, _)) => n == k}.map{case (n, (k, _)) => (n, k)}
转换primes
到虚拟对(Int, null)
and join
会更有效率:
primes.map((_, null)).join(squaresRDD).map(...)
但根据您的评论,我假设您对存在自然连接条件的场景感兴趣。
根据上下文,您还可以考虑使用数据库或文件来存储常用数据。
顺便说一句,RDD 是不可迭代的,所以你不能简单地使用for
环形。为了能够做这样的事情,你必须collect
或转换toLocalIterator
第一的。您还可以使用foreach
method.
* 准确地说你无法访问SparkContext
.
** Torrent 广播和树聚合涉及执行者之间的通信,因此在技术上是可行的。