Home > Mobile >  How to filter dataset to not include data with pickup and dropoff times outside of range
How to filter dataset to not include data with pickup and dropoff times outside of range

Time:10-20

I have a dataset of NYC taxi data that I am trying to filter through. The dataset has the schema like this:

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)

I want to filter the dataset so that any entries which have pickup and dropoff times not between the hours of 9am to 5pm are excluded. But I am having trouble writing a helper method to use within a withColumn function. Here is what I have for the withColumn calls:

from pyspark.sql.window import Window
from pyspark.sql import functions as fun

taxi_raw.withColumn("pickup_datetime", remove_pickup_times(fun.col("pickup_datetime"))
taxi_raw.withColumn("dropoff_datetime", remove_dropoff_times(fun.col("dropoff_datetime"))

And here is what I have for the helper methods so far:

import datetime

def remove_pickup_times(pickup_datetime):
    time_start = datetime.time(9,0,0)
    time_end = datetime.time(17,0,0)
    if(pickup_datetime.time() >= time_start and pickup_datetime.time() <= time_end):
        //insert code to remove entry from dataset

def remove_dropoff_times(dropoff_datetime):
        time_start = datetime.time(9,0,0)
        time_end = datetime.time(17,0,0)
        if(dropoff_datetime.time() >= time_start and dropoff_datetime.time() <= time_end):
            //insert code to remove entry from dataset

CodePudding user response:

You can use native Spark functions. date_format when 'HH:mm:ss' format is provided will only extract the time.

Input:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [('2020-01-01 08:08:08', '2020-01-01 09:09:09'),
     ('2020-01-01 16:08:08', '2020-01-01 17:09:09'),
     ('2020-01-01 16:08:08', '2020-01-01 16:09:09'),
     ('2020-01-01 20:08:08', '2020-01-01 20:09:09')],
    ['pickup_datetime', 'dropoff_datetime'])

Script for trips outside work hours:

start = F.date_format('pickup_datetime', 'HH:mm:ss')
end = F.date_format('dropoff_datetime', 'HH:mm:ss')
df = df.filter((end < '09:00:00') | (start > '17:00:00'))

df.show()
#  ------------------- ------------------- 
# |    pickup_datetime|   dropoff_datetime|
#  ------------------- ------------------- 
# |2020-01-01 20:08:08|2020-01-01 20:09:09|
#  ------------------- ------------------- 

Script for trips during work hours:

start = F.date_format('pickup_datetime', 'HH:mm:ss')
end = F.date_format('dropoff_datetime', 'HH:mm:ss')
df = df.filter((start >= '09:00:00') & (end <= '17:00:00'))

df.show()
#  ------------------- ------------------- 
# |    pickup_datetime|   dropoff_datetime|
#  ------------------- ------------------- 
# |2020-01-01 16:08:08|2020-01-01 16:09:09|
#  ------------------- ------------------- 

CodePudding user response:

Spark internally stores the timestamp as Epoch Milliseconds. So you can cast the timestamp to long and do the checks.

Try this spark.sql

spark.sql(s"""
with t1 ( select '2020-01-01 08:08:08' pickup , '2020-01-01 09:09:09' drop union all 
          select '2020-01-01 16:08:08', '2020-01-01 17:09:09' union all
          select '2020-01-01 16:08:08', '2020-01-01 16:09:09' union all
          select '2020-01-01 20:08:08', '2020-01-01 20:09:09' ) ,
    t2 ( select to_timestamp(pickup) pickup, to_timestamp(drop) drop  from t1  )
    select pickup, drop  from t2
     where (
             cast(drop as long) < cast(date_trunc('DAY',pickup) as long)   (9*60*60) --  09:00:00
             or
             cast(pickup as long) > cast(date_trunc('DAY',pickup) as long)   (17*60*60) --  17:00:00
            ) 
""").show(false)

 ------------------- ------------------- 
|pickup             |drop               |
 ------------------- ------------------- 
|2020-01-01 20:08:08|2020-01-01 20:09:09|
 ------------------- ------------------- 
  • Related