load() 在 Spark 中做什么?

2023-12-19

火花很懒吧? 那么什么是load() do?

start = timeit.default_timer()

 df = sqlContext.read.option(
     "es.resource", indexes
 ).format("org.elasticsearch.spark.sql")
 end = timeit.default_timer()

 print('without load: ', end - start) # almost instant
 start = timeit.default_timer()

 df = df.load()
 end = timeit.default_timer()
 print('load: ', end - start) # takes 1sec

 start = timeit.default_timer()

 df.show()
 end = timeit.default_timer()
 print('show: ', end - start) # takes 4 sec

If show()我猜这是唯一的行动load不会花费太多时间,如 1 秒。所以我得出结论load()是一个动作(与 Spark 中的转换相对)

load 是否真的将整个数据加载到内存中?我不这么认为,但是它有什么作用呢?

我搜索并查看了文档https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html但这没有帮助..


tl;dr load()是一个 DataFrameReader api(org.apache.spark.sql.DataFrameReader#load)从下面的代码可以看出,它返回一个DataFrame,可以在其上应用 Spark 转换。

/**
   * Loads input in as a `DataFrame`, for data sources that support multiple paths.
   * Only works if the source is a HadoopFsRelationProvider.
   *
   * @since 1.6.0
   */
  @scala.annotation.varargs
  def load(paths: String*): DataFrame

需要创建一个 DataFrame 来执行转换。
要从路径(HDFS、S3 等)创建数据帧,用户可以使用spark.read.format("<format>").load().(还有特定于数据源的 API,可以自动加载文件,例如spark.read.parquet(<path>))

为什么需要整整1秒?

在基于文件的源中,这一次可以归因于文件列表。在 HDFS 中,这些列表并不昂贵,而在像 S3 这样的云存储中,这个列表非常昂贵,并且需要与文件数量成比例的时间。
在您的情况下,使用的数据源是elastic-search,时间可归因于连接建立、收集元数据以执行分布式扫描等,这取决于 Elastic Serach 连接器实现。我们可以启用调试日志并检查更多信息。如果elasticsearch有办法记录它收到的请求,我们可以检查elasticsearch日志中是否有在该时间之后发出的请求load()被解雇。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

load() 在 Spark 中做什么? 的相关文章

  • Spark - 构建时出现 scala 初始化错误

    我正在尝试在我的 scala 应用程序中使用 Spark 这是我正在使用的 Spark 依赖项
  • 一起调用distinct和map会在spark库中抛出NPE

    我不确定这是否是一个错误 所以如果你这样做 d spark RDD String d distinct map x gt d filter equals x 您将获得 Java NPE 但是如果你做了一个collect之后立马distinc
  • S3A:失败,而 S3:在 Spark EMR 中工作

    我将 EMR 5 5 0 与 Spark 结合使用 如果我使用一个简单的文件写入 s3s3 网址写得很好 但如果我使用s3a 地址 它失败了Service Amazon S3 Status Code 403 Error Code Acces
  • 如何调试 Spark 工作线程上的映射函数中的错误?

    我是 Spark 新手 正在努力寻找自己的方法 我有一个 Spark 应用程序 它在dataset 此地图功能可能会因主要与数据相关的原因而失败 我怎样才能获得一些关于问题所在的有意义的信息 我不知道从哪里开始 非常感谢 如果您想编写单元测
  • 在 pyspark 中创建一个包含单列元组的数据框

    我有一个 RDD 其中包含以下内容 column 1 value column 2 value column 3 value column 100 value 我想创建一个包含带有元组的单列的数据框 我得到的最接近的是 schema Str
  • 我如何判断我的 Spark 工作是否有进展?

    我有一个正在运行的 Spark 作业YARN它似乎只是挂起并且没有进行任何计算 这是当我这样做时纱线所说的yarn application status
  • Python - 将整数或字符串发送到 Spark-Streaming

    我可以通过 CSV 文件发送我的数据 首先 将我的随机数写入CSV文件然后发送 但是可以直接发送吗 我的套接字代码 import socket host localhost port 8080 s socket socket socket
  • 更改spark_temporary目录路径

    是否可以更改 temporarySpark在写入之前保存临时文件的目录 特别是 由于我正在编写表的单个分区 因此我希望临时文件夹位于分区文件夹内 是否可以 由于其实现原因 无法使用默认的 FileOutputCommiter FileOut
  • 将 Scala Dataframe 写入 CSV 文件时应用 UTF8 编码

    在 Spark2 Scala 中将数据帧写入 CSV 文件时如何正确应用 UTF8 编码 我正在使用这个 df repartition 1 write mode SaveMode Overwrite format csv option he
  • 使用 Spark pandas_udf 创建列,具有动态数量的输入列

    我有这个 df df spark createDataFrame row a 5 0 0 0 11 0 row b 3394 0 0 0 4543 0 row c 136111 0 0 0 219255 0 row d 0 0 0 0 0
  • Spark-获取RDD中的文件名

    我正在尝试处理每天都在增长的 4 个文本文件目录 我需要做的是 如果有人试图搜索发票号码 我应该给他们包含该发票号码的文件列表 我能够通过将文本文件加载为 RDD 来映射和减少文本文件中的值 但是如何获取文件名和其他文件属性呢 从 Spar
  • Pyspark显示最大值(S)和多重排序

    感谢这里的一些帮助 使用Pyspark 请不能使用SQL 所以我有一个存储为 RDD 对的元组列表 城市1 2020 03 27 X1 44 城市1 2020 03 28 X1 44 City3 2020 03 28 X3 15 City4
  • Spark SQL如何读取压缩的csv文件?

    我尝试过使用apispark read csv读取带有扩展名的压缩 csv 文件bz or gzip 有效 但在源代码中我没有找到任何可以声明的选项参数codec type 即使在这个link https github com databr
  • Spark - 如何在本地运行独立集群

    是否有可能运行Spark独立集群仅在一台机器上进行本地操作 这与仅在本地开发作业基本上不同 即local 到目前为止 我正在运行 2 个不同的虚拟机来构建集群 如果我可以在同一台机器上运行一个独立的集群 该怎么办 例如三个不同的 JVM 正
  • 如何捕获 Oozie Spark 输出

    有没有办法捕获spark的输出然后将其输入到shell上 我们当前正在使用 scala 创建 jar 文件 并希望我们的 Spark 输出成为 shell 输入 我的想法是使用 wf actionData spark XXXX var 我只
  • SPARK SQL - 当时的情况

    我是 SPARK SQL 的新手 SPARK SQL 中是否有相当于 CASE WHEN CONDITION THEN 0 ELSE 1 END 的内容 select case when 1 1 then 1 else 0 end from
  • 如何在Spark结构化流中指定批处理间隔?

    我正在使用 Spark 结构化流并遇到问题 在 StreamingContext DStreams 中 我们可以定义批处理间隔 如下所示 from pyspark streaming import StreamingContext ssc
  • Java 中的“Lambdifying”scala 函数

    使用Java和Apache Spark 已用Scala重写 面对旧的API方法 org apache spark rdd JdbcRDD构造函数 其参数为 AbstractFunction1 abstract class AbstractF
  • 懒惰背景下的变革与行动

    正如 Learning Spark 闪电般快速的大数据分析 一书中提到的 由于 Spark 计算 RDD 的方式不同 转换和操作也有所不同 在对惰性进行一些解释之后 我发现转换和操作都是惰性地进行的 那么问题来了 这句话的意思是什么 对比
  • 如何获取 Kafka 偏移量以进行结构化查询以进行手动且可靠的偏移量管理?

    Spark 2 2引入了Kafka的结构化流源 据我了解 它依赖 HDFS 检查点目录来存储偏移量并保证 恰好一次 消息传递 但是旧码头 比如https blog cloudera com blog 2017 06 offset manag

随机推荐