Home > database >  Pyspark Increment the timestamp column based on row_number value
Pyspark Increment the timestamp column based on row_number value

Time:05-24

I am pulling the data from eventhub and I am getting 10 records in each packet and a timestamp is coming on each packet. I want to explode the packet consisting of 10 records and I want to add the packet timestamp to each record incrementing by 1 sec when partitioned by EnqueuedTimeUtc and vehicleid

Below is the intermediate data that I have in the dataframe.


df.show()

 ------------------- --------------- ------------------- 
|    EnqueuedTimeUtc|      vehicleid|   datetime_pkt    |
 ------------------- --------------- ------------------- 
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|
 ------------------- --------------- ------------------- 

expected output

 ------------------- --------------- ------------------- ------------------- 
|    EnqueuedTimeUtc|      vehicleid|   datetime_pkt    | nw_datetime_pkt   |
 ------------------- --------------- ------------------- ------------------- 
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:19:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:20:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:21:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:22:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:23:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:24:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:25:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:26:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:27:43|
|5/1/2022 7:19:46 AM|86135903910    |2022-05-01 07:19:43|2022-05-01 07:28:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:19:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:20:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:21:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:22:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:23:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:24:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:25:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:26:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:27:43|
|5/1/2022 7:19:49 AM|86135903910    |2022-05-01 07:19:48|2022-05-01 07:28:43|
 ------------------- --------------- ------------------- ------------------- 

CodePudding user response:

I was able to resolve the above task by making use of Windowing functions.

steps:

  1. add the row_number for partitionBy columns and add substract by 1 so that row_number starts by 0 instead of 1.
  2. make use of lag function and create a new column nw_datetime_pkt.
  3. make use of unix_timestamp function which take timestamp column and number of seconds to increment
import pyspark.sql.functions as F

df1 = df.withColumn("rn", F.row_number().over(Window.partitionBy("vehicleid", "datetime_pkt").orderBy("datetime_pkt")) - 1) \
        .withColumn("nw_datetime_pkt", F.lag(F.col("datetime_pkt")).over(Window.partitionBy("vehicleid", "datetime_pkt").orderBy("datetime_pkt")))

df1 = df1.withColumn("nw_datetime_pkt", F.when(F.col("nw_datetime_pkt").isNull(), F.col("datetime_pkt")).otherwise((F.unix_timestamp("nw_datetime_pkt")   df1.rn).cast('timestamp')))
  • Related