I have a json file with the following schema:
`
root
|-- context: struct (nullable = true)
| |-- application: struct (nullable = true)
| | |-- version: string (nullable = true)
| |-- custom: struct (nullable = true)
| | |-- dimensions: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- Activity ID: string (nullable = true)
| | | | |-- Activity Type: string (nullable = true)
| | | | |-- Bot ID: string (nullable = true)
| | | | |-- Channel ID: string (nullable = true)
| | | | |-- Conversation ID: string (nullable = true)
| | | | |-- Correlation ID: string (nullable = true)
| | | | |-- From ID: string (nullable = true)
| | | | |-- Recipient ID: string (nullable = true)
| | | | |-- StatusCode: string (nullable = true)
| | | | |-- Timestamp: string (nullable = true)
| |-- data: struct (nullable = true)
| | |-- eventTime: string (nullable = true)
| | |-- isSynthetic: boolean (nullable = true)
| | |-- samplingRate: double (nullable = true)
| |-- device: struct (nullable = true)
| | |-- roleInstance: string (nullable = true)
| | |-- roleName: string (nullable = true)
| | |-- type: string (nullable = true)
| |-- location: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- clientip: string (nullable = true)
| | |-- continent: string (nullable = true)
| | |-- country: string (nullable = true)
| | |-- province: string (nullable = true)
| |-- operation: struct (nullable = true)
| | |-- id: string (nullable = true)
| | |-- parentId: string (nullable = true)
| |-- session: struct (nullable = true)
| | |-- isFirst: boolean (nullable = true)
|-- event: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- count: long (nullable = true)
| | |-- name: string (nullable = true)
|-- internal: struct (nullable = true)
| |-- data: struct (nullable = true)
| | |-- documentVersion: string (nullable = true)
| | |-- id: string (nullable = true)`
In this schema by using pyspark i need to extract only
Activity ID,Activity Type,Bot ID,Channel ID,Conversation ID,Correlation ID,From ID,Recipient ID,StatusCode,Timestamp
in a dataframe. How can i achive this by using Pyspark ?
JSON File:
{
"event": [
{
"name": "Activity",
"count": 1
}
],
"internal": {
"data": {
"id": "79baca55-d168-11ea-b166-6fc861e9e21c",
"documentVersion": "1.61"
}
},
"context": {
"application": {
"version": "Wed 07/22/2020 5:37:05.58 \r\nUTC (fv-az461) [Build 148886] [Repo Intercom] [Branch prod] [Commit XXX] \r\n[XX 1.6.20-140775] [XXX 1.3.27-144047] \r\n"
},
"data": {
"eventTime": "2020-07-29T06:55:15.6294636Z",
"isSynthetic": false,
"samplingRate": 100
},
"cloud": {},
"device": {
"type": "PC",
"roleName": "bc-directline-southindia",
"roleInstance": "RD0003FF905CCA",
"screenResolution": {}
},
"session": {
"isFirst": false
},
"operation": {
"id": "XXX",
"parentId": "|XXXX.c4cd9570_"
},
"location": {
"clientip": "0.0.0.0",
"continent": "XX",
"country": "XXX",
"province": "XXX",
"city": "XXX"
},
"custom": {
"dimensions": [
{
"Timestamp": "XXX"
},
{
"StatusCode": "200"
},
{
"Activity ID": "JoH4veTvChCCnzchOD1Lg-f|0000001"
},
{
"From ID": "XXX"
},
{
"Correlation ID": "|54734cb21ba7f143a72ddd03fc865669.c4cd9570_"
},
{
"Channel ID": "directline"
},
{
"Recipient ID": "XXXX"
},
{
"Bot ID": "XXXX"
},
{
"Activity Type": "message"
},
{
"Conversation ID": "XXX"
}
]
}
}
}
CodePudding user response:
The dimensions
schema is a challenging structure here. Each key-value is separated into its own object, resulting in creating many nulls.
My solution looks complex but it will eliminate those nulls without exploding the dataframe.
Steps:
- Convert the dimensions struct into MapType
- Combine all individual object (ex:
{ "Timestamp": "XXX" }, { "StatusCode": "200" }
) into 1 single Map ({"Timestamp": "XXX", "StatusCode": "200"}
) - Convert the MapType into struct and expand them into columns
df = spark.read.json('nested.json')
# Save dimesions's object schema for later use
dim_ele_schema = StructType.fromJson(
df.select('context.custom.dimensions').schema[0].jsonValue()['type']['elementType']
)
# Extract dimensions and convert it to MapType to aggregate
df = (df.select('context.custom.dimensions')
# Step 1
.withColumn('dim_map', F.from_json(F.to_json('dimensions'), ArrayType(MapType(StringType(), StringType()))))
# Step 2
.select(F.aggregate('dim_map',
F.create_map().cast("map<string,string>"),
lambda acc, x: F.map_concat(acc, x))
.alias('dim_map')))
# Step 3
df = (df.withColumn("dim", F.from_json(F.to_json("dim_map"), dim_ele_schema))
.select("dim.*"))
Result:
-------------------- ------------- ------ ---------- --------------- -------------------- ------- ------------ ---------- ---------
| Activity ID|Activity Type|Bot ID|Channel ID|Conversation ID| Correlation ID|From ID|Recipient ID|StatusCode|Timestamp|
-------------------- ------------- ------ ---------- --------------- -------------------- ------- ------------ ---------- ---------
|JoH4veTvChCCnzchO...| message| XXXX|directline| XXX||54734cb21ba7f143...| XXX| XXXX| 200| XXX|
-------------------- ------------- ------ ---------- --------------- -------------------- ------- ------------ ---------- ---------
Explanation:
F.aggregate(array, initial value, merge function)
This function will take array and aggregate (similar to reduce
in Python). What I am trying here is to merge individual dicts(Map) to 1 Map with an initial value as empty Map. F.create_map().cast("map<string,string>")
will generate empty Map with type String for both keys and values. Then for merge function I use map_concat
to concatenate 2 Maps (Main one that is concatenated over and over again and each individual Map).
ref: https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.map_concat.html
CodePudding user response:
You can do a nested select on your df, then convert name struct type into columns using ".*"
# prepare data
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.read.option("multiline", "true").json("./ressources/75061097.json")
# Processing
result = df.withColumn("id", monotonically_increasing_id()) \
.select("id", explode(col("context.custom.dimensions"))).select("id", "col.*") \
.groupby("id").agg(first(col('Activity ID'), ignorenulls=True).alias("Activity ID"),
first(col("Activity Type"), ignorenulls=True).alias("Activity Type"),
first(col("Bot ID"), ignorenulls=True).alias("Bot ID"),
first(col("Channel ID"), ignorenulls=True).alias("Channel ID"),
first(col("Conversation ID"), ignorenulls=True).alias("Conversation ID"),
first(col("Correlation ID"), ignorenulls=True).alias("Correlation ID"),
first(col("From ID"), ignorenulls=True).alias("From ID"),
first(col("Recipient ID"), ignorenulls=True).alias("Recipient ID"),
first(col("StatusCode"), ignorenulls=True).alias("StatusCode"),
first(col("Timestamp"), ignorenulls=True).alias("Timestamp"),
).drop("id")
result.show(truncate=False)
------------------------------- ------------- ------ ---------- --------------- ------------------------------------------- ------- ------------ ---------- ---------
|Activity ID |Activity Type|Bot ID|Channel ID|Conversation ID|Correlation ID |From ID|Recipient ID|StatusCode|Timestamp|
------------------------------- ------------- ------ ---------- --------------- ------------------------------------------- ------- ------------ ---------- ---------
|JoH4veTvChCCnzchOD1Lg-f|0000001|message |XXXX |directline|XXX ||54734cb21ba7f143a72ddd03fc865669.c4cd9570_|XXX |XXXX |200 |XXX |
------------------------------- ------------- ------ ---------- --------------- ------------------------------------------- ------- ------------ ---------- ---------