Home > database >  Pyspark - Flatten nested json
Pyspark - Flatten nested json

Time:12-15

I have a json that looks like this:

[
    {
        "event_date": "20221207",
        "user_properties": [
            {
                "key": "user_id",
                "value": {
                    "set_timestamp_micros": "1670450329209558"
                }
            },
            {
                "key": "doc_id",
                "value": {
                    "set_timestamp_micros": "1670450329209558"
                }
            }
        ]
    },
    {
        "event_date": "20221208",
        "user_properties": [
            {
                "key": "account_id",
                "value": {
                    "int_value": "3176465",
                    "set_timestamp_micros": "1670450323992556"
                }
            },
            {
                "key": "user_id",
                "value": {
                    "string_value": "430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d",
                    "set_timestamp_micros": "1670450323992556"
                }
            }
        ]
    }
]

When I read it using spark.read.json(JSON_PATH), I got the following schema:

root
 |-- event_date: string (nullable = true)
 |-- user_properties: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: struct (nullable = true)
 |    |    |    |-- int_value: string (nullable = true)
 |    |    |    |-- set_timestamp_micros: string (nullable = true)
 |    |    |    |-- string_value: string (nullable = true)

I need to parse it using pyspark and the result dataframe should be like this:

event_date up_account_id_int up_account_id_set_timestamp_micros up_doc_id_set_timestamp_micros up_user_id_set_timestamp_micros up_user_id_string
20221208 3176465 1670450323992556 null 1670450323992556 430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d
20221207 null null 1670450329209558 1670450329209558 null

Any ideas on how can I accomplish it?

CodePudding user response:

You can use this function:

import org.apache.spark.sql.DataFrame

def flattenDataframe(df: DataFrame): DataFrame = {

val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
val length = fields.length

for (i <- 0 to fields.length - 1) {
  val field = fields(i)
  val fieldtype = field.dataType
  val fieldName = field.name
  fieldtype match {
    case arrayType: ArrayType =>
      val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName)
      val fieldNamesAndExplode = fieldNamesExcludingArray    Array(s"explode_outer($fieldName) as $fieldName")
      // val fieldNamesToSelect = (fieldNamesExcludingArray    Array(s"$fieldName.*"))
      val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
      return flattenDataframe(explodedDf)
    case structType: StructType =>
      val childFieldnames = structType.fieldNames.map(childname => fieldName   "."   childname)
      val newfieldNames = fieldNames.filter(_ != fieldName)    childFieldnames
      val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_"))))
      val explodedf = df.select(renamedcols: _*)
      return flattenDataframe(explodedf)
    case _ =>
  }
}
df
}

val flattendedJSON = flattenDataframe(df)

Before flattening:

 ---------- -------------------------------------------------------------------------------------------------------------------------------------------------------- 
|event_date|user_properties                                                                                                                                         |
 ---------- -------------------------------------------------------------------------------------------------------------------------------------------------------- 
|20221207  |[{user_id, {null, 1670450329209558, null}}, {doc_id, {null, 1670450329209558, null}}]                                                                   |
|20221208  |[{account_id, {3176465, 1670450323992556, null}}, {user_id, {null, 1670450323992556, 430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d}}]|
 ---------- -------------------------------------------------------------------------------------------------------------------------------------------------------- 

After flattening:

 ---------- ------------------- ------------------------------- ------------------------------------------ ---------------------------------------------------------------- 
|event_date|user_properties_key|user_properties_value_int_value|user_properties_value_set_timestamp_micros|user_properties_value_string_value                              |
 ---------- ------------------- ------------------------------- ------------------------------------------ ---------------------------------------------------------------- 
|20221207  |user_id            |null                           |1670450329209558                          |null                                                            |
|20221207  |doc_id             |null                           |1670450329209558                          |null                                                            |
|20221208  |account_id         |3176465                        |1670450323992556                          |null                                                            |
|20221208  |user_id            |null                           |1670450323992556                          |430fdfc579f55f9859173c1bea39713dc11c3ba62e83c24830e3d5936f43c26d|
 ---------- ------------------- ------------------------------- ------------------------------------------ ---------------------------------------------------------------- 

CodePudding user response:

First you can explode the array then flatten struct with select.

df = (df.select('event_date', F.explode('user_properties').alias('user_properties'))
      .select('event_date', 'user_properties.key', 'user_properties.value.*')
)

And it seems you are pivoting the data. This won't give you the exact dataframe as you posted but you should be able to transform it as you like.

df = (df.groupby('event_date')
      .pivot('key')
      .agg(F.max('int_value').alias('id_int'),
           F.max('set_timestamp_micros').alias('set_timestamp_micros'),
           F.max('string_value').alias('string')))
  • Related