I have a dataframe that has distinct 'send' and 'receive' rows. I need to join these rows in a single one with send and receive columns, using PySpark. Notice that the ID is the same for the lines and the action identifier is ACTION_ID:
Original dataframe:
------------------------------------ ------------------------ --------- --------------------
|ID |MSG_DT |ACTION_CD|MESSAGE |
------------------------------------ ------------------------ --------- --------------------
|d2636151-b95e-4845-8014-0a113c381ff9|2022-08-07T21:24:54.552Z|receive |Oi |
|d2636151-b95e-4845-8014-0a113c381ff9|2022-08-07T21:24:54.852Z|send |Olá! |
|4241224b-9ba5-4eda-8e16-7e3aeaacf164|2022-08-07T21:25:06.565Z|receive |4 |
|4241224b-9ba5-4eda-8e16-7e3aeaacf164|2022-08-07T21:25:06.688Z|send |Certo |
|bd46c6fb-1315-4418-9943-2e7d3151f788|2022-08-07T21:25:30.408Z|receive |1 |
|bd46c6fb-1315-4418-9943-2e7d3151f788|2022-08-07T21:25:30.479Z|send |⭐️*Antes de você ir |
|14da8519-6e4c-4edc-88ea-e33c14533dd9|2022-08-07T21:25:52.798Z|receive |788884 |
|14da8519-6e4c-4edc-88ea-e33c14533dd9|2022-08-07T21:25:57.435Z|send |Agora |
------------------------------------ ------------------------ --------- --------------------
How I need:
------------------------------------ ------------------------ ------- -------------------
|ID |MSG_DT |RECEIVE|SEND |
------------------------------------ ------------------------ ------- -------------------
|d2636151-b95e-4845-8014-0a113c381ff9|2022-08-07T21:24:54.552Z|Oi |Olá! |
|4241224b-9ba5-4eda-8e16-7e3aeaacf164|2022-08-07T21:25:06.565Z|4 |Certo |
|bd46c6fb-1315-4418-9943-2e7d3151f788|2022-08-07T21:25:30.408Z|1 |⭐️*Antes de você ir|
|14da8519-6e4c-4edc-88ea-e33c14533dd9|2022-08-07T21:25:52.798Z|788884 |Agora |
------------------------------------ ------------------------ ------- -------------------
Ps.: The MSG_DT is the earliest record.
CodePudding user response:
You can construct the RECEIVE
and SEND
by applying first
expression over computed columns that are created depending on ACTION_CD
.
from pyspark.sql import functions as F
from pyspark.sql import Window as W
data = [("d2636151-b95e-4845-8014-0a113c381ff9", "2022-08-07T21:24:54.552Z", "receive", "Oi",),
("d2636151-b95e-4845-8014-0a113c381ff9", "2022-08-07T21:24:54.852Z", "send", "Olá!",),
("4241224b-9ba5-4eda-8e16-7e3aeaacf164", "2022-08-07T21:25:06.565Z", "receive", "4",),
("4241224b-9ba5-4eda-8e16-7e3aeaacf164", "2022-08-07T21:25:06.688Z", "send", "Certo",),
("bd46c6fb-1315-4418-9943-2e7d3151f788", "2022-08-07T21:25:30.408Z", "receive", "1",),
("bd46c6fb-1315-4418-9943-2e7d3151f788", "2022-08-07T21:25:30.479Z", "send", "️*Antes de você ir",),
("14da8519-6e4c-4edc-88ea-e33c14533dd9", "2022-08-07T21:25:52.798Z", "receive", "788884",),
("14da8519-6e4c-4edc-88ea-e33c14533dd9", "2022-08-07T21:25:57.435Z", "send", "Agora",), ]
df = spark.createDataFrame(data, ("ID", "MSG_DT", "ACTION_CD", "MESSAGE")).withColumn("MSG_DT", F.to_timestamp("MSG_DT"))
ws = W.partitionBy("ID").orderBy("MSG_DT")
first_rows = ws.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
action_column_selection = lambda action: F.first(F.when(F.col("ACTION_CD") == action, F.col("MESSAGE")), ignorenulls=True).over(first_rows)
(df.select("*",
action_column_selection("receive").alias("RECEIVE"),
action_column_selection("send").alias("SEND"),
F.row_number().over(ws).alias("rn"))
.where("rn = 1")
.drop("ACTION_CD", "MESSAGE", "rn")).show(truncate=False)
"""
------------------------------------ ----------------------- ------- ------------------
|ID |MSG_DT |RECEIVE|SEND |
------------------------------------ ----------------------- ------- ------------------
|14da8519-6e4c-4edc-88ea-e33c14533dd9|2022-08-07 23:25:52.798|788884 |Agora |
|4241224b-9ba5-4eda-8e16-7e3aeaacf164|2022-08-07 23:25:06.565|4 |Certo |
|bd46c6fb-1315-4418-9943-2e7d3151f788|2022-08-07 23:25:30.408|1 |️*Antes de você ir|
|d2636151-b95e-4845-8014-0a113c381ff9|2022-08-07 23:24:54.552|Oi |Olá! |
------------------------------------ ----------------------- ------- ------------------
"""