I am seeing something called as DataFilter
in my query execution plan:
FileScan parquet [product_id#12,price#14] Batched: true, DataFilters: [isnotnull(product_id#12)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/products_parquet_dtc], PartitionFilters: [], PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<product_id:int,price:int>
There is a
- PartitionFilters: []
- PushedFilters: [IsNotNull(product_id)]
- DataFilters: [isnotnull(product_id#12)]
I understand PartitionFilter
and PushedFilter
. But, what is the DataFilter
that is showing up here? There is an answer to a similar question here. However, the definition of DataFilter
given there is exactly what I think PushedFilter
is (Also, that answer has 1 downvote). So, Is my understanding of PushedFilter
wrong? If not, then, what is DataFilter
?
CodePudding user response:
This explanation is for Spark's latest version at the time of this post (3.3.1).
PushedFilters
are kind of a subset of DataFilters
, you can see this in DataSourceScanExec.scala
. They are the DataFilters
whose predicates we can push down to filter on the metadata of the file you're trying to read in instead of against the data itself. Filtering against the metadata is of course way quicker then filtering against the data itself, because you might be able to skip reading large chunks of data when doing that.
So to structure everything, we have:
- PartitionFilters: Filters on partition columns. Enable you to disregard directories within your parquet file.
- DataFilters: Filters on non-partition columns.
- PushedFilters: those
DataFilters
whose predicates we can push down - So when a filter is a
DataFilter
but not aPushedFilter
, it means that we can't push down the predicate to filter on the underlying file's metadata.
- PushedFilters: those
Example
Let's take this example of parquet files (not all file formats support predicate pushdown, but parquet files do):
import org.apache.spark.sql.functions.col
val df = Seq(
(1,2,3),
(2,2,3),
(3,20,300),
(1,24,299),
).toDF("colA", "colB", "colC")
df.write.partitionBy("colA").mode("overwrite").parquet("datafilter.parquet")
So we're simply writing a parquet file that is partitioned by the colA
column. The file structure looks like this:
datafilter.parquet/
├── colA=1
│ ├── part-00000-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
│ └── part-00003-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
├── colA=2
│ └── part-00001-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
├── colA=3
│ └── part-00002-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
└── _SUCCESS
Let's have a look at the 3 filter types:
PartitionFilter
spark.read.parquet("./datafilter.parquet").filter(col("colA") < 10).explain
== Physical Plan ==
*(1) ColumnarToRow
- FileScan parquet [colB#165,colC#166,colA#167] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [isnotnull(colA#167), (colA#167 < 10)], PushedFilters: [], ReadSchema: struct<colB:int,colC:int>
Here you see that our filter is a PartitionFilter
, since our data has been partitioned by colA
we can easily filter on the directories.
PushedFilter
spark.read.parquet("./datafilter.parquet").filter(col("colB") < 10).explain
== Physical Plan ==
*(1) Filter (isnotnull(colB#172) AND (colB#172 < 10))
- *(1) ColumnarToRow
- FileScan parquet [colB#172,colC#173,colA#174] Batched: true, DataFilters: [isnotnull(colB#172), (colB#172 < 10)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(colB), LessThan(colB,10)], ReadSchema: struct<colB:int,colC:int>
Here you see our filter (colB < 10
) makes part of the DataFilters
. This is because colB
is not a partition column.
It also is part of the PushedFilters
, because this is a predicate we can push down. Parquet files store minimum and maximum values of chunks as metadata. So if the minimum value of a chunk is larger than 10, we know we can skip reading this chunk.
Non PushedFilters
spark.read.parquet("./datafilter.parquet").filter(col("colB") < col("colC")).explain
== Physical Plan ==
*(1) Filter ((isnotnull(colB#179) AND isnotnull(colC#180)) AND (colB#179 < colC#180))
- *(1) ColumnarToRow
- FileScan parquet [colB#179,colC#180,colA#181] Batched: true, DataFilters: [isnotnull(colB#179), isnotnull(colC#180), (colB#179 < colC#180)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(colB), IsNotNull(colC)], ReadSchema: struct<colB:int,colC:int>
This filter is more complicated. colB < colC
is not a filter that we can push down to filter onto the metadata of the parquet file. That means that we need to read the full data and filter afterwards in memory.