By using PySpark how to parse nested json


I have a json file with the following schema:


|-- context: struct (nullable = true)
|    |-- application: struct (nullable = true)
|    |    |-- version: string (nullable = true)
|    |-- custom: struct (nullable = true)
|    |    |-- dimensions: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- Activity ID: string (nullable = true)
|    |    |    |    |-- Activity Type: string (nullable = true)
|    |    |    |    |-- Bot ID: string (nullable = true)
|    |    |    |    |-- Channel ID: string (nullable = true)
|    |    |    |    |-- Conversation ID: string (nullable = true)
|    |    |    |    |-- Correlation ID: string (nullable = true)
|    |    |    |    |-- From ID: string (nullable = true)
|    |    |    |    |-- Recipient ID: string (nullable = true)
|    |    |    |    |-- StatusCode: string (nullable = true)
|    |    |    |    |-- Timestamp: string (nullable = true)
|    |-- data: struct (nullable = true)
|    |    |-- eventTime: string (nullable = true)
|    |    |-- isSynthetic: boolean (nullable = true)
|    |    |-- samplingRate: double (nullable = true)
|    |-- device: struct (nullable = true)
|    |    |-- roleInstance: string (nullable = true)
|    |    |-- roleName: string (nullable = true)
|    |    |-- type: string (nullable = true)
|    |-- location: struct (nullable = true)
|    |    |-- city: string (nullable = true)
|    |    |-- clientip: string (nullable = true)
|    |    |-- continent: string (nullable = true)
|    |    |-- country: string (nullable = true)
|    |    |-- province: string (nullable = true)
|    |-- operation: struct (nullable = true)
|    |    |-- id: string (nullable = true)
|    |    |-- parentId: string (nullable = true)
|    |-- session: struct (nullable = true)
|    |    |-- isFirst: boolean (nullable = true)
|-- event: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- count: long (nullable = true)
|    |    |-- name: string (nullable = true)
|-- internal: struct (nullable = true)
|    |-- data: struct (nullable = true)
|    |    |-- documentVersion: string (nullable = true)
|    |    |-- id: string (nullable = true)`

In this schema by using pyspark i need to extract only

Activity ID,Activity Type,Bot ID,Channel ID,Conversation ID,Correlation ID,From ID,Recipient ID,StatusCode,Timestamp

in a dataframe. How can i achive this by using Pyspark ?

JSON File:

  "event": [
      "name": "Activity",
      "count": 1
  "internal": {
    "data": {
      "id": "79baca55-d168-11ea-b166-6fc861e9e21c",
      "documentVersion": "1.61"
  "context": {
    "application": {
      "version": "Wed 07/22/2020  5:37:05.58 \r\nUTC (fv-az461) [Build 148886] [Repo Intercom] [Branch prod] [Commit XXX] \r\n[XX 1.6.20-140775]  [XXX 1.3.27-144047] \r\n"
    "data": {
      "eventTime": "2020-07-29T06:55:15.6294636Z",
      "isSynthetic": false,
      "samplingRate": 100
    "cloud": {},
    "device": {
      "type": "PC",
      "roleName": "bc-directline-southindia",
      "roleInstance": "RD0003FF905CCA",
      "screenResolution": {}
    "session": {
      "isFirst": false
    "operation": {
      "id": "XXX",
      "parentId": "|XXXX.c4cd9570_"
    "location": {
      "clientip": "",
      "continent": "XX",
      "country": "XXX",
      "province": "XXX",
      "city": "XXX"
    "custom": {
      "dimensions": [
          "Timestamp": "XXX"
          "StatusCode": "200"
          "Activity ID": "JoH4veTvChCCnzchOD1Lg-f|0000001"
          "From ID": "XXX"
          "Correlation ID": "|54734cb21ba7f143a72ddd03fc865669.c4cd9570_"
          "Channel ID": "directline"
          "Recipient ID": "XXXX"
          "Bot ID": "XXXX"
          "Activity Type": "message"
          "Conversation ID": "XXX"

The dimensions schema is a challenging structure here. Each key-value is separated into its own object, resulting in creating many nulls.

My solution looks complex but it will eliminate those nulls without exploding the dataframe.


  1. Convert the dimensions struct into MapType
  2. Combine all individual object (ex: { "Timestamp": "XXX" }, { "StatusCode": "200" }) into 1 single Map ({"Timestamp": "XXX", "StatusCode": "200"})
  3. Convert the MapType into struct and expand them into columns
df = spark.read.json('nested.json')

# Save dimesions's object schema for later use
dim_ele_schema = StructType.fromJson(

# Extract dimensions and convert it to MapType to aggregate
df = (df.select('context.custom.dimensions')
      # Step 1
      .withColumn('dim_map', F.from_json(F.to_json('dimensions'), ArrayType(MapType(StringType(), StringType()))))
      # Step 2
                          lambda acc, x: F.map_concat(acc, x))

# Step 3
df = (df.withColumn("dim", F.from_json(F.to_json("dim_map"), dim_ele_schema))


 -------------------- ------------- ------ ---------- --------------- -------------------- ------- ------------ ---------- --------- 
|         Activity ID|Activity Type|Bot ID|Channel ID|Conversation ID|      Correlation ID|From ID|Recipient ID|StatusCode|Timestamp|
 -------------------- ------------- ------ ---------- --------------- -------------------- ------- ------------ ---------- --------- 
|JoH4veTvChCCnzchO...|      message|  XXXX|directline|            XXX||54734cb21ba7f143...|    XXX|        XXXX|       200|      XXX|
 -------------------- ------------- ------ ---------- --------------- -------------------- ------- ------------ ---------- --------- 


F.aggregate(array, initial value, merge function)

This function will take array and aggregate (similar to reduce in Python). What I am trying here is to merge individual dicts(Map) to 1 Map with an initial value as empty Map. F.create_map().cast("map<string,string>") will generate empty Map with type String for both keys and values. Then for merge function I use map_concat to concatenate 2 Maps (Main one that is concatenated over and over again and each individual Map).

ref: https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.map_concat.html

You can do a nested select on your df, then convert name struct type into columns using ".*"

# prepare data
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.read.option("multiline", "true").json("./ressources/75061097.json")

# Processing
result = df.withColumn("id", monotonically_increasing_id()) \
    .select("id", explode(col("context.custom.dimensions"))).select("id", "col.*") \
    .groupby("id").agg(first(col('Activity ID'), ignorenulls=True).alias("Activity ID"),
                       first(col("Activity Type"), ignorenulls=True).alias("Activity Type"),
                       first(col("Bot ID"), ignorenulls=True).alias("Bot ID"),
                       first(col("Channel ID"), ignorenulls=True).alias("Channel ID"),
                       first(col("Conversation ID"), ignorenulls=True).alias("Conversation ID"),
                       first(col("Correlation ID"), ignorenulls=True).alias("Correlation ID"),
                       first(col("From ID"), ignorenulls=True).alias("From ID"),
                       first(col("Recipient ID"), ignorenulls=True).alias("Recipient ID"),
                       first(col("StatusCode"), ignorenulls=True).alias("StatusCode"),
                       first(col("Timestamp"), ignorenulls=True).alias("Timestamp"),

 ------------------------------- ------------- ------ ---------- --------------- ------------------------------------------- ------- ------------ ---------- --------- 
|Activity ID                    |Activity Type|Bot ID|Channel ID|Conversation ID|Correlation ID                             |From ID|Recipient ID|StatusCode|Timestamp|
 ------------------------------- ------------- ------ ---------- --------------- ------------------------------------------- ------- ------------ ---------- --------- 
|JoH4veTvChCCnzchOD1Lg-f|0000001|message      |XXXX  |directline|XXX            ||54734cb21ba7f143a72ddd03fc865669.c4cd9570_|XXX    |XXXX        |200       |XXX      |
 ------------------------------- ------------- ------ ---------- --------------- ------------------------------------------- ------- ------------ ---------- --------- 
