I have the following dataframe
date | value | ID
--------------------------------------
2021-12-06 15:00:00 25 1
2021-12-06 15:15:00 35 1
2021-11-30 00:00:00 20 2
2021-11-25 00:00:00 10 2
I want to join this DF with another one like this:
idUser | Name | Gender
-------------------
1 John M
2 Anne F
My expected output is:
ID | Name | Gender | Value
---------------------------
1 John M 35
2 Anne F 20
What I need is: Get only the most recent value of the first dataframe and join only this value with my second dataframe. Although, my spark script is joining both values:
My code:
df = df1.select(
col("date"),
col("value"),
col("ID"),
).OrderBy(
col("ID").asc(),
col("date").desc(),
).groupBy(
col("ID"), col("date").cast(StringType()).substr(0,10).alias("date")
).agg (
max(col("value")).alias("value")
)
final_df = df2.join(
df,
(col("idUser") == col("ID")),
how="left"
)
When i perform this join (formating the columns is abstracted in this post) I have the following output:
ID | Name | Gender | Value
---------------------------
1 John M 35
2 Anne F 20
2 Anne F 10
I use substr
to remove hours and minutes to filter only by date. But when I have the same ID in different days my output df has the 2 values instead of the most recently. How can I fix this?
Note: I'm using only pyspark functions to do this (I now want to use spark.sql(...)
).
CodePudding user response:
You can use window
and row_number
function in pysaprk
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window.partitionBy("ID").orderBy("date").desc()
df1_latest_val = df1.withColumn("row_number", row_number().over(windowSpec)).filter(
f.col("row_number") == 1
)
The output of table df1_latest_val
will look something like this
date | value | ID | row_number |
-----------------------------------------------------
2021-12-06 15:15:00 35 1 1
2021-11-30 00:00:00 20 2 1
Now you will have df with the latest val, which you can directly join with another table.