Home > Mobile >  PySpark extract struct key into column
PySpark extract struct key into column

Time:11-04

I'm trying to convert the following schema;

|-- a: struct (nullable = true)
 |    |-- b: struct (nullable = true)
 |    |    |-- one: double (nullable = true)
 |    |    |-- two: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- three: string (nullable = true)
 |    |    |-- four: boolean (nullable = true)
 |    |-- c: struct (nullable = true)
 |    |    |-- one: double (nullable = true)
 |    |    |-- two: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- three: string (nullable = true)
 |    |    |-- four: boolean (nullable = true)

Into this;

 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- struct_key: string (nullable = true)
 |    |    |-- one: double (nullable = true)
 |    |    |-- two: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- three: string (nullable = true)
 |    |    |-- four: boolean (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- struct_key: string (nullable = true)
 |    |    |-- one: double (nullable = true)
 |    |    |-- two: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- three: string (nullable = true)
 |    |    |-- four: boolean (nullable = true)

Really just trying to get the struct key and convert it into a string and add it into a column. The b/c structs in the dataset are numerous, so will need some wildcard to convert them. Using spark 3.2.1

Data is generated from JSON, so is read like this;

df = spark.read.json(json_file)

CodePudding user response:

here's an approach where you first add the struct_key within the inner structs and then create an array using them.

# input
data_sdf = spark.createDataFrame([(((1, 2), (3, 4)), )], 
                                 'a struct<b: struct<foo: int, bar: int>, c: struct<foo: int, bar: int>>'
                                 )

#  ---------------- 
# |               a|
#  ---------------- 
# |{{1, 2}, {3, 4}}|
#  ---------------- 

# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- foo: integer (nullable = true)
#  |    |    |-- bar: integer (nullable = true)
#  |    |-- c: struct (nullable = true)
#  |    |    |-- foo: integer (nullable = true)
#  |    |    |-- bar: integer (nullable = true)

# processing
data_sdf. \
    selectExpr('a.*'). \
    selectExpr(*['struct("{0}" as struct_key, {0}.*) as {0}'.format(c) for c in data_sdf.selectExpr('a.*').columns]). \
    withColumn('a', func.array(*data_sdf.selectExpr('a.*').columns)). \
    show(truncate=False)

#  ---------------------- 
# |a                     |
#  ---------------------- 
# |[{b, 1, 2}, {c, 3, 4}]|
#  ---------------------- 

# root
#  |-- a: array (nullable = false)
#  |    |-- element: struct (containsNull = false)
#  |    |    |-- struct_key: string (nullable = false)
#  |    |    |-- foo: integer (nullable = true)
#  |    |    |-- bar: integer (nullable = true)

CodePudding user response:

This approach lets to retain other columns present in the original dataframe.

Input:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(9, ((1.1, [], '', True),(2.2, [], '', True),),)],
    'x int, a struct<b:struct<one:double,two:array<string>,three:string,four:boolean>,c:struct<one:double,two:array<string>,three:string,four:boolean>>')

df.show(truncate=0)
#  --- -------------------------------------- 
# |x  |a                                     |
#  --- -------------------------------------- 
# |9  |{{1.1, [], , true}, {2.2, [], , true}}|
#  --- -------------------------------------- 
df.printSchema()
# root
#  |-- x: integer (nullable = true)
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- one: double (nullable = true)
#  |    |    |-- two: array (nullable = true)
#  |    |    |    |-- element: string (containsNull = true)
#  |    |    |-- three: string (nullable = true)
#  |    |    |-- four: boolean (nullable = true)
#  |    |-- c: struct (nullable = true)
#  |    |    |-- one: double (nullable = true)
#  |    |    |-- two: array (nullable = true)
#  |    |    |    |-- element: string (containsNull = true)
#  |    |    |-- three: string (nullable = true)
#  |    |    |-- four: boolean (nullable = true)

Script:

dtype = df.schema['a'].dataType[0].dataType.simpleString()
df = df.withColumn('a', F.map_values(F.transform_values(
    F.from_json(F.to_json("a"), f'map<string,{dtype}>'), 
    lambda k, v: v.withField('struct_key', k)
)))

df.show(truncate=0)
#  --- -------------------------------------------- 
# |x  |a                                           |
#  --- -------------------------------------------- 
# |9  |[{1.1, [], , true, b}, {2.2, [], , true, c}]|
#  --- -------------------------------------------- 
df.printSchema()
# root
#  |-- x: integer (nullable = true)
#  |-- a: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- one: double (nullable = true)
#  |    |    |-- two: array (nullable = true)
#  |    |    |    |-- element: string (containsNull = true)
#  |    |    |-- three: string (nullable = true)
#  |    |    |-- four: boolean (nullable = true)
#  |    |    |-- struct_key: string (nullable = false)
  • Related