I have a multiline JSON file which I am reading using pyspark (Spark 3.0 and above). The final goal is to be able to load the JSON into a postgres db and run some queries on the data.
I am using a 2 step approach. First clean the RAW JSON file (with only required fields) and store it as parquet or JSON. Second load this cleansed data into postgres. Below is the code to load the JSON file and the count of records in the file.
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName('Sample').getOrCreate()
df_source = spark.read.option("multiline",True).json('data.json')
print('Row Count', df_source.count())
Row Count 1
Below is the schema of the Dataframe
df_source.printSchema()
root
|-- data: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
|-- meta: struct (nullable = true)
| |-- view: struct (nullable = true)
| | |-- category: string (nullable = true)
| | |-- columns: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- dataTypeName: string (nullable = true)
| | | | |-- description: string (nullable = true)
| | | | |-- fieldName: string (nullable = true)
| | | | |-- id: long (nullable = true)
| | | | |-- position: long (nullable = true)
| | |-- createdAt: long (nullable = true)
| | |-- description: string (nullable = true)
| | |-- downloadCount: long (nullable = true)
So the file consists of data tag consisting of the actual data as array and meta tag consisting of the metadata information about the actual data.
Can anyone suggest a way to extract the data from the above dataframe into a separate data frame so that I can write the final dataset?
Sample JSON data as below:
{
"meta" : {
"view" : {
"category" : "This is the view category",
"createdAt" : 1439381433,
"description" : "The data is a sample subset of the actual data",
"downloadCount" : 33858,
"columns" : [ {
"id" : -1,
"dataTypeName" : "text",
"fieldName" : "sid",
"position" : 1,
"description" : "meta_data"
}, {
"id" : -10,
"dataTypeName" : "text",
"fieldName" : "id",
"position" : 2,
"description" : "meta_data"
}, {
"id" : -20,
"dataTypeName" : "long",
"fieldName" : "created_at",
"position" : 3,
"description" : "meta_data"
}, {
"id" : -30,
"dataTypeName" : "long",
"fieldName" : "updated_at",
"position" : 4,
"description" : "meta_data"
}, {
"id" : 217182091,
"dataTypeName" : "text",
"fieldName" : "measureid",
"position" : 5,
"description" : "Unique measure id"
}, {
"id" : 217182092,
"dataTypeName" : "text",
"fieldName" : "measurename",
"position" : 6,
"description" : "Unique measure name"
}, {
"id" : 217182093,
"dataTypeName" : "text",
"fieldName" : "measuretype",
"position" : 7,
"description" : "The type of measure"
}, {
"id" : 217182100,
"dataTypeName" : "text",
"fieldName" : "reportyear",
"position" : 8,
"description" : "year on which reported"
}, {
"id" : 217182100,
"dataTypeName" : "text",
"fieldName" : "value",
"position" : 9,
"description" : "Value of measure"
} ]
}
},
"data" : [ [ "row-8eh8_xxkx-u3mq", "00000000-0000-0000-A1B7-70E47BCE5354", 1439382361, 1439382361, "83", "Number of days", "Counts", "1999", "33" ]
, [ "row-u2v5_78j5-pxk4", "00000000-0000-0000-260A-99DE31733069", 1439382361, 1439382361, "83", "Number of days", "Counts", "2000", "40" ]
, [ "row-68zj_7qfn-sxwu", "00000000-0000-0000-AA6F-0AA88BE0BC18", 1439382361, 1439382361, "83", "Number of days", "Counts", "2002", "39" ]
, [ "row-zziv.xdnh-rsv4", "00000000-0000-0000-D103-71CF4022F146", 1439382361, 1439382361, "85", "Percent of days", "Percent", "1999", "2" ]
, [ "row-8dia~i5sg-v6cj", "00000000-0000-0000-1A71-DE17F79EC965", 1439382361, 1439382361, "86", "Person-days", "Counts", "2006", "5" ]
, [ "row-r7kk_e3dm-z22z", "00000000-0000-0000-B536-48BC9313E20F", 1439382361, 1439382361, "83", "Number of days", "Counts", "2006", "67" ]
, [ "row-mst5-k3ph~ikp3", "00000000-0000-0000-7BD9-A3C1B223ECFE", 1439382361, 1439382361, "86", "Person-days", "Counts""2001", "9" ]
]
}
CodePudding user response:
Explode the data frame column data
then you will get an array and can be accessed by index.
Example:
from pyspark.sql.functions import *
df=spark.read.option("multiLine",True).json("data.json").select(explode("data"))
df.select("col").show(10,False)
# -----------------------------------------------------------------------------------------------------------------------
#|col |
# -----------------------------------------------------------------------------------------------------------------------
#|[row-8eh8_xxkx-u3mq, 00000000-0000-0000-A1B7-70E47BCE5354, 1439382361, 1439382361, 83, Numberofdays, Counts, 1999, 33] |
#|[row-u2v5_78j5-pxk4, 00000000-0000-0000-260A-99DE31733069, 1439382361, 1439382361, 83, Numberofdays, Counts, 2000, 40] |
#|[row-68zj_7qfn-sxwu, 00000000-0000-0000-AA6F-0AA88BE0BC18, 1439382361, 1439382361, 83, Numberofdays, Counts, 2002, 39] |
#|[row-zziv.xdnh-rsv4, 00000000-0000-0000-D103-71CF4022F146, 1439382361, 1439382361, 85, Percentofdays, Percent, 1999, 2]|
#|[row-8dia~i5sg-v6cj, 00000000-0000-0000-1A71-DE17F79EC965, 1439382361, 1439382361, 86, Person-days, Counts, 2006, 5] |
#|[row-r7kk_e3dm-z22z, 00000000-0000-0000-B536-48BC9313E20F, 1439382361, 1439382361, 83, Numberofdays, Counts, 2006, 67] |
#|[row-mst5-k3ph~ikp3, 00000000-0000-0000-7BD9-A3C1B223ECFE, 1439382361, 1439382361, 86, Person-days, Counts, 2001, 9] |
-----------------------------------------------------------------------------------------------------------------------
#accessing data by index
df.select(col("col").getItem(0)).show(10,False)
# ------------------
#|col[0] |
# ------------------
#|row-8eh8_xxkx-u3mq|
#|row-u2v5_78j5-pxk4|
#|row-68zj_7qfn-sxwu|
#|row-zziv.xdnh-rsv4|
#|row-8dia~i5sg-v6cj|
#|row-r7kk_e3dm-z22z|
#|row-mst5-k3ph~ikp3|
# ------------------
CodePudding user response:
You can first, get the column names (fieldName
) and their positions (position
) from the meta
field, then explode the column data
to get each row as an array. To transform the arrays into multiple columns you use the positions and names you get from the meta
field:
import pyspark.sql.functions as F
columns = [
F.col("row")[r.position-1].alias(r.fieldName) for r in
df_source.select(F.expr("inline(meta.view.columns)")).select("fieldName", "position").collect()
]
df_clean = df_source.select(F.explode("data").alias("row")).select(*columns)
df_clean.show(truncate=False)
# ------------------ ------------------------------------ ---------- ---------- --------- --------------- ----------- ---------- -----
#|sid |id |created_at|updated_at|measureid|measurename |measuretype|reportyear|value|
# ------------------ ------------------------------------ ---------- ---------- --------- --------------- ----------- ---------- -----
#|row-8eh8_xxkx-u3mq|00000000-0000-0000-A1B7-70E47BCE5354|1439382361|1439382361|83 |Number of days |Counts |1999 |33 |
#|row-u2v5_78j5-pxk4|00000000-0000-0000-260A-99DE31733069|1439382361|1439382361|83 |Number of days |Counts |2000 |40 |
#|row-68zj_7qfn-sxwu|00000000-0000-0000-AA6F-0AA88BE0BC18|1439382361|1439382361|83 |Number of days |Counts |2002 |39 |
#|row-zziv.xdnh-rsv4|00000000-0000-0000-D103-71CF4022F146|1439382361|1439382361|85 |Percent of days|Percent |1999 |2 |
#|row-8dia~i5sg-v6cj|00000000-0000-0000-1A71-DE17F79EC965|1439382361|1439382361|86 |Person-days |Counts |2006 |5 |
#|row-r7kk_e3dm-z22z|00000000-0000-0000-B536-48BC9313E20F|1439382361|1439382361|83 |Number of days |Counts |2006 |67 |
#|row-mst5-k3ph~ikp3|00000000-0000-0000-7BD9-A3C1B223ECFE|1439382361|1439382361|86 |Person-days |Counts |2001 |9 |
# ------------------ ------------------------------------ ---------- ---------- --------- --------------- ----------- ---------- -----