I have a DataFrame with this schema:
root
|-- id: string (nullable = false)
|-- data_zone_array: array (nullable = true)
| |-- element: string (containsNull = true)
It actually contains an array data_zone_array
containing several more or less predictable string values (or none at all), where their keys and values are separated by :
; looking like this show(5)
output:
id | data_zone_array
1 | ['name:john', 'surname:wick', 'group:a', 'group:b', 'group:c']
2 | ['name:joe', 'surname:linda', 'surname:boss', 'group:d', 'group:b']
3 | ['name:david', 'group:a', 'age:7']
4 | ['name:mary', 'surname:gilles']
5 | ['name:charles', 'surname:paul', 'group:d', 'group:b', 'group:c', 'age:6', 'unplanned_attribute_165:thisvalue']
I want to :
- Extract some of those values according to a list of keys (such as
name
andsurname
) - knowing that their destination types are predictable (name
will always be a unique string andsurname
an array of strings) - Place all other found attributes in a struct containing string arrays. Note that there will be unpredictable attributes such as
unplanned_attribute_165
.
It would give this kind of schema:
root
|-- id: string (nullable = false)
|-- name: string (nullable = true)
|-- surname: array (nullable = true)
| |-- element: string (containsNull = true)
|-- other_attributes: struct (nullable = true)
| |-- <attrx>: array (containsNull = true)
| | |-- element: string(containsNull = true)
| |-- <attry>: array (containsNull = true)
| | |-- element: string(containsNull = true)
| |-- ......
With records like:
id | name | surname | other_attributes
1 | 'john' | ['wick'] | {group:['a','b','c']}
2 | 'joe' | ['boss', 'linda'] | {group:['b', 'd']}
3 | 'david' | <null> | {group: ['a'], age:['7']}
4 | 'mary' | ['gilles'] | <null>
5 | 'charles' | ['paul'] | {group: ['b','c','d'], age:['6'], unplanned_attribute_165:['thisvalue']}
Any idea on how to perform such operations?
CodePudding user response:
Here's one way of doing.
First, explode the column data_zone_array
and extract keys and values into separate columns key
and value
by splitting on :
.
Then, group by id
and key
and collect list of values associated with each key. And group by again by id
to create map key -> [values]
.
Finally, select keys you want as columns and filter the reste of the keys from the map using map_keys
filter
transform
to create the other_attributes
column.
import pyspark.sql.functions as F
df1 = (df.withColumn("data_zone_array", F.explode("data_zone_array"))
.withColumn("key", F.split("data_zone_array", ":")[0])
.withColumn("value", F.split("data_zone_array", ":")[1])
.groupBy("id", "key").agg(F.collect_list("value").alias("values"))
.groupBy("id").agg(F.map_from_arrays(F.collect_list("key"), F.collect_list("values")).alias("attributes"))
.select("id",
F.col("attributes")["name"].alias("name"),
F.col("attributes")["surname"].alias("surname"),
F.expr("""transform(
filter(map_keys(attributes), k -> k not in('name', 'surname')),
x -> struct(x as key, attributes[x] as value)
)""").alias("other_attributes")
)
)
df1.show(truncate=False)
# --- --------- ------------- ------------------------------------------------------------------------
# |id |name |surname |other_attributes |
# --- --------- ------------- ------------------------------------------------------------------------
# |5 |[charles]|[paul] |[{group, [d, b, c]}, {age, [6]}, {unplanned_attribute_165, [thisvalue]}]|
# |1 |[john] |[wick] |[{group, [a, b, c]}] |
# |3 |[david] |null |[{group, [a]}, {age, [7]}] |
# |2 |[joe] |[linda, boss]|[{group, [d, b]}] |
# |4 |[mary] |[gilles] |[] |
# --- --------- ------------- ------------------------------------------------------------------------