I have the given JSON, taken from HDFS, with thousands of records like those:
{
"01": {
"created": "2020-12-28 02-15-01",
"entity_id": "s.m_free",
"old_state_id": null,
"state": "1498.7"
},
"02": {
"created": "2020-12-28 02-15-31",
"entity_id": "s.m_free",
"old_state_id": 58100,
"state": "1498.9"
},
...}
Unfortunately the DataFrame comes out as thousands of Columns and just 4 Row, like this:
| 01 | 02|..................|
created |2020-12-28 02-15-01 | 2020-12-28 02-15-31|..................|
entity_id | s.m_free | s.m_free|..................|
old_state_id | null | 58100|..................|
state | 1498.7 | 1498.9|..................|
And I need it with 4 columns and thousands Records as:
| created| entity_id| old_state_id| state|
01 | 2020-12-28 02-15-01| s.m.free| null| 1498.7|
02 | 2020-12-28 02-15-31| s.m.free| 58100| 1498.9|
I found an option for PySpark that can change the orientation of the dataframe using Pandas, but since I have to do the task with Scala I cant find a similar option.
Also is there a way that I can put a name on the first column (records 01, 02, etc.) since it appears to be the key of the values in the json file.
Ill be very glad if you could help me.
CodePudding user response:
This part simulates the generation of the original dataframe.
Similar to this example, makes sure that in the real scenario you are also using option("primitivesAsString",true)
.
This comes to solve unpivoting mismatch types issue, due to Spark default type for null, which is string.
E.g. without option("primitivesAsString",true)
, for "old_state_id": 58100
, old_state_id
will be inferred as long, while for "old_state_id": null
it will be inferred as string.
import spark.implicits._
val json_str = """
{
"01": {
"created": "2020-12-28 02-15-01",
"entity_id": "s.m_free",
"old_state_id": null,
"state": "1498.7"
},
"02": {
"created": "2020-12-28 02-15-31",
"entity_id": "s.m_free",
"old_state_id": 58100,
"state": "1498.9"
}
}"""
val df = spark.read.option("primitivesAsString",true).json(Seq(json_str).toDS)
df.printSchema()
root
|-- 01: struct (nullable = true)
| |-- created: string (nullable = true)
| |-- entity_id: string (nullable = true)
| |-- old_state_id: string (nullable = true)
| |-- state: string (nullable = true)
|-- 02: struct (nullable = true)
| |-- created: string (nullable = true)
| |-- entity_id: string (nullable = true)
| |-- old_state_id: string (nullable = true)
| |-- state: string (nullable = true)
df.show(false)
--------------------------------------------- ----------------------------------------------
|01 |02 |
--------------------------------------------- ----------------------------------------------
|{2020-12-28 02-15-01, s.m_free, null, 1498.7}|{2020-12-28 02-15-31, s.m_free, 58100, 1498.9}|
--------------------------------------------- ----------------------------------------------
This the data transformation part, based on stack
df.createOrReplaceTempView("t")
val cols_num = df.columns.size // 2
val cols_names_and_vals = (for (c <- df.columns) yield s"'$c',`$c`").mkString(",") // "'01',`01`,'02',`02`"
val sql_query = s"select id,val.* from (select stack($cols_num,$cols_names_and_vals) as (id,val) from t)" // select id,val.* from (select stack(2,'01',`01`,'02',`02`) as (id,val) from t)
val df_unpivot = spark.sql(sql_query)
df_unpivot.printSchema()
root
|-- id: string (nullable = true)
|-- created: string (nullable = true)
|-- entity_id: string (nullable = true)
|-- old_state_id: string (nullable = true)
|-- state: string (nullable = true)
df_unpivot.show(truncate = false)
--- ------------------- --------- ------------ ------
|id |created |entity_id|old_state_id|state |
--- ------------------- --------- ------------ ------
|01 |2020-12-28 02-15-01|s.m_free |null |1498.7|
|02 |2020-12-28 02-15-31|s.m_free |58100 |1498.9|
--- ------------------- --------- ------------ ------