Home > Blockchain >  Dataframe UDFs for faster performance?
Dataframe UDFs for faster performance?

Time:10-30

I have 2 dataframes in Pyspark like this (it may have more companies):

df1

Company Name              UnderspendAmount 
Pepsi                     2700.00
Pepsi                     1200.00

df2

Company Name              OverspendAmount 
Pepsi                     3600.00
Pepsi                     1400.00

So basically the goal is that for the same Company Name, I want to be able to match the maximum underspend with the maximum overspend and the 2nd most with the 2nd most, 3rd most with the 3rd most if that make sense.

The end result after a join something like this:

Company Name              OverspendAmount              UnderspendAmount
Pepsi                     3600.00                      2700.00
Pepsi                     1400.00                      1200.00

So I have a very bruteforce and slow approach for this is which is using collect. I am hoping there is a way to solve this using UDFS and agg?

This the current approach:

for row in df1.select('Company Name').distinct().collect():
   current_company = row['Company Name'] # Get the company name
   df1_sorted_by_desc = df1.orderBy(col('UnderspendAmount').desc()
   df2_sorted_by_desc = df1.orderBy(col('OverspendAmount').desc()
   df1_collected = df1_sorted_by_desc.collect()
   df2_collected = df2_sorted_by_desc.collect()
   for indx in min(df1_collected.count(), df2_collected.count()): # get the min lenght for matching
     matching_row_df1 = df1_collected[indx]
     matching_row_df2 = df2_collected[indx]
     ....

SO you get the gist, it is a pretty bad approach and takes a long time to execute. What is the optimal approach for this (and in general)? Is UDF agg possible for this (and possible extensions to this?)

CodePudding user response:

Use Spark SQL built-in functions, the rank windowing function to order the two dataframes according to their respective amounts, and then join the two dataframes on Company name rank(use inner/left/right join according to the actual situation ).

......
df1 = df1.withColumn('rank', F.expr('rank() over (partition by `Company Name` order by UnderspendAmount desc)'))
df2 = df2.withColumn('rank', F.expr('rank() over (partition by `Company Name` order by OverspendAmount desc)'))
df = df1.join(df2, on=['Company Name', 'rank'], how='inner').drop('rank')

CodePudding user response:

Same idea as @过过招, but with Spark API instead of Spark SQL

ranking for UnderspendAmount

df1 = df1.withColumn('rn', F.rank().over(W.partitionBy('Company').orderBy(F.desc('UnderspendAmount'))))
#  ------- ---------------- --- 
# |Company|UnderspendAmount| rn|
#  ------- ---------------- --- 
# |  Pepsi|          2700.0|  1|
# |  Pepsi|          1200.0|  2|
#  ------- ---------------- --- 

ranking for OverspendAmount

df2 = df2.withColumn('rn', F.rank().over(W.partitionBy('Company').orderBy(F.desc('OverspendAmount'))))
#  ------- --------------- --- 
# |Company|OverspendAmount| rn|
#  ------- --------------- --- 
# |  Pepsi|         3600.0|  1|
# |  Pepsi|         1400.0|  2|
#  ------- --------------- --- 

Join back

df1.join(df2, on=['Company', 'rn']).orderBy('rn')
#  ------- --- ---------------- --------------- 
# |Company| rn|UnderspendAmount|OverspendAmount|
#  ------- --- ---------------- --------------- 
# |  Pepsi|  1|          2700.0|         3600.0|
# |  Pepsi|  2|          1200.0|         1400.0|
#  ------- --- ---------------- --------------- 
  • Related