Spark 向 S3 写入/读取 - 分区大小和压缩

2024-04-13

我正在做一个实验来了解哪种文件大小对于 s3 和 [EMR + Spark] 表现最好

输入数据 :

Incompressible data: Random Bytes in files 
Total Data Size: 20GB  
Each folder has varying input file size: From 2MB To 4GB file size.

集群规格:

1 master + 4 nodes : C3.8xls
--driver-memory 5G \
--executor-memory 3G \
--executor-cores 2 \
--num-executors 60 \

Code :

scala> def time[R](block: => R): R = {
          val t0 = System.nanoTime()
          val result = block    // call-by-name
         val t1 = System.nanoTime()
          println("Elapsed time: " + (t1 - t0) + "ns")
          result
      }
time: [R](block: => R)R

scala> val inputFiles = time{sc.textFile("s3://bucket/folder/2mb-10240files-20gb/*/*")};
scala> val outputFiles = time {inputFiles.saveAsTextFile("s3://bucket/folder-out/2mb-10240files-20gb/")};

观察结果

  • 2MB - 32MB:大部分时间花在打开文件句柄上[效率不高]
  • 64MB 到 1GB:Spark 本身针对所有这些文件大小启动了 320 个任务,不再是 20GB 的存储桶中的文件数量 数据例如512 MB 文件有 40 个文件可构成 20 GB 数据,并且可以 原本只有 40 个任务需要完成,但实际上却有 320 个
    每个任务处理 64MB 数据。
  • 4GB 文件大小:输出 0 字节 [无法处理内存/数据甚至不可分割???]

问题

  • 任何强制将输入大小处理为 64MB 的默认设置?
  • 由于我使用的数据是随机字节并且已经被压缩,它如何进一步分割这些数据?如果它可以分割此数据为什么它不能分割 4GB 目标文件的文件大小 尺寸?
  • 为什么通过spark上传后压缩文件大小会增加? 2MB 压缩输入文件在输出存储桶中变为 3.6 MB。

由于未指定,我假设在我的答案中使用 gzip 和 Spark 2.2。

  • 任何强制将输入大小处理为 64MB 的默认设置?

就在这里。 Spark 是一个 Hadoop 项目,因此将 S3 视为基于块的文件系统,即使它是基于对象的文件系统。 所以这里真正的问题是:您使用的是哪种 S3 文件系统的实现(s3a,s3n)等。可以找到类似的问题here https://stackoverflow.com/questions/37168716/how-many-partitions-does-spark-create-when-a-file-is-loaded-from-s3-bucket.

  • 由于我使用的数据是随机字节并且已经被压缩,它如何进一步分割这些数据?如果它可以分割此数据,为什么它不能分割 4GB 目标文件大小的文件大小?

Spark 文档 http://spark.apache.org/docs/latest/programming-guide.html表明它能够读取压缩文件:

Spark 的所有基于文件的输入方法(包括 textFile)都支持在目录、压缩文件和通配符上运行。例如,您可以使用textFile(“/my/directory”)、textFile(“/my/directory/.txt”)和textFile(“/my/directory/.gz”)。

这意味着您的文件很容易被读取并转换为每行的纯文本字符串。

但是,您正在使用压缩文件。假设是gzip等不可分割的格式,则需要整个文件来解压。您正在使用 3gb 执行程序运行,它可以很好地满足 4mb-1gb 文件的需求,但无法一次处理大于 3gb 的文件(考虑到开销后可能会更小)。

一些进一步的信息可以在这里找到question https://stackoverflow.com/questions/40492967/dealing-with-a-large-gzipped-file-in-spark。可分割压缩类型的详细信息可以在此处找到answer https://stackoverflow.com/a/34209281/2996373.

  • 为什么通过 Spark 上传后压缩文件大小会增加?2MB 压缩输入文件在输出存储桶中变成 3.6 MB。

作为上一点的推论,这意味着 Spark 在以明文形式读取时已解压缩 RDD。重新上传时,不再压缩。要压缩,您可以传递压缩编解码器作为参数:

sc.saveAsTextFile("s3://path", classOf[org.apache.hadoop.io.compress.GzipCodec])

还有其他可用的压缩格式。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 向 S3 写入/读取 - 分区大小和压缩 的相关文章

随机推荐