I'm trying to convert the following schema:
|-- a: struct (nullable = true)
| |-- b: struct (nullable = true)
| | |-- one: double (nullable = true)
| | |-- two: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- three: string (nullable = true)
| | |-- four: boolean (nullable = true)
| |-- c: struct (nullable = true)
| | |-- one: double (nullable = true)
| | |-- two: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- three: string (nullable = true)
| | |-- four: boolean (nullable = true)
into this:
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- struct_key: string (nullable = true)
| | |-- one: double (nullable = true)
| | |-- two: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- three: string (nullable = true)
| | |-- four: boolean (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- struct_key: string (nullable = true)
| | |-- one: double (nullable = true)
| | |-- two: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- three: string (nullable = true)
| | |-- four: boolean (nullable = true)
Really just trying to get the struct key and convert it into a string and add it into a column. The b
/c
structs in the dataset are numerous, so will need some wildcard to convert them.
I'm using Spark 3.2.1.
The data is generated from JSON, so is read like this:
df = spark.read.json(json_file)
CodePudding user response:
I prefer this approach, as often we need to retain other columns which are present in the original dataframe. This method doesn't remove other columns.
dtype = df.schema['a'].dataType[0].dataType.simpleString()
df = df.withColumn('a', F.map_values(F.transform_values(
F.from_json(F.to_json("a"), f'map<string,{dtype}>'),
lambda k, v: v.withField('struct_key', k)
)))
df.schema['a'].dataType[0].dataType.simpleString()
navigates to the inner struct and extracts its schema in DDL format.
F.from_json(F.to_json("a"), f'map<string,{dtype}>')
converts the inner struct into map. The field name of the struct becomes map's key, while the struct itself becomes map's value. This conversion is useful, because it makes it easier to access the field name.
F.transform_values(..., lambda k, v: v.withField('struct_key', k))
adds the map's key into map's value (i.e. into the struct).
F.map_values()
extracts the values of the map, thus making an array of structs.
Test input:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[(9, ((1.1, [], '', True),(2.2, [], '', True),),)],
'x int, a struct<b:struct<one:double,two:array<string>,three:string,four:boolean>,c:struct<one:double,two:array<string>,three:string,four:boolean>>')
df.show(truncate=0)
# --- --------------------------------------
# |x |a |
# --- --------------------------------------
# |9 |{{1.1, [], , true}, {2.2, [], , true}}|
# --- --------------------------------------
df.printSchema()
# root
# |-- x: integer (nullable = true)
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- one: double (nullable = true)
# | | |-- two: array (nullable = true)
# | | | |-- element: string (containsNull = true)
# | | |-- three: string (nullable = true)
# | | |-- four: boolean (nullable = true)
# | |-- c: struct (nullable = true)
# | | |-- one: double (nullable = true)
# | | |-- two: array (nullable = true)
# | | | |-- element: string (containsNull = true)
# | | |-- three: string (nullable = true)
# | | |-- four: boolean (nullable = true)
Result:
dtype = df.schema['a'].dataType[0].dataType.simpleString()
df = df.withColumn('a', F.map_values(F.transform_values(
F.from_json(F.to_json("a"), f'map<string,{dtype}>'),
lambda k, v: v.withField('struct_key', k)
)))
df.show(truncate=0)
# --- --------------------------------------------
# |x |a |
# --- --------------------------------------------
# |9 |[{1.1, [], , true, b}, {2.2, [], , true, c}]|
# --- --------------------------------------------
df.printSchema()
# root
# |-- x: integer (nullable = true)
# |-- a: array (nullable = true)
# | |-- element: struct (containsNull = true)
# | | |-- one: double (nullable = true)
# | | |-- two: array (nullable = true)
# | | | |-- element: string (containsNull = true)
# | | |-- three: string (nullable = true)
# | | |-- four: boolean (nullable = true)
# | | |-- struct_key: string (nullable = false)
CodePudding user response:
here's an approach where you first add the struct_key
within the inner structs and then create an array using them.
# input
data_sdf = spark.createDataFrame([(((1, 2), (3, 4)), )],
'a struct<b: struct<foo: int, bar: int>, c: struct<foo: int, bar: int>>'
)
# ----------------
# | a|
# ----------------
# |{{1, 2}, {3, 4}}|
# ----------------
# root
# |-- a: struct (nullable = true)
# | |-- b: struct (nullable = true)
# | | |-- foo: integer (nullable = true)
# | | |-- bar: integer (nullable = true)
# | |-- c: struct (nullable = true)
# | | |-- foo: integer (nullable = true)
# | | |-- bar: integer (nullable = true)
# processing
data_sdf. \
selectExpr('a.*'). \
selectExpr(*['struct("{0}" as struct_key, {0}.*) as {0}'.format(c) for c in data_sdf.selectExpr('a.*').columns]). \
withColumn('a', func.array(*data_sdf.selectExpr('a.*').columns)). \
show(truncate=False)
# ----------------------
# |a |
# ----------------------
# |[{b, 1, 2}, {c, 3, 4}]|
# ----------------------
# root
# |-- a: array (nullable = false)
# | |-- element: struct (containsNull = false)
# | | |-- struct_key: string (nullable = false)
# | | |-- foo: integer (nullable = true)
# | | |-- bar: integer (nullable = true)