Home > Blockchain >  By using PySpark how to parse nested json
By using PySpark how to parse nested json

Time:01-10

I have a json file with the following schema:

`

root
|-- context: struct (nullable = true)
|    |-- application: struct (nullable = true)
|    |    |-- version: string (nullable = true)
|    |-- custom: struct (nullable = true)
|    |    |-- dimensions: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- Activity ID: string (nullable = true)
|    |    |    |    |-- Activity Type: string (nullable = true)
|    |    |    |    |-- Bot ID: string (nullable = true)
|    |    |    |    |-- Channel ID: string (nullable = true)
|    |    |    |    |-- Conversation ID: string (nullable = true)
|    |    |    |    |-- Correlation ID: string (nullable = true)
|    |    |    |    |-- From ID: string (nullable = true)
|    |    |    |    |-- Recipient ID: string (nullable = true)
|    |    |    |    |-- StatusCode: string (nullable = true)
|    |    |    |    |-- Timestamp: string (nullable = true)
|    |-- data: struct (nullable = true)
|    |    |-- eventTime: string (nullable = true)
|    |    |-- isSynthetic: boolean (nullable = true)
|    |    |-- samplingRate: double (nullable = true)
|    |-- device: struct (nullable = true)
|    |    |-- roleInstance: string (nullable = true)
|    |    |-- roleName: string (nullable = true)
|    |    |-- type: string (nullable = true)
|    |-- location: struct (nullable = true)
|    |    |-- city: string (nullable = true)
|    |    |-- clientip: string (nullable = true)
|    |    |-- continent: string (nullable = true)
|    |    |-- country: string (nullable = true)
|    |    |-- province: string (nullable = true)
|    |-- operation: struct (nullable = true)
|    |    |-- id: string (nullable = true)
|    |    |-- parentId: string (nullable = true)
|    |-- session: struct (nullable = true)
|    |    |-- isFirst: boolean (nullable = true)
|-- event: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- count: long (nullable = true)
|    |    |-- name: string (nullable = true)
|-- internal: struct (nullable = true)
|    |-- data: struct (nullable = true)
|    |    |-- documentVersion: string (nullable = true)
|    |    |-- id: string (nullable = true)`

In this schema by using pyspark i need to extract only

Activity ID,Activity Type,Bot ID,Channel ID,Conversation ID,Correlation ID,From ID,Recipient ID,StatusCode,Timestamp

in a dataframe. How can i achive this by using Pyspark ?

JSON File:

{
  "event": [
    {
      "name": "Activity",
      "count": 1
    }
  ],
  "internal": {
    "data": {
      "id": "79baca55-d168-11ea-b166-6fc861e9e21c",
      "documentVersion": "1.61"
    }
  },
  "context": {
    "application": {
      "version": "Wed 07/22/2020  5:37:05.58 \r\nUTC (fv-az461) [Build 148886] [Repo Intercom] [Branch prod] [Commit XXX] \r\n[XX 1.6.20-140775]  [XXX 1.3.27-144047] \r\n"
    },
    "data": {
      "eventTime": "2020-07-29T06:55:15.6294636Z",
      "isSynthetic": false,
      "samplingRate": 100
    },
    "cloud": {},
    "device": {
      "type": "PC",
      "roleName": "bc-directline-southindia",
      "roleInstance": "RD0003FF905CCA",
      "screenResolution": {}
    },
    "session": {
      "isFirst": false
    },
    "operation": {
      "id": "XXX",
      "parentId": "|XXXX.c4cd9570_"
    },
    "location": {
      "clientip": "0.0.0.0",
      "continent": "XX",
      "country": "XXX",
      "province": "XXX",
      "city": "XXX"
    },
    "custom": {
      "dimensions": [
        {
          "Timestamp": "XXX"
        },
        {
          "StatusCode": "200"
        },
        {
          "Activity ID": "JoH4veTvChCCnzchOD1Lg-f|0000001"
        },
        {
          "From ID": "XXX"
        },
        {
          "Correlation ID": "|54734cb21ba7f143a72ddd03fc865669.c4cd9570_"
        },
        {
          "Channel ID": "directline"
        },
        {
          "Recipient ID": "XXXX"
        },
        {
          "Bot ID": "XXXX"
        },
        {
          "Activity Type": "message"
        },
        {
          "Conversation ID": "XXX"
        }
      ]
    }
  }
}

CodePudding user response:

