I have a dataframe in the below format:
col_1. col_2. col_3
null. null. yes
null. null yes
a1. a2 no
a3 null no
I would like to add the column "col3" to the below dataframe
col1. col2. col_1. col_2.
a_1 a_2 null. null.
a_3 a_4 null. null
a_5 a_6 a1. a2
a_7 a_8 a3 null
Expected output:
col1. col2. col_1. col_2. col_3
a_1 a_2 null. null. yes
a_3 a_4 null. null yes
a_5 a_6 a1. a2 no
a_7 a_8 a3 null no
I tried merging the two dataframes. It did not work as there are duplicate column names.
CodePudding user response:
I think you are looking for some kind of implementation of eqNullSafe
.
Inputs:
df1 = spark.createDataFrame(
[('null.', 'null.', 'yes'),
('null.', None, 'yes'),
('a1.', 'a2', 'no'),
('a3', None, 'no')],
['col_1', 'col_2', 'col_3'])
df2 = spark.createDataFrame(
[('a_1', 'a_2', 'null.', 'null.'),
('a_3', 'a_4', 'null.', None),
('a_5', 'a_6', 'a1.', 'a2'),
('a_7', 'a_8', 'a3', None)],
['col1', 'col2', 'col_1', 'col_2'])
Join:
df = df2.join(
df1,
df2.col_1.eqNullSafe(df1.col_1) & df2.col_2.eqNullSafe(df1.col_2),
'left'
).select(df2['*'], df1.col_3)
df.show()
# ---- ---- ----- ----- -----
# |col1|col2|col_1|col_2|col_3|
# ---- ---- ----- ----- -----
# | a_1| a_2|null.|null.| yes|
# | a_3| a_4|null.| null| yes|
# | a_7| a_8| a3| null| no|
# | a_5| a_6| a1.| a2| no|
# ---- ---- ----- ----- -----
CodePudding user response:
Not sure about the extra (.)
within null
, hence replaced it with None
which would be considered as nulls
If nulls
are actual nulls , go ahead with the solution from @ZygD
With the above consideration, the way to merge the dataframes would be with join
, since you contain special characters , you can escape them with backtick
and additionally select the required column(s) from each dataframe using select
Data Preparation
s1 = StringIO("""
col_1.,col_2.,col_3
None.,None,yes
None.,None,yes
a1.,a2,no
a3,None,no
"""
)
s2 = StringIO("""
col1.,col2.,col_1.,col_2.
a_1,a_2,None.,None.
a_3,a_4,None.,None
a_5,a_6,a1.,a2
a_7,a_8,a3,None
"""
)
df1 = pd.read_csv(s1,delimiter=',')
sparkDF1 = sql.createDataFrame(df1)
df2 = pd.read_csv(s2,delimiter=',')
sparkDF2 = sql.createDataFrame(df2)
sparkDF1.show()
------ ------ -----
|col_1.|col_2.|col_3|
------ ------ -----
| None.| None| yes|
| None.| None| yes|
| a1.| a2| no|
| a3| None| no|
------ ------ -----
sparkDF2.show()
----- ----- ------ ------
|col1.|col2.|col_1.|col_2.|
----- ----- ------ ------
| a_1| a_2| None.| None.|
| a_3| a_4| None.|None |
| a_5| a_6| a1.| a2|
| a_7| a_8| a3| None|
----- ----- ------ ------
Join & Select
finalDF = sparkDF1.join(sparkDF2
,sparkDF1["`col_1.`"] == sparkDF2["`col_1.`"]
,'inner'
).select(
sparkDF2['`col1.`']
,sparkDF2['`col2.`']
,sparkDF1['`col_1.`']
,sparkDF1['`col_2.`']
,sparkDF1['col_3']
)
finalDF.show()
----- ----- ------ ------ -----
|col1.|col2.|col_1.|col_2.|col_3|
----- ----- ------ ------ -----
| a_1| a_2| None.| None| yes|
| a_3| a_4| None.| None| yes|
| a_1| a_2| None.| None| yes|
| a_3| a_4| None.| None| yes|
| a_5| a_6| a1.| a2| no|
| a_7| a_8| a3| None| no|
----- ----- ------ ------ -----