I have two dataframes, each one with a date column. ie:
-----------
| DEADLINES|
-----------
| 2023-07-15|
| 2018-08-10|
| 2022-03-28|
| 2021-06-22|
| 2021-12-18|
| 2021-10-11|
| 2021-11-13|
-----------
----------
| DT_DATE|
----------
|2021-04-02|
|2021-04-21|
|2021-05-01|
|2021-06-03|
|2021-09-07|
|2021-10-12|
|2021-11-02|
----------
I need to count how many dates of DT_DATE
are between a given reference date and each one of DEADLINES
dates.
For example: using 2021-03-31
as reference date should give the following result set.
----------- ------------
| DEADLINES| dt_count|
----------- ------------
| 2023-07-15| 7|
| 2018-08-10| 0|
| 2022-03-28| 7|
| 2021-06-22| 4|
| 2021-12-18| 7|
| 2021-10-11| 5|
| 2021-11-13| 7|
----------- ------------
I managed to make it work iterating through each row of deadlines dataframe but with a larger dataset the performance got very poor.
Does anyone have a better solution?
Edit: thats my current solution:
def count_days(deadlines_df, dates_df, ref_date):
for row in deadlines_df.collect():
qtt = dates_df.filter(dates_df.DT_DATE.between(ref_date, row.DEADLINES)).count()
yield row.DEADLINES, qtt
new_df = spark.createDataFrame(count_days(deadlines_df, dates_df, "2021-03-31"), ["DEADLINES", "dt_count"])
CodePudding user response:
Both dataframes can be united with different weight, and Window function with range from start to current row used (Scala):
val deadlines = Seq(
("2023-07-15"),
("2018-08-10"),
("2022-03-28"),
("2021-06-22"),
("2021-12-18"),
("2021-10-11"),
("2021-11-13")
).toDF("DEADLINES")
val dates = Seq(
("2021-04-02"),
("2021-04-21"),
("2021-05-01"),
("2021-06-03"),
("2021-09-07"),
("2021-10-12"),
("2021-11-02")
).toDF("DT_DATE")
val referenceDate = "2021-03-31"
val united = deadlines.withColumn("weight", lit(0))
.unionAll(
dates
.where($"DT_DATE" >= referenceDate)
.withColumn("weight", lit(1))
)
val fromStartToCurrentRowWindow = Window.orderBy("DEADLINES").rangeBetween(Window.unboundedPreceding, Window.currentRow)
val result = united
.withColumn("dt_count", sum("weight").over(fromStartToCurrentRowWindow))
.where($"weight" === lit(0))
.drop("weight")
Output:
---------- --------
|DEADLINES |dt_count|
---------- --------
|2018-08-10|0 |
|2021-06-22|4 |
|2021-10-11|5 |
|2021-11-13|7 |
|2021-12-18|7 |
|2022-03-28|7 |
|2023-07-15|7 |
---------- --------
Note: calculation will be executed in one partition, Spark shows such warning: WARN Logging - No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Also other solution possible, joining two dataframes by range, which leads to cartesian join.
CodePudding user response:
If you have a small number of deadline dates, you can:
- add one column by deadline date on
dates_df
dataframe, with value is1
whenDT_DATE
is betweenref_date
and deadline date and0
otherwise - then sum each deadline date columns
- finally transpose result dataframe to obtain your desired dataframe
Let's see step by step
Add one column by deadline date:
from pyspark.sql import functions as F
deadline_rows = deadlines_df.collect()
dates_with_deadlines = dates_df
for row in deadline_rows:
dates_with_deadlines = dates_with_deadlines.withColumn(
str(row.DEADLINES),
F.when(
dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
.otherwise(
F.lit(0)
)
)
And you get, with your example, the following dates_with_deadlines
dataframe:
---------- ---------- ---------- ---------- ---------- ---------- ---------- ----------
|DT_DATE |2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
---------- ---------- ---------- ---------- ---------- ---------- ---------- ----------
|2021-04-02|1 |0 |1 |1 |1 |1 |1 |
|2021-04-21|1 |0 |1 |1 |1 |1 |1 |
|2021-05-01|1 |0 |1 |1 |1 |1 |1 |
|2021-06-03|1 |0 |1 |1 |1 |1 |1 |
|2021-09-07|1 |0 |1 |0 |1 |1 |1 |
|2021-10-12|1 |0 |1 |0 |1 |0 |1 |
|2021-11-02|1 |0 |1 |0 |1 |0 |1 |
---------- ---------- ---------- ---------- ---------- ---------- ---------- ----------
Sum deadlines
aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])
After this step, you get the following aggregated_df
dataframe:
---------- ---------- ---------- ---------- ---------- ---------- ----------
|2023-07-15|2018-08-10|2022-03-28|2021-06-22|2021-12-18|2021-10-11|2021-11-13|
---------- ---------- ---------- ---------- ---------- ---------- ----------
|7 |0 |7 |4 |7 |5 |7 |
---------- ---------- ---------- ---------- ---------- ---------- ----------
Transpose dataframe
result_df = aggregated_df \
.withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
.drop(*[str(x.DEADLINES) for x in deadline_rows]) \
.withColumn('data', F.explode('merged')) \
.drop('merged') \
.withColumn('DEADLINES', F.col('data.DEADLINES')) \
.withColumn('dt_count', F.col('data.dt_count')) \
.drop('data')
And you have your expected result_df
dataframe:
---------- --------
|DEADLINES |dt_count|
---------- --------
|2023-07-15|7 |
|2018-08-10|0 |
|2022-03-28|7 |
|2021-06-22|4 |
|2021-12-18|7 |
|2021-10-11|5 |
|2021-11-13|7 |
---------- --------
Complete Code
from pyspark.sql import functions as F
deadline_rows = deadlines_df.collect()
dates_with_deadlines = dates_df
for row in deadline_rows:
dates_with_deadlines = dates_with_deadlines.withColumn(
str(row.DEADLINES),
F.when(
dates_df.DT_DATE.between(ref_date, row.DEADLINES), F.lit(1))
.otherwise(
F.lit(0)
)
)
aggregated_df = dates_with_deadlines.agg(*[F.sum(str(x.DEADLINES)).alias(str(x.DEADLINES)) for x in deadline_rows])
result_df = aggregated_df \
.withColumn('merged', F.array(*[F.struct(F.lit(x.DEADLINES).alias('DEADLINES'), F.col(str(x.DEADLINES)).alias('dt_count')) for x in deadline_rows])) \
.drop(*[str(x.DEADLINES) for x in deadline_rows]) \
.withColumn('data', F.explode('merged')) \
.drop('merged') \
.withColumn('DEADLINES', F.col('data.DEADLINES')) \
.withColumn('dt_count', F.col('data.dt_count')) \
.drop('data')
Advantages and limits of this solution
With this solution, the only step that cannot be done using a distributed system is the transpose step.
Moreover, instead of your current solution, we perform all aggregation for each deadline column in parallele, and not sequentially.
However, this solutions works only if there are few deadline dates (hundreds, maybe thousands deadline dates), first because we retrieve all those deadline dates in the Spark driver with .collect()
, second because in first step we create one column per deadline date, creating rows with lot of data, and finally because the last step is also executed on only one executor.