Spark 闭包参数绑定

2023-11-30

我正在 Scala 中使用 Apache Spark。

我在尝试使用第二个 RDD 中的数据操作一个 RDD 时遇到问题。我试图将第二个 RDD 作为参数传递给针对第一个 RDD 进行“映射”的函数,但似乎在该函数上创建的闭包绑定了该值的未初始​​化版本。

以下是一段更简单的代码,显示了我所看到的问题类型。 (我第一次遇到麻烦的真实例子更大且更难理解)。

我不太明白 Spark 闭包的参数绑定规则。

我真正寻找的是一种基本方法或模式,用于如何使用另一个 RDD 的内容(之前在其他地方构建)来操作一个 RDD。

在下面的代码中,调用 Test1.process(sc) 将失败,并在 findSquare 中进行空指针访问(因为闭包中绑定的第二个参数未初始化)

object Test1 {

  def process(sc: SparkContext) {
    val squaresMap = (1 to 10).map(n => (n, n * n))
    val squaresRDD = sc.parallelize(squaresMap)

    val primes = sc.parallelize(List(2, 3, 5, 7))

    for (p <- primes) {
      println("%d: %d".format(p, findSquare(p, squaresRDD)))
    }
  }

  def findSquare(n: Int, squaresRDD: RDD[(Int, Int)]): Int = {
    squaresRDD.filter(kv => kv._1 == n).first._1
  }
}

您遇到的问题与闭包或 RDD 无关,与普遍的看法相反,是可序列化的.

它只是违反了 Spark 的基本规则,该规则规定您不能从另一个操作或转换触发一个操作或转换*,并且这个问题的不同变体已被多次询问。

要理解为什么会出现这种情况,您必须考虑架构:

  • SparkContext由驱动程序管理
  • 转换中发生的所有事情都在工作人员身上执行。每个工作人员只能访问自己的部分数据,并且不与其他工作人员进行通信**。

如果你想使用多个 RDD 的内容,你必须使用组合 RDD 的转换之一,例如join, cartesian, zip or union.

在这里,您很可能(我不确定为什么您传递元组并仅使用该元组的第一个元素)想要使用广播变量:

val squaresMapBD = sc.broadcast(squaresMap)

def findSquare(n: Int): Seq[(Int, Int)] = {
  squaresMapBD.value
    .filter{case (k, v) => k == n}
    .map{case (k, v) => (n, k)}
    .take(1)
}

primes.flatMap(findSquare)

或笛卡尔:

primes
  .cartesian(squaresRDD)
  .filter{case (n, (k, _)) => n == k}.map{case (n, (k, _)) => (n, k)}

转换primes到虚拟对(Int, null) and join会更有效率:

primes.map((_, null)).join(squaresRDD).map(...)

但根据您的评论,我假设您对存在自然连接条件的场景感兴趣。

根据上下文,您还可以考虑使用数据库或文件来存储常用数据。

顺便说一句,RDD 是不可迭代的,所以你不能简单地使用for环形。为了能够做这样的事情,你必须collect或转换toLocalIterator第一的。您还可以使用foreach method.


* 准确地说你无法访问SparkContext.

** Torrent 广播和树聚合涉及执行者之间的通信,因此在技术上是可行的。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 闭包参数绑定 的相关文章

