Home > Blockchain >  Join dataframes and merge/replace column values
Join dataframes and merge/replace column values

Time:06-16

from delta.tables import *

vals1 = [(1, "a"), 
        (2, "b"), 
        (3, "c"), 
        (4, "d") 
      ]
columns1 = ["id","name"]
df1 = spark.createDataFrame(data=vals1, schema=columns1)
# df1.show()

vals2 = [(1, "k"), 
        (2, "l"), 
        (3, "m")
      ]
columns2 = ["id","name"]
df2 = spark.createDataFrame(data=vals2, schema=columns2)
# df2.show()

df1 = df1.join(df2, 'id', 'full')
df1.show()

Gives me the following result:

 --- ---- ---- 
| id|name|name|
 --- ---- ---- 
|  1|   a|   k|
|  3|   c|   m|
|  2|   b|   l|
|  4|   d|null|
 --- ---- ---- 

How do I make it produce a single column for "name" that would contain "k, l, m, d" values?
(it has to always use values from df2, with one exception - when there is a row in df1 that doesn't have a matching row in df2, in this case it needs to use the value from df1).

CodePudding user response:

You can use coalesce.

df1 = (df1.join(df2, 'id', 'left')
       .select('id', F.coalesce(df2.name, df1.name).alias('name')))

============================================================

Update

df1 = (df1.join(df2, 'id', 'left')
       .select('id', 
               *[F.coalesce(df2[x], df1[x]).alias(x) for x in df1.columns if x != 'id' and x in df2.columns]))

============================================================

Update2

I think I understand your requirement. I tweak your sample data to illustrate your condition.

df1
id name
 1    a 
 2    b 
 3    c 
 4    d 

df2
id name
 1 null 
 2    l 
 3    m

If this is your input data, you should keep all values including the null (for id 1) from df2, but you want the value d from df1 because the id 4 is missing in df2.

Expected result should look like

id name
 1 null 
 2    l 
 3    m 
 4    d

In this case, there is no easy indicator to differentiate the null value where the null is from the original data or result from the outer merge.

One way to work around this is to replace the null value to something else before the join and then replace back after the join.

null_value = 'NA'
df2 = df2.na.fill(null_value)
df1 = (df1.join(df2, 'id', 'full')
       .select('id', 
               *[F.coalesce(df2[x], df1[x]).alias(x) for x in df1.columns if x != 'id' and x in df2.columns])
       .replace(null_value, None))

CodePudding user response:

This should work for you:

df_new (the new dataframe after joining df1 and df2)

df_1 = df_1.select(F.col('id'),F.col('name').alias('name_df1'))
df_2 = df_2.select(F.col('id'),F.col('name').alias('name_df2'))

df_new = df_1.join(df_2, on="id", how="outer") \
             .withColumn('name',F.coalesce('name_df2','name_df1')) \
             .select('id','name')

This is not the most efficient answer if you have a huge dataset, but this:

  • Is more readable when you are trying to understand the steps
  • Will get you the output you are looking for

CodePudding user response:

Simplest solution , use pyspark when case

>>> from pyspark.sql.functions import when
#i have renamed cols from df2 to avoid ambiguity
>>> df_join = df1.join(df2,df1.id==df2.df2_id,"full")
>>>df_join1=df_join.withColumn("new_col",when(df_join.df2_name.isNull(),df_join.name).otherwise(df_join.df2_name))
>>> df_output=df_join1.select("id","name","new_col")
>>> df_output.show()
 --- ---- ------- 
| id|name|new_col|
 --- ---- ------- 
|  1|   a|      k|
|  3|   c|      m|
|  2|   b|      l|
|  4|   d|      d|
 --- ---- ------- 
  • Related