This is my goal: I try to analyze the json files created by Microsoft's Azure Data Factory. I want to convert them into a set of relational tables.
To explain my problem, I have tried to create a sample with reduced complexity. You can produce two sample records with below python code:
sample1 = """{
"name": "Pipeline1",
"properties": {
"parameters": {
"a": {"type": "string", "default": ""},
"b": {"type": "string", "default": "chris"},
"c": {"type": "string", "default": "columbus"},
"d": {"type": "integer", "default": "0"}
},
"annotations": ["Test","Sample"]
}
}"""
sample2 = """{
"name": "Pipeline2",
"properties": {
"parameters": {
"x": {"type": "string", "default": "X"},
"y": {"type": "string", "default": "Y"},
},
"annotations": ["another sample"]
}
My first approach to load those data is of course, to read them as json structures:
df = spark.read.json(sc.parallelize([sample1,sample2]))
df.printSchema()
df.show()
but this returns:
root
|-- _corrupt_record: string (nullable = true)
|-- name: string (nullable = true)
|-- properties: struct (nullable = true)
| |-- annotations: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- parameters: struct (nullable = true)
| | |-- a: struct (nullable = true)
| | | |-- default: string (nullable = true)
| | | |-- type: string (nullable = true)
| | |-- b: struct (nullable = true)
| | | |-- default: string (nullable = true)
| | | |-- type: string (nullable = true)
| | |-- c: struct (nullable = true)
| | | |-- default: string (nullable = true)
| | | |-- type: string (nullable = true)
| | |-- d: struct (nullable = true)
| | | |-- default: string (nullable = true)
| | | |-- type: string (nullable = true)
-------------------- --------- --------------------
| _corrupt_record| name| properties|
-------------------- --------- --------------------
| null|Pipeline1|{[Test, Sample], ...|
|{
"name": "Pipel...|Pipeline2| null|
-------------------- --------- --------------------
As you can see, the second sample was not loaded, apparently because the schemas of sample1 and sample2 are different (different names of parameters). I do not know, why Microsoft has decided to make the parameters elements of a struct and not of an array - but I can't change that.
Let me come back to my goal: I would like to create two dataframes out of those samples:
The first dataframe should contain the annotations (with the columns pipeline_name and annotation), the other dataframe should contain the parameters (with the columns pipeline_name, parameter_name, parameter_type and parameter_default).
Does anybody know a simple way, to convert elements of a struct (not array) into rows of a dataframe? First of all, I was thinking about a user defined function which converts the json code one by one and loops over the elements of the "parameters" structure to return them as elements of an array. But I did not find out exactly, how to achieve that. I have tried:
import json
from pyspark.sql.types import *
# create a dataframe with the json data as strings
df = spark.createDataFrame([Row(json=sample1), Row(json=sample2)])
#define desired schema
new_schema = StructType([
StructField("pipeline", StructType([
StructField("name", StringType(), True)
,StructField("params", ArrayType(StructType([
StructField("paramname", StringType(), True)
,StructField("type", StringType(), True)
,StructField("default", StringType(), True)
])), None)
,StructField("annotations", ArrayType(StringType()), True)
]), True)
])
def parse_pipeline(source:str):
dict = json.loads(source)
name = dict["name"]
props = dict["properties"]
paramlist = [ ( key, value.get('type'), value.get('default')) for key, value in props.get("parameters",{}).items() ]
annotations = props.get("annotations")
return {'pipleine': { 'name':name, 'params':paramlist, 'annotations': annotations}}
parse_pipeline_udf = udf(parse_pipeline, new_schema)
df = df.withColumn("data", parse_pipeline_udf(F.col("json")))
But this returns an error message: Failed to convert the JSON string '{"metadata":{},"name":"params","nullable":null,"type":{"containsNull":true,"elementType":{"fields":[{"metadata":{},"name":"paramname","nullable":true,"type":"string"},{"metadata":{},"name":"type","nullable":true,"type":"string"},{"metadata":{},"name":"default","nullable":true,"type":"string"}],"type":"struct"},"type":"array"}}' to a field.
Maybe the error comes from the return value of my udf. But if that's the reason, how should I pass the result. Thank you for any help.
CodePudding user response:
First, I fixed you data sample : """
and }
missing, an extra ,
:
sample1 = """{
"name": "Pipeline1",
"properties": {
"parameters": {
"a": {"type": "string", "default": ""},
"b": {"type": "string", "default": "chris"},
"c": {"type": "string", "default": "columbus"},
"d": {"type": "integer", "default": "0"}
},
"annotations": ["Test","Sample"]
}
}"""
sample2 = """{
"name": "Pipeline2",
"properties": {
"parameters": {
"x": {"type": "string", "default": "X"},
"y": {"type": "string", "default": "Y"}
},
"annotations": ["another sample"]
}
}"""
Just fixing this, you should have the sample2 included when using your basic code. But if you want "array", actually, you need a map type.
new_schema = T.StructType([
T.StructField("name", T.StringType()),
T.StructField("properties", T.StructType([
T.StructField("annotations", T.ArrayType(T.StringType())),
T.StructField("parameters", T.MapType(T.StringType(), T.StructType([
T.StructField("default", T.StringType()),
T.StructField("type", T.StringType()),
])))
]))
])
df = spark.read.json(sc.parallelize([sample1, sample2]), new_schema)
and the result :
df.show(truncate=False)
--------- -----------------------------------------------------------------------------------------------------
|name |properties |
--------- -----------------------------------------------------------------------------------------------------
|Pipeline1|[[Test, Sample], [a -> [, string], b -> [chris, string], c -> [columbus, string], d -> [0, integer]]]|
|Pipeline2|[[another sample], [x -> [X, string], y -> [Y, string]]] |
--------- -----------------------------------------------------------------------------------------------------
df.printSchema()
root
|-- name: string (nullable = true)
|-- properties: struct (nullable = true)
| |-- annotations: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- parameters: map (nullable = true)
| | |-- key: string
| | |-- value: struct (valueContainsNull = true)
| | | |-- default: string (nullable = true)
| | | |-- type: string (nullable = true)