Spark 读取分区 avro 比指向确切位置慢得多

2024-04-15

我正在尝试读取分区的 Avro 数据,该数据是根据年、月和日分区的,这似乎比直接将其指向路径要慢得多。 在物理计划中,我可以看到分区过滤器正在传递,因此它不会扫描整个目录集,但速度仍然慢得多。

例如。像这样读取分区数据

profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/"
 
profitLoss = spark.read.\
    format("com.databricks.spark.avro").\
    option("header", "false").\
    option("inferSchema", "false").load(profitLossPath)
 
profitLoss.createOrReplaceTempView("ProfitLosstt")

df=sqlContext.sql("SELECT * \
                             FROM ProfitLosstt \
                             where Year= " + year + " and Month=" + month_nz + " and Day=" + date_nz )

大约需要 3 分钟

而我使用字符串生成器指向确切位置,只需 2 秒即可完成

profitLossPath="abfss://raw@"+datalakename+".dfs.core.windows.net/datawarehouse/CommercialDM.ProfitLoss/Year=" +year +"/Month=" + month_nz + "/Day=" + date_nz
 
profitLoss = spark.read.\
    format("com.databricks.spark.avro").\
    option("header", "false").\
    option("inferSchema", "false").load(profitLossPath)

 
profitLoss.createOrReplaceTempView("ProfitLosstt")

df=sqlContext.sql("SELECT * \
                             FROM ProfitLosstt "
                              )
                  
display(df)

查看第一个(较慢)的物理计划确实表明分区过滤器已传递

什么可以解释发现阶段花了这么长时间?

有任何问题我都可以详细说明。


好吧,速度慢的原因是因为 InMemoryFileIndex 的构建。

尽管进行了分区修剪,Spark 需要了解分区和文件信息,这就是它需要该步骤的地方。 这篇 S.O 帖子详细阐述了它:here https://stackoverflow.com/questions/53111210/speed-up-inmemoryfileindex-for-spark-sql-job-with-large-number-of-input-files

因此,我们的想法是创建一个外部表,以便构建此信息,我使用这样的脚本来完成此操作(我使用了内联模式,如果有模式文件,您可以使用模式文件)

create external table ProfitLossAvro 


partitioned by (Year int, Month int, Day int)

ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.avro.AvroSerDe'


Stored As 

 inputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'

 outputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'

Location 'abfss://[email protected] /cdn-cgi/l/email-protection/datawarehouse/CommercialDM.ProfitLoss/'

TBLPROPERTIES (
    'avro.schema.literal'='{
      "name": "Microsoft.Hadoop.Avro.Specifications.ProfitLoss",
      "type": "record",
      "fields": [{ "name":"MK_DatesID_TradeDate", "type":["int", "null"]},{ "name":"MK_UCRAccountsID_AccountID", "type":["int", "null"]},{ "name":"MK_ProductCategoriesID_ProductCategoryID", "type":["int", "null"]},{ "name":"CurrencyCode", "type":["string", "null"]},{ "name":"ProfitLoss", "type":["double", "null"]},{ "name":"MK_PnLAmountTypesID_PLBookingTypeID", "type":["int", "null"]}]
    }');

但如果您随后查询该表,您将得到 0 行。这是因为现有分区不会自动添加。所以,你可以使用

msck repair table ProfitLossAvro

每次将数据添加到数据湖时,您都可以添加分区。 像这样的事情:-

ALTER TABLE ProfitLossAvro ADD PARTITION (Year=2020, Month=6, Day=26)

如果您随后使用如下命令查询数据,它将运行得更快

df=sqlContext.sql("select * \
               from ProfitLossAvro \
               where Year=" + year + " and Month=" + month_nz + " and Day=" + date_nz)

display(df)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 读取分区 avro 比指向确切位置慢得多 的相关文章

随机推荐