Home > OS >  Converting a Struct to an Array in Pyspark
Converting a Struct to an Array in Pyspark

Time:10-12

This is my goal: I try to analyze the json files created by Microsoft's Azure Data Factory. I want to convert them into a set of relational tables.

To explain my problem, I have tried to create a sample with reduced complexity. You can produce two sample records with below python code:

sample1 = """{
    "name": "Pipeline1",
    "properties": {
        "parameters": {
            "a": {"type": "string", "default": ""},
            "b": {"type": "string", "default": "chris"},
            "c": {"type": "string", "default": "columbus"},
            "d": {"type": "integer", "default": "0"}
        },
        "annotations": ["Test","Sample"]
    }
}"""

sample2 = """{
    "name": "Pipeline2",
    "properties": {
        "parameters": {
            "x": {"type": "string", "default": "X"},
            "y": {"type": "string", "default": "Y"},
        },
        "annotations": ["another sample"]
    }

My first approach to load those data is of course, to read them as json structures:

df = spark.read.json(sc.parallelize([sample1,sample2]))
df.printSchema()
df.show()

but this returns:

root
 |-- _corrupt_record: string (nullable = true)
 |-- name: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- annotations: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- parameters: struct (nullable = true)
 |    |    |-- a: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- b: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- c: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |-- d: struct (nullable = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)

 -------------------- --------- -------------------- 
|     _corrupt_record|     name|          properties|
 -------------------- --------- -------------------- 
|                null|Pipeline1|{[Test, Sample], ...|
|{
    "name": "Pipel...|Pipeline2|                null|
 -------------------- --------- -------------------- 

As you can see, the second sample was not loaded, apparently because the schemas of sample1 and sample2 are different (different names of parameters). I do not know, why Microsoft has decided to make the parameters elements of a struct and not of an array - but I can't change that.

Let me come back to my goal: I would like to create two dataframes out of those samples:
The first dataframe should contain the annotations (with the columns pipeline_name and annotation), the other dataframe should contain the parameters (with the columns pipeline_name, parameter_name, parameter_type and parameter_default).

Does anybody know a simple way, to convert elements of a struct (not array) into rows of a dataframe? First of all, I was thinking about a user defined function which converts the json code one by one and loops over the elements of the "parameters" structure to return them as elements of an array. But I did not find out exactly, how to achieve that. I have tried:

import json
from pyspark.sql.types import *

# create a dataframe with the json data as strings
df = spark.createDataFrame([Row(json=sample1), Row(json=sample2)])

#define desired schema
new_schema = StructType([
   StructField("pipeline", StructType([
     StructField("name", StringType(), True)
    ,StructField("params", ArrayType(StructType([
       StructField("paramname", StringType(), True)
      ,StructField("type", StringType(), True)
      ,StructField("default", StringType(), True)
      ])), None)
    ,StructField("annotations", ArrayType(StringType()), True)
    ]), True)
  ])

def parse_pipeline(source:str):
  dict = json.loads(source)
  name = dict["name"]
  props = dict["properties"]
  paramlist = [ ( key,  value.get('type'), value.get('default')) for key, value in props.get("parameters",{}).items() ]
  annotations = props.get("annotations")
  return {'pipleine': { 'name':name, 'params':paramlist, 'annotations': annotations}}

parse_pipeline_udf = udf(parse_pipeline, new_schema)
df = df.withColumn("data", parse_pipeline_udf(F.col("json")))

But this returns an error message: Failed to convert the JSON string '{"metadata":{},"name":"params","nullable":null,"type":{"containsNull":true,"elementType":{"fields":[{"metadata":{},"name":"paramname","nullable":true,"type":"string"},{"metadata":{},"name":"type","nullable":true,"type":"string"},{"metadata":{},"name":"default","nullable":true,"type":"string"}],"type":"struct"},"type":"array"}}' to a field.

Maybe the error comes from the return value of my udf. But if that's the reason, how should I pass the result. Thank you for any help.

CodePudding user response:

First, I fixed you data sample : """ and } missing, an extra ,:

sample1 = """{
    "name": "Pipeline1",
    "properties": {
        "parameters": {
            "a": {"type": "string", "default": ""},
            "b": {"type": "string", "default": "chris"},
            "c": {"type": "string", "default": "columbus"},
            "d": {"type": "integer", "default": "0"}
        },
        "annotations": ["Test","Sample"]
    }
}"""

sample2 = """{
    "name": "Pipeline2",
    "properties": {
        "parameters": {
            "x": {"type": "string", "default": "X"},
            "y": {"type": "string", "default": "Y"}
        },
        "annotations": ["another sample"]
    }
}"""

Just fixing this, you should have the sample2 included when using your basic code. But if you want "array", actually, you need a map type.

new_schema = T.StructType([
    T.StructField("name", T.StringType()),
    T.StructField("properties", T.StructType([
        T.StructField("annotations", T.ArrayType(T.StringType())),
        T.StructField("parameters", T.MapType(T.StringType(), T.StructType([
            T.StructField("default", T.StringType()),
            T.StructField("type", T.StringType()),
        ])))
    ]))
])

df = spark.read.json(sc.parallelize([sample1, sample2]), new_schema)

and the result :

df.show(truncate=False)
 --------- ----------------------------------------------------------------------------------------------------- 
|name     |properties                                                                                           |
 --------- ----------------------------------------------------------------------------------------------------- 
|Pipeline1|[[Test, Sample], [a -> [, string], b -> [chris, string], c -> [columbus, string], d -> [0, integer]]]|
|Pipeline2|[[another sample], [x -> [X, string], y -> [Y, string]]]                                             |
 --------- ----------------------------------------------------------------------------------------------------- 

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- annotations: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- parameters: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |-- default: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
  • Related