I'm new using Spark and Scala and would like to get some help about this situation: This is my current schema.
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- date: timestamp (nullable = true)
|-- horizon: double (nullable = true)
|-- risk_table: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- index: string (nullable = true)
| | |-- risk_buy: double (nullable = true)
| | |-- reward_buy: double (nullable = true)
| | |-- risk_sell: double (nullable = true)
| | |-- reward_sell: double (nullable = true)
|-- symbol_id: string (nullable = true)
Here is an example of how looks the data:
--------------------
| risk_table|
--------------------
|[{count, 201.0, 2...|
|[{count, 219.0, 2...|
|[{count, 119.0, 1...|
|[{count, 217.0, 2...|
|[{count, 17.0, 17...|
|[{count, 189.0, 1...|
|[{count, 105.0, 1...|
|[{count, 188.0, 1...|
|[{count, 111.0, 1...|
|[{count, 276.0, 2...|
|[{count, 70.0, 70...|
|[{count, 121.0, 1...|
|[{count, 133.0, 1...|
|[{count, 116.0, 1...|
|[{count, 70.0, 70...|
|[{count, 193.0, 1...|
|[{count, 131.0, 1...|
|[{count, 93.0, 93...|
|[{count, 84.0, 84...|
|[{count, 114.0, 1...|
--------------------
I would like to explode risk_table column values into multiples columns, usually have 4 nested documents/dictionaries where the index name change, so the expected output would look something like this
----------- ------ --------- ------------------ -------------------- ----- ---------------------
| symbol_id | date | index_0 | risk_buy_index_0 | reward_buy_index_0 | ... | reward_sell_index_3 |
----------- ------ --------- ------------------ -------------------- ----- ---------------------
| APPL | xxxx | 0 | 0 | 0 | ... | 0 |
----------- ------ --------- ------------------ -------------------- ----- ---------------------
| APPL | xxxx | 0 | 0 | 0 | ... | 0 |
----------- ------ --------- ------------------ -------------------- ----- ---------------------
| APPL | xxxx | 0 | 0 | 0 | ... | 0 |
----------- ------ --------- ------------------ -------------------- ----- ---------------------
I found some information about how to explode only one document/dictionary but not nested, I'll highly appreciate if can someone help with that.
UPDATE #1:
After @vilalabinot answer, this is the returned dataframe:
|index_0| risk_buy_index_0| reward_buy_index_0|
------- -------------------- --------------------
| count| 201.0| 201.0|
| mean|-0.00842858807942...|0.034462956359400186|
| std|0.010321886923670486|0.024028309176849814|
| min|-0.04742597827704211| -0.0|
| 25%|-0.01445728455890...|0.018636627472515977|
| 50%|-0.00424808836023...|0.029910269192422685|
| 75%| 0.0| 0.04336544006150825|
| max| -0.0| 0.141510207428056|
| count| 219.0| 219.0|
| mean|-0.00825181843661...| 0.03181657870541232|
| std|0.009920846095541787| 0.02024399549501371|
| min|-0.04707151894023976|0.002521912798573491|
| 25%|-0.01330755207577...|0.015617730900798106|
| 50%|-0.00475774347023...|0.026639344262294966|
| 75%| -0.0| 0.04315554182360575|
| max| -0.0| 0.11667197234981128|
| count| 119.0| 119.0|
| mean|-0.01337031203096...|0.049558424443669605|
| std| 0.01942541767615014|0.036681645417330024|
| min| -0.0951268206361449|0.004135946772163351|
------- -------------------- --------------------
Instead, I would like to get something like this, where the values of records inside risk_table will be appended as a column rather than rows.
----------- ------ ------- ----- ----- ----- ----- ------------ -------------
| symbol_id | mean | count | min | max | 50% | 75% | reward_buy | reward_sell |
----------- ------ ------- ----- ----- ----- ----- ------------ -------------
| | ... | ... | ... | ... | ... | ... | .... | ... |
----------- ------ ------- ----- ----- ----- ----- ------------ -------------
| ... | ... | ... | ... | ... | ... | ... | ... | ... |
----------- ------ ------- ----- ----- ----- ----- ------------ -------------
| ... | ... | ... | ... | ... | ... | ... | ... | ... |
----------- ------ ------- ----- ----- ----- ----- ------------ -------------
CodePudding user response:
Assume your dataset is main
. First, we have to explode the content of risk_table
, since if we don't, we will get arrays as values of columns, which we do not like, so:
df1 = df1.withColumn("explode", explode(col("risk_table")))
now, explode
column has one object per row; there are a lot of ways to create columns from objects, but I like to use the selectExpr:
.selectExpr("id", "symbol_id", // or whatever other field you like
"explode.index as index_0", // then target the key with dot operator
"explode.risk_buy as risk_buy_index_0",
"explode.reward_buy as reward_buy_index_0"
// add your other wanted values
)
Dummy input:
-------------------------- --- ---------
|risk_table |id |symbol_id|
-------------------------- --- ---------
|[{1, 0.25, 0.3, 0.1, 0.3}]|1 |1 |
-------------------------- --- ---------
Final output:
--- --------- ------- ---------------- ------------------
| id|symbol_id|index_0|risk_buy_index_0|reward_buy_index_0|
--- --------- ------- ---------------- ------------------
| 1| 1| 1| 0.25| 0.3|
--- --------- ------- ---------------- ------------------
CodePudding user response:
Solution:
After call the explode function from vilalabinot answer, I just had to group by date and then pivot the field of the records inside risk_table, something like this:
df1.groupBy(col("date")).pivot(col("index_0"))
.agg(first(col("risk_buy_index_0")), first(col("reward_buy_index_0")))