Need to apply the filters dynamically based on the filters from two places. (1) config (2) user/job input
Required Filter to be applied: (1) whatever the filters mentioned in the config.filters and (2) user supplied filter ie number of days based on the run date. ie rundate-history_days. If user passes the rundate as 2020-01-20 and history_days as 5, then final filter should be :
cust=123 and (activity_day between rundate and rundate-5)
I was able to achive this using two step filters.
(1) using sql way of filter from the config df.filter(config['config'])
(2) 2nd round of filter on top of 1 using
activity_day>=date_sub(rundate,history_days) & activity_day<rundate
Is there anyway I can merge two step filter into one? so that I can maintain both the filters in config and somehow substitute user input?
Data:
df = spark.createDataFrame(
[
(123,"2020-01-01"),
(124,"2020-01-01"),
(123,"2019-01-01")
],
("cust", "activity_day")
)
Config:
config = """
[ {
"source":"df_1",
"filters":"cust=123",
}
]
CodePudding user response:
You can start by parsing your config to extract the filter condition 'filtersand the corresponding value into a dictionary and add the
run_datecondition to it to further filter the
DataFrame`
Data Preparation
sparkDF = sql.createDataFrame(
[
(123,"2020-01-01"),
(124,"2020-01-01"),
(123,"2019-01-01")
],
("cust", "activity_day")
)
sparkDF.show()
---- ------------
|cust|activity_day|
---- ------------
| 123| 2020-01-01|
| 124| 2020-01-01|
| 123| 2019-01-01|
---- ------------
Parsing Config and Generating Filter Condn
config = [ {"source":"df_1","filters":"cust=123",}]
filter_dict = {}
for row in config:
if 'filters' in row:
key,value = row['filters'].split("=")
filter_dict[key] = value
filter_dict
{'cust': '123'}
run_date = "2020-01-01"
upper_range = F.to_date(F.lit(run_date))
lower_range = F.date_add(upper_range,-5)
secondary_condn = (F.col('activity_day').between(lower_range,upper_range))
final_condn = (F.col(column) == filter_dict[column]) & (secondary_condn)
Filtering DataFrame
sparkDF.filter(final_condn).show()
---- ------------
|cust|activity_day|
---- ------------
| 123| 2020-01-01|
---- ------------
SQL Approach Toward multiple conditions
You can utilise createOrReplaceTempView for more complex filters. The idea is to build WHERE
filter(s) to merge it to a SQL
query to filter the rows
sparkDF = sql.createDataFrame(
[
(123,"2020-01-01",1,"NL"),
(124,"2020-01-01",0,"US"),
(123,"2019-01-01",1,"IN"),
(124,"2020-01-02",0,"NL"),
],
("cust", "activity_day","is_deleted","country")
)
sparkDF.show()
---- ------------ ---------- -------
|cust|activity_day|is_deleted|country|
---- ------------ ---------- -------
| 123| 2020-01-01| 1| NL|
| 124| 2020-01-01| 0| US|
| 123| 2019-01-01| 1| IN|
| 124| 2020-01-02| 0| NL|
---- ------------ ---------- -------
where_filter = ""
logical_flag = " OR "
for i,row in enumerate(config):
if 'filters' in row:
if i == 0:
where_filter = row['filters']
else:
where_filter = logical_flag "(" row['filters'] ")"
where_filter # O/P -- 'cust=123 OR (is_deleted != 1 AND country="NL")'
if where_filter != "":
sql.sql(f"""
SELECT *
FROM customer_activity
WHERE {where_filter}
""").show()
---- ------------ ---------- -------
|cust|activity_day|is_deleted|country|
---- ------------ ---------- -------
| 123| 2020-01-01| 1| NL|
| 123| 2019-01-01| 1| IN|
| 124| 2020-01-02| 0| NL|
---- ------------ ---------- -------