在 Apache Spark 中,如何使 RDD/DataFrame 操作变得惰性?

2024-02-03

假设我想编写一个函数 foo 来转换 DataFrame:

object Foo {
def foo(source: DataFrame): DataFrame = {
...complex iterative algorithm with a stopping condition...
}
}

由于 foo 的实现有很多“操作”(collect、reduce 等),调用 foo 将立即触发昂贵的执行。

这不是一个大问题,但是由于 foo 只将一个 DataFrame 转换为另一个 DataFrame,因此按照惯例,最好允许延迟执行:仅当生成的 DataFrame 或其衍生物被用于时,才应执行 foo 的实现驱动程序(通过另一个“操作”)。

到目前为止,可靠实现这一点的唯一方法是将所有实现写入 SparkPlan,并将其叠加到 DataFrame 的 SparkExecution 中,这非常容易出错,并且涉及大量样板代码。推荐的方法是什么?


我不太清楚你想要实现什么目标,但 Scala 本身至少提供了一些你可能会觉得有用的工具:

  • 惰性值:

    val rdd = sc.range(0, 10000)
    
    lazy val count = rdd.count  // Nothing is executed here
    // count: Long = <lazy>
    
    count  // count is evaluated only when it is actually used 
    // Long = 10000   
    
  • 按名称调用(表示为=>在函数定义中):

    def  foo(first: => Long, second: => Long, takeFirst: Boolean): Long =
      if (takeFirst) first else second
    
    val rdd1 = sc.range(0, 10000)
    val rdd2 = sc.range(0, 10000)
    
    foo(
      { println("first"); rdd1.count },
      { println("second"); rdd2.count },
      true  // Only first will be evaluated
    )
    // first
    // Long = 10000
    

    Note:实际上,您应该创建本地惰性绑定,以确保不会在每次访问时评估参数。

  • 无限懒惰的集合,例如Stream http://www.scala-lang.org/api/2.11.8/#scala.collection.immutable.Stream

    import org.apache.spark.mllib.random.RandomRDDs._
    
    val initial = normalRDD(sc, 1000000L, 10)
    
    // Infinite stream of RDDs and actions and nothing blows :)
    val stream: Stream[RDD[Double]] = Stream(initial).append(
      stream.map {
        case rdd if !rdd.isEmpty => 
          val mu = rdd.mean
          rdd.filter(_ > mu)
        case _ => sc.emptyRDD[Double]
      }
    )
    

