I'm trying to convert the following schema;
|-- a: struct (nullable = true)
| |-- b: struct (nullable = true)
| | |-- one: double (nullable = true)
| | |-- two: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- three: string (nullable = true)
| | |-- four: boolean (nullable = true)
| |-- c: struct (nullable = true)
| | |-- one: double (nullable = true)
| | |-- two: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- three: string (nullable = true)
| | |-- four: boolean (nullable = true)
Into this;
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- struct_key: string (nullable = true)
| | |-- one: double (nullable = true)
| | |-- two: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- three: string (nullable = true)
| | |-- four: boolean (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- struct_key: string (nullable = true)
| | |-- one: double (nullable = true)
| | |-- two: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- three: string (nullable = true)
| | |-- four: boolean (nullable = true)
Really just trying to get the struct key and convert it into a string and add it into a column. The b/c structs in the dataset are numerous, so will need some wildcard to convert them. Using spark 3.2.1
Data is generated from JSON, so is read like this;
df = spark.read.json(json_file)
CodePudding user response:
here's an approach where you first add the struct_key
within the inner structs and then create an array using them.
# input
data_sdf = spark.createDataFrame([(((1, 2), (3, 4)), )],
'a struct<b: struct<foo: int, bar: int>, c: struct<foo: int, bar: int>>'
)
# ----------------
# | a|
# ----------------
# |{{1, 2}, {3, 4}}|
# ----------------
# root
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- foo: integer (nullable = true)
# | | |-- bar: integer (nullable = true)
# | |-- c: struct (nullable = true)
# | | |-- foo: integer (nullable = true)
# | | |-- bar: integer (nullable = true)
# processing
data_sdf. \
selectExpr('a.*'). \
selectExpr(*['struct("{0}" as struct_key, {0}.*) as {0}'.format(c) for c in data_sdf.selectExpr('a.*').columns]). \
withColumn('a', func.array(*data_sdf.selectExpr('a.*').columns)). \
show(truncate=False)
# ----------------------
# |a |
# ----------------------
# |[{b, 1, 2}, {c, 3, 4}]|
# ----------------------
# root
# |-- a: array (nullable = false)
# | |-- element: struct (containsNull = false)
# | | |-- struct_key: string (nullable = false)
# | | |-- foo: integer (nullable = true)
# | | |-- bar: integer (nullable = true)
CodePudding user response:
This approach lets to retain other columns present in the original dataframe.
Input:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(9, ((1.1, [], '', True),(2.2, [], '', True),),)],
'x int, a struct<b:struct<one:double,two:array<string>,three:string,four:boolean>,c:struct<one:double,two:array<string>,three:string,four:boolean>>')
df.show(truncate=0)
# --- --------------------------------------
# |x |a |
# --- --------------------------------------
# |9 |{{1.1, [], , true}, {2.2, [], , true}}|
# --- --------------------------------------
df.printSchema()
# root
# |-- x: integer (nullable = true)
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- one: double (nullable = true)
# | | |-- two: array (nullable = true)
# | | | |-- element: string (containsNull = true)
# | | |-- three: string (nullable = true)
# | | |-- four: boolean (nullable = true)
# | |-- c: struct (nullable = true)
# | | |-- one: double (nullable = true)
# | | |-- two: array (nullable = true)
# | | | |-- element: string (containsNull = true)
# | | |-- three: string (nullable = true)
# | | |-- four: boolean (nullable = true)
Script:
dtype = df.schema['a'].dataType[0].dataType.simpleString()
df = df.withColumn('a', F.map_values(F.transform_values(
F.from_json(F.to_json("a"), f'map<string,{dtype}>'),
lambda k, v: v.withField('struct_key', k)
)))
df.show(truncate=0)
# --- --------------------------------------------
# |x |a |
# --- --------------------------------------------
# |9 |[{1.1, [], , true, b}, {2.2, [], , true, c}]|
# --- --------------------------------------------
df.printSchema()
# root
# |-- x: integer (nullable = true)
# |-- a: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- one: double (nullable = true)
# | | |-- two: array (nullable = true)
# | | | |-- element: string (containsNull = true)
# | | |-- three: string (nullable = true)
# | | |-- four: boolean (nullable = true)
# | | |-- struct_key: string (nullable = false)