I have 2 dataframes in Pyspark like this (it may have more companies):
df1
Company Name UnderspendAmount
Pepsi 2700.00
Pepsi 1200.00
df2
Company Name OverspendAmount
Pepsi 3600.00
Pepsi 1400.00
So basically the goal is that for the same Company Name
, I want to be able to match the maximum underspend with the maximum overspend and the 2nd most with the 2nd most, 3rd most with the 3rd most if that make sense.
The end result after a join something like this:
Company Name OverspendAmount UnderspendAmount
Pepsi 3600.00 2700.00
Pepsi 1400.00 1200.00
So I have a very bruteforce and slow approach for this is which is using collect
. I am hoping there is a way to solve this using UDFS and agg
?
This the current approach:
for row in df1.select('Company Name').distinct().collect():
current_company = row['Company Name'] # Get the company name
df1_sorted_by_desc = df1.orderBy(col('UnderspendAmount').desc()
df2_sorted_by_desc = df1.orderBy(col('OverspendAmount').desc()
df1_collected = df1_sorted_by_desc.collect()
df2_collected = df2_sorted_by_desc.collect()
for indx in min(df1_collected.count(), df2_collected.count()): # get the min lenght for matching
matching_row_df1 = df1_collected[indx]
matching_row_df2 = df2_collected[indx]
....
SO you get the gist, it is a pretty bad approach and takes a long time to execute. What is the optimal approach for this (and in general)? Is UDF agg possible for this (and possible extensions to this?)
CodePudding user response:
Use Spark SQL built-in functions, the rank
windowing function to order the two dataframes according to their respective amounts, and then join the two dataframes on Company name
rank
(use inner/left/right join according to the actual situation ).
......
df1 = df1.withColumn('rank', F.expr('rank() over (partition by `Company Name` order by UnderspendAmount desc)'))
df2 = df2.withColumn('rank', F.expr('rank() over (partition by `Company Name` order by OverspendAmount desc)'))
df = df1.join(df2, on=['Company Name', 'rank'], how='inner').drop('rank')
CodePudding user response:
Same idea as @过过招, but with Spark API instead of Spark SQL
ranking for UnderspendAmount
df1 = df1.withColumn('rn', F.rank().over(W.partitionBy('Company').orderBy(F.desc('UnderspendAmount'))))
# ------- ---------------- ---
# |Company|UnderspendAmount| rn|
# ------- ---------------- ---
# | Pepsi| 2700.0| 1|
# | Pepsi| 1200.0| 2|
# ------- ---------------- ---
ranking for OverspendAmount
df2 = df2.withColumn('rn', F.rank().over(W.partitionBy('Company').orderBy(F.desc('OverspendAmount'))))
# ------- --------------- ---
# |Company|OverspendAmount| rn|
# ------- --------------- ---
# | Pepsi| 3600.0| 1|
# | Pepsi| 1400.0| 2|
# ------- --------------- ---
Join back
df1.join(df2, on=['Company', 'rn']).orderBy('rn')
# ------- --- ---------------- ---------------
# |Company| rn|UnderspendAmount|OverspendAmount|
# ------- --- ---------------- ---------------
# | Pepsi| 1| 2700.0| 3600.0|
# | Pepsi| 2| 1200.0| 1400.0|
# ------- --- ---------------- ---------------