I have a dataframe like below:
Rowkey timestamp col_1 col_2 col_3.... col_n
1234 165789 20 null 30 ... null
1234 155789 20 20 null ... 40
1234 145789 20 10 30 ... 50
and except to transform it into following dataframe:
Rowkey timestamp col_1 col_2 col_3.... col_n
1234 165789 20 20 30 ... 40
I want latest timestamp. Also, if a cell is null
, and following cell with same Rowkey
has value, then that value should be used.
I am using Spark with Scala.
CodePudding user response:
Here's my take:
Use a Window function to select the first non-null value of each Rowkey
partition, ordered by timestamp
- then drop duplicates to have only one row per Rowkey
.
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val simpleData: Seq[(String, Integer,Integer,Integer,Integer,Integer)] = Seq(
("1234",165789,20,null,30, null),
("1234",155789,10,20,null, 40),
("1234",145789,2,10,30, 50),
("123e4",145789,2,10,30, 50)
)
val someDF = simpleData.toDF("Rowkey","timestamp","col_1","col_2","col_3","col_4")
someDF.show()
val listCols= List("Rowkey","timestamp","col_1","col_2","col_3","col_4")
val windowSpec = Window.partitionBy("Rowkey").orderBy($"timestamp".desc).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
someDF.select(
listCols.map(m=> first(m, true)
.over(windowSpec).alias(m)
) :_*
)
.dropDuplicates()
.show()
Result:
------ --------- ----- ----- ----- -----
|Rowkey|timestamp|col_1|col_2|col_3|col_4|
------ --------- ----- ----- ----- -----
| 1234| 165789| 20| null| 30| null|
| 1234| 155789| 10| 20| null| 40|
| 1234| 145789| 2| 10| 30| 50|
| 123e4| 145789| 2| 10| 30| 50|
------ --------- ----- ----- ----- -----
------ --------- ----- ----- ----- -----
|Rowkey|timestamp|col_1|col_2|col_3|col_4|
------ --------- ----- ----- ----- -----
| 1234| 165789| 20| 20| 30| 40|
| 123e4| 145789| 2| 10| 30| 50|
------ --------- ----- ----- ----- -----