Home > Software design >  Replace column values based on the max Spark Scala
Replace column values based on the max Spark Scala

Time:09-28

Suppose I ave a Dataset that looks like this:

val data1 = Seq(
  ("81518165", "10", "0412221432", "2021.02.01 12:29:57"),
  ("81518165", "10", "0412392873", "2021.02.01 11:33:41"),
  ("81518165", "10", "0412392879", "2021.02.01 05:12:12"),
  ("81518165", "10", "0412392950", "2021.02.01 01:39:37"),
  ("23698102", "12", "0412221432", "2021.02.14 12:55:33"),
  ("23698102", "12", "0412392873", "2021.02.14 11:33:37"),
  ("23698102", "12", "0412392879", "2021.02.14 05:12:00")
)

val df1 = data1.toDF("AUFTRAG", "AUFTRAG_POS", "IID_CODE", "ERST_TIMESTAMP")

I want to remove duplicate rows, caused by dates, by aggregating the columns "AUFTRAG" and "AUFTRAG_POS" based on the maximum date "ERST_TIMESTAMP". To have the max date, this is my code :

df1.withColumn("ERST_TIMESTAMP", to_timestamp(col("ERST_TIMESTAMP"),"yyyy.MM.dd HH:mm:ss"))
  .groupBy("AUFTRAG", "AUFTRAG_POS")
  .agg(max("ERST_TIMESTAMP"))
  .show()

The is the result as expected :

 -------- ----------- ------------------- 
| AUFTRAG|AUFTRAG_POS|max(ERST_TIMESTAMP)|
 -------- ----------- ------------------- 
|81518165|         10|2021-02-01 12:29:57|
|23698102|         12|2021-02-14 12:55:33|
 -------- ----------- ------------------- 

My objectif now is to replace the ERST_TIMESTAMP, grouped by "AUFTRAG" and "AUFTRAG_POS" by this max date. This is my solution :

val df2 = df1.withColumn("ERST_TIMESTAMP", to_timestamp(col("ERST_TIMESTAMP"),"yyyy.MM.dd HH:mm:ss"))
  .groupBy("AUFTRAG", "AUFTRAG_POS")
  .agg(max("ERST_TIMESTAMP"))


df1.join(df2, Seq("AUFTRAG", "AUFTRAG_POS")).show()

The expected result, exactly as I want :

enter image description here

I'm not very satisfied by this method. Is there another way ? Any help please ?

CodePudding user response:

You can use the Window function for this as below

import org.apache.spark.sql.functions._
val window = Window.partitionBy("AUFTRAG", "AUFTRAG_POS")

df1.withColumn("ERST_TIMESTAMP", to_timestamp(col("ERST_TIMESTAMP"),"yyyy.MM.dd HH:mm:ss"))
  .withColumn("ERST_TIMESTAMP", max("ERST_TIMESTAMP").over(window))
  .show(false)

Output:

 -------- ----------- ---------- ------------------- 
|AUFTRAG |AUFTRAG_POS|IID_CODE  |ERST_TIMESTAMP     |
 -------- ----------- ---------- ------------------- 
|81518165|10         |0412221432|2021-02-01 12:29:57|
|81518165|10         |0412392873|2021-02-01 12:29:57|
|81518165|10         |0412392879|2021-02-01 12:29:57|
|81518165|10         |0412392950|2021-02-01 12:29:57|
 -------- ----------- ---------- ------------------- 
  • Related