I have a merged DataFrame with two TimeStamp columns. I want to find the nearest (forward) TimeStamp (Timestamp1 -> Timestamp2) and take the value asociated and add it in a new column.
TimeStamp1 Value1 TimeStamp2 Value2
2021-11-01T01:55:29.473 131 2021-11-01T01:55:28.205 A
2021-11-01T01:55:30.474 3 2021-11-01T01:55:31.205 B
2021-11-01T05:01:55.247 195 2021-11-01T03:44:14.208 C
2021-11-01T05:01:56.247 67 2021-11-01T05:41:56.205 D
2021-11-01T09:41:30.264 131 2021-11-01T09:41:29.405 E
2021-11-01T09:41:32.264 67 2021-11-01T09:41:35.205 F
Expected output:
TimeStamp1 Value1 Value 2
2021-11-01T01:55:29.473 131 B
2021-11-01T01:55:30.474 3 B
2021-11-01T05:01:55.247 195 D
2021-11-01T05:01:56.247 67 D
2021-11-01T09:41:30.264 131 F
2021-11-01T09:41:32.264 67 F
Im working with PySpark, i checked some ways to do it but in pandas.
CodePudding user response:
The tranformation you are looking for can be achieved in two steps:
- Generate all posible combinations where
df["TimeStamp2"] >= df[TimeStamp1"]
using aself join
. This forms ourcandidate_df
. - We prune the
candidate_df
to retrieve the expected rows by finding the row containing the minimumTimeStamp2
across rows containing forTimeStamp1
. We do this my partitioning thecandidate_df
byTimeStamp1
then ordering byTimeStamp2
ascending and returning the first row.
If you have threshold for the "maximum nearness" (i.e) maximum difference between
TimeStamp1
andnearest TimeStamp2
, then the solution can be optimized to reduce size ofcandidate_df
.
Working Example
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window
data = [(datetime.strptime("2021-11-01T01:55:29.473", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T01:55:28.205", "%Y-%m-%dT%H:%M:%S.%f"), "A"),
(datetime.strptime("2021-11-01T01:55:30.474", "%Y-%m-%dT%H:%M:%S.%f"), 3, datetime.strptime("2021-11-01T01:55:31.205", "%Y-%m-%dT%H:%M:%S.%f"), "B"),
(datetime.strptime("2021-11-01T05:01:55.247", "%Y-%m-%dT%H:%M:%S.%f"), 195, datetime.strptime("2021-11-01T03:44:14.208", "%Y-%m-%dT%H:%M:%S.%f"), "C"),
(datetime.strptime("2021-11-01T05:01:56.247", "%Y-%m-%dT%H:%M:%S.%f"), 67, datetime.strptime("2021-11-01T05:41:56.205", "%Y-%m-%dT%H:%M:%S.%f"), "D"),
(datetime.strptime("2021-11-01T09:41:30.264", "%Y-%m-%dT%H:%M:%S.%f"), 131, datetime.strptime("2021-11-01T09:41:29.405", "%Y-%m-%dT%H:%M:%S.%f"), "E"),
(datetime.strptime("2021-11-01T09:41:32.264", "%Y-%m-%dT%H:%M:%S.%f"), 67, datetime.strptime("2021-11-01T09:41:35.205", "%Y-%m-%dT%H:%M:%S.%f"), "F"),]
df = spark.createDataFrame(data, ("TimeStamp1", "Value1", "TimeStamp2", "Value2",))
candidate_df = df.alias("l").join(df.alias("r"), F.col("r.TimeStamp2") >= F.col("l.TimeStamp1"))\
.selectExpr("l.TimeStamp1 as TimeStamp1",
"l.Value1 as Value1",
"r.TimeStamp2 as TimeStamp2",
"r.Value2 as Value2")
window_spec = Window.partitionBy("TimeStamp1").orderBy("TimeStamp2")
candidate_df.withColumn("rn" ,F.row_number().over(window_spec))\
.filter(F.col("rn") == 1)\
.drop("rn", "TimeStamp2")\
.show(200, False)
Output
----------------------- ------ ------
|TimeStamp1 |Value1|Value2|
----------------------- ------ ------
|2021-11-01 01:55:29.473|131 |B |
|2021-11-01 01:55:30.474|3 |B |
|2021-11-01 05:01:55.247|195 |D |
|2021-11-01 05:01:56.247|67 |D |
|2021-11-01 09:41:30.264|131 |F |
|2021-11-01 09:41:32.264|67 |F |
----------------------- ------ ------