Home > OS >  Pyspark - counting dates between two dataframes
Pyspark - counting dates between two dataframes

Time:09-22

I have two dataframes, each one with a date column. ie:


 ----------- 
|  DEADLINES|
 ----------- 
| 2023-07-15|
| 2018-08-10|
| 2022-03-28|
| 2021-06-22|
| 2021-12-18|
| 2021-10-11|
| 2021-11-13|
 ----------- 

 ---------- 
|   DT_DATE|
 ---------- 
|2021-04-02|
|2021-04-21|
|2021-05-01|
|2021-06-03|
|2021-09-07|
|2021-10-12|
|2021-11-02|
 ---------- 

I need to count how many dates of DT_DATE are between a given reference date and each one of DEADLINES dates.

For example: using 2021-03-31 as reference date should give the following result set.

 ----------- ------------ 
|  DEADLINES|    dt_count|
 ----------- ------------ 
| 2023-07-15|           7|
| 2018-08-10|           0|
| 2022-03-28|           7|
| 2021-06-22|           4|
| 2021-12-18|           7|
| 2021-10-11|           5|
| 2021-11-13|           7|
 ----------- ------------ 

I managed to make it work iterating through each row of deadlines dataframe but with a larger dataset the performance got very poor.

Does anyone have a better solution?

Edit: thats my current solution:

def count_days(deadlines_df, dates_df, ref_date):
    for row in deadlines_df.collect():
        qtt = dates_df.filter(dates_df.DT_DATE.between(ref_date, row.DEADLINES)).count()
        yield row.DEADLINES, qtt


new_df = spark.createDataFrame(count_days(deadlines_df, dates_df, "2021-03-31"), ["DEADLINES", "dt_count"])

CodePudding user response:

Both dataframes can be united with different weight, and Window function with range from start to current row used (Scala):

val deadlines = Seq(
  ("2023-07-15"),
  ("2018-08-10"),
  ("2022-03-28"),
  ("2021-06-22"),
  ("2021-12-18"),
  ("2021-10-11"),
  ("2021-11-13")
).toDF("DEADLINES")

val dates = Seq(
  ("2021-04-02"),
  ("2021-04-21"),
  ("2021-05-01"),
  ("2021-06-03"),
  ("2021-09-07"),
  ("2021-10-12"),
  ("2021-11-02")
).toDF("DT_DATE")

val referenceDate = "2021-03-31"
val united = deadlines.withColumn("weight", lit(0))
  .unionAll(
    dates
      .where($"DT_DATE" >= referenceDate)
      .withColumn("weight", lit(1))
  )

val fromStartToCurrentRowWindow = Window.orderBy("DEADLINES").rangeBetween(Window.unboundedPreceding, Window.currentRow)

val result = united
  .withColumn("dt_count", sum("weight").over(fromStartToCurrentRowWindow))
  .where($"weight" === lit(0))
  .drop("weight")

Output:

 ---------- -------- 
|DEADLINES |dt_count|
 ---------- -------- 
|2018-08-10|0       |
|2021-06-22|4       |
|2021-10-11|5       |
|2021-11-13|7       |
|2021-12-18|7       |
|2022-03-28|7       |
|2023-07-15|7       |
 ---------- -------- 

Note: calculation will be executed in one partition, Spark shows such warning: WARN Logging - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

Also other solution possible, joining two dataframes by range, which leads to cartesian join.

CodePudding user response:

If you have a small number of deadline dates, you can:

  • add one column by deadline date on dates_df dataframe, with value is 1 when DT_DATE is between ref_date and deadline date and 0 otherwise
  • then sum each deadline date columns
  • finally transpose result dataframe to obtain your desired dataframe

Let's see step by step

Add one column by deadline date:

from pyspark.sql import functions as F

deadline_rows = deadlines_df.collect()

dates_with_deadlines = dates_df
for row in deadline_rows:
    dates_with_deadlines = dates_with_deadlines.withColumn(
        str(row.DEADLINES),
        F.when(
          dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
        .otherwise(
          F.lit(0)
        )
    )

And you get, with your example, the following dates_with_deadlines dataframe:

 ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- 
|DT_DATE   |2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
 ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- 
|2021-04-02|1         |0         |1         |1         |1         |1         |1         |
|2021-04-21|1         |0         |1         |1         |1         |1         |1         |
|2021-05-01|1         |0         |1         |1         |1         |1         |1         |
|2021-06-03|1         |0         |1         |1         |1         |1         |1         |
|2021-09-07|1         |0         |1         |0         |1         |1         |1         |
|2021-10-12|1         |0         |1         |0         |1         |0         |1         |
|2021-11-02|1         |0         |1         |0         |1         |0         |1         |
 ---------- ---------- ---------- ---------- ---------- ---------- ---------- ---------- 

Sum deadlines

aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])

After this step, you get the following aggregated_df dataframe:

 ---------- ---------- ---------- ---------- ---------- ---------- ---------- 
|2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
 ---------- ---------- ---------- ---------- ---------- ---------- ---------- 
|7         |0         |7         |4         |7         |5         |7         |
 ---------- ---------- ---------- ---------- ---------- ---------- ---------- 

Transpose dataframe

result_df = aggregated_df \
  .withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
  .drop(*[str(x.DEADLINES) for x in deadline_rows]) \
  .withColumn('data', F.explode('merged')) \
  .drop('merged') \
  .withColumn('DEADLINES', F.col('data.DEADLINES')) \
  .withColumn('dt_count', F.col('data.dt_count')) \
  .drop('data')

And you have your expected result_df dataframe:

 ---------- -------- 
|DEADLINES |dt_count|
 ---------- -------- 
|2023-07-15|7       |
|2018-08-10|0       |
|2022-03-28|7       |
|2021-06-22|4       |
|2021-12-18|7       |
|2021-10-11|5       |
|2021-11-13|7       |
 ---------- -------- 

Complete Code

from pyspark.sql import functions as F

deadline_rows = deadlines_df.collect()

dates_with_deadlines = dates_df
for row in deadline_rows:
    dates_with_deadlines = dates_with_deadlines.withColumn(
        str(row.DEADLINES),
        F.when(
          dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
        .otherwise(
          F.lit(0)
        )
    )

aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])

result_df = aggregated_df \
  .withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
  .drop(*[str(x.DEADLINES) for x in deadline_rows]) \
  .withColumn('data', F.explode('merged')) \
  .drop('merged') \
  .withColumn('DEADLINES', F.col('data.DEADLINES')) \
  .withColumn('dt_count', F.col('data.dt_count')) \
  .drop('data')

Advantages and limits of this solution

With this solution, the only step that cannot be done using a distributed system is the transpose step.

Moreover, instead of your current solution, we perform all aggregation for each deadline column in parallele, and not sequentially.

However, this solutions works only if there are few deadline dates (hundreds, maybe thousands deadline dates), first because we retrieve all those deadline dates in the Spark driver with .collect(), second because in first step we create one column per deadline date, creating rows with lot of data, and finally because the last step is also executed on only one executor.

  • Related