I am new to databricks(pyspark) and I want to map to dataframe-based on conditions. DF1 has id, date and some other columns and DF2 also has id, Date, and some other columns. For example DF1
Date | name | ID | Other columns |
---|---|---|---|
21/06/2022 |
ABC | XZ18610 | |
22/05/2022 |
ABC | XZ18610 | |
22/04/2022 |
ABC | XZ18610 | |
05/05/2022 |
DEF | XZ25277 | |
04/02/2022 |
DEF | XZ25277 | |
28/06/2022 |
GHI | XZU6S19 | |
18/07/2022 |
JKL | XZ54866 | |
27/07/2022 |
MNO | XZ82434 | |
20/06/2022 |
PQR | XZ78433 |
DF2
Date1 | ID1 | Value | Other columns1 |
---|---|---|---|
30/05/2022 |
XZ18610 | B | |
21/06/2021 |
XZ18610 | A | |
05/01/2021 |
XZ25277 | B | |
28/07/2022 |
XZU6S19 | E | |
18/05/2022 |
XZ54866 | D | |
27/07/2022 |
XZ82434 | F | |
20/06/2022 |
XZ78433 | I |
Desire output
Date | name | ID | Other columns | Date1 | Value | Other columns1 |
---|---|---|---|---|---|---|
21/06/2022 |
ABC | XZ18610 | 30/05/2022 |
B | ||
22/05/2022 |
ABC | XZ18610 | 21/06/2021 |
A | ||
22/04/2022 |
ABC | XZ18610 | 21/06/2021 |
A | ||
05/05/2022 |
DEF | XZ25277 | 05/01/2021 |
B | ||
04/02/2022 |
DEF | XZ25277 | 05/01/2021 |
B | ||
28/06/2022 |
GHI | XZU6S19 | nan |
nan | ||
18/07/2022 |
JKL | XZ54866 | 18/05/2022 |
D | ||
27/07/2022 |
MNO | XZ82434 | 27/07/2022 |
F | ||
20/06/2022 |
PQR | XZ78433 | 20/06/2022 |
I |
Basically, I want to add other columns from DF2 to DF1 when ID is the same and DATE1 is less than DATE but is close to DATE. For example, if there are two DATE1 that are less than DATE then take the latest DATE1 and all other data from that row and add them to DF1.
I can do it in python with loops but my data are huge and it takes so much time so I wanted to do it in DataBricks
Thanks
CodePudding user response:
You can use a window function:
from pyspark.sql import functions as F
from pyspark.sql import Window
df1 = spark.createDataFrame(
[
('21/06/2022','ABC','XZ18610',''),
('22/05/2022','ABC','XZ18610',''),
('22/04/2022','ABC','XZ18610',''),
('05/05/2022','DEF','XZ25277',''),
('04/02/2022','DEF','XZ25277',''),
('28/06/2022','GHI','XZU6S19',''),
('18/07/2022','JKL','XZ54866',''),
('27/07/2022','MNO','XZ82434',''),
('20/06/2022','PQR','XZ78433','')
],
['Date','name','ID','Other columns']
)\
.withColumn('Date', F.to_date('Date', 'd/M/y'))
df2 = spark.createDataFrame(
[
('30/05/2022','XZ18610','B',''),
('21/06/2021','XZ18610','A',''),
('05/01/2021','XZ25277','B',''),
('28/07/2022','XZU6S19','E',''),
('18/05/2022','XZ54866','D',''),
('27/07/2022','XZ82434','F',''),
('20/06/2022','XZ78433','I','')
],
['Date1','ID1','Value','Other columns1']
)\
.withColumn('Date1', F.to_date('Date1', 'd/M/y'))
df3 = df1\
.join(df2, df1.ID == df2.ID1, 'left')\
.filter(df2.Date1 <= df1.Date)\
.withColumn('datediff', F.datediff('Date', 'Date1'))\
.withColumn("min", F.min('datediff').over(Window.partitionBy("Date", "ID")))\
.filter(F.col('datediff')==F.col('min'))
result = df1.join(df3, ['Date', 'name', 'ID', 'Other columns'], 'left')
result\
.select('Date','name','ID','Other columns','Date1','Value','Other columns1')\
.show()
# ---------- ---- ------- ------------- ---------- ----- --------------
# | Date|name| ID|Other columns| Date1|Value|Other columns1|
# ---------- ---- ------- ------------- ---------- ----- --------------
# |2022-06-21| ABC|XZ18610| |2022-05-30| B| |
# |2022-05-22| ABC|XZ18610| |2021-06-21| A| |
# |2022-04-22| ABC|XZ18610| |2021-06-21| A| |
# |2022-05-05| DEF|XZ25277| |2021-01-05| B| |
# |2022-02-04| DEF|XZ25277| |2021-01-05| B| |
# |2022-06-28| GHI|XZU6S19| | null| null| null|
# |2022-07-18| JKL|XZ54866| |2022-05-18| D| |
# |2022-07-27| MNO|XZ82434| |2022-07-27| F| |
# |2022-06-20| PQR|XZ78433| |2022-06-20| I| |
# ---------- ---- ------- ------------- ---------- ----- --------------