随机推荐

  • 合并两个列表python

    我有两个清单 a 1 2 3 4 5 6 b 7 8 我想将它合并到 c 1 2 3 7 4 5 6 8 I used zip a b 但结果似乎不正确 有人可以帮忙吗 zip 只会将元组和整数配对 您还需要连接元组和新项目 c aa bb
  • 使用 post/sendmessage 进行鼠标点击不起作用[重复]

    这个问题在这里已经有答案了 可能的重复 如何在C 中模拟鼠标点击 我努力了 Window FindWindow null untitled Paint PostMessage WindowToFind WM MOUSEMOVE 0 loca
  • 组合框的默认值

    您好 我尝试为组合框设置默认值 XAML
  • 检查范围内的 int

    java中是否有一种优雅的方法来检查int是否等于某个值 或者是否比某个值大 小1 例如 如果我检查x在身边5 我想返回 true4 5 and 6 因为 4 和 6 与 5 只相差一 有内置函数可以做到这一点吗 或者我这样写会更好吗 in
  • 如何使用 Jquery 为文件上传中选择的多个图像提供预览?

    大家好 我有一个 fileuplaod 用户可以在其中选择多个图像 我想在上传之前显示这些所选图像的预览 目前我将其管理为单个图像预览 我如何为所选的多个图像提供预览 function readURL input var img input
  • 使用 JavaFX 2.2 助记符(和加速器)

    我正在尝试让 JavaFX 助记符发挥作用 我在场景中有一些按钮 我想要实现的是通过按 Ctrl S 来触发此按钮事件 这是一个代码骨架 FXML public Button btnFirst btnFirst getScene addMn
  • MVC6 TagHelpers 一次性

    在较旧的 MVC HTML 帮助程序中 可以使用IDisposable包装内容 例如BeginForm助手会自动换行 stuff 有结束语form tag stuff MVC6 TagHelpers 支持这种内容包装吗 例如我想要这个
  • 如何将 Bundle 从 Fragment 传递到 Fragment

    我在我的应用程序中使用片段 这是我的第一个片段 它只是简单地膨胀了 xml 文件 public class FragmentA extends SherlockFragment Context myContext appContext Ov
  • 如何在谷歌应用程序引擎模板上获取cookie值

    我正在开发一个应用程序来了解 python 和 Google App Engine 我想从 cookie 中获取值并在模板上打印以隐藏或显示某些内容 是否可以 哪种会话系统最适合与谷歌应用程序引擎一起使用 在 gae 和模板上使用会话的最佳
  • Angular4中的浏览器关闭事件

    我如何检测角度 4 0 2 中的浏览器关闭事件 我努力了 HostListener window unload event unloadHandler event HostListener window beforeunload event
  • std::thread 构造函数如何检测右值引用?

    显然可以将右值引用传递给std thread构造函数 我的问题是这个构造函数的定义参考参数 它说这个构造函数 template lt class Function class Args gt explicit thread Function
  • 如何在python中将对象数组转换为普通数组

    我有一个看起来像这样的对象数组 array array 2 4567 dtype object array 3 4567 dtype object array 4 4567 dtype object array 5 4567 dtype o
  • Java 应用程序挂在 in.hasNext(); 上

    我正在开发通过套接字进行通信的战舰摇摆应用程序 private ServerSocket server private Socket connection private PrintWriter out private Scanner in
  • 如何在资源管理器的重命名事件中挂钩 C++

    我不能比我的标题更清楚了 P 我想每当用户在 Windows 资源管理器中重命名文件时 并且仅在资源管理器中 运行我的程序 这是一个简单的模型 一个简单的教程链接将会非常有帮助 我什么也没找到 先感谢您 附 我是 C 新手 看来 Windo
  • Room 无法验证数据完整性

    我在使用房间数据库运行程序时收到此错误 Room cannot verify the data integrity Looks like you ve changed schema but forgot to update the vers
  • 使用 iOS 获取 Facebook 新闻源?

    我从适用于 iOS 的 Facebook SDK 开始 在我的应用程序中 我尝试获取用户新闻源并将其加载到 uitableview 中 事实证明这很棘手 我也找不到任何有关它的文档 使用 Facebook SDK 您可以使用以下方式调用 F
  • 使用 App 目录和 next-intl 翻译 Next.js 13 中的 URL

    我目前正在开发多语言 Next js 13 应用程序 并使用 next intl 包进行国际化 我一直在尝试为我的路线设置翻译后的网址 但遇到了一些问题 这是我想要实现的目标的一个例子 如果源语言是丹麦语 则路径可以是 mypage om
  • 使用自定义域部署到 Heroku [关闭]

    Closed 这个问题是无关 目前不接受答案 我已经从 My app heroku com 运行该应用程序并且它可以工作 并且我已经在我的域之间设置了 DNS 但是每当我从我指定的域打开我的应用程序时 heroku 会说 请参阅文档 如果您
  • 为基于 create-react-app 的项目运行 npm build 后在运行时读取环境变量

    我是 React 新手 我将部署一个 React 项目 React项目由create react app创建 然后生产代码由 npm build 构建 并由Express托管 在项目中 有一些对 API 服务器的 fetch 调用 其中的
  • Spark 闭包参数绑定

    我正在 Scala 中使用 Apache Spark 我在尝试使用第二个 RDD 中的数据操作一个 RDD 时遇到问题 我试图将第二个 RDD 作为参数传递给针对第一个 RDD 进行 映射 的函数 但似乎在该函数上创建的闭包绑定了该值的未初