Here's my dataset sparkDF1
Id Value month Year
1 672 4 2020
1 356 6 2020
2 683 6 2019
3 366 4 2021
Here's my dataset sparkDF2
Id Value month Year
1 671 4 2020
1 353 6 2020
2 682 6 2019
3 363 4 2021
Here's my expected dataset sparkDF
that is used sparkDF2
from month=5 Year=2020 and before that using sparkDF1
Id Value month Year
1 672 4 2020
1 353 6 2020
2 683 6 2019
3 363 4 2021
The pandas alternative is
df = df1.mask((df1['month'].ge(5) & df1['Year'].eq(2020)) | df1['Year'].ge(2021), df2)
CodePudding user response:
you can filter each df on its own and then union the two. Alternatively you can use Koalas which will give you pandas syntax on spark
CodePudding user response:
Option 1: Filter and unionBy
s=((df1.month >= 5)&(df1.Year == 2020))|(df1.Year >= 2021)
s1=((df2.month >= 5)&(df2.Year == 2020))|(df2.Year >= 2021)
new = df1.where(~s).unionByName(df2.where(s1)).orderBy('Id')
new.show()
--- ----- ----- ----
| Id|Value|month|Year|
--- ----- ----- ----
| 1| 672| 4|2020|
| 1| 353| 6|2020|
| 2| 683| 6|2019|
| 3| 363| 4|2021|
--- ----- ----- ----
Option 2: If you have pandas code, you can use pandas udfs. The problem with pandas udf that include two dataframes use the cogroup
method which incurs a shuffle. In your case. I would use pandas' combine_first
or just what you did. code below
import pandas as pd
def mask_filter(l: pd.DataFrame, r: pd.DataFrame) -> pd.DataFrame:
l =l.mask((l['month'].ge(5) & l['Year'].eq(2020)) | l['Year'].ge(2021))
r=r
return l.combine_first(r)
df1.groupBy(['month', 'Year']).cogroup(df2.groupBy(['month', 'Year'])).applyInPandas(mask_filter, schema=df2.schema).orderBy('Id').show()
or
import pandas as pd
def mask_filter(l: pd.DataFrame, r: pd.DataFrame) -> pd.DataFrame:
t =l.mask((l['month'].ge(5) & l['Year'].eq(2020)) | l['Year'].ge(2021),r)
return t
df1.groupBy(['month', 'Year']).cogroup(df2.groupBy(['month', 'Year'])).applyInPandas(mask_filter, schema=df2.schema).orderBy('Id').show()
--- ----- ----- ----
| Id|Value|month|Year|
--- ----- ----- ----
| 1| 672| 4|2020|
| 3| 363| 4|2021|
| 2| 683| 6|2019|
| 1| 353| 6|2020|
--- ----- ----- ----