Home > OS >  What is DataFilter in pyspark?
What is DataFilter in pyspark?

Time:02-02

I am seeing something called as DataFilter in my query execution plan:

FileScan parquet [product_id#12,price#14] Batched: true, DataFilters: [isnotnull(product_id#12)], Format: Parquet, Location: InMemoryFileIndex[gs://monsoon-credittech.appspot.com/spark_datasets/products_parquet_dtc], PartitionFilters: [], PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<product_id:int,price:int>

There is a

  • PartitionFilters: []
  • PushedFilters: [IsNotNull(product_id)]
  • DataFilters: [isnotnull(product_id#12)]

I understand PartitionFilter and PushedFilter. But, what is the DataFilter that is showing up here? There is an answer to a similar question here. However, the definition of DataFilter given there is exactly what I think PushedFilter is (Also, that answer has 1 downvote). So, Is my understanding of PushedFilter wrong? If not, then, what is DataFilter?

CodePudding user response:

This explanation is for Spark's latest version at the time of this post (3.3.1).

PushedFilters are kind of a subset of DataFilters, you can see this in DataSourceScanExec.scala. They are the DataFilters whose predicates we can push down to filter on the metadata of the file you're trying to read in instead of against the data itself. Filtering against the metadata is of course way quicker then filtering against the data itself, because you might be able to skip reading large chunks of data when doing that.

So to structure everything, we have:

  • PartitionFilters: Filters on partition columns. Enable you to disregard directories within your parquet file.
  • DataFilters: Filters on non-partition columns.
    • PushedFilters: those DataFilters whose predicates we can push down
    • So when a filter is a DataFilter but not a PushedFilter, it means that we can't push down the predicate to filter on the underlying file's metadata.

Example

Let's take this example of parquet files (not all file formats support predicate pushdown, but parquet files do):

import org.apache.spark.sql.functions.col

val df = Seq(
  (1,2,3),
  (2,2,3),
  (3,20,300),
  (1,24,299),
).toDF("colA", "colB", "colC")

df.write.partitionBy("colA").mode("overwrite").parquet("datafilter.parquet")

So we're simply writing a parquet file that is partitioned by the colA column. The file structure looks like this:

datafilter.parquet/
├── colA=1
│   ├── part-00000-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
│   └── part-00003-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
├── colA=2
│   └── part-00001-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
├── colA=3
│   └── part-00002-55cb3320-f145-4d64-8cba-55a72111c0c8.c000.snappy.parquet
└── _SUCCESS

Let's have a look at the 3 filter types:

PartitionFilter

spark.read.parquet("./datafilter.parquet").filter(col("colA") < 10).explain

== Physical Plan ==
*(1) ColumnarToRow
 - FileScan parquet [colB#165,colC#166,colA#167] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [isnotnull(colA#167), (colA#167 < 10)], PushedFilters: [], ReadSchema: struct<colB:int,colC:int>

Here you see that our filter is a PartitionFilter, since our data has been partitioned by colA we can easily filter on the directories.

PushedFilter

spark.read.parquet("./datafilter.parquet").filter(col("colB") < 10).explain

== Physical Plan ==
*(1) Filter (isnotnull(colB#172) AND (colB#172 < 10))
 - *(1) ColumnarToRow
    - FileScan parquet [colB#172,colC#173,colA#174] Batched: true, DataFilters: [isnotnull(colB#172), (colB#172 < 10)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(colB), LessThan(colB,10)], ReadSchema: struct<colB:int,colC:int>

Here you see our filter (colB < 10) makes part of the DataFilters. This is because colB is not a partition column.

It also is part of the PushedFilters, because this is a predicate we can push down. Parquet files store minimum and maximum values of chunks as metadata. So if the minimum value of a chunk is larger than 10, we know we can skip reading this chunk.

Non PushedFilters

spark.read.parquet("./datafilter.parquet").filter(col("colB") < col("colC")).explain

== Physical Plan ==
*(1) Filter ((isnotnull(colB#179) AND isnotnull(colC#180)) AND (colB#179 < colC#180))
 - *(1) ColumnarToRow
    - FileScan parquet [colB#179,colC#180,colA#181] Batched: true, DataFilters: [isnotnull(colB#179), isnotnull(colC#180), (colB#179 < colC#180)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:somePath/spark-tests/datafilter.parquet], PartitionFilters: [], PushedFilters: [IsNotNull(colB), IsNotNull(colC)], ReadSchema: struct<colB:int,colC:int>

This filter is more complicated. colB < colC is not a filter that we can push down to filter onto the metadata of the parquet file. That means that we need to read the full data and filter afterwards in memory.

  • Related