Home > front end >  Dynamic filter based on the config and user : PySpark
Dynamic filter based on the config and user : PySpark

Time:09-17

Need to apply the filters dynamically based on the filters from two places. (1) config (2) user/job input

Required Filter to be applied: (1) whatever the filters mentioned in the config.filters and (2) user supplied filter ie number of days based on the run date. ie rundate-history_days. If user passes the rundate as 2020-01-20 and history_days as 5, then final filter should be :

 cust=123 and (activity_day between rundate and rundate-5)

I was able to achive this using two step filters. (1) using sql way of filter from the config df.filter(config['config']) (2) 2nd round of filter on top of 1 using activity_day>=date_sub(rundate,history_days) & activity_day<rundate

Is there anyway I can merge two step filter into one? so that I can maintain both the filters in config and somehow substitute user input?

Data:

df = spark.createDataFrame(
        [
          (123,"2020-01-01"),
          (124,"2020-01-01"),
          (123,"2019-01-01")
        ],
        ("cust", "activity_day")
    )

Config:

config = """
                [ {
                      "source":"df_1",
                      "filters":"cust=123",
                  }
                ]

CodePudding user response:

You can start by parsing your config to extract the filter condition 'filtersand the corresponding value into a dictionary and add therun_datecondition to it to further filter theDataFrame`

Data Preparation

sparkDF = sql.createDataFrame(
        [
          (123,"2020-01-01"),
          (124,"2020-01-01"),
          (123,"2019-01-01")
        ],
        ("cust", "activity_day")
    )

sparkDF.show()

 ---- ------------ 
|cust|activity_day|
 ---- ------------ 
| 123|  2020-01-01|
| 124|  2020-01-01|
| 123|  2019-01-01|
 ---- ------------ 

Parsing Config and Generating Filter Condn

config = [ {"source":"df_1","filters":"cust=123",}]

filter_dict = {}

for row in config:
    if 'filters' in row:
        key,value = row['filters'].split("=")
        filter_dict[key] = value

filter_dict

{'cust': '123'}

run_date = "2020-01-01"

upper_range = F.to_date(F.lit(run_date))
lower_range = F.date_add(upper_range,-5)

secondary_condn = (F.col('activity_day').between(lower_range,upper_range))

final_condn = (F.col(column) == filter_dict[column]) & (secondary_condn)

Filtering DataFrame

sparkDF.filter(final_condn).show()

 ---- ------------ 
|cust|activity_day|
 ---- ------------ 
| 123|  2020-01-01|
 ---- ------------ 
SQL Approach Toward multiple conditions

You can utilise createOrReplaceTempView for more complex filters. The idea is to build WHERE filter(s) to merge it to a SQL query to filter the rows

sparkDF = sql.createDataFrame(
        [
          (123,"2020-01-01",1,"NL"),
          (124,"2020-01-01",0,"US"),
          (123,"2019-01-01",1,"IN"),
          (124,"2020-01-02",0,"NL"),
        ],
        ("cust", "activity_day","is_deleted","country")
    )

sparkDF.show()

 ---- ------------ ---------- ------- 
|cust|activity_day|is_deleted|country|
 ---- ------------ ---------- ------- 
| 123|  2020-01-01|         1|     NL|
| 124|  2020-01-01|         0|     US|
| 123|  2019-01-01|         1|     IN|
| 124|  2020-01-02|         0|     NL|
 ---- ------------ ---------- ------- 

where_filter = ""
logical_flag = " OR "

for i,row in enumerate(config):
    if 'filters' in row:
        if i == 0:
            where_filter  = row['filters']
        else:
            where_filter  = logical_flag   "("   row['filters']   ")"

where_filter # O/P -- 'cust=123 OR (is_deleted != 1 AND country="NL")'

if where_filter != "":
    sql.sql(f"""
        SELECT *
        FROM customer_activity
        WHERE {where_filter}
    """).show()

 ---- ------------ ---------- ------- 
|cust|activity_day|is_deleted|country|
 ---- ------------ ---------- ------- 
| 123|  2020-01-01|         1|     NL|
| 123|  2019-01-01|         1|     IN|
| 124|  2020-01-02|         0|     NL|
 ---- ------------ ---------- ------- 

  • Related