Home > Back-end >  joining data sets while also comparing null values
joining data sets while also comparing null values

Time:09-20

I have two data frames.

DF1

column1         column2             column3              column4
agree           strongly agree      disagree             null
null            disagree            strongly disagree    agree
disagree        null                strongly agree       disagree

DF2

col1            col2                col3                 col4      cluster
disagree        agree               strongly disagree    disagree    1 
agree           strongly agree      disagree             null        2
disagree        null                strongly agree       disagree    5
disagree        agree               strongly agree       disagree    3
null            disagree            strongly disagree    agree       5
disagree        null                strongly agree       disagree    6

Expected output

column1         column2             column3              column4   Cluster
agree           strongly agree      disagree             null      2
null            disagree            strongly disagree    agree     5
disagree        null                strongly agree       disagree  6

I have already did this in R but I could not implement the same in PySpark. Can anyone please tell me how to implement in PySpark

Code I used in R(where it worked):

# my columns names were same here
merged <- left_join(df1,df2,by=c('column1','column2','column3','column4'))

Required Result: All rows of df1 and the last column of df2 joined.

CodePudding user response:

Since join won't take care of null values, I'm thinking about a small hack by replacing all nulls with a specific value, then perform a left join to keep app df1's records

df1

df1 = spark.read.csv('a.csv', header=True).fillna('xxxyyyzzz')
df1.show()

 --------- -------------- ----------------- --------- 
|  column1|       column2|          column3|  column4|
 --------- -------------- ----------------- --------- 
|    agree|strongly agree|         disagree|xxxyyyzzz|
|xxxyyyzzz|      disagree|strongly disagree|    agree|
| disagree|     xxxyyyzzz|   strongly agree| disagree|
 --------- -------------- ----------------- --------- 

df2

df2 = spark.read.csv('b.csv', header=True).fillna('xxxyyyzzz')
df2.show()

 --------- -------------- ----------------- --------- ------- 
|  column1|       column2|          column3|  column4|cluster|
 --------- -------------- ----------------- --------- ------- 
| disagree|         agree|strongly disagree| disagree|      1|
|    agree|strongly agree|         disagree|xxxyyyzzz|      2|
| disagree|     xxxyyyzzz|   strongly agree| disagree|      5|
| disagree|         agree|   strongly agree| disagree|      3|
|xxxyyyzzz|      disagree|strongly disagree|    agree|      5|
| disagree|     xxxyyyzzz|   strongly agree| disagree|      6|
 --------- -------------- ----------------- --------- ------- 

Join and return dummy value back to nulls

(df1
    .join(df2, on=df1.columns, how='left')
    .replace('xxxyyyzzz', None)
    .show()
)

 -------- -------------- ----------------- -------- ------- 
| column1|       column2|          column3| column4|cluster|
 -------- -------------- ----------------- -------- ------- 
|   agree|strongly agree|         disagree|    null|      2|
|    null|      disagree|strongly disagree|   agree|      5|
|disagree|          null|   strongly agree|disagree|      6|
|disagree|          null|   strongly agree|disagree|      5|
 -------- -------------- ----------------- -------- ------- 

CodePudding user response:

In addition to pltc's answer, you can also use eqNullSafe() which would result in the same output.

data1_sdf. \
    join(data2_sdf, 
         [data1_sdf.c1.eqNullSafe(data2_sdf.c1), 
          data1_sdf.c2.eqNullSafe(data2_sdf.c2), 
          data1_sdf.c3.eqNullSafe(data2_sdf.c3), 
          data1_sdf.c4.eqNullSafe(data2_sdf.c4)],
         'left'
         ). \
    select(data1_sdf['*'], 'cluster'). \
    show()

#  -------- -------------- ----------------- -------- ------- 
# |      c1|            c2|               c3|      c4|cluster|
#  -------- -------------- ----------------- -------- ------- 
# |   agree|strongly agree|         disagree|    null|      2|
# |disagree|          null|   strongly agree|disagree|      5|
# |disagree|          null|   strongly agree|disagree|      6|
# |    null|      disagree|strongly disagree|   agree|      5|
#  -------- -------------- ----------------- -------- ------- 

CodePudding user response:

If you are going to replace this nulls with some values as @pltc suggested you may consider to use monotonically_increasing_id() instead of just constant. During sort merge join records with the same values are going to be moved to the same machine to do the join, in case of monotonically_increasing_id() the dummy values are going to be evenly distributed which may be good for the performance

 //use such function to salt nulls
def salt(column: Column): Column =
    when(column.isNull, concat(lit("null_prefix_"), monotonicall_increasing_id()))
    .otherwsie(column)


//Then do the join as usuall

//Then unsalt before select with this function

def unSalt(column: Column): Column =
    when(column.startsWith("null_prefix_"), concat(lit(null)))
    .otherwsie(column)
  • Related