I would like to get the last business day (LBD) of the month, and use LBD to filter records in a dataframe, I did come up with python code. But to achieve this functionality I need to use UDF. Is there any way to get the last business day of the month without using PySpark UDF?
import calendar
def last_business_day_in_month(calendarYearMonth):
year = int(calendarYearMonth[0:4])
month = int(calendarYearMonth[4:])
return str(year) str(month) str(max(calendar.monthcalendar(year, month)[-1:][0][:5]))
is in format YYYYMM
Ref: https://stackoverflow.com/a/62392077/6187792
CodePudding user response:
You can calculate it using last_day
and its dayofweek
from pyspark.sql import functions as func
spark.sparkContext.parallelize([(202010,), (202201,)]).toDF(['yrmth']). \
withColumn('lastday_mth', func.last_day(func.to_date(func.col('yrmth').cast('string'), 'yyyyMM'))). \
withColumn('dayofwk', func.dayofweek('lastday_mth')). \
func.when(func.col('dayofwk') == 7, func.date_add('lastday_mth', -1)).
when(func.col('dayofwk') == 1, func.date_add('lastday_mth', -2)).
). \
# ------ ----------- ------- --------------
# | yrmth|lastday_mth|dayofwk|lastbizday_mth|
# ------ ----------- ------- --------------
# |202010| 2020-10-31| 7| 2020-10-30|
# |202201| 2022-01-31| 2| 2022-01-31|
# ------ ----------- ------- --------------
CodePudding user response:
Create a small sequence
of last dates of the month, filter
out weekends and use array_max
to return the max date.
from pyspark.sql import functions as F
df = spark.createDataFrame([('202010',), ('202201',)], ['yrmth'])
last_day = F.last_day(F.to_date('yrmth', 'yyyyMM'))
last_days = F.sequence(F.date_sub(last_day, 3), last_day)
df = df.withColumn(
F.array_max(F.filter(last_days, lambda x: ~F.dayofweek(x).isin([1, 7])))
# ------ --------------------------
# | yrmth|last_business_day_in_month|
# ------ --------------------------
# |202010| 2020-10-30|
# |202201| 2022-01-31|
# ------ --------------------------
For lower Spark versions:
last_day = "last_day(to_date(yrmth, 'yyyyMM'))"
df = df.withColumn(
F.expr(f"array_max(filter(sequence(date_sub({last_day}, 3), {last_day}), x -> weekday(x) < 5))")