I have two data frames.
DF1
column1 column2 column3 column4
agree strongly agree disagree null
null disagree strongly disagree agree
disagree null strongly agree disagree
DF2
col1 col2 col3 col4 cluster
disagree agree strongly disagree disagree 1
agree strongly agree disagree null 2
disagree null strongly agree disagree 5
disagree agree strongly agree disagree 3
null disagree strongly disagree agree 5
disagree null strongly agree disagree 6
Expected output
column1 column2 column3 column4 Cluster
agree strongly agree disagree null 2
null disagree strongly disagree agree 5
disagree null strongly agree disagree 6
I have already did this in R but I could not implement the same in PySpark. Can anyone please tell me how to implement in PySpark
Code I used in R(where it worked):
# my columns names were same here
merged <- left_join(df1,df2,by=c('column1','column2','column3','column4'))
Required Result: All rows of df1 and the last column of df2 joined.
CodePudding user response:
Since join won't take care of null values, I'm thinking about a small hack by replacing all null
s with a specific value, then perform a left join to keep app df1's records
df1
df1 = spark.read.csv('a.csv', header=True).fillna('xxxyyyzzz')
df1.show()
--------- -------------- ----------------- ---------
| column1| column2| column3| column4|
--------- -------------- ----------------- ---------
| agree|strongly agree| disagree|xxxyyyzzz|
|xxxyyyzzz| disagree|strongly disagree| agree|
| disagree| xxxyyyzzz| strongly agree| disagree|
--------- -------------- ----------------- ---------
df2
df2 = spark.read.csv('b.csv', header=True).fillna('xxxyyyzzz')
df2.show()
--------- -------------- ----------------- --------- -------
| column1| column2| column3| column4|cluster|
--------- -------------- ----------------- --------- -------
| disagree| agree|strongly disagree| disagree| 1|
| agree|strongly agree| disagree|xxxyyyzzz| 2|
| disagree| xxxyyyzzz| strongly agree| disagree| 5|
| disagree| agree| strongly agree| disagree| 3|
|xxxyyyzzz| disagree|strongly disagree| agree| 5|
| disagree| xxxyyyzzz| strongly agree| disagree| 6|
--------- -------------- ----------------- --------- -------
Join and return dummy value back to nulls
(df1
.join(df2, on=df1.columns, how='left')
.replace('xxxyyyzzz', None)
.show()
)
-------- -------------- ----------------- -------- -------
| column1| column2| column3| column4|cluster|
-------- -------------- ----------------- -------- -------
| agree|strongly agree| disagree| null| 2|
| null| disagree|strongly disagree| agree| 5|
|disagree| null| strongly agree|disagree| 6|
|disagree| null| strongly agree|disagree| 5|
-------- -------------- ----------------- -------- -------
CodePudding user response:
In addition to pltc's answer, you can also use eqNullSafe()
which would result in the same output.
data1_sdf. \
join(data2_sdf,
[data1_sdf.c1.eqNullSafe(data2_sdf.c1),
data1_sdf.c2.eqNullSafe(data2_sdf.c2),
data1_sdf.c3.eqNullSafe(data2_sdf.c3),
data1_sdf.c4.eqNullSafe(data2_sdf.c4)],
'left'
). \
select(data1_sdf['*'], 'cluster'). \
show()
# -------- -------------- ----------------- -------- -------
# | c1| c2| c3| c4|cluster|
# -------- -------------- ----------------- -------- -------
# | agree|strongly agree| disagree| null| 2|
# |disagree| null| strongly agree|disagree| 5|
# |disagree| null| strongly agree|disagree| 6|
# | null| disagree|strongly disagree| agree| 5|
# -------- -------------- ----------------- -------- -------
CodePudding user response:
If you are going to replace this nulls with some values as @pltc suggested you may consider to use monotonically_increasing_id() instead of just constant. During sort merge join records with the same values are going to be moved to the same machine to do the join, in case of monotonically_increasing_id() the dummy values are going to be evenly distributed which may be good for the performance
//use such function to salt nulls
def salt(column: Column): Column =
when(column.isNull, concat(lit("null_prefix_"), monotonicall_increasing_id()))
.otherwsie(column)
//Then do the join as usuall
//Then unsalt before select with this function
def unSalt(column: Column): Column =
when(column.startsWith("null_prefix_"), concat(lit(null)))
.otherwsie(column)