Home > Back-end >  How to merge dataframes with the closest timestamps
How to merge dataframes with the closest timestamps

Time:02-06

I am trying to merge two dataframes with different timestamps using pyspark. I want to be able to merge the data based on the closest timestamp difference.

Here is the sample data: Timestamps are all different so I cant join on time=time in the dataframes.

ID time x y
1 2023-01-02 14:31 10 20
1 2023-01-02 14:35 20 10
ID time x1 y1
2 2023-01-02 14:32 10 20
2 2023-01-01 14:36 20 10
ID time x1 y1 ID time x2 y2
1 2023-01-01 14:31 10 20 2 2023-01-02 14:32 10 20
1 2023-01-01 14:35 20 10 2 2023-01-01 14:36 20 10

When I simply join the dataframes, it creates thousands of rows and the timestamps are all over the places when theres only 200 datapoints. I am not sure what is going on please help.

I tried joining and its creating too much data

CodePudding user response:

Unfortunately, time-based joins are outside of the typical Spark use-cases and usually require workarounds that tend to be inefficient. And if you only have 200 datapoints, there are probably more convenient ways to process the data than via Spark, e.g. https://pandas.pydata.org/docs/reference/api/pandas.merge_asof.html.

In addition, I'm not sure if your task is well-defined. If two timestamps from the first dataframe have the same closest timestamp in the second dataframe,

  1. should both be joined to the same line from the second dataframe, or
  2. should only the join with the smaller distance be performed, while the other row from the first dataframe needs a different partner?

The latter will need a sophisticated solution and it's unlikely that Spark's built-in functions can help you. In the first scenario, let's go through your options.

If the closest timestamp from the past is sufficient

In this case you need an "AS OF" join. As of 3.3.1 Spark doesn't have those natively, but there's a workaround in Scala and even a simpler one in Python via the Pandas API; see How do you create merge_asof functionality in PySpark?.

If it really has to be the closest timestamp from the past and the future

The easy but inefficient way is to perform the whole join and select the rows to be kept based on temporal distance, i.e.

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import col

spark = SparkSession.builder.master("local").getOrCreate()

df1 = spark.read.csv("./df1.csv", header=True).withColumn(
    "time", col("time").cast(T.TimestampType())
)
df2 = spark.read.csv("./df2.csv", header=True).withColumn(
    "time", col("time").cast(T.TimestampType())
)

df_joined = (
    df1.join(df2.withColumnRenamed("time", "time1").withColumnRenamed("ID", "ID1"), how="left")
    .withColumn("temporalDiff", F.abs(col("time1") - col("time")))
    .groupBy("ID", "time", "x", "y")
    .agg(
        F.expr("min_by(ID1, temporalDiff)").alias("ID1"),
        F.expr("min_by(time1, temporalDiff)").alias("time1"),
        F.expr("min_by(x1, temporalDiff)").alias("x1"),
        F.expr("min_by(y1, temporalDiff)").alias("y1"),
    )
)

min_by was introduced in Spark 3.0. For previous versions, see https://sparkbyexamples.com/spark/spark-find-maximum-row-per-group-in-dataframe/.

You could also use the AS OF join solution linked above and expand it to work in both directions, which would be more efficient, but also more complicated to implement and read.

  • Related