I have a spark dataframe that has multiple columns and one of the metrics column datatype is in string format. This column data looks like the below, I need to convert it into multiple columns. I tried using spark rdd map function, it shows a type value error.
Schema :column1,
column2,
metrics string
Sample data :
[{"name": "ABC","kt":
[{"name": "AB-1",
"values":
[{"date": "2021-05-21 08:04:56.000", "value":0.05520880298702948},
{"date": "2021-05-21 08:05:56.000", "value": 0.6873692705340528},
{"date": "2021-05-21 08:06:56.000", "value": 1.0036619131928861},
{"date": "2021-05-21 08:07:56.000", "value": 0.7431644238409444},
{"date": "2021-05-21 08:08:56.000", "value": 0.9845464929057735},
{"date": "2021-05-21 08:09:56.000", "value": 1.0010811066472702},
{"date": "2021-05-21 08:10:56.000", "value": 1.0009814513959714},
{"date": "2021-05-21 08:11:56.000", "value": 1.001614167307074},
{"date": "2021-05-21 08:12:56.000", "value": 1.001766291527917},
{"date": "2021-05-21 08:13:56.000", "value": 0.5865639742905218},
{"date": "2021-05-21 08:14:56.000", "value": 1.0015836161251768},
{"date": "2021-05-21 08:15:56.000", "value": 0.3571215446400451}]},
{"name": "BC-2",
"values":
[{"date": "2021-05-21 08:04:56.000", "value": 0.14044187962813096},
{"date": "2021-05-21 08:05:56.000", "value": 0.7565445799225486},
{"date": "2021-05-21 08:06:56.000", "value": 1.0017136900856412},
{"date": "2021-05-21 08:07:56.000", "value": 1.001730692743276},
{"date": "2021-05-21 08:08:56.000", "value": 1.0010340874676533},
{"date": "2021-05-21 08:09:56.000", "value": 1.0007168399510786},
{"date": "2021-05-21 08:10:56.000", "value": 1.0017091878186537},
{"date": "2021-05-21 08:11:56.000", "value": 1.0004370714489406},
{"date": "2021-05-21 08:12:56.000", "value": 1.0015819812456357},
{"date": "2021-05-21 08:13:56.000", "value": 0.7370171481823211},
{"date": "2021-05-21 08:14:56.000", "value": 1.001703540193026},
{"date": "2021-05-21 08:15:56.000", "value": 0.5519119341514123}]}]
CodePudding user response:
First, you may want to check if your JSON is indeed formatted correctly as the sample data given is not valid JSON.
Assuming your intention is to flatten the nested JSON structure into rows datetime and values you can try something similar to this:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t
spark = SparkSession.builder.getOrCreate()
# example valid JSON
json = """
[
{
"name": "ABC",
"kt": [
{
"name": "AB-1",
"values": [
{
"date": "2021-05-21 08:04:56.000",
"value": 0.05520880298702948
}
]
},
{
"name": "BC-2",
"values": [
{
"date": "2021-05-21 08:04:56.000",
"value": 0.14044187962813096
}
]
}
]
}
]
"""
json_df = spark.createDataFrame([(1, json)],['id', 'json'])
# define the schema for the dataframe
schema = t.ArrayType(
t.StructType([
t.StructField('name', t.StringType()),
t.StructField('kt', t.ArrayType(
t.StructType([
t.StructField('name', t.StringType()),
t.StructField('values', t.ArrayType(
t.StructType([
t.StructField('date', t.TimestampType()),
t.StructField('value', t.DoubleType())
])
))
])
))
])
)
# select the string value in the 'json' column using the defined schema
df = json_df.select(
f.from_json(f.col('json'), schema).alias('json')
)
# flatten the nested structure to expose date and properties as columns
df.select(
f.explode('json')
).select(
f.col('col.name').alias('name_0'),
f.explode('col.kt')
).select(
'name_0',
'col.*'
).select(
'name_0',
f.col('name').alias('name_1'),
f.explode('values')
).select(
'name_0',
'name_1',
'col.*'
).show()
Expected output:
------ ------ ------------------- -------------------
|name_0|name_1| date| value|
------ ------ ------------------- -------------------
| ABC| AB-1|2021-05-21 08:04:56|0.05520880298702948|
| ABC| BC-2|2021-05-21 08:04:56|0.14044187962813096|
------ ------ ------------------- -------------------
CodePudding user response:
df = spark.read.json("<Your JSON file path>")
df.write.option("header","true").csv("<Your CSV file path>")