其中的一些子集应该足以实现复杂的惰性计算。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Apache Spark 中,如何使 RDD/DataFrame 操作变得惰性? 的相关文章

  • Spark-获取RDD中的文件名

    我正在尝试处理每天都在增长的 4 个文本文件目录 我需要做的是 如果有人试图搜索发票号码 我应该给他们包含该发票号码的文件列表 我能够通过将文本文件加载为 RDD 来映射和减少文本文件中的值 但是如何获取文件名和其他文件属性呢 从 Spar
  • 重塑案例类构造函数?

    试图找到一种方法来 重塑 案例构造函数以填充某些默认值 以下情况可能吗 def reshape T R1 lt HList R2 lt HList h R1 R2 gt T example case class MyClass a Doub
  • AssertionError:断言失败:没有在 Databricks 中进行 DeleteFromTable 的计划

    这个命令运行良好有什么原因吗 sql SELECT FROM Azure Reservations WHERE timestamp gt 2021 04 02 返回 2 行 如下 sql DELETE FROM Azure Reservat
  • Scala - 如何解决“值不是 Nothing 的成员”错误

    此示例代码基于 Atmosphere 类 但如果有人可以让我了解该错误的一般含义 我想我可以找出任何特定于 Atmosphere 的解决方案 val bc BroadcasterFactory getDefault lookup broad
  • Pyspark显示最大值(S)和多重排序

    感谢这里的一些帮助 使用Pyspark 请不能使用SQL 所以我有一个存储为 RDD 对的元组列表 城市1 2020 03 27 X1 44 城市1 2020 03 28 X1 44 City3 2020 03 28 X3 15 City4
  • Scala 集合不一致

    为什么 Scala Collections API 中的集合和列表之间缺乏一致性 例如 有不可变的 Set 但也有可变的 Set 如果我想使用后者 我可以简单地这样做 val set Set A set new A 但是 本身不存在可变列表
  • 为什么 Databricks Connect Test 无法在 Mac 上运行?

    我已经阅读了配置文档databricks connect但运行时仍然出现以下错误databricks connect test 来自终端的错误 java lang NoSuchMethodError org apache spark int
  • Scala:具有复杂结构的树插入尾递归

    我正在 scala 中创建自定义对象树 并且我的插入方法引发堆栈溢出 因为它不是尾递归 但是 我不太清楚如何使其尾递归 我见过使用 累加器 变量的相关示例 但它们要么是只能相乘和覆盖的整数之类的东西 要么是我在适应树时遇到困难的列表 这是我
  • Scala Array.apply 有何魔力

    来自 scala 2 10 4 的 array scala Array定义为 final class Array T length Int extends java io Serializable with java lang Clonea
  • 使用原始类型模拟案例类

    考虑以下类型结构 trait HasId T def id T case class Entity id Long extends HasId Long 比方说 我们想在一些测试中模拟实体类 val entityMock mock Enti
  • 对多列应用窗口函数

    我想执行窗口函数 具体为移动平均值 但针对数据帧的所有列 我可以这样做 from pyspark sql import SparkSession functions as func df df select func avg df col
  • Scala:类似 Option (Some, None) 但具有三种状态:Some、None、Unknown

    我需要返回值 当有人询问值时 告诉他们以下三件事之一 这是值 没有价值 我们没有关于该值的信息 未知 情况 2 与情况 3 略有不同 示例 val radio car radioType 我们知道该值 返回无线电类型 例如 pioneer
  • 如何捕获 Oozie Spark 输出

    有没有办法捕获spark的输出然后将其输入到shell上 我们当前正在使用 scala 创建 jar 文件 并希望我们的 Spark 输出成为 shell 输入 我的想法是使用 wf actionData spark XXXX var 我只
  • 任务和分区之间有什么关系?

    我能说 么 Spark任务的数量等于Spark分区的数量吗 执行器运行一次 执行器内部的批处理 等于一个任务吗 每个任务只产生一个分区 1 的重复 并行度或可以同时运行的任务数量由以下公式设置 Executor实例的数量 配置 每个执行器的
  • Spark 结构化流中具有不同计数的聚合抛出错误

    我正在尝试在 Spark 结构化流中获取 Parentgroup childgroup 和 MountingType 组的唯一 id 代码 下面的代码抛出错误 withWatermark timestamp 1 minutes val ag
  • 在 Scala 和 SBT 中调试较长的编译时间

    在我的 Scala SBT 项目中 我有一个文件需要 5 分钟才能编译 所有其他的都可以在几秒钟内编译 这使得开发非常痛苦 我确信我滥用了一些 Scala 构造 但我不知道如何调试它 如何在 Scala 中调试较长的编译时间 我正在使用 S
  • 使用多行选项和编码选项读取 CSV

    在 azure Databricks 中 当我使用以下命令读取 CSV 文件时multiline true and encoding SJIS 似乎编码选项被忽略了 如果我使用multiline选项 Spark 使用默认值encoding那
  • 如何在Spark结构化流中指定批处理间隔?

    我正在使用 Spark 结构化流并遇到问题 在 StreamingContext DStreams 中 我们可以定义批处理间隔 如下所示 from pyspark streaming import StreamingContext ssc
  • 缓存 Slick DBIO 操作

    我正在尝试加快 SELECT FROM WHERE name 的速度Play 中的查询类型 Scala 应用程序 我正在使用 Play 2 4 Scala 2 11 play slick 1 1 1 包 该软件包使用Slick 3 1版本
  • Java 中的“Lambdifying”scala 函数

    使用Java和Apache Spark 已用Scala重写 面对旧的API方法 org apache spark rdd JdbcRDD构造函数 其参数为 AbstractFunction1 abstract class AbstractF

