Home > front end >  Pyspark two DataFrame mapping on DataBricks
Pyspark two DataFrame mapping on DataBricks

Time:09-06

I am new to databricks(pyspark) and I want to map to dataframe-based on conditions. DF1 has id, date and some other columns and DF2 also has id, Date, and some other columns. For example DF1

Date name ID Other columns
21/06/2022 ABC XZ18610
22/05/2022 ABC XZ18610
22/04/2022 ABC XZ18610
05/05/2022 DEF XZ25277
04/02/2022 DEF XZ25277
28/06/2022 GHI XZU6S19
18/07/2022 JKL XZ54866
27/07/2022 MNO XZ82434
20/06/2022 PQR XZ78433

DF2

Date1 ID1 Value Other columns1
30/05/2022 XZ18610 B
21/06/2021 XZ18610 A
05/01/2021 XZ25277 B
28/07/2022 XZU6S19 E
18/05/2022 XZ54866 D
27/07/2022 XZ82434 F
20/06/2022 XZ78433 I

Desire output

Date name ID Other columns Date1 Value Other columns1
21/06/2022 ABC XZ18610 30/05/2022 B
22/05/2022 ABC XZ18610 21/06/2021 A
22/04/2022 ABC XZ18610 21/06/2021 A
05/05/2022 DEF XZ25277 05/01/2021 B
04/02/2022 DEF XZ25277 05/01/2021 B
28/06/2022 GHI XZU6S19 nan nan
18/07/2022 JKL XZ54866 18/05/2022 D
27/07/2022 MNO XZ82434 27/07/2022 F
20/06/2022 PQR XZ78433 20/06/2022 I

Basically, I want to add other columns from DF2 to DF1 when ID is the same and DATE1 is less than DATE but is close to DATE. For example, if there are two DATE1 that are less than DATE then take the latest DATE1 and all other data from that row and add them to DF1.

I can do it in python with loops but my data are huge and it takes so much time so I wanted to do it in DataBricks

Thanks

CodePudding user response:

You can use a window function:

from pyspark.sql import functions as F
from pyspark.sql import Window

df1 = spark.createDataFrame(
    [
    ('21/06/2022','ABC','XZ18610',''),
    ('22/05/2022','ABC','XZ18610',''),
    ('22/04/2022','ABC','XZ18610',''),
    ('05/05/2022','DEF','XZ25277',''),
    ('04/02/2022','DEF','XZ25277',''),
    ('28/06/2022','GHI','XZU6S19',''),
    ('18/07/2022','JKL','XZ54866',''),
    ('27/07/2022','MNO','XZ82434',''),
    ('20/06/2022','PQR','XZ78433','')
    ],
    ['Date','name','ID','Other columns']
)\
    .withColumn('Date', F.to_date('Date', 'd/M/y'))

df2 = spark.createDataFrame(
    [
    ('30/05/2022','XZ18610','B',''),
    ('21/06/2021','XZ18610','A',''),
    ('05/01/2021','XZ25277','B',''),
    ('28/07/2022','XZU6S19','E',''),
    ('18/05/2022','XZ54866','D',''),
    ('27/07/2022','XZ82434','F',''),
    ('20/06/2022','XZ78433','I','')
    ],
    ['Date1','ID1','Value','Other columns1']
)\
    .withColumn('Date1', F.to_date('Date1', 'd/M/y'))

                                                          
df3 = df1\
          .join(df2, df1.ID == df2.ID1, 'left')\
          .filter(df2.Date1 <= df1.Date)\
          .withColumn('datediff', F.datediff('Date', 'Date1'))\
          .withColumn("min", F.min('datediff').over(Window.partitionBy("Date", "ID")))\
          .filter(F.col('datediff')==F.col('min'))

result = df1.join(df3, ['Date', 'name', 'ID', 'Other columns'], 'left')
            
result\
        .select('Date','name','ID','Other columns','Date1','Value','Other columns1')\
        .show()


#  ---------- ---- ------- ------------- ---------- ----- -------------- 
# |      Date|name|     ID|Other columns|     Date1|Value|Other columns1|
#  ---------- ---- ------- ------------- ---------- ----- -------------- 
# |2022-06-21| ABC|XZ18610|             |2022-05-30|    B|              |
# |2022-05-22| ABC|XZ18610|             |2021-06-21|    A|              |
# |2022-04-22| ABC|XZ18610|             |2021-06-21|    A|              |
# |2022-05-05| DEF|XZ25277|             |2021-01-05|    B|              |
# |2022-02-04| DEF|XZ25277|             |2021-01-05|    B|              |
# |2022-06-28| GHI|XZU6S19|             |      null| null|          null|
# |2022-07-18| JKL|XZ54866|             |2022-05-18|    D|              |
# |2022-07-27| MNO|XZ82434|             |2022-07-27|    F|              |
# |2022-06-20| PQR|XZ78433|             |2022-06-20|    I|              |
#  ---------- ---- ------- ------------- ---------- ----- -------------- 
  • Related