Home > Software design >  Pyspark -Sql loop optimization
Pyspark -Sql loop optimization

Time:12-04

I have a scenario where i need to filter date column on date condition ,like wise i need to do it for entire month . Problem is while looping for each date it is taking time . I wanted to do entire month in one go. Following is the code.

target_date = [1,2,3...30]

for i in  target_date: 
    df = spark.sql(f'select * from table where x_date <={i} and y_date >={i}')
    df = df.withColumn('load_date',f.lit(i))
    df.write.partition('load_date').mode('append').parquet(output_path)

Any approaches to make this faster

CodePudding user response:

Maybe you can move the write to outside the loop. Something like

target_date = [1,2,3...30]
df_final = []

for i in  target_date: 
    df = spark.sql(f'select * from table where x_date <={i} and y_date >={i}')
    df = df.withColumn('load_date',f.lit(i))
    df_final = df_final.union(df)

df_final.write.partition('load_date').parquet(output_path)

CodePudding user response:

I believe you could solve it with a kind of cross-join like this:

load_dates = spark.createDataFrame([[i,] for i in range(1,31)], ['load_date'])
load_dates.show()
 --------- 
|load_date|
 --------- 
|        1|
|        2|
|        3|
|      ...|
|       30|
 --------- 

df = spark.sql(f'select * from table')

df.join(
    load_dates,
    on=(F.col('x_date') <= F.col('load_date')) & (F.col('y_date') >= F.col('load_date')),
    how='inner',
)

df.write.partitionBy('load_date').parquet(output_path)

CodePudding user response:

You should be able to do it by

  1. Creating an array of load_dates in each row
  2. Exploding the array so that you have a unique load_date per original row
  3. Filtering to get just the load_dates you want

For example

target_dates = [1,2,3...30]

df = spark.sql(f'select * from table')
# create an array of all load_dates in each row
df = df.withColumn("load_date", F.array([F.lit(i) for i in target_dates]))
# Explode the load_dates so that you get a new row for each load_date
df = df.withColumn("load_date", F.explode("load_date"))
# Filter only the load_dates you want to keep 
df = df.filter("x_date <= load_date and y_date >=load_date")
df.write.partition('load_date').mode('append').parquet(output_path)
  • Related