Home > Software design >  Find nearest Timestamp on another column and add value in a new column PySpark
Find nearest Timestamp on another column and add value in a new column PySpark

Time:11-23

I have a merged DataFrame with two TimeStamp columns. I want to find the nearest (forward) TimeStamp (Timestamp1 -> Timestamp2) and take the value asociated and add it in a new column.

TimeStamp1                Value1     TimeStamp2               Value2
2021-11-01T01:55:29.473   131        2021-11-01T01:55:28.205  A
2021-11-01T01:55:30.474   3          2021-11-01T01:55:31.205  B
2021-11-01T05:01:55.247   195        2021-11-01T03:44:14.208  C
2021-11-01T05:01:56.247   67         2021-11-01T05:41:56.205  D
2021-11-01T09:41:30.264   131        2021-11-01T09:41:29.405  E
2021-11-01T09:41:32.264   67         2021-11-01T09:41:35.205  F

Expected output:

TimeStamp1                Value1     Value 2 
2021-11-01T01:55:29.473   131        B
2021-11-01T01:55:30.474   3          B
2021-11-01T05:01:55.247   195        D
2021-11-01T05:01:56.247   67         D
2021-11-01T09:41:30.264   131        F
2021-11-01T09:41:32.264   67         F

Im working with PySpark, i checked some ways to do it but in pandas.

CodePudding user response:

The tranformation you are looking for can be achieved in two steps:

  1. Generate all posible combinations where df["TimeStamp2"] >= df[TimeStamp1"] using a self join. This forms our candidate_df.
  2. We prune the candidate_df to retrieve the expected rows by finding the row containing the minimum TimeStamp2 across rows containing for TimeStamp1. We do this my partitioning the candidate_df by TimeStamp1 then ordering by TimeStamp2 ascending and returning the first row.

If you have threshold for the "maximum nearness" (i.e) maximum difference between TimeStamp1 and nearest TimeStamp2, then the solution can be optimized to reduce size of candidate_df.

Working Example


from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window

data = [(datetime.strptime("2021-11-01T01:55:29.473", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T01:55:28.205", "%Y-%m-%dT%H:%M:%S.%f"), "A"),
(datetime.strptime("2021-11-01T01:55:30.474", "%Y-%m-%dT%H:%M:%S.%f"), 3,   datetime.strptime("2021-11-01T01:55:31.205", "%Y-%m-%dT%H:%M:%S.%f"), "B"),
(datetime.strptime("2021-11-01T05:01:55.247", "%Y-%m-%dT%H:%M:%S.%f"), 195, datetime.strptime("2021-11-01T03:44:14.208", "%Y-%m-%dT%H:%M:%S.%f"), "C"),
(datetime.strptime("2021-11-01T05:01:56.247", "%Y-%m-%dT%H:%M:%S.%f"), 67,  datetime.strptime("2021-11-01T05:41:56.205", "%Y-%m-%dT%H:%M:%S.%f"), "D"),
(datetime.strptime("2021-11-01T09:41:30.264", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T09:41:29.405", "%Y-%m-%dT%H:%M:%S.%f"), "E"),
(datetime.strptime("2021-11-01T09:41:32.264", "%Y-%m-%dT%H:%M:%S.%f"), 67,  datetime.strptime("2021-11-01T09:41:35.205", "%Y-%m-%dT%H:%M:%S.%f"), "F"),]

df = spark.createDataFrame(data, ("TimeStamp1", "Value1", "TimeStamp2", "Value2",))

candidate_df = df.alias("l").join(df.alias("r"), F.col("r.TimeStamp2") >= F.col("l.TimeStamp1"))\
                 .selectExpr("l.TimeStamp1 as TimeStamp1", 
                             "l.Value1 as Value1", 
                             "r.TimeStamp2 as TimeStamp2", 
                             "r.Value2 as Value2")

window_spec = Window.partitionBy("TimeStamp1").orderBy("TimeStamp2")

candidate_df.withColumn("rn" ,F.row_number().over(window_spec))\
            .filter(F.col("rn") == 1)\
            .drop("rn", "TimeStamp2")\
            .show(200, False)

Output

 ----------------------- ------ ------ 
|TimeStamp1             |Value1|Value2|
 ----------------------- ------ ------ 
|2021-11-01 01:55:29.473|131   |B     |
|2021-11-01 01:55:30.474|3     |B     |
|2021-11-01 05:01:55.247|195   |D     |
|2021-11-01 05:01:56.247|67    |D     |
|2021-11-01 09:41:30.264|131   |F     |
|2021-11-01 09:41:32.264|67    |F     |
 ----------------------- ------ ------ 

  • Related