Home > Software design >  Pyspark find existing set of rows in a dataframe and replace it with values from another dataframe
Pyspark find existing set of rows in a dataframe and replace it with values from another dataframe

Time:07-28

I have a Pyspark dataframe_Old (dfo) as below:

Id neighbor_sid neighbor division
a1 1100 Naalehu Hawaii
a2 1101 key-west-fl Miami
a3 1102 lubbock Texas
a10 1202 bay-terraces California

I have a Pyspark dataframe_new (dfn) as below:

Id neighbor_sid neighbor division
a1 1100 Naalehu Hawaii
a2 1111 key-largo-fl Miami
a3 1103 grapevine Texas
a4 1115 meriden-ct Connecticut
a12 2002 east-louisville Kentucky

My objective is to find the Ids from dataframe_new in dataframe_old and replace them with the new values from dataframe_new

Final expected Pyspark dataframe updated - dataframe_old

Id neighbor_sid neighbor division
a1 1100 Naalehu Hawaii
a2 1111 key-largo-fl Miami
a3 1103 grapevine Texas
a4 1115 meriden-ct Connecticut
a10 1202 bay-terraces California
a12 2002 east-louisville Kentucky

My wrong attempt at solving it as it is comparing column wise instead of row

dfO.alias('a').join(dfN.alias('b'), on=['id'], how='left')\
    .select(
        'id',
        f.when(
            ~f.isnull(f.col('b.id')),
            f.col('b.id')
        ).otherwise(f.col('a.id')).alias('id'),
        'b.col_3'
    )\
    .union(dfO)\
    .dropDuplicates()\
    .sort('id')\
    .show()

Please help - would really appreciate any guidance!

CodePudding user response:

We can do an outer join on the id fields and then use coalesce() to prioritize the fields from dfn.

columns = ['id', 'neighbor_sid', 'neighbor', 'division']

dfo. \
    join(dfn, 'id', 'outer'). \
    select(*['id']   [func.coalesce(dfn[k], dfo[k]).alias(k) for k in columns if k != 'id']). \
    orderBy('id'). \
    show()

#  --- ------------ ------------ ----------- 
# | id|neighbor_sid|    neighbor|   division|
#  --- ------------ ------------ ----------- 
# | a1|        1100|     Naalehu|     Hawaii|
# |a10|        1202|bay-terraces| California|
# | a2|        1111|key-largo-fl|      Miami|
# | a3|        1103|   grapevine|      Texas|
# | a4|        1115|  meriden-ct|Connecticut|
#  --- ------------ ------------ ----------- 

CodePudding user response:

Id do a full outer join then coalesce to choose the second table where necessary:

# imports and creating data example
from pyspark.sql import functions as F, Window as W
cols = ["id", "neighbor_sid", "neighbor", "division"]
data1 = [
    ["a1", 1100, 'Naalehu', 'Hawaii'],
    ["a2", 1101, 'key-west-fl', 'Miami'],
    ["a3", 1102, 'lubbock', 'Texas'],
    ["a10", 1202, 'bay-terraces', 'California'],
]

data2 = [
    ['a1',1100,'Naalehu','Hawaii'],
    ['a2',1111,'key-largo-fl','Miami'],
    ['a3',1103,'grapevine','Texas'],
    ['a4',1115,'meriden-ct','Connecticut'],
]
df0 = spark.createDataFrame(data1, cols)
dfN = spark.createDataFrame(data2, cols)


# solution
merge_df = df0.alias("a").join(dfN.alias('b'), on='id', how='outer')
d = (
    merge_df
    .select(
        "id",
        F.coalesce("b.neighbor_sid", "a.neighbor_sid").alias("neighbor_sid"),
        F.coalesce("b.neighbor", "a.neighbor").alias("neighbor"),
        F.coalesce("b.division", "a.division").alias("division")
    )
.sort("neighbor_sid")
)
display(d)
  • Related