Home > Mobile >  Pyspark parse highly nested json (Prometheus)
Pyspark parse highly nested json (Prometheus)

Time:11-22

I would really love some help with parsing nested JSON data using PySpark-SQL because I'm new to PySpark. The data has the following schema:

Schema

root

|-- data: struct (nullable = true)

|    |-- result: array (nullable = true)

|    |    |-- element: struct (containsNull = true)

|    |    |    |-- metric: struct (nullable = true)

|    |    |    |    |-- data0: string (nullable = true)

|    |    |    |    |-- data1: string (nullable = true)

|    |    |    |    |-- data2: string (nullable = true)

|    |    |    |    |-- data3: string (nullable = true)

|    |    |    |-- values: array (nullable = true)

|    |    |    |    |-- element: array (containsNull = true)

|    |    |    |    |    |-- element: string (containsNull = true)

|    |-- resultType: string (nullable = true)

|-- status: string (nullable = true)

This is an example of the JSON file (input):

{"status":"success",

"data":{"resultType":"matrix","result":

[{"metric":{"data0":"T" ,"data1":"O"},"values":[[90,"0"],[80, "0"]]},

{"metric":{"data0":"K" ,"data1":"S"},"values":[[70,"0"],[60, "0"]]},

{"metric":{"data2":"J" ,"data3":"O"},"values":[[50,"0"],[40, "0"]]}]}}

My Goals I would essentially want to get the data into the following data frames:

1-

data0 | data1 | data2 | data3 |values

example output dataframe:

data0  | data1 | data2  | data3 | values
"T"    |   "O" |    nan |    nan|   [90,"0"],[80, "0"]
"K"    |   "S" |    nan |    nan|   [70,"0"],[60, "0"]
nan    |   nan |    "J" |    "O"|   [50,"0"],[40, "0"]

2-

time | value | data0 | data1 | data2 | data3

example output dataframe

time | value |data0 | data1 | data2  | data3 
90   |   "0" |   "T"|    "O"|   nan  | nan
80   |   "0" |   "T"|    "O"|   nan  | nan
70   |   "0" |   "K"|    "S"|   nan  | nan
60   |   "0" |   "K"|    "S"|   nan  | nan
50   |   "0" |   nan|    nan|   "J"  | "O"
40   |   "0" |   nan|    nan|   "J"  | "O"

Also , if there are any ways to speed up this process using spark's parallelism capabilities , that would be great because the parsed json files are in gigabytes.

CodePudding user response:

To get the first dataframe, you could use:

df = (
    df.withColumn("data0", F.expr("transform(data.result, x -> x.metric.data0)"))
    .withColumn("data1", F.expr("transform(data.result, x -> x.metric.data1)"))
    .withColumn("data2", F.expr("transform(data.result, x -> x.metric.data2)"))
    .withColumn("data3", F.expr("transform(data.result, x -> x.metric.data3)"))
    .withColumn("values", F.expr("transform(data.result, x -> x.values)"))
    .withColumn("items", F.array(F.lit(0), F.lit(1), F.lit(2)))
    .withColumn("items", F.explode(F.col("items")))
    .withColumn("data0", F.col("data0").getItem(F.col("items")))
    .withColumn("data1", F.col("data1").getItem(F.col("items")))
    .withColumn("data2", F.col("data2").getItem(F.col("items")))
    .withColumn("data3", F.col("data3").getItem(F.col("items")))
    .withColumn("values", F.col("values").getItem(F.col("items")))
    .drop("data", "status", "items")
)

Result:

root
 |-- data0: string (nullable = true)
 |-- data1: string (nullable = true)
 |-- data2: string (nullable = true)
 |-- data3: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

 ----- ----- ----- ----- ------------------ 
|data0|data1|data2|data3|values            |
 ----- ----- ----- ----- ------------------ 
|T    |O    |null |null |[[90, 0], [80, 0]]|
|K    |S    |null |null |[[70, 0], [60, 0]]|
|null |null |J    |O    |[[50, 0], [40, 0]]|
 ----- ----- ----- ----- ------------------ 

To get the second, it's the same but with additional explode for values:

df = (
    df.withColumn("data0", F.expr("transform(data.result, x -> x.metric.data0)"))
    .withColumn("data1", F.expr("transform(data.result, x -> x.metric.data1)"))
    .withColumn("data2", F.expr("transform(data.result, x -> x.metric.data2)"))
    .withColumn("data3", F.expr("transform(data.result, x -> x.metric.data3)"))
    .withColumn("values", F.expr("transform(data.result, x -> x.values)"))
    .withColumn("items", F.array(F.lit(0), F.lit(1), F.lit(2)))
    .withColumn("items", F.explode(F.col("items")))
    .withColumn("data0", F.col("data0").getItem(F.col("items")))
    .withColumn("data1", F.col("data1").getItem(F.col("items")))
    .withColumn("data2", F.col("data2").getItem(F.col("items")))
    .withColumn("data3", F.col("data3").getItem(F.col("items")))
    .withColumn("values", F.col("values").getItem(F.col("items")))
    .withColumn("values", F.explode("values"))
    .withColumn("time", F.col("values").getItem(0))
    .withColumn("value", F.col("values").getItem(1))
    .drop("data", "status", "items", "values")
)

Result:

root
 |-- data0: string (nullable = true)
 |-- data1: string (nullable = true)
 |-- data2: string (nullable = true)
 |-- data3: string (nullable = true)
 |-- time: string (nullable = true)
 |-- value: string (nullable = true)

 ----- ----- ----- ----- ---- ----- 
|data0|data1|data2|data3|time|value|
 ----- ----- ----- ----- ---- ----- 
|T    |O    |null |null |90  |0    |
|T    |O    |null |null |80  |0    |
|K    |S    |null |null |70  |0    |
|K    |S    |null |null |60  |0    |
|null |null |J    |O    |50  |0    |
|null |null |J    |O    |40  |0    |
 ----- ----- ----- ----- ---- ----- 

  • Update:

Example of automating data names:

data_names = []
with open("test.json", "r") as f_in:
    raw_data = json.load(f_in)
    for item in raw_data["data"]["result"]:
        for key in item["metric"].keys():
            if key not in data_names:
                data_names.append(key)

spark = SparkSession.builder.getOrCreate()
df = spark.read.option("multiline", True).json("test.json")

for data_name in data_names:
    df = df.withColumn(
        data_name, F.expr(f"transform(data.result, x -> x.metric.{data_name})")
    )

df = (
    df.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
    .withColumn("items", F.array(F.lit(0), F.lit(1), F.lit(2)))
    .withColumn("items", F.explode(F.col("items")))
)

for data_name in data_names:
    df = df.withColumn(data_name, F.col(data_name).getItem(F.col("items")))

df = df.withColumn("values", F.col("values").getItem(F.col("items"))).drop(
    "data", "status", "items"
)

The result is the first dataframe (same as above)

  • Related