I have data as below
----- ------------ -------------
| TYPE|DTIN_DATE |DTOUT_DATE |
----- ------------ -------------
| A| 2021-03-22| 2021-05-26|
| B| 2021-03-30| 2021-04-09|
----- ------------ -------------
I want to expand and count the number of days for every month between DTIN_DATE and DTOUT_DATE as below
----- ------------ -------------
| TYPE|MONTH |NO_DAYS |
----- ------------ -------------
| A| 2021-03 | 10 |
| A| 2021-04 | 30 |
| A| 2021-05 | 26 |
| B| 2021-03 | 2 |
| B| 2021-04 | 9 |
----- ------------ -------------
CodePudding user response:
You can create a sequence
using 1 month
as interval and then calculate days difference without grouping/ aggregating.
df = (df
.withColumn(
'month',
F.explode(F.expr("sequence(date_trunc('MM', DTIN_DATE), date_trunc('MM', DTOUT_DATE), interval 1 month)")))
.select(
'TYPE',
F.date_format('month', 'yyyy-MM').alias('MONTH'),
(F.datediff(F.least('DTOUT_DATE', F.last_day('month')), F.greatest('DTIN_DATE', 'month')) 1).alias('NO_DAYS'))
)
Full example:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('A', '2021-03-22', '2021-05-26'),
('B', '2021-03-30', '2021-04-09')],
['TYPE', 'DTIN_DATE', 'DTOUT_DATE']
).withColumn('DTIN_DATE', F.col('DTIN_DATE').cast('date')
).withColumn('DTOUT_DATE', F.col('DTOUT_DATE').cast('date'))
df = (df
.withColumn(
'month',
F.explode(F.expr("sequence(date_trunc('MM', DTIN_DATE), date_trunc('MM', DTOUT_DATE), interval 1 month)")))
.select(
'TYPE',
F.date_format('month', 'yyyy-MM').alias('MONTH'),
(F.datediff(F.least('DTOUT_DATE', F.last_day('month')), F.greatest('DTIN_DATE', 'month')) 1).alias('NO_DAYS'))
)
df.show()
# ---- ------- -------
# |TYPE|MONTH |NO_DAYS|
# ---- ------- -------
# |A |2021-03|10 |
# |A |2021-04|30 |
# |A |2021-05|26 |
# |B |2021-03|2 |
# |B |2021-04|9 |
# ---- ------- -------
CodePudding user response:
Check this out:
df = (
df
.withColumn('date_range', f.expr('sequence(dtin_date, dtout_date, interval 1 day)'))
.withColumn('date_range_month_values', f.expr('transform(date_range, element -> date_format(element, "yyyy-MM"))'))
.withColumn('date_range_month_value', f.explode(f.col('date_range_month_values')))
.groupby('type', 'date_range_month_value')
.count()
)