I have a json that looks like this:
[
{
"event_date": "20221207",
"user_properties": [
{
"key": "user_id",
"value": {
"set_timestamp_micros": "1670450329209558"
}
},
{
"key": "doc_id",
"value": {
"set_timestamp_micros": "1670450329209558"
}
}
]
},
{
"event_date": "20221208",
"user_properties": [
{
"key": "account_id",
"value": {
"int_value": "3176465",
"set_timestamp_micros": "1670450323992556"
}
},
{
"key": "user_id",
"value": {
"string_value": "430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d",
"set_timestamp_micros": "1670450323992556"
}
}
]
}
]
When I read it using spark.read.json(JSON_PATH), I got the following schema:
root
|-- event_date: string (nullable = true)
|-- user_properties: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: struct (nullable = true)
| | | |-- int_value: string (nullable = true)
| | | |-- set_timestamp_micros: string (nullable = true)
| | | |-- string_value: string (nullable = true)
I need to parse it using pyspark and the result dataframe should be like this:
event_date | up_account_id_int | up_account_id_set_timestamp_micros | up_doc_id_set_timestamp_micros | up_user_id_set_timestamp_micros | up_user_id_string |
---|---|---|---|---|---|
20221208 | 3176465 | 1670450323992556 | null | 1670450323992556 | 430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d |
20221207 | null | null | 1670450329209558 | 1670450329209558 | null |
Any ideas on how can I accomplish it?
CodePudding user response:
You can use this function:
import org.apache.spark.sql.DataFrame
def flattenDataframe(df: DataFrame): DataFrame = {
val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
val length = fields.length
for (i <- 0 to fields.length - 1) {
val field = fields(i)
val fieldtype = field.dataType
val fieldName = field.name
fieldtype match {
case arrayType: ArrayType =>
val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName)
val fieldNamesAndExplode = fieldNamesExcludingArray Array(s"explode_outer($fieldName) as $fieldName")
// val fieldNamesToSelect = (fieldNamesExcludingArray Array(s"$fieldName.*"))
val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
return flattenDataframe(explodedDf)
case structType: StructType =>
val childFieldnames = structType.fieldNames.map(childname => fieldName "." childname)
val newfieldNames = fieldNames.filter(_ != fieldName) childFieldnames
val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_"))))
val explodedf = df.select(renamedcols: _*)
return flattenDataframe(explodedf)
case _ =>
}
}
df
}
val flattendedJSON = flattenDataframe(df)
Before flattening:
---------- --------------------------------------------------------------------------------------------------------------------------------------------------------
|event_date|user_properties |
---------- --------------------------------------------------------------------------------------------------------------------------------------------------------
|20221207 |[{user_id, {null, 1670450329209558, null}}, {doc_id, {null, 1670450329209558, null}}] |
|20221208 |[{account_id, {3176465, 1670450323992556, null}}, {user_id, {null, 1670450323992556, 430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d}}]|
---------- --------------------------------------------------------------------------------------------------------------------------------------------------------
After flattening:
---------- ------------------- ------------------------------- ------------------------------------------ ----------------------------------------------------------------
|event_date|user_properties_key|user_properties_value_int_value|user_properties_value_set_timestamp_micros|user_properties_value_string_value |
---------- ------------------- ------------------------------- ------------------------------------------ ----------------------------------------------------------------
|20221207 |user_id |null |1670450329209558 |null |
|20221207 |doc_id |null |1670450329209558 |null |
|20221208 |account_id |3176465 |1670450323992556 |null |
|20221208 |user_id |null |1670450323992556 |430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d|
---------- ------------------- ------------------------------- ------------------------------------------ ----------------------------------------------------------------
CodePudding user response:
First you can explode
the array then flatten struct with select
.
df = (df.select('event_date', F.explode('user_properties').alias('user_properties'))
.select('event_date', 'user_properties.key', 'user_properties.value.*')
)
And it seems you are pivoting the data. This won't give you the exact dataframe as you posted but you should be able to transform it as you like.
df = (df.groupby('event_date')
.pivot('key')
.agg(F.max('int_value').alias('id_int'),
F.max('set_timestamp_micros').alias('set_timestamp_micros'),
F.max('string_value').alias('string')))