I am pulling the data from eventhub and I am getting 10 records in each packet and a timestamp is coming on each packet. I want to explode the packet consisting of 10 records and I want to add the packet timestamp to each record incrementing by 1 sec when partitioned by EnqueuedTimeUtc and vehicleid
Below is the intermediate data that I have in the dataframe.
df.show()
------------------- --------------- -------------------
| EnqueuedTimeUtc| vehicleid| datetime_pkt |
------------------- --------------- -------------------
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|
------------------- --------------- -------------------
expected output
------------------- --------------- ------------------- -------------------
| EnqueuedTimeUtc| vehicleid| datetime_pkt | nw_datetime_pkt |
------------------- --------------- ------------------- -------------------
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:20:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:21:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:22:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:23:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:24:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:25:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:26:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:27:43|
|5/1/2022 7:19:46 AM|86135903910 |2022-05-01 07:19:43|2022-05-01 07:28:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:19:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:20:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:21:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:22:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:23:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:24:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:25:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:26:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:27:43|
|5/1/2022 7:19:49 AM|86135903910 |2022-05-01 07:19:48|2022-05-01 07:28:43|
------------------- --------------- ------------------- -------------------
CodePudding user response:
I was able to resolve the above task by making use of Windowing functions.
steps:
- add the row_number for partitionBy columns and add substract by 1 so that row_number starts by 0 instead of 1.
- make use of lag function and create a new column nw_datetime_pkt.
- make use of unix_timestamp function which take timestamp column and number of seconds to increment
import pyspark.sql.functions as F
df1 = df.withColumn("rn", F.row_number().over(Window.partitionBy("vehicleid", "datetime_pkt").orderBy("datetime_pkt")) - 1) \
.withColumn("nw_datetime_pkt", F.lag(F.col("datetime_pkt")).over(Window.partitionBy("vehicleid", "datetime_pkt").orderBy("datetime_pkt")))
df1 = df1.withColumn("nw_datetime_pkt", F.when(F.col("nw_datetime_pkt").isNull(), F.col("datetime_pkt")).otherwise((F.unix_timestamp("nw_datetime_pkt") df1.rn).cast('timestamp')))