Hello I have nested json files with size of 400 megabytes with 200k records.I created a solution using pyspark to parse the file and store in a customized dataframe , but it takes about 5-7 minutes to do this operation which is very slow.
Here is an example of a json file (small one but with same structure as the large ones) :
{"status":"success",
"data":{"resultType":"matrix","result":
[{"metric":{"data0":"T" ,"data1":"O"},"values":[[90,"0"],[80, "0"]]},
{"metric":{"data0":"K" ,"data1":"S"},"values":[[70,"0"],[60, "0"]]},
{"metric":{"data2":"J" ,"data3":"O"},"values":[[50,"0"],[40, "0"]]}]}}
Here is the structure of the output dataframe I want :
time | value |data0 | data1 | data2 | data3
90 | "0" | "T"| "O"| nan | nan
80 | "0" | "T"| "O"| nan | nan
70 | "0" | "K"| "S"| nan | nan
60 | "0" | "K"| "S"| nan | nan
50 | "0" | nan| nan| "J" | "O"
40 | "0" | nan| nan| "J" | "O"
and this is the pyspark code I used to on the large file to produce the structure of the dataframe listed above:
from datetime import datetime
import json
import rapidjson
import pyspark.sql.functions as F
from pyspark.sql.types import StructType
from util import schema ,meta_date
new_schema = StructType.fromJson(json.loads(schema))
with open("largefile.json", "r") as json_file:
result_count = len(rapidjson.load(json_file)["data"]["result"])
spark = SparkSession.builder.master("spark://IP").getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '5g'),
('spark.executor.cores', '4'),
('spark.driver.memory', '4g'),
])
spark.sparkContext.stop()
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.json("largefile.json")
for data_name in meta_date:
df = df.withColumn(
data_name, F.expr(f"transform(data.result, x -> x.metric.{data_name})")
)
df = (
df.withColumn("values", F.expr("transform(data.result, x -> x.values)"))
.withColumn("items", F.array(*[F.lit(x) for x in range(0, result_count)]))
.withColumn("items", F.explode(F.col("items")))
)
for data_name in meta_date:
df = df.withColumn(data_name, F.col(data_name).getItem(F.col("items")))
df = (df.withColumn("values", F.col("values").getItem(F.col("items")))
.withColumn("values", F.explode("values"))
.withColumn("time", F.col("values").getItem(0))
.withColumn("value", F.col("values").getItem(1))
.drop("data", "status", "items", "values")).show()
My machine has 4 cores (8 logical cores) and memory 16 gb . I'm using the standalone mode with cluster of master and 2 worker nodes.
Any help on how to speed up this process either by editing the cluster configurations or refactoring the transformations in the code?
CodePudding user response:
What about this? Read json, select columns with explode and it looks like match with your desired result.
df.select(f.explode('data.result').alias('result')) \
.select('result.metric.*', f.explode('result.values').alias('values')) \
.withColumn('time', f.col('values')[0]) \
.withColumn('value', f.col('values')[1]) \
.drop('values') \
.show(truncate=False)
----- ----- ----- ----- ---- -----
|data0|data1|data2|data3|time|value|
----- ----- ----- ----- ---- -----
|T |O |null |null |90 |0 |
|T |O |null |null |80 |0 |
|K |S |null |null |70 |0 |
|K |S |null |null |60 |0 |
|null |null |J |O |50 |0 |
|null |null |J |O |40 |0 |
----- ----- ----- ----- ---- -----