I have a Pyspark dataframe_Old (dfo) as below:
Id | neighbor_sid | neighbor | division |
---|---|---|---|
a1 | 1100 | Naalehu | Hawaii |
a2 | 1101 | key-west-fl | Miami |
a3 | 1102 | lubbock | Texas |
a10 | 1202 | bay-terraces | California |
I have a Pyspark dataframe_new (dfn) as below:
Id | neighbor_sid | neighbor | division |
---|---|---|---|
a1 | 1100 | Naalehu | Hawaii |
a2 | 1111 | key-largo-fl | Miami |
a3 | 1103 | grapevine | Texas |
a4 | 1115 | meriden-ct | Connecticut |
a12 | 2002 | east-louisville | Kentucky |
My objective is to find the Ids from dataframe_new in dataframe_old and replace them with the new values from dataframe_new
Final expected Pyspark dataframe updated - dataframe_old
Id | neighbor_sid | neighbor | division |
---|---|---|---|
a1 | 1100 | Naalehu | Hawaii |
a2 | 1111 | key-largo-fl | Miami |
a3 | 1103 | grapevine | Texas |
a4 | 1115 | meriden-ct | Connecticut |
a10 | 1202 | bay-terraces | California |
a12 | 2002 | east-louisville | Kentucky |
My wrong attempt at solving it as it is comparing column wise instead of row
dfO.alias('a').join(dfN.alias('b'), on=['id'], how='left')\
.select(
'id',
f.when(
~f.isnull(f.col('b.id')),
f.col('b.id')
).otherwise(f.col('a.id')).alias('id'),
'b.col_3'
)\
.union(dfO)\
.dropDuplicates()\
.sort('id')\
.show()
Please help - would really appreciate any guidance!
CodePudding user response:
We can do an outer join on the id
fields and then use coalesce()
to prioritize the fields from dfn
.
columns = ['id', 'neighbor_sid', 'neighbor', 'division']
dfo. \
join(dfn, 'id', 'outer'). \
select(*['id'] [func.coalesce(dfn[k], dfo[k]).alias(k) for k in columns if k != 'id']). \
orderBy('id'). \
show()
# --- ------------ ------------ -----------
# | id|neighbor_sid| neighbor| division|
# --- ------------ ------------ -----------
# | a1| 1100| Naalehu| Hawaii|
# |a10| 1202|bay-terraces| California|
# | a2| 1111|key-largo-fl| Miami|
# | a3| 1103| grapevine| Texas|
# | a4| 1115| meriden-ct|Connecticut|
# --- ------------ ------------ -----------
CodePudding user response:
Id do a full outer join then coalesce to choose the second table where necessary:
# imports and creating data example
from pyspark.sql import functions as F, Window as W
cols = ["id", "neighbor_sid", "neighbor", "division"]
data1 = [
["a1", 1100, 'Naalehu', 'Hawaii'],
["a2", 1101, 'key-west-fl', 'Miami'],
["a3", 1102, 'lubbock', 'Texas'],
["a10", 1202, 'bay-terraces', 'California'],
]
data2 = [
['a1',1100,'Naalehu','Hawaii'],
['a2',1111,'key-largo-fl','Miami'],
['a3',1103,'grapevine','Texas'],
['a4',1115,'meriden-ct','Connecticut'],
]
df0 = spark.createDataFrame(data1, cols)
dfN = spark.createDataFrame(data2, cols)
# solution
merge_df = df0.alias("a").join(dfN.alias('b'), on='id', how='outer')
d = (
merge_df
.select(
"id",
F.coalesce("b.neighbor_sid", "a.neighbor_sid").alias("neighbor_sid"),
F.coalesce("b.neighbor", "a.neighbor").alias("neighbor"),
F.coalesce("b.division", "a.division").alias("division")
)
.sort("neighbor_sid")
)
display(d)