我想重新分区/合并我的数据,以便将其保存到每个分区的一个 Parquet 文件中。我还想使用 Spark SQL partitionBy API。所以我可以这样做:
df.coalesce(1)
.write
.partitionBy("entity", "year", "month", "day", "status")
.mode(SaveMode.Append)
.parquet(s"$location")
我已经对此进行了测试,但它似乎表现不佳。这是因为数据集中只有一个分区需要处理,所有文件的分区、压缩和保存都必须由一个 CPU 核心完成。
我可以在调用合并之前重写它以手动进行分区(例如使用具有不同分区值的过滤器)。
但是有没有更好的方法使用标准 Spark SQL API 来做到这一点?
我遇到了完全相同的问题,我找到了一种使用方法来做到这一点DataFrame.repartition()
。使用时出现的问题coalesce(1)
是你的并行度下降到 1,并且它在最好的情况下会很慢,在最坏的情况下会出错。增加这个数字也没有帮助——如果你这样做的话coalesce(10)
您可以获得更多并行性,但最终每个分区有 10 个文件。
在不使用的情况下为每个分区获取一个文件coalesce()
, use repartition()
与您希望输出分区所依据的列相同。因此,对于您的情况,请执行以下操作:
import spark.implicits._
df
.repartition($"entity", $"year", $"month", $"day", $"status")
.write
.partitionBy("entity", "year", "month", "day", "status")
.mode(SaveMode.Append)
.parquet(s"$location")
完成此操作后,每个输出分区都会获得一个镶木地板文件,而不是多个文件。
我在 Python 中对此进行了测试,但我认为在 Scala 中它应该是相同的。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)