I have a dataset of NYC taxi data that I am trying to filter through. The dataset has the schema like this:
root
|-- medallion: string (nullable = true)
|-- hack_license: string (nullable = true)
|-- vendor_id: string (nullable = true)
|-- rate_code: integer (nullable = true)
|-- store_and_fwd_flag: string (nullable = true)
|-- pickup_datetime: timestamp (nullable = true)
|-- dropoff_datetime: timestamp (nullable = true)
|-- passenger_count: integer (nullable = true)
|-- trip_time_in_secs: integer (nullable = true)
|-- trip_distance: double (nullable = true)
|-- pickup_longitude: double (nullable = true)
|-- pickup_latitude: double (nullable = true)
|-- dropoff_longitude: double (nullable = true)
|-- dropoff_latitude: double (nullable = true)
I want to filter the dataset so that any entries which have pickup and dropoff times not between the hours of 9am to 5pm are excluded. But I am having trouble writing a helper method to use within a withColumn
function. Here is what I have for the withColumn
calls:
from pyspark.sql.window import Window
from pyspark.sql import functions as fun
taxi_raw.withColumn("pickup_datetime", remove_pickup_times(fun.col("pickup_datetime"))
taxi_raw.withColumn("dropoff_datetime", remove_dropoff_times(fun.col("dropoff_datetime"))
And here is what I have for the helper methods so far:
import datetime
def remove_pickup_times(pickup_datetime):
time_start = datetime.time(9,0,0)
time_end = datetime.time(17,0,0)
if(pickup_datetime.time() >= time_start and pickup_datetime.time() <= time_end):
//insert code to remove entry from dataset
def remove_dropoff_times(dropoff_datetime):
time_start = datetime.time(9,0,0)
time_end = datetime.time(17,0,0)
if(dropoff_datetime.time() >= time_start and dropoff_datetime.time() <= time_end):
//insert code to remove entry from dataset
CodePudding user response:
You can use native Spark functions. date_format
when 'HH:mm:ss' format is provided will only extract the time.
Input:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('2020-01-01 08:08:08', '2020-01-01 09:09:09'),
('2020-01-01 16:08:08', '2020-01-01 17:09:09'),
('2020-01-01 16:08:08', '2020-01-01 16:09:09'),
('2020-01-01 20:08:08', '2020-01-01 20:09:09')],
['pickup_datetime', 'dropoff_datetime'])
Script for trips outside work hours:
start = F.date_format('pickup_datetime', 'HH:mm:ss')
end = F.date_format('dropoff_datetime', 'HH:mm:ss')
df = df.filter((end < '09:00:00') | (start > '17:00:00'))
df.show()
# ------------------- -------------------
# | pickup_datetime| dropoff_datetime|
# ------------------- -------------------
# |2020-01-01 20:08:08|2020-01-01 20:09:09|
# ------------------- -------------------
Script for trips during work hours:
start = F.date_format('pickup_datetime', 'HH:mm:ss')
end = F.date_format('dropoff_datetime', 'HH:mm:ss')
df = df.filter((start >= '09:00:00') & (end <= '17:00:00'))
df.show()
# ------------------- -------------------
# | pickup_datetime| dropoff_datetime|
# ------------------- -------------------
# |2020-01-01 16:08:08|2020-01-01 16:09:09|
# ------------------- -------------------
CodePudding user response:
Spark internally stores the timestamp as Epoch Milliseconds. So you can cast the timestamp to long and do the checks.
Try this spark.sql
spark.sql(s"""
with t1 ( select '2020-01-01 08:08:08' pickup , '2020-01-01 09:09:09' drop union all
select '2020-01-01 16:08:08', '2020-01-01 17:09:09' union all
select '2020-01-01 16:08:08', '2020-01-01 16:09:09' union all
select '2020-01-01 20:08:08', '2020-01-01 20:09:09' ) ,
t2 ( select to_timestamp(pickup) pickup, to_timestamp(drop) drop from t1 )
select pickup, drop from t2
where (
cast(drop as long) < cast(date_trunc('DAY',pickup) as long) (9*60*60) -- 09:00:00
or
cast(pickup as long) > cast(date_trunc('DAY',pickup) as long) (17*60*60) -- 17:00:00
)
""").show(false)
------------------- -------------------
|pickup |drop |
------------------- -------------------
|2020-01-01 20:08:08|2020-01-01 20:09:09|
------------------- -------------------