Spark SQL无法完成大量分片的Parquet数据写入

2023-11-22

我正在尝试使用 Apache Spark SQL 将 S3 中的 json 日志数据 etl 到 S3 上的 Parquet 文件中。 我的代码基本上是:

import org.apache.spark._
val sqlContext = sql.SQLContext(sc)
val data = sqlContext.jsonFile("s3n://...", 10e-6)
data.saveAsParquetFile("s3n://...")

当我有多达 2000 个分区时,此代码可以工作,而当我有 5000 个或更多分区时,无论数据量有多大,该代码都会失败。通常情况下,我们可以将分区合并到可接受的数量, 但这是一个非常大的数据集,在 2000 个分区时我遇到了这里描述的问题question

14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ...
java.io.IOException: Could not read footer: java.lang.NullPointerException
        at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190)
        at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203)
        at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49)
        at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319)
        at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)
        at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77)
        at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
        at $line37.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
        at $line37.$read$$iwC$$iwC$$iwC.<init>(<console>:56)
        at $line37.$read$$iwC$$iwC.<init>(<console>:58)
        at $line37.$read$$iwC.<init>(<console>:60)
        at $line37.$read.<init>(<console>:62)
        at $line37.$read$.<init>(<console>:66)
        at $line37.$read$.<clinit>(<console>)
        at $line37.$eval$.<init>(<console>:7)
        at $line37.$eval$.<clinit>(<console>)
        at $line37.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106)
        at java.io.BufferedInputStream.close(BufferedInputStream.java:472)
        at java.io.FilterInputStream.close(FilterInputStream.java:181)
        at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298)
        at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180)
        at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

我在 ec2 中的 R3.xlarge 上的 Spark-1.1.0 上运行此程序。我正在使用 Spark-shell 控制台来运行上面的代码。我能够对data之后是SchemaRDD对象,所以看起来不是资源问题。 还可以读取和查询生成的 Parquet 文件,只是由于缺少摘要文件而需要很长时间。


尝试将此属性设置为 false :

sparkContext.hadoopConfiguration().set("parquet.enable.summary-metadata", "false");
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark SQL无法完成大量分片的Parquet数据写入 的相关文章

