好吧,速度慢的原因是因为 InMemoryFileIndex 的构建。
尽管进行了分区修剪,Spark 需要了解分区和文件信息,这就是它需要该步骤的地方。
这篇 S.O 帖子详细阐述了它:here https://stackoverflow.com/questions/53111210/speed-up-inmemoryfileindex-for-spark-sql-job-with-large-number-of-input-files
因此,我们的想法是创建一个外部表,以便构建此信息,我使用这样的脚本来完成此操作(我使用了内联模式,如果有模式文件,您可以使用模式文件)
create external table ProfitLossAvro
partitioned by (Year int, Month int, Day int)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
Stored As
inputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
outputformat 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
Location 'abfss://[email protected] /cdn-cgi/l/email-protection/datawarehouse/CommercialDM.ProfitLoss/'
TBLPROPERTIES (
'avro.schema.literal'='{
"name": "Microsoft.Hadoop.Avro.Specifications.ProfitLoss",
"type": "record",
"fields": [{ "name":"MK_DatesID_TradeDate", "type":["int", "null"]},{ "name":"MK_UCRAccountsID_AccountID", "type":["int", "null"]},{ "name":"MK_ProductCategoriesID_ProductCategoryID", "type":["int", "null"]},{ "name":"CurrencyCode", "type":["string", "null"]},{ "name":"ProfitLoss", "type":["double", "null"]},{ "name":"MK_PnLAmountTypesID_PLBookingTypeID", "type":["int", "null"]}]
}');
但如果您随后查询该表,您将得到 0 行。这是因为现有分区不会自动添加。所以,你可以使用
msck repair table ProfitLossAvro
每次将数据添加到数据湖时,您都可以添加分区。
像这样的事情:-
ALTER TABLE ProfitLossAvro ADD PARTITION (Year=2020, Month=6, Day=26)
如果您随后使用如下命令查询数据,它将运行得更快
df=sqlContext.sql("select * \
from ProfitLossAvro \
where Year=" + year + " and Month=" + month_nz + " and Day=" + date_nz)
display(df)