Home > Software engineering >  How can I join measure data and release information?
How can I join measure data and release information?

Time:12-15

I try to join two pyspark dataframes. One contains my measurement data, the other one contains release information of my measruement equipment. I want to add the release information to the measurement data like this:

Input:

  • measure data:

    logger_id measure_date data
    394 2018-07-09T09:25:40 some data
    394 2018-08-23T09:51:18 other data
    394 2019-04-23T09:51:18 other data
    398 2018-01-10T12:15:53 more data
    398 2019-10-24T08:10:25 other data
  • release data

    logger_id release_date release_information
    394 2018-07-01T00:00:00 release information
    394 2019-04-01T00:00:00 release information
    398 2018-01-01T00:00:00 release information
    398 2019-07-01T00:00:00 release information

and I want an output like that:

logger_id measure_date data release_date release_information
394 2018-07-09T09:25:40 some data 2018-07-01T00:00:00 release information
394 2018-08-23T09:51:18 other data 2018-07-01T00:00:00 release information
394 2019-04-23T09:51:18 other data 2019-04-01T00:00:00 release information
398 2018-01-10T12:15:53 more data 2018-01-01T00:00:00 release information
398 2019-10-24T08:10:25 other data 2019-07-01T00:00:00 release information

I've already tried

cond = [release_data.release_date < measure_data.measure_date, release_data.logger_id == measure_data.logger_id]
measure_data.join(release_data, cond, how='fullouter')

But in the resulting dataframe I get the release data with 'null' columns of the measure dataframe

I also considered iterating through my measuredata dataframe and adding the release information for every row, but for it is really large, I don't wanna do that

CodePudding user response:

You can transform release_df to include a column that finds the until when a release is valid, for this lead can be used.

Once the release_valid_end is included, then the join condition would change to find date comparison checks between measure_date and release_date and release_valid_end.

from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window as W

measure_data = [(394, datetime.strptime("2018-07-09T09:25:40", "%Y-%m-%dT%H:%M:%S"), "some data",),
                (394, datetime.strptime("2018-08-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
                (394, datetime.strptime("2019-04-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
                (398, datetime.strptime("2018-01-10T12:15:53", "%Y-%m-%dT%H:%M:%S"), "more data",),
                (398, datetime.strptime("2019-10-24T08:10:25", "%Y-%m-%dT%H:%M:%S"), "other data",), ]

release_data = [(394, datetime.strptime("2018-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
                (394, datetime.strptime("2019-04-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
                (398, datetime.strptime("2018-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
                (398, datetime.strptime("2019-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",), ]

measure_df = spark.createDataFrame(measure_data, ("logger_id", "measure_date", "data",))
release_df = spark.createDataFrame(release_data, ("logger_id", "release_date", "release_information",))

world_end_date = datetime.strptime("2999-12-31T00:00:00", "%Y-%m-%dT%H:%M:%S")

window_spec = W.partitionBy("logger_id").orderBy(F.asc("release_date"))

release_validity_df = release_df.withColumn("release_valid_end", 
                                            F.lead("release_date", offset=1, default=world_end_date).over(window_spec))

(measure_df.join(release_validity_df, 
                ((measure_df["logger_id"] == release_validity_df["logger_id"]) & 
                 ((measure_df["measure_date"] >= release_validity_df["release_date"]) &
                  (measure_df["measure_date"] < release_validity_df["release_valid_end"]))
                ))
).select(measure_df["logger_id"], "measure_date", "data", "release_date", "release_information").show()

Output

 --------- ------------------- ---------- ------------------- ------------------- 
|logger_id|       measure_date|      data|       release_date|release_information|
 --------- ------------------- ---------- ------------------- ------------------- 
|      398|2018-01-10 12:15:53| more data|2018-01-01 00:00:00|release information|
|      398|2019-10-24 08:10:25|other data|2019-07-01 00:00:00|release information|
|      394|2018-07-09 09:25:40| some data|2018-07-01 00:00:00|release information|
|      394|2018-08-23 09:51:18|other data|2018-07-01 00:00:00|release information|
|      394|2019-04-23 09:51:18|other data|2019-04-01 00:00:00|release information|
 --------- ------------------- ---------- ------------------- ------------------- 
  • Related