I'm doing a project on the New York yellow taxi dataset. which consist of yearly taxi pick-up and drop-off records of almost 24M rows for the year of 2020. I'm interested in two columns, the trip pickup timestamp 'tpep_pickup_datetime' and the pick-up location ID 'PULocationID'.
df_new_final.select(['tpep_pickup_datetime','PULocationID']).show()
-------------------- ------------
|tpep_pickup_datetime|PULocationID|
-------------------- ------------
| 2020-03-07 12:35:20| 79|
| 2020-03-07 13:06:13| 107|
| 2020-03-07 13:40:17| 264|
| 2020-03-07 13:41:34| 236|
| 2020-03-07 14:11:24| 230|
| 2020-03-07 14:29:23| 239|
| 2020-03-07 14:46:16| 230|
| 2020-03-07 15:49:30| 170|
| 2020-03-07 16:53:56| 261|
| 2020-03-08 06:02:16| 114|
| 2020-03-08 06:23:47| 142|
| 2020-03-08 07:00:15| 186|
| 2020-03-08 08:05:36| 170|
| 2020-03-08 09:21:57| 148|
| 2020-03-08 10:10:08| 13|
| 2020-03-08 10:48:19| 162|
| 2020-03-08 10:56:04| 233|
| 2020-03-08 11:17:04| 170|
| 2020-03-08 11:29:25| 162|
| 2020-03-08 11:53:42| 138|
-------------------- ------------
I have a 262 location ID. My main objective is to have an hourly aggregated number of trips per location ID. The year 2020 has 366 days and I have 262 location ID. so if trips were made every hour in each location ID, then their number of rows would be: 366 X 24 X 262 = 2301408. when I aggregate the data I have only 910784 rows below is my code:
to change the timestamp to hourly timestamp (remove min and sec)
df_new_final=df_new_final.withColumn("Pickup_datetime_hourly", date_format(col("tpep_pickup_datetime").cast("timestamp"), "yyyy-MM-dd HH:00"))
-------------------- ---------------------- ------------
|tpep_pickup_datetime|Pickup_datetime_hourly|PULocationID|
-------------------- ---------------------- ------------
| 2020-03-07 12:35:20| 2020-03-07 12:00| 79|
| 2020-03-07 13:06:13| 2020-03-07 13:00| 107|
| 2020-03-07 13:40:17| 2020-03-07 13:00| 264|
| 2020-03-07 13:41:34| 2020-03-07 13:00| 236|
| 2020-03-07 14:11:24| 2020-03-07 14:00| 230|
| 2020-03-07 14:29:23| 2020-03-07 14:00| 239|
| 2020-03-07 14:46:16| 2020-03-07 14:00| 230|
| 2020-03-07 15:49:30| 2020-03-07 15:00| 170|
create trip count column
df_new_final=df_new_final.withColumn("Trip_count", lit(1))
group by and aggregate the data per hour and location ID
hourly_aggregated=df_new_final.groupby(['Pickup_datetime_hourly','PULocationID']).agg({'Trip_count':'count'})
---------------------- ------------ -----------------
|Pickup_datetime_hourly|PULocationID|count(Trip_count)|
---------------------- ------------ -----------------
| 2020-03-01 07:00| 230| 72|
| 2020-03-01 10:00| 232| 5|
| 2020-03-01 16:00| 100| 180|
| 2020-03-01 16:00| 179| 4|
| 2020-03-01 18:00| 129| 3|
| 2020-03-03 05:00| 168| 2|
| 2020-03-03 06:00| 186| 392|
| 2020-03-04 01:00| 33| 1|
| 2020-03-04 04:00| 112| 1|
| 2020-03-04 05:00| 170| 55|
| 2020-03-04 20:00| 211| 128|
| 2020-03-04 20:00| 166| 91|
| 2020-03-05 03:00| 132| 22|
the number of rows count of the aggregated data
hourly_aggregated.count()
910784
It should be 2301408, and I think the issue is there are no pick-up records in certain hours. So when I aggregate the data, it will not show the missing hours as shown below for example
---------------------- ------------ -----------------
|Pickup_datetime_hourly|PULocationID|count(Trip_count)|
---------------------- ------------ -----------------
| 2020-03-01 00:00| 230| 72|
| 2020-03-01 01:00| 230| 5|
| 2020-03-01 03:00| 230| 180|
| 2020-03-01 06:00| 230| 4|
| 2020-03-01 07:00| 230| 3|
here there are no trips happening during 02:00, 04:00 and 05:00
my expectation, I want to even if there are no trips to show the count 0
---------------------- ------------ -----------------
|Pickup_datetime_hourly|PULocationID|count(Trip_count)|
---------------------- ------------ -----------------
| 2020-03-01 00:00| 230| 72|
| 2020-03-01 01:00| 230| 5|
| 2020-03-01 02:00| 230| 0|
| 2020-03-01 03:00| 230| 4|
| 2020-03-01 04:00| 230| 0|
| 2020-03-03 05:00| 230| 0|
| 2020-03-03 06:00| 230| 4|
| 2020-03-01 07:00| 230| 3|
How I can do that?
CodePudding user response:
from pyspark.sql.functions import col
def generate_series(start, stop, interval):
"""
:parameter start - lower bound, inclusive
:parameter stop - upper bound, exclusive
:interval int - increment interval in seconds
"""
# Get start and stops in epoch seconds
start, stop = spark.createDataFrame(
[(start, stop)], ("start", "stop")
).select(
[col(c).cast("timestamp").cast("long") for c in ("start", "stop")
]).first()
# Create range with increments and then type cast to timestamp
return spark.range(start, stop, interval).select(
col("id").cast("timestamp")).toDF("Gen_pickup_datetime_hourly")
# Get series by hour and add start and stop value as per dataset values I have added values considering Mar-2020 Taxi data
gen_df=generate_series("2020-03-01", "2020-03-30", 60 * 60)
hourly_aggregated_final=hourly_aggregated.join(gen_df,hourly_aggregated.Pickup_datetime_hourly == gen_df.Gen_pickup_datetime_hourly,"right")
# Replace null values in PULocationID and count(Trip_count) columns of dataframe hourly_aggregated_final with 0
# Assume PULocationID of missing data to be 0
hourly_aggregated_final = hourly_aggregated_final.fillna({'PULocationID':'0','count(Trip_count)':'0'})
hourly_aggregated_final.show()