Assume we have a DataFrame(df) below in PySpark. And, how to use PySpark to get the duration (in mins) between each biking event and the driving event? And, assume for each biking will have only one corresponding driving event; However, within a day there could be multiple "biking - driving" pairs. Eventually, store the final results into data frame, including biking__time, biking_event, driving_time, driving_event, and each_durations, etc)
Notes: there can be other events between biking and driving, such as a person can start from biking, running, swimming then driving.
One example refers to below table:
The duration of date 03/01/2018 between biking and driving is: 8:12 - 5:12 = 3 hours = 180 mins
TimeDetails | Event | |
---|---|---|
1 | 3/1/2018 5:12 | Biking |
2 | 3/1/2018 6:12 | Swimming |
3 | 3/1/2018 7:12 | Hiking |
4 | 3/1/2018 8:12 | Driving |
5 | 3/2/2018 9:12 | Biking |
6 | 3/2/2018 10:12 | Swimming |
7 | 3/2/2018 11:12 | Swimming |
8 | 3/2/2018 12:12 | Driving |
9 | 3/2/2018 13:12 | Swimming |
Below is the sample output:
biking_time | event_name1 | driving_time | event_name2 | durations_inMins | |
---|---|---|---|---|---|
1 | 3/1/2018 5:12 | biking | 3/1/2018 8:12 | driving | 180 |
2 | 3/2/2018 9:12 | biking | 3/2/2018 12:12 | driving | 180 |
Below is some of my code:
biking_df = df.filter(df.Event == 'Biking)
driving_df = df.filter(df.Event == 'Driving')
Can someone please provide me with some code in PySpark? Thanks a lot
CodePudding user response:
Your exemple (I added another day with a missing Driving record - the solution will now also handle that):
df = spark.createDataFrame(
[
('1','3/1/2018 5:12','Biking')
,('2','3/1/2018 6:12','Swimming')
,('3','3/1/2018 7:12','Hiking')
,('4','3/1/2018 8:12','Driving')
,('5','3/2/2018 9:12','Biking')
,('6','3/2/2018 10:12','Swimming')
,('7','3/2/2018 11:12','Swimming')
,('8','3/2/2018 12:12','Driving')
,('9','3/2/2018 13:12','Swimming')
,('10','3/3/2018 9:10','Biking')
,('11','3/3/2018 9:50','Swimming')
,('12','3/3/2018 10:30','Swimming')
,('13','3/3/2018 11:12','Hiking')
], ['index','TimeDetails','Event']
)
Solution
from pyspark.sql import functions as F
df = df\
.withColumn('TimeDetails', F.to_timestamp('TimeDetails', 'M/d/y H:m'))\
.withColumn('date', F.to_date('TimeDetails'))
#Finding all possible dates in the original dataset:
date_interval = df\
.agg(
F.date_trunc("dd", F.max(F.col("date"))).alias("max_date"),
F.date_trunc("dd", F.min(F.col("date"))).alias("min_date"))\
.withColumn('date_interval', F.explode(F.expr('sequence(min_date, max_date, interval 1 day)')))\
.select('date_interval')\
.withColumn('date_interval', F.to_date('date_interval'))
#Imputing those dates on biking and drivig subsets
biking_df = date_interval\
.join(df.filter(df.Event == 'Biking'),date_interval.date_interval == df.date,'left')\
.withColumn('Event', F.coalesce(F.col('Event'), F.lit('Biking')))\
.select('date_interval',F.col('TimeDetails').alias('biking_time'),F.col('Event').alias('event_name1'))
driving_df = date_interval\
.join(df.filter(df.Event == 'Driving'),date_interval.date_interval == df.date,'left')\
.withColumn('Event', F.coalesce(F.col('Event'), F.lit('Driving')))\
.select('date_interval',F.col('TimeDetails').alias('driving_time'),F.col('Event').alias('event_name2'))
result = biking_df\
.join(driving_df, 'date_interval')\
.withColumn('durations_inMins',(F.unix_timestamp("driving_time") - F.unix_timestamp('biking_time'))/60)\
.select('biking_time','event_name1','driving_time','event_name2','durations_inMins')
result.show()
The output:
------------------- ----------- ------------------- ----------- ----------------
| biking_time|event_name1| driving_time|event_name2|durations_inMins|
------------------- ----------- ------------------- ----------- ----------------
|2018-03-01 05:12:00| Biking|2018-03-01 08:12:00| Driving| 180.0|
|2018-03-02 09:12:00| Biking|2018-03-02 12:12:00| Driving| 180.0|
|2018-03-03 09:10:00| Biking| null| Driving| null|
------------------- ----------- ------------------- ----------- ----------------