Home > Mobile >  Filter DataFrame to delete duplicate values in pyspark
Filter DataFrame to delete duplicate values in pyspark

Time:12-09

I have the following dataframe

date                |   value    | ID
--------------------------------------
2021-12-06 15:00:00      25        1
2021-12-06 15:15:00      35        1
2021-11-30 00:00:00      20        2
2021-11-25 00:00:00      10        2

I want to join this DF with another one like this:

idUser | Name | Gender
-------------------
1       John    M
2       Anne    F

My expected output is:

ID | Name | Gender | Value
---------------------------
1    John    M        35
2    Anne    F        20

What I need is: Get only the most recent value of the first dataframe and join only this value with my second dataframe. Although, my spark script is joining both values:

My code:

df = df1.select(
   col("date"),
   col("value"),
   col("ID"),
).OrderBy(
   col("ID").asc(),
   col("date").desc(),
).groupBy(
   col("ID"), col("date").cast(StringType()).substr(0,10).alias("date")
).agg (
   max(col("value")).alias("value")
)

final_df = df2.join(
    df,
    (col("idUser") == col("ID")),
    how="left"
)

When i perform this join (formating the columns is abstracted in this post) I have the following output:

ID | Name | Gender | Value
---------------------------
1    John    M        35
2    Anne    F        20
2    Anne    F        10

I use substr to remove hours and minutes to filter only by date. But when I have the same ID in different days my output df has the 2 values instead of the most recently. How can I fix this?

Note: I'm using only pyspark functions to do this (I now want to use spark.sql(...)).

CodePudding user response:

You can use window and row_number function in pysaprk

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.partitionBy("ID").orderBy("date").desc()

df1_latest_val = df1.withColumn("row_number", row_number().over(windowSpec)).filter(
    f.col("row_number") == 1
)

The output of table df1_latest_val will look something like this

date                |   value    | ID | row_number |
-----------------------------------------------------
2021-12-06 15:15:00      35        1        1
2021-11-30 00:00:00      20        2        1

Now you will have df with the latest val, which you can directly join with another table.

  • Related