Home > Software engineering >  Workday hours in a range of timestamp values in PySpark
Workday hours in a range of timestamp values in PySpark

Time:09-06

I am doing the following to get workdays and work hours excluding weekends.

case = case.withColumn('days_h', F.expr('sequence(CreatedDate, ClosedDate, interval 1 hour)'))\
        .withColumn('weekdays_h', F.expr('filter(transform(days_h, day->(day, extract(dow_iso from day))), day -> day.col2 <=5).day'))\

case = case.withColumn("from_work_day", F.col('weekdays_h')[0])
case = case.withColumn("to_work_day", F.element_at(F.col('weekdays_h'), -1))

timeDiff = (F.col('to_work_day').cast('long') - F.col('from_work_day').cast('long'))
case = case.withColumn("Duration", timeDiff).withColumn('Diff_in_Hours', F.round(F.col('Duration')/3600))

In the below screenshot, I must get 482 hrs, but instead I get 672 hrs which is including weekend dates.

enter image description here

Is there a way to calculate hours of the dates in the array which is exclusive of weekends?

CodePudding user response:

You had a nice idea of using sequence and filter. I have modified your approach in a way that for every date which is not weekend I calculate seconds within range. At the end I accumulate seconds for every date into one number.

Input:

from pyspark.sql import functions as F
df = spark.range(1).select(
    F.lit('2021-06-02 01:31:28').cast('timestamp').alias('CreatedDate'),
    F.lit('2021-06-30 03:31:42').cast('timestamp').alias('ClosedDate'),
)

Script:

diff_in_seconds = F.aggregate(
    F.expr("filter(sequence(to_date(CreatedDate), to_date(ClosedDate)), x -> weekday(x) < 5)"),
    F.lit(0).cast('long'),
    lambda acc, d: acc   F.least(F.date_add(d, 1), F.to_timestamp('ClosedDate')).cast('long')
                    - F.greatest(d, F.to_timestamp('CreatedDate')).cast('long')
)
df = df.withColumn('Diff_in_Hours', F.round(diff_in_seconds / 3600))

df.show(truncate=0)
#  ------------------- ------------------- ------------- 
# |CreatedDate        |ClosedDate         |Diff_in_Hours|
#  ------------------- ------------------- ------------- 
# |2021-06-02 01:31:28|2021-06-30 03:31:42|482.0        |
#  ------------------- ------------------- ------------- 

Version for Spark 2.4

diff_in_seconds = F.expr("""
    aggregate(
        filter(sequence(to_date(CreatedDate), to_date(ClosedDate)), x -> weekday(x) < 5),
        cast(0 as long),
        (acc, d) -> acc   cast(least(date_add(d, 1), to_timestamp(ClosedDate)) as long)
                    - cast(greatest(d, to_timestamp(CreatedDate)) as long)
    )
""")
df = df.withColumn('Diff_in_Hours', F.round(diff_in_seconds / 3600))
  • Related