I have a scenario where i need to filter date column on date condition ,like wise i need to do it for entire month . Problem is while looping for each date it is taking time . I wanted to do entire month in one go. Following is the code.
target_date = [1,2,3...30]
for i in target_date:
df = spark.sql(f'select * from table where x_date <={i} and y_date >={i}')
df = df.withColumn('load_date',f.lit(i))
df.write.partition('load_date').mode('append').parquet(output_path)
Any approaches to make this faster
CodePudding user response:
Maybe you can move the write to outside the loop. Something like
target_date = [1,2,3...30]
df_final = []
for i in target_date:
df = spark.sql(f'select * from table where x_date <={i} and y_date >={i}')
df = df.withColumn('load_date',f.lit(i))
df_final = df_final.union(df)
df_final.write.partition('load_date').parquet(output_path)
CodePudding user response:
I believe you could solve it with a kind of cross-join like this:
load_dates = spark.createDataFrame([[i,] for i in range(1,31)], ['load_date'])
load_dates.show()
---------
|load_date|
---------
| 1|
| 2|
| 3|
| ...|
| 30|
---------
df = spark.sql(f'select * from table')
df.join(
load_dates,
on=(F.col('x_date') <= F.col('load_date')) & (F.col('y_date') >= F.col('load_date')),
how='inner',
)
df.write.partitionBy('load_date').parquet(output_path)
CodePudding user response:
You should be able to do it by
- Creating an array of load_dates in each row
- Exploding the array so that you have a unique load_date per original row
- Filtering to get just the load_dates you want
For example
target_dates = [1,2,3...30]
df = spark.sql(f'select * from table')
# create an array of all load_dates in each row
df = df.withColumn("load_date", F.array([F.lit(i) for i in target_dates]))
# Explode the load_dates so that you get a new row for each load_date
df = df.withColumn("load_date", F.explode("load_date"))
# Filter only the load_dates you want to keep
df = df.filter("x_date <= load_date and y_date >=load_date")
df.write.partition('load_date').mode('append').parquet(output_path)