The dimensions schema is a challenging structure here. Each key-value is separated into its own object, resulting in creating many nulls.

My solution looks complex but it will eliminate those nulls without exploding the dataframe.

Steps:

  1. Convert the dimensions struct into MapType
  2. Combine all individual object (ex: { "Timestamp": "XXX" }, { "StatusCode": "200" }) into 1 single Map ({"Timestamp": "XXX", "StatusCode": "200"})
  3. Convert the MapType into struct and expand them into columns
df = spark.read.json('nested.json')

# Save dimesions's object schema for later use
dim_ele_schema = StructType.fromJson(
    df.select('context.custom.dimensions').schema[0].jsonValue()['type']['elementType']
)

# Extract dimensions and convert it to MapType to aggregate
df = (df.select('context.custom.dimensions')
      # Step 1
      .withColumn('dim_map', F.from_json(F.to_json('dimensions'), ArrayType(MapType(StringType(), StringType()))))
      # Step 2
      .select(F.aggregate('dim_map', 
                          F.create_map().cast("map<string,string>"),
                          lambda acc, x: F.map_concat(acc, x))
              .alias('dim_map')))

# Step 3
df = (df.withColumn("dim", F.from_json(F.to_json("dim_map"), dim_ele_schema))
      .select("dim.*"))

Result:

 -------------------- ------------- ------ ---------- --------------- -------------------- ------- ------------ ---------- --------- 
|         Activity ID|Activity Type|Bot ID|Channel ID|Conversation ID|      Correlation ID|From ID|Recipient ID|StatusCode|Timestamp|
 -------------------- ------------- ------ ---------- --------------- -------------------- ------- ------------ ---------- --------- 
|JoH4veTvChCCnzchO...|      message|  XXXX|directline|            XXX||54734cb21ba7f143...|    XXX|        XXXX|       200|      XXX|
 -------------------- ------------- ------ ---------- --------------- -------------------- ------- ------------ ---------- --------- 

Explanation:

F.aggregate(array, initial value, merge function)

This function will take array and aggregate (similar to reduce in Python). What I am trying here is to merge individual dicts(Map) to 1 Map with an initial value as empty Map. F.create_map().cast("map<string,string>") will generate empty Map with type String for both keys and values. Then for merge function I use map_concat to concatenate 2 Maps (Main one that is concatenated over and over again and each individual Map).

ref: https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.map_concat.html

CodePudding user response:

You can do a nested select on your df, then convert name struct type into columns using ".*"

# prepare data
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.read.option("multiline", "true").json("./ressources/75061097.json")

# Processing
result = df.withColumn("id", monotonically_increasing_id()) \
    .select("id", explode(col("context.custom.dimensions"))).select("id", "col.*") \
    .groupby("id").agg(first(col('Activity ID'), ignorenulls=True).alias("Activity ID"),
                       first(col("Activity Type"), ignorenulls=True).alias("Activity Type"),
                       first(col("Bot ID"), ignorenulls=True).alias("Bot ID"),
                       first(col("Channel ID"), ignorenulls=True).alias("Channel ID"),
                       first(col("Conversation ID"), ignorenulls=True).alias("Conversation ID"),
                       first(col("Correlation ID"), ignorenulls=True).alias("Correlation ID"),
                       first(col("From ID"), ignorenulls=True).alias("From ID"),
                       first(col("Recipient ID"), ignorenulls=True).alias("Recipient ID"),
                       first(col("StatusCode"), ignorenulls=True).alias("StatusCode"),
                       first(col("Timestamp"), ignorenulls=True).alias("Timestamp"),
                       ).drop("id")
result.show(truncate=False)


 ------------------------------- ------------- ------ ---------- --------------- ------------------------------------------- ------- ------------ ---------- --------- 
|Activity ID                    |Activity Type|Bot ID|Channel ID|Conversation ID|Correlation ID                             |From ID|Recipient ID|StatusCode|Timestamp|
 ------------------------------- ------------- ------ ---------- --------------- ------------------------------------------- ------- ------------ ---------- --------- 
|JoH4veTvChCCnzchOD1Lg-f|0000001|message      |XXXX  |directline|XXX            ||54734cb21ba7f143a72ddd03fc865669.c4cd9570_|XXX    |XXXX        |200       |XXX      |
 ------------------------------- ------------- ------ ---------- --------------- ------------------------------------------- ------- ------------ ---------- --------- 
  • Related