Home > Back-end >  Pyspark get rows with max value for a column over a window
Pyspark get rows with max value for a column over a window

Time:07-30

I have a dataframe as follows:

| created        | id   | date       |value|
| 1650983874871  | x    | 2020-05-08 | 5   |
| 1650367659030  | x    | 2020-05-08 | 3   |
| 1639429213087  | x    | 2020-05-08 | 2   |
| 1650983874871  | x    | 2020-06-08 | 5   |
| 1650367659030  | x    | 2020-06-08 | 3   |
| 1639429213087  | x    | 2020-06-08 | 2   |

I want to get max of created for every date. The table should look like :

    | created        | id   | date       |value|
    | 1650983874871  | x    | 2020-05-08 | 5   |
    | 1650983874871  | x    | 2020-06-08 | 5   |

I tried:

df2 = (
        df
        .groupby(['id', 'date'])
        .agg(
            F.max(F.col('created')).alias('created_max')
            
        )
df3 = df.join(df2, on=['id', 'date'], how='left')

But this is not working as expected. Can anyone help me.

CodePudding user response:

You need to make two changes.

  1. The join condition needs to include created as well. Here I have changed alias to alias("created") to make the join easier. This will ensure a unique join condition (if there are no duplicate created values).
  2. The join type must be inner.
df2 = (
        df
        .groupby(['id', 'date'])
        .agg(
            F.max(F.col('created')).alias('created')
        )
)
df3 = df.join(df2, on=['id', 'date','created'], how='inner')

df3.show()

 --- ---------- ------------- ----- 
| id|      date|      created|value|
 --- ---------- ------------- ----- 
|  x|2020-05-08|1650983874871|    5|
|  x|2020-06-08|1650983874871|    5|
 --- ---------- ------------- ----- 

CodePudding user response:

Instead of using the group by and joining, you can also use the Window in pyspark.sql:

from pyspark.sql import functions as func
from pyspark.sql.window import Window

df = df\
     .withColumn('max_created', func.max('created').over(Window.partitionBy('date', 'id')))\
     .filter(func.col('created')==func.col('max_created'))\
     .drop('max_created')

Step:

  1. Get the max value based on the Window
  2. Filter the row by using the matched timestamp
  • Related