I am doing the following to get workdays and work hours excluding weekends.
case = case.withColumn('days_h', F.expr('sequence(CreatedDate, ClosedDate, interval 1 hour)'))\
.withColumn('weekdays_h', F.expr('filter(transform(days_h, day->(day, extract(dow_iso from day))), day -> day.col2 <=5).day'))\
case = case.withColumn("from_work_day", F.col('weekdays_h')[0])
case = case.withColumn("to_work_day", F.element_at(F.col('weekdays_h'), -1))
timeDiff = (F.col('to_work_day').cast('long') - F.col('from_work_day').cast('long'))
case = case.withColumn("Duration", timeDiff).withColumn('Diff_in_Hours', F.round(F.col('Duration')/3600))
In the below screenshot, I must get 482 hrs, but instead I get 672 hrs which is including weekend dates.
Is there a way to calculate hours of the dates in the array which is exclusive of weekends?
CodePudding user response:
You had a nice idea of using sequence
and filter
. I have modified your approach in a way that for every date which is not weekend I calculate seconds within range. At the end I accumulate seconds for every date into one number.
Input:
from pyspark.sql import functions as F
df = spark.range(1).select(
F.lit('2021-06-02 01:31:28').cast('timestamp').alias('CreatedDate'),
F.lit('2021-06-30 03:31:42').cast('timestamp').alias('ClosedDate'),
)
Script:
diff_in_seconds = F.aggregate(
F.expr("filter(sequence(to_date(CreatedDate), to_date(ClosedDate)), x -> weekday(x) < 5)"),
F.lit(0).cast('long'),
lambda acc, d: acc F.least(F.date_add(d, 1), F.to_timestamp('ClosedDate')).cast('long')
- F.greatest(d, F.to_timestamp('CreatedDate')).cast('long')
)
df = df.withColumn('Diff_in_Hours', F.round(diff_in_seconds / 3600))
df.show(truncate=0)
# ------------------- ------------------- -------------
# |CreatedDate |ClosedDate |Diff_in_Hours|
# ------------------- ------------------- -------------
# |2021-06-02 01:31:28|2021-06-30 03:31:42|482.0 |
# ------------------- ------------------- -------------
Version for Spark 2.4
diff_in_seconds = F.expr("""
aggregate(
filter(sequence(to_date(CreatedDate), to_date(ClosedDate)), x -> weekday(x) < 5),
cast(0 as long),
(acc, d) -> acc cast(least(date_add(d, 1), to_timestamp(ClosedDate)) as long)
- cast(greatest(d, to_timestamp(CreatedDate)) as long)
)
""")
df = df.withColumn('Diff_in_Hours', F.round(diff_in_seconds / 3600))