Home > Blockchain >  Fetch start and end between two dates inclusive in pyspark
Fetch start and end between two dates inclusive in pyspark

Time:06-02

I have been trying to fetch months range from given 2 dates. But it's not giving up as expected

eg: start_date = 12-01-2022 (in dd-mm-yyyy) end_date = 03-06-2022

expected output: Valid_From Valid_To 2022-01-12 2022-01-31 2022-02-01 2022-02-28 2022-03-01 2022-03-31 2022-04-01 2022-04-30 2022-05-01 2022-05-31 2022-06-01 2022-06-03

My code: var_forecast_start_date = datetime.datetime(2022,1,12) var_forecast_end_date = datetime.datetime(2022,6,2)

df_datetime = ( pandas_to_spark( df_datetime(start = var_forecast_start_date, end = var_forecast_end_date) ) )

df_datetime = (df_datetime.withColumn('DateID',date_format(df_datetime.Date,'yyyyMMdd').cast(IntegerType())) .withColumn('FiscalDate',date_format(df_datetime.Date,'yyyy-MM-dd')) )

df_datetime = (df_datetime.selectExpr("add_months(date_add(last_day(Date),1),-1) AS Valid_From", "last_day(Date) AS Valid_To" ).distinct()

CodePudding user response:

try maybe the following:

import findspark
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

findspark.init()
spark = SparkSession.builder.appName("local").getOrCreate()
columns = ["start_date", "end_date"]
data = [("12-01-2022", "03-06-2022")]

df = spark.createDataFrame(data).toDF(*columns)
df = (
    df.withColumn(
        "start_date", F.to_date(F.col("start_date"), "dd-MM-yyyy").cast("DATE")
    )
    .withColumn(
        "end_date", F.to_date(F.col("end_date"), "dd-MM-yyyy").cast("DATE")
    )
    .withColumn(
        "months_between",
        F.round(
            F.months_between(F.col("end_date"), F.col("start_date"), True)
        ).cast("Integer"),
    )
    .withColumn(
        "months_between_seq", F.sequence(F.lit(1), F.col("months_between"))
    )
    .withColumn("months_between_seq", F.explode(F.col("months_between_seq")))
    .withColumn(
        "end_of_month",
        F.expr(
            """
                LAST_DAY(ADD_MONTHS(start_date, months_between_seq - 1))
            """
        ),
    )
    .withColumn(
        "begin_of_month",
        F.expr(
            """
                LAST_DAY(ADD_MONTHS(start_date, months_between_seq - 1))   1
            """
        ),
    )
)

start_window_agg = Window.partitionBy().orderBy("Valid_From")
start_union_sdf = (
    df.select(
        F.col("start_date").alias("Valid_From")
    )
    .unionByName(
        df.select(
            F.col("begin_of_month").alias("Valid_From")
        )
    )
    .drop_duplicates()
    .withColumn(
        "row_number",
        F.row_number().over(start_window_agg)
    )
)
end_window_agg = Window.partitionBy().orderBy("Valid_To")
end_union_sdf = (
    df.select(
        F.col("end_date").alias("Valid_To")
    )
    .unionByName(
        df.select(
            F.col("end_of_month").alias("Valid_To")
        )
    )
    .drop_duplicates()
    .withColumn(
        "row_number",
        F.row_number().over(end_window_agg)
    )
)
join_sdf = (
    end_union_sdf
    .join(
        start_union_sdf,
        how="inner",
        on=["row_number"]
    )
    .drop("row_number")
    .withColumn("Valid_To", F.col("Valid_To").cast("DATE"))
    .withColumn("Valid_From", F.col("Valid_From").cast("DATE"))
    .select("Valid_From", "Valid_To")
    .orderBy("Valid_From")
)
join_sdf.show()

It returns:

 ---------- ---------- 
|Valid_From|  Valid_To|
 ---------- ---------- 
|2022-01-12|2022-01-31|
|2022-02-01|2022-02-28|
|2022-03-01|2022-03-31|
|2022-04-01|2022-04-30|
|2022-05-01|2022-05-31|
|2022-06-01|2022-06-03|
 ---------- ---------- 
  • Related