Home > front end >  merging filter multiple condition on pyspark
merging filter multiple condition on pyspark

Time:04-22

Here's my dataset sparkDF1

Id   Value  month   Year
1    672        4   2020  
1    356        6   2020
2    683        6   2019  
3    366        4   2021

Here's my dataset sparkDF2

Id   Value  month   Year
1    671        4   2020  
1    353        6   2020
2    682        6   2019  
3    363        4   2021

Here's my expected dataset sparkDF that is used sparkDF2 from month=5 Year=2020 and before that using sparkDF1

Id   Value  month   Year
1    672        4   2020  
1    353        6   2020
2    683        6   2019  
3    363        4   2021

The pandas alternative is

df = df1.mask((df1['month'].ge(5) & df1['Year'].eq(2020)) | df1['Year'].ge(2021), df2)

CodePudding user response:

you can filter each df on its own and then union the two. Alternatively you can use Koalas which will give you pandas syntax on spark

CodePudding user response:

Option 1: Filter and unionBy

s=((df1.month >= 5)&(df1.Year == 2020))|(df1.Year >= 2021)

s1=((df2.month >= 5)&(df2.Year == 2020))|(df2.Year >= 2021)

new = df1.where(~s).unionByName(df2.where(s1)).orderBy('Id')

new.show()

 --- ----- ----- ---- 
| Id|Value|month|Year|
 --- ----- ----- ---- 
|  1|  672|    4|2020|
|  1|  353|    6|2020|
|  2|  683|    6|2019|
|  3|  363|    4|2021|
 --- ----- ----- ---- 

Option 2: If you have pandas code, you can use pandas udfs. The problem with pandas udf that include two dataframes use the cogroup method which incurs a shuffle. In your case. I would use pandas' combine_first or just what you did. code below

import pandas as pd

def mask_filter(l: pd.DataFrame, r: pd.DataFrame) -> pd.DataFrame:
  l =l.mask((l['month'].ge(5) & l['Year'].eq(2020)) | l['Year'].ge(2021))
  r=r
  return l.combine_first(r)

df1.groupBy(['month', 'Year']).cogroup(df2.groupBy(['month', 'Year'])).applyInPandas(mask_filter, schema=df2.schema).orderBy('Id').show() 

or

import pandas as pd

def mask_filter(l: pd.DataFrame, r: pd.DataFrame) -> pd.DataFrame:
  t =l.mask((l['month'].ge(5) & l['Year'].eq(2020)) | l['Year'].ge(2021),r)
  
  return t

df1.groupBy(['month', 'Year']).cogroup(df2.groupBy(['month', 'Year'])).applyInPandas(mask_filter, schema=df2.schema).orderBy('Id').show() 
 --- ----- ----- ---- 
| Id|Value|month|Year|
 --- ----- ----- ---- 
|  1|  672|    4|2020|
|  3|  363|    4|2021|
|  2|  683|    6|2019|
|  1|  353|    6|2020|
 --- ----- ----- ---- 
  • Related