I have two dataframes and I am trying to write a function to compare the two dataframes so that it will return me the net changes to columns that are impacted.
DF1:
--------------- ------ ------ ------- ----------
| City | Temp | Zone | Score | Activity |
--------------- ------ ------ ------- ----------
| Atlanta | 10 | 1 | 100 | 400 |
--------------- ------ ------ ------- ----------
| Chicago | 100 | 2 | 200 | 500 |
--------------- ------ ------ ------- ----------
| Boston | 100 | 3 | 300 | 600 |
--------------- ------ ------ ------- ----------
| San Francisco | 1000 | 4 | 400 | 700 |
--------------- ------ ------ ------- ----------
DF2:
--------------- ------ ------ ------- ----------
| City | Temp | Zone | Score | Activity |
--------------- ------ ------ ------- ----------
| Atlanta | 10 | 1 | 150 | 400 |
--------------- ------ ------ ------- ----------
| Chicago | 100 | 2 | 200 | 450 |
--------------- ------ ------ ------- ----------
| Boston | 100 | 3 | 300 | 650 |
--------------- ------ ------ ------- ----------
| San Francisco | 1200 | 4 | 400 | 750 |
--------------- ------ ------ ------- ----------
I would like the result to be like:
--------------- ------ ------ ------- ----------
| City | Temp | Zone | Score | Activity |
--------------- ------ ------ ------- ----------
| Atlanta | 0 | 0 | 50 | 0 |
--------------- ------ ------ ------- ----------
| Boston | 0 | 0 | 0 | -50 |
--------------- ------ ------ ------- ----------
| San Francisco | 200 | 0 | 0 | 50 |
--------------- ------ ------ ------- ----------
I am very new to PySpark, and am wondering how can I achieve this in PySpark?
I tried to do df2.substract(df1)
but it just shows me the row in df2 that was not in df1, which is not very straightforward, if I just want to see net changes happened to any columns.
Notes: City name is the unique identifier. Each row is different.
Appreciate your help!
CodePudding user response:
dataframe.subtract(dataframe)
is logical subtraction (EXCEPT DISTINCT
).
So, instead you can join and do arithmetic subtraction between columns.
df = df1.join(df2, on='City').cache()
for col in df1.columns:
if col != 'City':
df = df.withColumn('diff_' col, df2[col] - df1[col]).drop(col)