i have two dataframes df1=
columnA columnB columnC columnD
value1 value7 value13 value20
value2 value8 value14 value21
value3 value9 value15 value22
value4 value10 value16 value23
value5 value11 value17 value24
value6 null null value25
df2=
columnA columnB columnC columnD
value1 value7 value13 value20
value2 null value14 value21
null value9 value15 value22
value4 value10 value16 value23
value5 value11 value17 value24
value6 value12 value18 value25
i want to compare both the dataframe and i need to pick all rows which are null (missing values) after comparing both dataframes my output dataframe should be like: outputDF=
columnA columnB columnC columnD
value2 value8 value14 value21
value3 value9 value15 value22
value6 value12 value18 value25
how to achieve this using pyspark?
CodePudding user response:
Why do compare two DFs? Will you fill null values from other DF by key?
CodePudding user response:
Assuming you are able to join on an id column... here I created one:
from pyspark.sql import functions as psf
data1 = [
('value1','value7','value13','value20')
, ('value2','value8','value14','value21')
, ('value3','value9','value15','value22')
, ('value4','value10','value16','value23')
, ('value5','value11','value17','value24')
, ('value6', None, None, 'value25')
]
df1 = spark.createDataFrame(data1, ['columnA', 'columnB', 'columnC', 'columnD'])
data2 = [
('value1','value7','value13','value20')
, ('value2', None,'value14','value21')
, (None, 'value9','value15','value22')
, ('value4','value10','value16','value23')
, ('value5','value11','value17','value24')
, ('value6','value12','value18','value25')
]
df2 = spark.createDataFrame(data2, ['columnA', 'columnB', 'columnC', 'columnD'])
df1=df1.withColumn('id', psf.monotonically_increasing_id())
df2=df2.withColumn('id', psf.monotonically_increasing_id())
df = df1.join(df2, how='inner', on='id').filter(
(df1.columnA.isNull())
| (df1.columnB.isNull())
| (df1.columnC.isNull())
| (df1.columnD.isNull())
| (df2.columnA.isNull())
| (df2.columnB.isNull())
| (df2.columnC.isNull())
| (df2.columnD.isNull())
).select(
psf.coalesce(df1.columnA, df2.columnA).alias('ColumnA')
, psf.coalesce(df1.columnB, df2.columnB).alias('ColumnB')
, psf.coalesce(df1.columnC, df2.columnC).alias('ColumnC')
, psf.coalesce(df1.columnD, df2.columnD).alias('ColumnD')
).show()