Pyspark - 与扫描后应用的谓词相同的过滤器,即使在谓词被下推之后也是如此

2023-11-30

Question:连接两个数据集时,为什么过滤器 isnotnull 在连接键列上应用了两次?在物理计划中,它首先作为 PushedFilter 应用,然后立即显式应用。为什么会这样?

code:

import os
import pandas as pd, numpy as np
import pyspark
spark=pyspark.sql.SparkSession.builder.getOrCreate()

save_loc = "gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/"

df1 = spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size = 1000, p = [0.47,0.48,0.05]),
                                         'b': np.random.random(1000)}))

df2 = spark.createDataFrame(pd.DataFrame({'a':np.random.choice([1,2,None],size = 1000, p = [0.47,0.48,0.05]),
                                         'b': np.random.random(1000)}))

df1.write.parquet(os.path.join(save_loc,"dfl_key_int"))
df2.write.parquet(os.path.join(save_loc,"dfr_key_int"))

dfl_int = spark.read.parquet(os.path.join(save_loc,"dfl_key_int"))
dfr_int = spark.read.parquet(os.path.join(save_loc,"dfr_key_int"))

dfl_int.join(dfr_int,on='a',how='inner').explain()



output:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [a#23L, b#24, b#28]
   +- BroadcastHashJoin [a#23L], [a#27L], Inner, BuildRight, false
      :- Filter isnotnull(a#23L)
      :  +- FileScan parquet [a#23L,b#24] Batched: true, DataFilters: [isnotnull(a#23L)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfl_key_int], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:bigint,b:double>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#75]
         +- Filter isnotnull(a#27L)
            +- FileScan parquet [a#27L,b#28] Batched: true, DataFilters: [isnotnull(a#27L)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/random_tests/dfr_key_int], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:bigint,b:double>

原因是一个PushedFilter不保证所有数据在被 Spark 读入内存之前都按照您想要的方式进行过滤。有关什么的更多背景信息PushedFilter是,检查一下这个答案.

镶木地板文件

让我们看看您的示例中的 Parquet 文件。 Parquet 文件以列格式存储,并且也按行组(或块)组织。下图来自于Apache Parquet 文档:

enter image description here

您会看到数据以列方式存储,并且它们被分成块(行组)。现在,对于每个列/行块组合,Parquet 都会存储一些元数据。在那张图片中,您会看到它包含一堆元数据,然后还extra key/value pairs。这些还包含有关您的数据的统计信息(取决于您的列的类型)。

这些统计数据的一些例子是:

  • 块的最小/最大值是多少(如果它对于列的数据类型有意义)
  • 该块是否具有非空值
  • ...

回到你的例子

您正在加入a柱子。为了能够做到这一点,我们需要确保a没有空值。让我们想象一下你的a列(忽略其他列)的存储方式如下:

  • a column:
    • 块 1:0, 1, None, 1, 1, None
    • 大块2:0, 0, 0, 0, 0, 0
    • 大块3:None, None, None, None, None, None

现在,使用PushedFilter我们可以立即(只需查看块的元数据)忽略块 3,我们甚至不必读入它!

但如您所见,块 1 仍然包含空值。这是我们无法仅通过查看块的元数据来过滤掉的东西。因此,我们必须读取整个块,然后使用第二个块在 Spark 中过滤其他空值Filter物理计划中的节点。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Pyspark - 与扫描后应用的谓词相同的过滤器,即使在谓词被下推之后也是如此 的相关文章

随机推荐