I am trying to transpose the columns to rows and load it to the data base. My input is the Json file.
{"09087":{"values": ["76573433","2222322323","768346865"],"values1": ["7686548898","33256768","09864324567"],"values2": ["234523723","64238793333333","75478393333"],"values3": ["87765","46389333","9234689677"]},"090881": {"values": ["76573443433","22276762322323","7683878746865"],"values1": ["768637676548898","3398776256768","0986456834324567"],"values2": ["23877644523723","64238867658793333333","754788776393333"],"values3": ["87765","46389333","9234689677"]}}
Pyspark:
df = spark.read.option("multiline", "true").format("json").load("testfile.json")
Schema:
root
|-- 09087: struct (nullable = true)
| |-- values: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- values1: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- values2: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- values3: array (nullable = true)
| | |-- element: string (containsNull = true)
|-- 090881: struct (nullable = true)
| |-- values: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- values1: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- values2: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- values3: array (nullable = true)
| | |-- element: string (containsNull = true)
Data:
df.show()
-------------------- --------------------
| 09087| 090881|
-------------------- --------------------
|{[76573433, 22223...|{[76573443433, 22...|
-------------------- --------------------
OUTPUT:
Name values values1 values2 values3
09087 76573433 7686548898 234523723 87765
09087 2222322323 33256768 64238793333333 9234689677
09087 768346865 09864324567 75478393333 46389333
090881 76573443433 768637676548898 23877644523723 87765
090881 22276762322323 3398776256768 64238867658793333333 46389333
090881 7683878746865 0986456834324567 754788776393333 9234689677
Actually I just gave 2 columns as input but I have lot of them. I have been trying this- could someone please help me on this. Thanks in advance.
CodePudding user response:
Pyspark translation of my scala solution:
from pyspark.sql import
rdd = spark.sparkContext.parallelize([("""{"09087":{"values": ["76573433","2222322323","768346865"],"values1": ["7686548898","33256768","09864324567"],"values2": ["234523723","64238793333333","75478393333"],"values3": ["87765","46389333","9234689677"]},"090881": {"values": ["76573443433","22276762322323","7683878746865"],"values1": ["768637676548898","3398776256768","0986456834324567"],"values2": ["23877644523723","64238867658793333333","754788776393333"],"values3": ["87765","46389333","9234689677"]}}""" )])
df = spark.read.json(rdd)
df.select(\
explode (\#explode array into rows
array(\
*[ struct(\# make a stuct from column name and values
lit( col_name ).alias("Name"),\
col(col_name ".*")\
) for col_name in df.columns ])))\
.select(\
col("col.Name").alias("Name"),\
explode(\
arrays_zip(\# make an array of structs from multiple arrays. The name of the struct.column will be it's index in the orginal array.
col("col.values"),\
col("col.values1"),\
col("col.values2"),\
col("col.values3")\
)\
).alias("columns")\
).select( col("Name"),col("columns.*")).show()#use '.*' syntax to change struct.column into table.column
------ -------------- ---------------- -------------------- ----------
| Name| 0| 1| 2| 3|
------ -------------- ---------------- -------------------- ----------
| 09087| 76573433| 7686548898| 234523723| 87765|
| 09087| 2222322323| 33256768| 64238793333333| 46389333|
| 09087| 768346865| 09864324567| 75478393333|9234689677|
|090881| 76573443433| 768637676548898| 23877644523723| 87765|
|090881|22276762322323| 3398776256768|64238867658793333333| 46389333|
|090881| 7683878746865|0986456834324567| 754788776393333|9234689677|
------ -------------- ---------------- -------------------- ----------
CodePudding user response:
//make dummy data
val df = spark.sqlContext.read.json(res4)
val rdd = spark.sparkContext.parallelize(Seq(("""{"09087":{"values": ["76573433","2222322323","768346865"],"values1": ["7686548898","33256768","09864324567"],"values2": ["234523723","64238793333333","75478393333"],"values3": ["87765","46389333","9234689677"]},"090881": {"values": ["76573443433","22276762322323","7683878746865"],"values1": ["768637676548898","3398776256768","0986456834324567"],"values2": ["23877644523723","64238867658793333333","754788776393333"],"values3": ["87765","46389333","9234689677"]}}""" )))
val df = spark.sqlContext.read.json(rdd)
df.select(
explode ( // explode an array into rows
array( // make an array
(for( col_name <- df.columns )
yield
struct( //create struct with names that can be use as columns
lit(s"$col_name").as("Name") ,
col(s"$col_name.*")
)
).toSeq :_* // make sequence into VarArgs
).as("rows")
)
).select(
col("col.Name"),
expr("explode(
arrays_zip(
col.values ,
col.values1,
col.values2,
col.values3)) " //use array_zip to suck together multiple identical length arrays into 1 array(of structs) with struct containing the names column of the index.
).as("columns")
).select(
col("Name"),
col("columns.*") // rename as required.
).show()
------ -------------- ---------------- -------------------- ----------
| Name| 0| 1| 2| 3|
------ -------------- ---------------- -------------------- ----------
| 09087| 76573433| 7686548898| 234523723| 87765|
| 09087| 2222322323| 33256768| 64238793333333| 46389333|
| 09087| 768346865| 09864324567| 75478393333|9234689677|
|090881| 76573443433| 768637676548898| 23877644523723| 87765|
|090881|22276762322323| 3398776256768|64238867658793333333| 46389333|
|090881| 7683878746865|0986456834324567| 754788776393333|9234689677|
------ -------------- ---------------- -------------------- ----------
for more info on arrays_zip see here.