How does Spark SQL implement the group by aggregate? I want to group by name field and based on the latest data to get the latest salary. How to write the SQL
The data is:
------- ------| ---------|
// | name |salary|date |
// ------- ------| ---------|
// |AA | 3000|2022-01 |
// |AA | 4500|2022-02 |
// |BB | 3500|2022-01 |
// |BB | 4000|2022-02 |
// ------- ------ ----------|
The expected result is:
------- ------|
// | name |salary|
// ------- ------|
// |AA | 4500|
// |BB | 4000|
// ------- ------
CodePudding user response:
Assuming that the dataframe is registered as a temporary view named tmp
, first use the row_number
windowing function for each group (name
) in reverse order by date
Assign the line number (rn
), and then take all the lines with rn=1
.
sql = """
select name, salary from
(select *, row_number() over (partition by name order by date desc) as rn
from tmp)
where rn = 1
"""
df = spark.sql(sql)
df.show(truncate=False)
CodePudding user response:
- First convert your string to a date.
- Covert the date to an UNixTimestamp.(number representation of a date, so you can use Max)
- User "First" as an aggregate function that retrieves a value of your aggregate results. (The first results, so if there is a date tie, it could pull either one.)
:
simpleData = [("James","Sales","NY",90000,34,'2022-02-01'),
("Michael","Sales","NY",86000,56,'2022-02-01'),
("Robert","Sales","CA",81000,30,'2022-02-01'),
("Maria","Finance","CA",90000,24,'2022-02-01'),
("Raman","Finance","CA",99000,40,'2022-03-01'),
("Scott","Finance","NY",83000,36,'2022-04-01'),
("Jen","Finance","NY",79000,53,'2022-04-01'),
("Jeff","Marketing","CA",80000,25,'2022-04-01'),
("Kumar","Marketing","NY",91000,50,'2022-05-01')
]
schema = ["employee_name","name","state","salary","age","updated"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)
df.withColumn(
"dateUpdated",
unix_timestamp(
to_date(
col("updated") ,
"yyyy-MM-dd"
)
)
).groupBy("name")
.agg(
max("dateUpdated"),
first("salary").alias("Salary")
).show()
--------- ---------------- ------
| name|max(dateUpdated)|Salary|
--------- ---------------- ------
| Sales| 1643691600| 90000|
| Finance| 1648785600| 90000|
|Marketing| 1651377600| 80000|
--------- ---------------- ------