Home > database >  Flattening map<string,string> column in spark scala
Flattening map<string,string> column in spark scala

Time:10-08

Below is my source schema.

root
 |-- header: struct (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- honame: string (nullable = true)
 |-- device: struct (nullable = true)
 |    |-- srcId: string (nullable = true)
 |    |-- srctype.: string (nullable = true)
 |-- ATTRIBUTES: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- event_date: date (nullable = true)
 |-- event_datetime: string (nullable = true)

I want to explode the ATTRIBUTES map type column and select all the columns which ends with _id.

Im using the below code.

val exploded = batch_df.select($"event_date", explode($"ATTRIBUTES")).show()

I am getting the below sample output.

--- ---------- -------------------- -------------------- 
|date       |                 key|               value|
 ---------- -------------------- -------------------- 
|2021-05-18|SYST_id             |                  85|
|2021-05-18|RECVR_id            |                   1|
|2021-05-18|Account_Id|         |               12345|
|2021-05-18|Vb_id               |                 845|
|2021-05-18|SYS_INFO_id         |                 640|
|2021-05-18|mem_id              |                 456|
------------------------------------------------------

However, my required output is as below.

 --- ------- -------------- ----------- ------------ ------- ------------- ------- 
|date       | SYST_id      |  RECVR_id | Account_Id | Vb_id |  SYS_INFO_id| mem_id|
 ---- ------ -------------- ----------- ------------ ------- ------------- ------- 
|2021-05-18|  85           |  1        |   12345    |  845  |     640     | 456   |
 ----------- -------------- ----------- ------------ ------- ------------- ------- 

Could someone pls assist.

CodePudding user response:

Your approach works. You only have to add a pivot operation after the explode:

import org.apache.spark.sql.functions._

exploded.groupBy("date").pivot("key").agg(first("value")).show()

I assume that the combination of date and key is unique, so it is safe to take the first (and only) value in the aggregation. If the combination is not unique, you could use collect_list as aggregation function.


Edit:

To add scrId and srctype, simply add these columns to the select statement:

val exploded = batch_df.select($"event_date", $"device.srcId", $"device.srctype", explode($"ATTRIBUTES"))

To reduce the number of columns after the pivot operation, apply a filter on the key column before aggregating:

val relevant_cols = Array("Account_Id", "Vb_id", "RECVR_id", "mem_id") // the four additional columns

exploded.filter($"key".isin(relevant_cols:_*).or($"key".endsWith(lit("_split"))))
    .groupBy("date").pivot("key").agg(first("value")).show()
  • Related