随机推荐

  • 关闭 Windows 8 超级按钮栏

    我有一台 Surface Pro 我需要将其 锁定 为一种 Kiosk 模式 我知道 信息亭模式 的更新正在进行中 但是我需要在此之前执行此操作 我在互联网上进行了搜索 但似乎您无法通过滑动来禁用超级按钮栏screen 我找到了禁用触控板的
  • C 静态库的包装

    我有一个用于相机的 C 静态库 现在计划为 Windows 8 开发 C WPF UI 它将使用 C 静态库来捕获视频 音频 我的想法是 C Static 会有 C CLI 包装器 包装器将是托管 Dll C WPF UI 将使用此 Dll
  • 在克隆期间更改内部元素 id

    我正在单击按钮时克隆 DIV 元素 我可以更改正在克隆的 DIV 元素的 ID 值 但是是否可以更改内部元素的 id 在下面的代码中 我更改了 Id selection克隆时 我需要动态更改 id select div div class
  • 使用 Spring RestTemplate 访问 Https Rest 服务

    谁能给我提供一个代码示例来使用 Spring Rest 模板访问通过 HTTPS 保护的其余服务 URL 我有证书 用户名和密码 基本身份验证用于服务器端 我想创建一个可以使用提供的证书 用户名和密码 如果需要 连接到该服务器的客户端 Ke
  • 头部内有多个 RSS 链接标签,标记是否有效?

    在 RSS feed 中包含多个 RSS feed 是否有效 tag 我的意思是 标签如下 etc 我们有一小部分 总共五个 RSS 提要 我们已经拥有了一段时间 但只在头标签中包含了 主要 提要 可以将它们全部包括在内吗 是的 这是完全有
  • jquery UI - 将日期添加到选定日期

    这看起来很简单 但我无法解决它 我真的需要这个 如何通过 SelectedDate 事件将日期添加到选定日期 我需要对 2 个日期选择器执行日期范围限制 一旦用户设置了一个日期选择器 另一个日期选择器只需要允许日期等于第一个日期选择器的所选
  • WHERE 子句中的动态条件

    我有一个存储过程 想知道是否可以建立一个动态的where基于参数的条件 假设我有这个查询 SELECT FROM tbl Users 现在 我有一个名为 username 我想用它来建立一个动态的where条件 通过我的程序可能是 1 个或
  • 从文件读取/写入 std::unordered_map 的更快方法

    我正在与一些非常大的公司合作std unordered maps 数亿个条目 并且需要将它们保存到文件中或从文件中加载它们 我目前执行此操作的方法是迭代映射并一次读取 写入每个键和值对 std unordered map
  • 增加 Linux 中 TCP/IP 连接的最大数量

    我正在对服务器进行编程 似乎我的连接数量受到限制 因为即使我将连接数量设置为 无限 我的带宽也没有饱和 如何增加或消除 Ubuntu Linux 机器一次可以打开的最大连接数 操作系统是否限制了这一点 或者是路由器或ISP 或者是别的什么
  • 火鸟远程备份

    我想备份 firebird 数据库 我正在使用 gbak exe 实用程序 效果很好 但是 当我想从远程计算机进行备份时 备份文件存储在服务器文件系统上 有没有办法强制 gbak 实用程序下载备份文件 Thanks 备份存储在 Firebi
  • chrome:为什么 css 3d 对变换比例 < 0 的大元素进行变换会导致白屏错误?

    我遇到了屏幕区域在镀铬中变白的问题 下面是一个简化的测试用例 从测试来看 可以肯定的是 变换缩放和旋转的组合导致了该问题 为了用少量 html 元素重现该问题 我夸大了情况并使用了按比例缩小 0 125 的 5000px 正方形 请注意 只
  • Angular 2 单元测试:找不到名称“描述”

    我正在跟进本教程来自 angular io https angular io docs ts latest guide testing html jasmine 101 正如他们所说 我创建了 Hero spec ts 文件来创建单元测试
  • -XX:-PrintGC 和 XX:-PrintGCDetails 标志有什么作用?

    我找到了 JVM 标志here http java sun com javase technologies hotspot vmoptions jsp 有没有更详细的解释他们到底做什么 设置此标志会将 JVM 进行的所有垃圾收集写入日志文件
  • bash 输出为 json 格式

    我是 Linux 和 Bash 脚本新手 我正在尝试将 Ubuntu Linux 中的几个 bash 脚本输出为 JSON 格式 但是 我似乎无法让它正常工作 我的目标是得到这个 date u Y m d H M S date and ti
  • MS Access - 对于给定变量的每次出现,将“计数”值加一?

    我正在寻找一个查询 该查询将 读取一列中一行的当前值 将其与上面的行进行比较 如果上面的行匹配则按顺序计数 本质上 这听起来像是一个运行计数 而不是 Count 几乎就像每组行上的行应该为 Count Variable 1 直到达到最小值零
  • 使用母版页的 ASP.net MVC 应用程序中的 jQuery

    我试图让简单的 jQuery 在我的内容页面上执行 但没有成功 下面是我想要做的
  • 如何在 Internet Explorer 64 位中支持 PDF 文件在浏览器内显示

    使用 64 位版本的 Internet Explorer 时 Adobe 似乎不支持在浏览器中显示 PDF 单击 pdf 链接后 64 位 Internet Explorer 将始终跨越新的 Adob e 窗口来显示 pdf 32 位 In
  • Angular2 如果 ngModel 在表单标签中使用,则必须设置 name 属性或表单

    我从 Angular 2 收到此错误 core umd js 5995 异常 未捕获 承诺中 错误 app model exposure currencies model exposure currencies component html
  • 实现自定义compareTo

    Override public int compareTo Object t if t instanceof Student Student s Student t return this name compareTo s name els
  • 在 Apache Spark 中,如何使 RDD/DataFrame 操作变得惰性?

    假设我想编写一个函数 foo 来转换 DataFrame object Foo def foo source DataFrame DataFrame complex iterative algorithm with a stopping c