Home > Back-end >  PySpark: Moving rows from one dataframe into another if column values are not found in second datafr
PySpark: Moving rows from one dataframe into another if column values are not found in second datafr

Time:02-18

I have two spark dataframes with similar schemas: DF1:

id       category  flag
123abc   type 1     1 
456def   type 1     1
789ghi   type 2     0
101jkl   type 3     0

Df2:

id       category  flag
123abc   type 1     1 
456def   type 1     1
789ghi   type 2     1
101xyz   type 3     0

DF1 has more data than DF2 so I cannot replace it. However, DF2 will have ids not found in DF1, as well as several IDs with more accurate flag data. This means there there are two situations that I need resolved:

  1. 789ghi has a different flag and needs to overwrite the 789ghi in DF1.
  2. 101xyz is not found in DF1 and needs to be moved over

Each dataframe is millions of rows, so I am looking for an efficient way to perform this operation. I am not sure if this is a situation that requires an outer join or anti-join.

CodePudding user response:

You can union the two dataframes and keep the first record for each id.

from functools import reduce
from pyspark.sql import DataFrame, Window
from pyspark.sql.functions import monotonically_increasing_id, col

df = reduce(DataFrame.unionByName,[df2,df1])

df = df.withColumn('row_num',monotonically_increasing_id())

window = Window.partitionBy("id").orderBy('row_num')

df = (df.withColumn('rank', rank().over(window))
        .filter(col('rank') == 1)).drop('rank','row_num')

Output

 ------ -------- ---- 
|    id|category|flag|
 ------ -------- ---- 
|101jkl|  type 3|   0|
|101xyz|  type 3|   0|
|123abc|  type 1|   1|
|456def|  type 1|   1|
|789ghi|  type 2|   1|
 ------ -------- ---- 

CodePudding user response:

Option 1: I would find ids in df1 not in df2 and put them into a subset df I would then union the subset with df2.

Or

Option 2: Find elements in df1 that are in df2 and drop those rows and then union df2. The approach I take would obviously be based on which is less expensive computationally.

Option 1 code

s=df1.select('id').subtract(df2.select('id')).collect()[0][0]

df2.union(df1.filter(col('id')==s)).show()

Outcome

 ------ -------- ---- 
|    id|category|flag|
 ------ -------- ---- 
|123abc|  type 1|   1|
|456def|  type 1|   1|
|789ghi|  type 2|   1|
|101xyz|  type 3|   0|
|101jkl|  type 3|   0|
 ------ -------- ---- 
  • Related