Home > OS >  Spark partition filter is skipped when table is used in where condition, why?
Spark partition filter is skipped when table is used in where condition, why?

Time:01-13

Maybe someone observed this behavior and knows why Spark takes this route. I wanted to read only few partitions from partitioned table.

SELECT *
FROM my_table
WHERE snapshot_date IN('2023-01-06', '2023-01-07')

results in (part of) the physical plan:

-- Location: PreparedDeltaFileIndex [dbfs:/...]
-- PartitionFilters: [cast(snapshot_date#282634 as string) IN (2023-01-06,2033-01-07)]

It is very fast, ~1s, in the execution plan I see it is using those provided datasets as arguments for partition filters.

If I try to provide filter predicate in form of the one column table it does full table scan and it takes 100x longer.

SELECT *
FROM
  my_table
WHERE snapshot_date IN (
    SELECT snapshot_date
    FROM (VALUES('2023-01-06'), ('2023-01-07')) T(snapshot_date)
  )

-- plan

Location: PreparedDeltaFileIndex [dbfs:/...]
PartitionFilters: []
ReadSchema: ...
 

I was unable to find any query hints that would force Spark to push down this predicate. One can easily do for loop in python and wrap logic of reading a table with desired dates and read them one by one. But I'm not sure it is possible in SQL.

Is there any option/switch I have missed?

CodePudding user response:

I don't think pushing down this kind of predicate is something supported by Spark's HiveMetaStore client, today. So in first case, HiveShim.convertFilters(...) method will transform

:
WHERE snapshot_date IN ('2023-01-06', '2023-01-07') 

into a filtering predicate understood by HMS as

snapshot_date="2023-01-06" or snapshot_date="2023-01-07"

but in the second, sub-select, case the condition will be skipped altogether.

  /**
   * Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
   * a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
   *
   * Unsupported predicates are skipped.
   */
  def convertFilters(table: Table, filters: Seq[Expression]): String = {
    lazy val dateFormatter = DateFormatter()
    :
    :
  • Related