Home > Software engineering >  How to explode column with multiple records into multiple Columns in Spark
How to explode column with multiple records into multiple Columns in Spark

Time:08-01

I'm new using Spark and Scala and would like to get some help about this situation: This is my current schema.

 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- horizon: double (nullable = true)
 |-- risk_table: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- index: string (nullable = true)
 |    |    |-- risk_buy: double (nullable = true)
 |    |    |-- reward_buy: double (nullable = true)
 |    |    |-- risk_sell: double (nullable = true)
 |    |    |-- reward_sell: double (nullable = true)
 |-- symbol_id: string (nullable = true)

Here is an example of how looks the data:

 -------------------- 
|          risk_table|
 -------------------- 
|[{count, 201.0, 2...|
|[{count, 219.0, 2...|
|[{count, 119.0, 1...|
|[{count, 217.0, 2...|
|[{count, 17.0, 17...|
|[{count, 189.0, 1...|
|[{count, 105.0, 1...|
|[{count, 188.0, 1...|
|[{count, 111.0, 1...|
|[{count, 276.0, 2...|
|[{count, 70.0, 70...|
|[{count, 121.0, 1...|
|[{count, 133.0, 1...|
|[{count, 116.0, 1...|
|[{count, 70.0, 70...|
|[{count, 193.0, 1...|
|[{count, 131.0, 1...|
|[{count, 93.0, 93...|
|[{count, 84.0, 84...|
|[{count, 114.0, 1...|
 -------------------- 

I would like to explode risk_table column values into multiples columns, usually have 4 nested documents/dictionaries where the index name change, so the expected output would look something like this

 ----------- ------ --------- ------------------ -------------------- ----- --------------------- 
| symbol_id | date | index_0 | risk_buy_index_0 | reward_buy_index_0 | ... | reward_sell_index_3 |
 ----------- ------ --------- ------------------ -------------------- ----- --------------------- 
| APPL      | xxxx | 0       | 0                | 0                  | ... | 0                   |
 ----------- ------ --------- ------------------ -------------------- ----- --------------------- 
| APPL      | xxxx | 0       | 0                | 0                  | ... | 0                   |
 ----------- ------ --------- ------------------ -------------------- ----- --------------------- 
| APPL      | xxxx | 0       | 0                | 0                  | ... | 0                   |
 ----------- ------ --------- ------------------ -------------------- ----- --------------------- 


I found some information about how to explode only one document/dictionary but not nested, I'll highly appreciate if can someone help with that.

UPDATE #1:

After @vilalabinot answer, this is the returned dataframe:

|index_0|    risk_buy_index_0|  reward_buy_index_0|
 ------- -------------------- -------------------- 
|  count|               201.0|               201.0|
|   mean|-0.00842858807942...|0.034462956359400186|
|    std|0.010321886923670486|0.024028309176849814|
|    min|-0.04742597827704211|                -0.0|
|    25%|-0.01445728455890...|0.018636627472515977|
|    50%|-0.00424808836023...|0.029910269192422685|
|    75%|                 0.0| 0.04336544006150825|
|    max|                -0.0|   0.141510207428056|
|  count|               219.0|               219.0|
|   mean|-0.00825181843661...| 0.03181657870541232|
|    std|0.009920846095541787| 0.02024399549501371|
|    min|-0.04707151894023976|0.002521912798573491|
|    25%|-0.01330755207577...|0.015617730900798106|
|    50%|-0.00475774347023...|0.026639344262294966|
|    75%|                -0.0| 0.04315554182360575|
|    max|                -0.0| 0.11667197234981128|
|  count|               119.0|               119.0|
|   mean|-0.01337031203096...|0.049558424443669605|
|    std| 0.01942541767615014|0.036681645417330024|
|    min| -0.0951268206361449|0.004135946772163351|
 ------- -------------------- -------------------- 

Instead, I would like to get something like this, where the values of records inside risk_table will be appended as a column rather than rows.

 ----------- ------ ------- ----- ----- ----- ----- ------------ ------------- 
| symbol_id | mean | count | min | max | 50% | 75% | reward_buy | reward_sell |
 ----------- ------ ------- ----- ----- ----- ----- ------------ ------------- 
|           | ...  | ...   | ... | ... | ... | ... | ....       | ...         |
 ----------- ------ ------- ----- ----- ----- ----- ------------ ------------- 
| ...       | ...  | ...   | ... | ... | ... | ... | ...        | ...         |
 ----------- ------ ------- ----- ----- ----- ----- ------------ ------------- 
| ...       | ...  | ...   | ... | ... | ... | ... | ...        | ...         |
 ----------- ------ ------- ----- ----- ----- ----- ------------ ------------- 

CodePudding user response:

Assume your dataset is main. First, we have to explode the content of risk_table, since if we don't, we will get arrays as values of columns, which we do not like, so:

df1 = df1.withColumn("explode", explode(col("risk_table")))

now, explode column has one object per row; there are a lot of ways to create columns from objects, but I like to use the selectExpr:

.selectExpr("id", "symbol_id", // or whatever other field you like
  "explode.index as index_0",  // then target the key with dot operator
  "explode.risk_buy as risk_buy_index_0",
  "explode.reward_buy as reward_buy_index_0"
  // add your other wanted values
)

Dummy input:

 -------------------------- --- --------- 
|risk_table                |id |symbol_id|
 -------------------------- --- --------- 
|[{1, 0.25, 0.3, 0.1, 0.3}]|1  |1        |
 -------------------------- --- --------- 

Final output:

 --- --------- ------- ---------------- ------------------ 
| id|symbol_id|index_0|risk_buy_index_0|reward_buy_index_0|
 --- --------- ------- ---------------- ------------------ 
|  1|        1|      1|            0.25|               0.3|
 --- --------- ------- ---------------- ------------------ 

CodePudding user response:

Solution:

After call the explode function from vilalabinot answer, I just had to group by date and then pivot the field of the records inside risk_table, something like this:

df1.groupBy(col("date")).pivot(col("index_0"))
     .agg(first(col("risk_buy_index_0")), first(col("reward_buy_index_0")))
  • Related