Home > Back-end >  drop the latest date from a range of rows in spark dataframe
drop the latest date from a range of rows in spark dataframe

Time:07-20

I have the following dataframe:

 ---------- ---------- -------------------- -------------------- --------- 
|  fs_date |   ss_date|       request      |            response|full_date|
 ---------- ---------- -------------------- -------------------- --------- 
|2022-06-01|2022-06-02|[[[TLV, NYC, 2022...|[[[false, [1262.1...|2022-5-25|
|2022-06-01|2022-06-03|[[[TLV, NYC, 2022...|[[[false, [1226.6...|2022-5-28|
|2022-06-01|2022-06-03|[[[TLV, NYC, 2022...|[[[false, [3746.6...|2022-5-28|
|2022-06-01|2022-06-04|[[[TLV, NYC, 2022...|[[[false, [878.63...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [777.81...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [746.58...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [777.81...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [695.28...|2022-5-26|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [593.63...|2022-5-25|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [687.28...|2022-5-29|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [687.28...|2022-5-28|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [687.28...|2022-5-28|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [687.38...|2022-5-26|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [789.88...|2022-5-25|

For each date combination I want to have only the latest responses (by "full date").

for example, for |2022-06-01|2022-06-05| I want to have only responses from 2022-5-29.

for 2022-06-01|2022-06-03 only 2022-5-28, etc...

expected output:

 ---------- ---------- -------------------- -------------------- --------- 
|  fs_date |   ss_date|       request      |            response|full_date|
 ---------- ---------- -------------------- -------------------- --------- 
|2022-06-01|2022-06-02|[[[TLV, NYC, 2022...|[[[false, [1262.1...|2022-5-25|
|2022-06-01|2022-06-03|[[[TLV, NYC, 2022...|[[[false, [1226.6...|2022-5-28|
|2022-06-01|2022-06-03|[[[TLV, NYC, 2022...|[[[false, [3746.6...|2022-5-28|
|2022-06-01|2022-06-04|[[[TLV, NYC, 2022...|[[[false, [878.63...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [777.81...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [746.58...|2022-5-29|
|2022-06-01|2022-06-05|[[[TLV, NYC, 2022...|[[[false, [777.81...|2022-5-29|
|2022-06-01|2022-06-06|[[[TLV, NYC, 2022...|[[[false, [687.28...|2022-5-29|

Thanks!

CodePudding user response:

Assume your table is called data, the idea is to first create another table where we group by fs_date and ss_date then max aggregate on full_date. This way we get all the rows that have maximum full_date values:

val otherOne = data.groupBy("fs_date", "ss_date").agg(max("full_date").as("full_date"))

After this step, we inner join with the main table again (so we can filter out unwanted rows), as:

data.join(otherOne, Seq("fs_date", "ss_date", "full_date"), "inner")
  .orderBy("fs_date", "ss_date") // for the sake of matching results

This should give you what you want! The solution is done in Scala but I hope you get the idea.

A sample result of the output (with sample input data also):

 ---------- ---------- --------- ------- -------- 
|   fs_date|   ss_date|full_date|request|response|
 ---------- ---------- --------- ------- -------- 
|2022-06-01|2022-06-03|2022-5-28|      R|       R|
|2022-06-01|2022-06-03|2022-5-28|      R|       R|
|2022-06-01|2022-06-04|2022-5-29|      R|       R|
|2022-06-01|2022-06-05|2022-5-29|      R|       R|
|2022-06-01|2022-06-05|2022-5-29|      R|       R|
|2022-06-01|2022-06-05|2022-5-29|      R|       R|
|2022-06-01|2022-06-06|2022-5-29|      R|       R|
 ---------- ---------- --------- ------- -------- 

CodePudding user response:

This is a good example of when to use a window function -- a function that can operate within an aggregation.

date_window = W.partitionBy(["fs_date", "ss_date"]).orderBy(F.col("full_date").desc())
df2 = (
    df.withColumn("row", F.dense_rank().over(date_window))
    .filter(F.col("row") == 1).drop("row")
)

We create a partition to define the window we'll be working with, then we do a dense_rank (which will rank the values in full_date while repeating numbers where there is a tie), then we filter the top ranked rows and drop our window column.

  • Related