Below is my source schema.
root
|-- header: struct (nullable = true)
| |-- timestamp: long (nullable = true)
| |-- id: string (nullable = true)
| |-- honame: string (nullable = true)
|-- device: struct (nullable = true)
| |-- srcId: string (nullable = true)
| |-- srctype.: string (nullable = true)
|-- ATTRIBUTES: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- event_date: date (nullable = true)
|-- event_datetime: string (nullable = true)
I want to explode the ATTRIBUTES map type column and select all the columns which ends with _id.
Im using the below code.
val exploded = batch_df.select($"event_date", explode($"ATTRIBUTES")).show()
I am getting the below sample output.
--- ---------- -------------------- --------------------
|date | key| value|
---------- -------------------- --------------------
|2021-05-18|SYST_id | 85|
|2021-05-18|RECVR_id | 1|
|2021-05-18|Account_Id| | 12345|
|2021-05-18|Vb_id | 845|
|2021-05-18|SYS_INFO_id | 640|
|2021-05-18|mem_id | 456|
------------------------------------------------------
However, my required output is as below.
--- ------- -------------- ----------- ------------ ------- ------------- -------
|date | SYST_id | RECVR_id | Account_Id | Vb_id | SYS_INFO_id| mem_id|
---- ------ -------------- ----------- ------------ ------- ------------- -------
|2021-05-18| 85 | 1 | 12345 | 845 | 640 | 456 |
----------- -------------- ----------- ------------ ------- ------------- -------
Could someone pls assist.
CodePudding user response:
Your approach works. You only have to add a pivot
operation after the explode
:
import org.apache.spark.sql.functions._
exploded.groupBy("date").pivot("key").agg(first("value")).show()
I assume that the combination of date
and key
is unique, so it is safe to take the first
(and only) value in the aggregation. If the combination is not unique, you could use collect_list
as aggregation function.
Edit:
To add scrId
and srctype
, simply add these columns to the select
statement:
val exploded = batch_df.select($"event_date", $"device.srcId", $"device.srctype", explode($"ATTRIBUTES"))
To reduce the number of columns after the pivot
operation, apply a filter on the key
column before aggregating:
val relevant_cols = Array("Account_Id", "Vb_id", "RECVR_id", "mem_id") // the four additional columns
exploded.filter($"key".isin(relevant_cols:_*).or($"key".endsWith(lit("_split"))))
.groupBy("date").pivot("key").agg(first("value")).show()