Home > Enterprise >  How does Spark SQL implement the group by aggregate
How does Spark SQL implement the group by aggregate

Time:05-03

How does Spark SQL implement the group by aggregate? I want to group by name field and based on the latest data to get the latest salary. How to write the SQL

The data is:

    ------- ------| ---------|
// | name  |salary|date      |
//  ------- ------| ---------|
// |AA     |  3000|2022-01   |
// |AA     |  4500|2022-02   |
// |BB     |  3500|2022-01   |
// |BB     |  4000|2022-02   |
//  ------- ------ ----------|

The expected result is:

    ------- ------|
// | name  |salary|
//  ------- ------|
// |AA     |  4500|
// |BB     |  4000|
//  ------- ------ 

CodePudding user response:

Assuming that the dataframe is registered as a temporary view named tmp, first use the row_number windowing function for each group (name) in reverse order by date Assign the line number (rn), and then take all the lines with rn=1.

sql = """
    select name, salary from
        (select *, row_number() over (partition by name order by date desc) as rn
        from tmp)
    where rn = 1
"""
df = spark.sql(sql)
df.show(truncate=False)

CodePudding user response:

  1. First convert your string to a date.
  2. Covert the date to an UNixTimestamp.(number representation of a date, so you can use Max)
  3. User "First" as an aggregate function that retrieves a value of your aggregate results. (The first results, so if there is a date tie, it could pull either one.)

:

simpleData = [("James","Sales","NY",90000,34,'2022-02-01'),
    ("Michael","Sales","NY",86000,56,'2022-02-01'),
    ("Robert","Sales","CA",81000,30,'2022-02-01'),
    ("Maria","Finance","CA",90000,24,'2022-02-01'),
    ("Raman","Finance","CA",99000,40,'2022-03-01'),
    ("Scott","Finance","NY",83000,36,'2022-04-01'),
    ("Jen","Finance","NY",79000,53,'2022-04-01'),
    ("Jeff","Marketing","CA",80000,25,'2022-04-01'),
    ("Kumar","Marketing","NY",91000,50,'2022-05-01')
  ]

schema = ["employee_name","name","state","salary","age","updated"]
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)
df.withColumn(
  "dateUpdated", 
  unix_timestamp(
    to_date(
      col("updated") ,
      "yyyy-MM-dd"
    )
  ) 
).groupBy("name")
 .agg( 
   max("dateUpdated"), 
   first("salary").alias("Salary") 
 ).show()
 --------- ---------------- ------ 
|     name|max(dateUpdated)|Salary|
 --------- ---------------- ------ 
|    Sales|      1643691600| 90000|
|  Finance|      1648785600| 90000|
|Marketing|      1651377600| 80000|
 --------- ---------------- ------ 
  • Related