I try to join two pyspark dataframes. One contains my measurement data, the other one contains release information of my measruement equipment. I want to add the release information to the measurement data like this:
Input:
measure data:
logger_id measure_date data 394 2018-07-09T09:25:40 some data 394 2018-08-23T09:51:18 other data 394 2019-04-23T09:51:18 other data 398 2018-01-10T12:15:53 more data 398 2019-10-24T08:10:25 other data release data
logger_id release_date release_information 394 2018-07-01T00:00:00 release information 394 2019-04-01T00:00:00 release information 398 2018-01-01T00:00:00 release information 398 2019-07-01T00:00:00 release information
and I want an output like that:
logger_id | measure_date | data | release_date | release_information |
---|---|---|---|---|
394 | 2018-07-09T09:25:40 | some data | 2018-07-01T00:00:00 | release information |
394 | 2018-08-23T09:51:18 | other data | 2018-07-01T00:00:00 | release information |
394 | 2019-04-23T09:51:18 | other data | 2019-04-01T00:00:00 | release information |
398 | 2018-01-10T12:15:53 | more data | 2018-01-01T00:00:00 | release information |
398 | 2019-10-24T08:10:25 | other data | 2019-07-01T00:00:00 | release information |
I've already tried
cond = [release_data.release_date < measure_data.measure_date, release_data.logger_id == measure_data.logger_id]
measure_data.join(release_data, cond, how='fullouter')
But in the resulting dataframe I get the release data with 'null' columns of the measure dataframe
I also considered iterating through my measuredata dataframe and adding the release information for every row, but for it is really large, I don't wanna do that
CodePudding user response:
You can transform release_df
to include a column that finds the until when a release is valid, for this lead
can be used.
Once the release_valid_end
is included, then the join condition would change to find date comparison checks between measure_date
and
release_date
and release_valid_end
.
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window as W
measure_data = [(394, datetime.strptime("2018-07-09T09:25:40", "%Y-%m-%dT%H:%M:%S"), "some data",),
(394, datetime.strptime("2018-08-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
(394, datetime.strptime("2019-04-23T09:51:18", "%Y-%m-%dT%H:%M:%S"), "other data",),
(398, datetime.strptime("2018-01-10T12:15:53", "%Y-%m-%dT%H:%M:%S"), "more data",),
(398, datetime.strptime("2019-10-24T08:10:25", "%Y-%m-%dT%H:%M:%S"), "other data",), ]
release_data = [(394, datetime.strptime("2018-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(394, datetime.strptime("2019-04-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(398, datetime.strptime("2018-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",),
(398, datetime.strptime("2019-07-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), "release information",), ]
measure_df = spark.createDataFrame(measure_data, ("logger_id", "measure_date", "data",))
release_df = spark.createDataFrame(release_data, ("logger_id", "release_date", "release_information",))
world_end_date = datetime.strptime("2999-12-31T00:00:00", "%Y-%m-%dT%H:%M:%S")
window_spec = W.partitionBy("logger_id").orderBy(F.asc("release_date"))
release_validity_df = release_df.withColumn("release_valid_end",
F.lead("release_date", offset=1, default=world_end_date).over(window_spec))
(measure_df.join(release_validity_df,
((measure_df["logger_id"] == release_validity_df["logger_id"]) &
((measure_df["measure_date"] >= release_validity_df["release_date"]) &
(measure_df["measure_date"] < release_validity_df["release_valid_end"]))
))
).select(measure_df["logger_id"], "measure_date", "data", "release_date", "release_information").show()
Output
--------- ------------------- ---------- ------------------- -------------------
|logger_id| measure_date| data| release_date|release_information|
--------- ------------------- ---------- ------------------- -------------------
| 398|2018-01-10 12:15:53| more data|2018-01-01 00:00:00|release information|
| 398|2019-10-24 08:10:25|other data|2019-07-01 00:00:00|release information|
| 394|2018-07-09 09:25:40| some data|2018-07-01 00:00:00|release information|
| 394|2018-08-23 09:51:18|other data|2018-07-01 00:00:00|release information|
| 394|2019-04-23 09:51:18|other data|2019-04-01 00:00:00|release information|
--------- ------------------- ---------- ------------------- -------------------