Home > Software engineering >  In PySpark dataframe, Join send and receive rows in columns
In PySpark dataframe, Join send and receive rows in columns

Time:08-11

I have a dataframe that has distinct 'send' and 'receive' rows. I need to join these rows in a single one with send and receive columns, using PySpark. Notice that the ID is the same for the lines and the action identifier is ACTION_ID:

Original dataframe:

 ------------------------------------ ------------------------ --------- -------------------- 
|ID                                  |MSG_DT                  |ACTION_CD|MESSAGE             |
 ------------------------------------ ------------------------ --------- -------------------- 
|d2636151-b95e-4845-8014-0a113c381ff9|2022-08-07T21:24:54.552Z|receive  |Oi                  |
|d2636151-b95e-4845-8014-0a113c381ff9|2022-08-07T21:24:54.852Z|send     |Olá!                |
|4241224b-9ba5-4eda-8e16-7e3aeaacf164|2022-08-07T21:25:06.565Z|receive  |4                   |
|4241224b-9ba5-4eda-8e16-7e3aeaacf164|2022-08-07T21:25:06.688Z|send     |Certo               |
|bd46c6fb-1315-4418-9943-2e7d3151f788|2022-08-07T21:25:30.408Z|receive  |1                   |
|bd46c6fb-1315-4418-9943-2e7d3151f788|2022-08-07T21:25:30.479Z|send     |⭐️*Antes de você ir |
|14da8519-6e4c-4edc-88ea-e33c14533dd9|2022-08-07T21:25:52.798Z|receive  |788884              |
|14da8519-6e4c-4edc-88ea-e33c14533dd9|2022-08-07T21:25:57.435Z|send     |Agora               |
 ------------------------------------ ------------------------ --------- -------------------- 

How I need:

 ------------------------------------ ------------------------ ------- ------------------- 
|ID                                  |MSG_DT                  |RECEIVE|SEND               |
 ------------------------------------ ------------------------ ------- ------------------- 
|d2636151-b95e-4845-8014-0a113c381ff9|2022-08-07T21:24:54.552Z|Oi     |Olá!               |
|4241224b-9ba5-4eda-8e16-7e3aeaacf164|2022-08-07T21:25:06.565Z|4      |Certo              |
|bd46c6fb-1315-4418-9943-2e7d3151f788|2022-08-07T21:25:30.408Z|1      |⭐️*Antes de você ir|
|14da8519-6e4c-4edc-88ea-e33c14533dd9|2022-08-07T21:25:52.798Z|788884 |Agora              |
 ------------------------------------ ------------------------ ------- ------------------- 

Ps.: The MSG_DT is the earliest record.

CodePudding user response:

You can construct the RECEIVE and SEND by applying first expression over computed columns that are created depending on ACTION_CD.

from pyspark.sql import functions as F
from pyspark.sql import Window as W

data = [("d2636151-b95e-4845-8014-0a113c381ff9", "2022-08-07T21:24:54.552Z", "receive", "Oi",),
        ("d2636151-b95e-4845-8014-0a113c381ff9", "2022-08-07T21:24:54.852Z", "send", "Olá!",),
        ("4241224b-9ba5-4eda-8e16-7e3aeaacf164", "2022-08-07T21:25:06.565Z", "receive", "4",),
        ("4241224b-9ba5-4eda-8e16-7e3aeaacf164", "2022-08-07T21:25:06.688Z", "send", "Certo",),
        ("bd46c6fb-1315-4418-9943-2e7d3151f788", "2022-08-07T21:25:30.408Z", "receive", "1",),
        ("bd46c6fb-1315-4418-9943-2e7d3151f788", "2022-08-07T21:25:30.479Z", "send", "️*Antes de você ir",),
        ("14da8519-6e4c-4edc-88ea-e33c14533dd9", "2022-08-07T21:25:52.798Z", "receive", "788884",),
        ("14da8519-6e4c-4edc-88ea-e33c14533dd9", "2022-08-07T21:25:57.435Z", "send", "Agora",), ]

df = spark.createDataFrame(data, ("ID", "MSG_DT", "ACTION_CD", "MESSAGE")).withColumn("MSG_DT", F.to_timestamp("MSG_DT"))

ws = W.partitionBy("ID").orderBy("MSG_DT")
first_rows = ws.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

action_column_selection = lambda action: F.first(F.when(F.col("ACTION_CD") == action, F.col("MESSAGE")), ignorenulls=True).over(first_rows)

(df.select("*", 
           action_column_selection("receive").alias("RECEIVE"), 
           action_column_selection("send").alias("SEND"),
           F.row_number().over(ws).alias("rn"))
   .where("rn = 1")
   .drop("ACTION_CD", "MESSAGE", "rn")).show(truncate=False)


"""
 ------------------------------------ ----------------------- ------- ------------------ 
|ID                                  |MSG_DT                 |RECEIVE|SEND              |
 ------------------------------------ ----------------------- ------- ------------------ 
|14da8519-6e4c-4edc-88ea-e33c14533dd9|2022-08-07 23:25:52.798|788884 |Agora             |
|4241224b-9ba5-4eda-8e16-7e3aeaacf164|2022-08-07 23:25:06.565|4      |Certo             |
|bd46c6fb-1315-4418-9943-2e7d3151f788|2022-08-07 23:25:30.408|1      |️*Antes de você ir|
|d2636151-b95e-4845-8014-0a113c381ff9|2022-08-07 23:24:54.552|Oi     |Olá!              |
 ------------------------------------ ----------------------- ------- ------------------ 
"""
  • Related