随机推荐

  • 如何从 CMD 运行 Pip 命令

    据我了解 Python 2 7 9 附带安装了 Pip 但是当我尝试从 CMD Windows 执行 Pip 命令时 出现以下错误 pip is not recognized as an internal or external comma
  • 如何在Android Studio中指定JDK版本?

    Android Studio 给我一个 Gradle 构建错误 如下所示 Error 3 22 compileSdkVersion android 22 requires compiling with JDK 7 现在它给了我这些可点击的提
  • 如何正确处理Python中的循环模块依赖?

    尝试找到一种良好且正确的模式来处理 Python 中的循环模块依赖关系 通常 解决方案是删除它 通过重构 然而 在这种特殊情况下 我们确实希望拥有需要循环导入的功能 EDIT 根据下面的答案 此类问题通常的攻击角度是重构 然而 为了这个问题
  • Mac 上的 PDO MySQL 驱动程序

    我有一台大约一年前安装的自定义 PHP 5 的 Mac 我记得我花了整个星期天的时间 编译了大约 20 次才能把它做好 我拥有的 MySQL 来自熵并且是预编译的 现在我需要让 PDO 与 MySQL 驱动程序一起工作 但驱动程序尚未安装
  • 在c二进制中,测试数字是否在范围内

    这是我无法解开的谜题的一部分 该函数接受三个输入 第一个是 int 第二个是下界 第三个是上限 我需要测试第一个数字是否在下限和上限内 包括下限和上限 如果在范围内则返回1 否则返回0 问题是我只能使用 lt lt gt gt 操作 并且只
  • linux、C++、xft:如何使用它?

    我尝试使用 Xft tutorial 好吧 让他们称之为教程 看起来像是在朝鲜营地写的 我也发现了这个one 那么让我尝试一步一步地进行 g XftTest cc lX11 lXft pkg config cflags freetype2
  • 为什么我会收到此 WCF 错误消息?

    当我调用 WCF 服务时 出现以下错误 我在这里缺少什么 System String with data contract name ArrayOfstring http schemas microsoft com 2003 10 Seri
  • DRF 光标分页示例

    我正在尝试设置使用 DRF 进行光标分页获取交易记录列表 按创建日期排序 我不知道如何执行初始请求 因为我在那个阶段还不知道光标 令人惊讶的是 我找不到这方面的例子 另外 有没有办法使用 CursorPagination 设置每个请求的页面
  • SwiftUI @Binding 不刷新视图

    我有一个简单的主 详细信息界面 其中详细信息视图修改数组中的项目 使用下面的代码 模型已正确更新 但 SwiftUI 不会刷新视图以反映更改 Model struct ProduceItem Identifiable let id UUID
  • 插入时保持 std 向量/列表排序,或全部排序

    假设我的向量 列表中有 30000 个对象 这是我一一补充的 我需要将它们分类 一次排序 如 std sort 更快 还是在我一一添加对象时保持向量 列表排序更快 矢量 列表以后不会被修改 当你在一个接一个地插入元素的同时保持向量列表排序时
  • 如何在Flutter中实现持久秒表?

    我正在 flutter 中实现一个计时器 这是应用程序的结构 页面 A 包含一些列表 用户单击这些列表并将其带到计时器页面 页面 B 格式 运行计时器 我能够正确运行计时器 秒表 但是当我按页面 B 上的后退按钮时 我会在处理错误后调用 s
  • Chrome 说:资源解释为样式表,但使用 MIME 类型 application/xml 进行传输

    我有一个使用 XSL 文件设计样式的 XML 文件 在 Firefox 中打开 XML 文件时 我没有遇到任何问题 尽管奇怪的是 有时它只会在我点击 重新加载 后才会显示 但在 Chrome Chromium 中我收到错误消息 Resour
  • SQL Server 日期格式函数

    SELECT CONVERT VARCHAR 10 GETDATE 105 此查询返回 DD MM YYYY 中的日期 格式为 varchar 我需要在 sql server 中的日期时间数据类型中使用相同的格式 请帮助我 在 SQL Se
  • 为什么 lambda 表达式没有被“interned”?

    字符串是引用类型 但它们是不可变的 这使他们能够interned由编译器 只要出现相同的字符串文字 就可能引用相同的对象 委托也是不可变的引用类型 使用以下方法向多播委托添加方法 运算符构成任务 这不是可变性 并且 就像字符串一样 有一种
  • 在 C# 中使用内存映射文件时是否可以避免数据副本?

    我对内存映射文件在 C 中如何工作的理解是 每个数据请求都会产生一个副本 例如 如果您有一个作为文件持久保存的大型数据结构 则使用内存映射文件会导致将实际文件的内存映射到 RAM 中 并在从文件中读取后将其副本驻留在 GC 堆中 我假设这是
  • Spark SubQuery 扫描整个分区

    我有一个按 日期 字段分区的配置单元表 我想编写一个查询来从最新 最大 分区获取数据 spark sql select field from table where date of 2019 06 23 explain True vs sp
  • 从字符串 JavaScript 中提取数字

    有谁知道在 JavaScript 中从字符串中提取数字的方法吗 Example 1 banana 1 pineapple 3 oranges 我的目的是将结果保存在数组 JSON 或其他内容中 Result 1 1 3 var result
  • #define TRUE !FALSE 与 #define TRUE 1

    撇开自 c99 年以来的事实不谈stdbool h在定义宏来处理布尔类型时已经存在C以下有什么区别吗 define FALSE 0 define TRUE 1 Option 1 define TRUE FALSE Option 2 从这里的
  • JPA 使用父级合并但创建子级时如何获取生成的 id/对象?

    我有一个先前已被保留并具有 OneToMany与另一个实体的关系 为了添加新实体 我只需在托管对象中添加新实体并使用cascadeType ALL坚持改变 有没有一种方法可以获取新创建的对象的 id 或获取与合并一起使用的原始 非托管 对象
  • Spark SQL无法完成大量分片的Parquet数据写入

    我正在尝试使用 Apache Spark SQL 将 S3 中的 json 日志数据 etl 到 S3 上的 Parquet 文件中 我的代码基本上是 import org apache spark val sqlContext sql S