Home > other >  Do Spark Window functions work independently per partition?
Do Spark Window functions work independently per partition?

Time:11-23

I'm trying to get the latest row for each day for each some_guid. For example, I have the following data, everything is sorted by item_time by descending:

 ---------- -------------------- ------------- 
| file_date|           some_guid|    item_time|
 ---------- -------------------- ------------- 
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1632637545493|
|2021-11-22|22549ca165d88ffd2...|1632723945493|
|2021-11-22|22549ca165d88ffd2...|1632810345493|
|2021-11-22|22549ca165d88ffd2...|1632896745493|
|2021-11-22|22549ca165d88ffd2...|1632983145493|
|2021-11-22|22549ca165d88ffd2...|1633069545493|
|2021-11-22|22549ca165d88ffd2...|1633155945493|
|2021-11-22|22549ca165d88ffd2...|1633242345493|
|2021-11-22|22549ca165d88ffd2...|1633328745493|
|2021-11-22|22549ca165d88ffd2...|1633415145493|
|2021-11-22|22549ca165d88ffd2...|1633501545493|
|2021-11-22|22549ca165d88ffd2...|1633587945493|
|2021-11-22|22549ca165d88ffd2...|1633674345493|
|2021-11-22|22549ca165d88ffd2...|1633760745493|
|2021-11-22|22549ca165d88ffd2...|1633847145493|

As you see all fields in item_time are different. Then I apply the following transformation:

daily_window = Window.partitionBy('file_date', 'some_guid').orderBy(col('item_time').desc())
df.select('file_date','some_guid', first('item_time').over(daily_window).alias('item_time'))

And get the following result:

 ---------- -------------------- ------------- 
| file_date|           some_guid|    item_time|
 ---------- -------------------- ------------- 
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|
|2021-11-22|22549ca165d88ffd2...|1637562345493|

There are many duplicates, but I'm expecting only one row. Why does this happen? Is window function performed in each partition and give same row, which is printed later as many times as the partitions I have?

CodePudding user response:

You are aggregating by fill_date and some_guid, and looking at your data you only have one group:

fill_date some_guid
2021-11-22 22549ca165d88ffd2...

(we don't see the rest of some_guid and appears to be the same to all rows)

Then, its apply the first value for all the rows. Until here its seems correct.

But, I would recommend you try withColumn() instead of select:

df.withColumn('item_time'), first('item_time').over(daily_window))

Edit:

If you expect only one row you want to use groupby. Follow the previous answer: https://stackoverflow.com/a/70081054/13960095

Windows is when you want to all the rows of the group have a calculated value based on the group values.

CodePudding user response:

use groupBy:

df.groupBy('file_date','some_guid').agg(max('item_time'))

Or use window-functions (i.e. with rank/row_number) to enumerate the records, then use where/filter to select the desired records

  • Related