Home > other >  How to aggregate Spark dataframe based on timestamp count in hourly basis and still show the hours t
How to aggregate Spark dataframe based on timestamp count in hourly basis and still show the hours t

Time:07-25

I'm doing a project on the New York yellow taxi dataset. which consist of yearly taxi pick-up and drop-off records of almost 24M rows for the year of 2020. I'm interested in two columns, the trip pickup timestamp 'tpep_pickup_datetime' and the pick-up location ID 'PULocationID'.

df_new_final.select(['tpep_pickup_datetime','PULocationID']).show()
 -------------------- ------------ 
|tpep_pickup_datetime|PULocationID|
 -------------------- ------------ 
| 2020-03-07 12:35:20|          79|
| 2020-03-07 13:06:13|         107|
| 2020-03-07 13:40:17|         264|
| 2020-03-07 13:41:34|         236|
| 2020-03-07 14:11:24|         230|
| 2020-03-07 14:29:23|         239|
| 2020-03-07 14:46:16|         230|
| 2020-03-07 15:49:30|         170|
| 2020-03-07 16:53:56|         261|
| 2020-03-08 06:02:16|         114|
| 2020-03-08 06:23:47|         142|
| 2020-03-08 07:00:15|         186|
| 2020-03-08 08:05:36|         170|
| 2020-03-08 09:21:57|         148|
| 2020-03-08 10:10:08|          13|
| 2020-03-08 10:48:19|         162|
| 2020-03-08 10:56:04|         233|
| 2020-03-08 11:17:04|         170|
| 2020-03-08 11:29:25|         162|
| 2020-03-08 11:53:42|         138|
 -------------------- ------------ 

I have a 262 location ID. My main objective is to have an hourly aggregated number of trips per location ID. The year 2020 has 366 days and I have 262 location ID. so if trips were made every hour in each location ID, then their number of rows would be: 366 X 24 X 262 = 2301408. when I aggregate the data I have only 910784 rows below is my code:

to change the timestamp to hourly timestamp (remove min and sec)

df_new_final=df_new_final.withColumn("Pickup_datetime_hourly", date_format(col("tpep_pickup_datetime").cast("timestamp"), "yyyy-MM-dd HH:00"))

 -------------------- ---------------------- ------------ 
|tpep_pickup_datetime|Pickup_datetime_hourly|PULocationID|
 -------------------- ---------------------- ------------ 
| 2020-03-07 12:35:20|      2020-03-07 12:00|          79|
| 2020-03-07 13:06:13|      2020-03-07 13:00|         107|
| 2020-03-07 13:40:17|      2020-03-07 13:00|         264|
| 2020-03-07 13:41:34|      2020-03-07 13:00|         236|
| 2020-03-07 14:11:24|      2020-03-07 14:00|         230|
| 2020-03-07 14:29:23|      2020-03-07 14:00|         239|
| 2020-03-07 14:46:16|      2020-03-07 14:00|         230|
| 2020-03-07 15:49:30|      2020-03-07 15:00|         170|

create trip count column

df_new_final=df_new_final.withColumn("Trip_count", lit(1))

group by and aggregate the data per hour and location ID

hourly_aggregated=df_new_final.groupby(['Pickup_datetime_hourly','PULocationID']).agg({'Trip_count':'count'})

 ---------------------- ------------ ----------------- 
|Pickup_datetime_hourly|PULocationID|count(Trip_count)|
 ---------------------- ------------ ----------------- 
|      2020-03-01 07:00|         230|               72|
|      2020-03-01 10:00|         232|                5|
|      2020-03-01 16:00|         100|              180|
|      2020-03-01 16:00|         179|                4|
|      2020-03-01 18:00|         129|                3|
|      2020-03-03 05:00|         168|                2|
|      2020-03-03 06:00|         186|              392|
|      2020-03-04 01:00|          33|                1|
|      2020-03-04 04:00|         112|                1|
|      2020-03-04 05:00|         170|               55|
|      2020-03-04 20:00|         211|              128|
|      2020-03-04 20:00|         166|               91|
|      2020-03-05 03:00|         132|               22|

the number of rows count of the aggregated data

hourly_aggregated.count()
910784

It should be 2301408, and I think the issue is there are no pick-up records in certain hours. So when I aggregate the data, it will not show the missing hours as shown below for example

 ---------------------- ------------ ----------------- 
|Pickup_datetime_hourly|PULocationID|count(Trip_count)|
 ---------------------- ------------ ----------------- 
|      2020-03-01 00:00|         230|               72|
|      2020-03-01 01:00|         230|                5|
|      2020-03-01 03:00|         230|              180|
|      2020-03-01 06:00|         230|                4|
|      2020-03-01 07:00|         230|                3|

here there are no trips happening during 02:00, 04:00 and 05:00

my expectation, I want to even if there are no trips to show the count 0

 ---------------------- ------------ ----------------- 
|Pickup_datetime_hourly|PULocationID|count(Trip_count)|
 ---------------------- ------------ ----------------- 
|      2020-03-01 00:00|         230|               72|
|      2020-03-01 01:00|         230|                5|
|      2020-03-01 02:00|         230|                0|
|      2020-03-01 03:00|         230|                4|
|      2020-03-01 04:00|         230|                0|
|      2020-03-03 05:00|         230|                0|
|      2020-03-03 06:00|         230|                4|
|      2020-03-01 07:00|         230|                3|

How I can do that?

CodePudding user response:

from pyspark.sql.functions import col

def generate_series(start, stop, interval):
    """
    :parameter start  - lower bound, inclusive
    :parameter stop   - upper bound, exclusive
    :interval int     - increment interval in seconds
    """
    # Get start and stops in epoch seconds
    start, stop = spark.createDataFrame(
        [(start, stop)], ("start", "stop")
    ).select(
        [col(c).cast("timestamp").cast("long") for c in ("start", "stop")
    ]).first()
    # Create range with increments and then type cast to timestamp
    return spark.range(start, stop, interval).select(
        col("id").cast("timestamp")).toDF("Gen_pickup_datetime_hourly")
        
# Get series by hour and add start and stop value as per dataset values I have added values considering  Mar-2020 Taxi data 
gen_df=generate_series("2020-03-01", "2020-03-30", 60 * 60) 

hourly_aggregated_final=hourly_aggregated.join(gen_df,hourly_aggregated.Pickup_datetime_hourly ==  gen_df.Gen_pickup_datetime_hourly,"right")

# Replace null values in PULocationID and count(Trip_count) columns of dataframe hourly_aggregated_final with 0
# Assume  PULocationID of missing data to be 0

hourly_aggregated_final = hourly_aggregated_final.fillna({'PULocationID':'0','count(Trip_count)':'0'})
hourly_aggregated_final.show()
  • Related