Converting any nested Dictionary to Spark DataFrame.
- Using the dict object into rdd into DF
json_={
"filename": "some_file.csv",
"md5": "md5 hash",
"client_id": "some uuid",
"mappings": {
"shipping_city": "City",
"shipping_country": "Country",
"shipping_zipcode": "Zip",
"shipping_address1": "Street Line 1",
"shipping_address2": "Street Line 2",
"shipping_state_abbreviation": "State"
}
}
sc.parallelize(json_.items()).toDF().show()
--------- --------------------
| _1| _2|
--------- --------------------
| filename| some_file.csv|
| md5| md5 hash|
|client_id| some uuid|
| mappings|{shipping_zipcode...|
--------- --------------------
- Loading the same dictionary as a json through spark read
spark.read.format("json").option("multiLine", "true").load("json_").show()
--------- ------------- -------------------- --------
|client_id| filename| mappings| md5|
--------- ------------- -------------------- --------
|some uuid|some_file.csv|{Street Line 1, S...|md5 hash|
--------- ------------- -------------------- --------
Needed behaviour is the 2nd one. How do i make sure the data structure dict directly loads correctly into Spark DF/RDD? Sometimes the files might not be present to be loaded..
CodePudding user response:
I would suggest you either write a schema with the StructType class(if using Scala or Python) or using a case class if programming in the Scala language.
The value inside the mappings
key shall be stored as a Struct which can be accessed using the .
operator.
For Scala, use the following code using Case Classes
import org.apache.spark.sql.{Encoders,Encoder}
import org.apache.spark.sql.{SparkSession,Dataset}
case class Mappings(
shipping_city: String,
shipping_country: String,
shipping_zipcode: String,
shipping_address1: String,
shipping_address2: String,
shipping_state_abbreviation: String
)
case class Root(
filename: String,
md5: String,
client_id: String,
mappings: Mappings
)
implicit val jsonDataEncoder: Encoder[Root] = Encoders.product[Root]
val spark: SparkSession = SparkSession.builder.getOrCreate()
val df: Dataset[Root] = spark.read.option("multiline","true").json("/path/to/json/data").as[Root]
df.select(col("filename"),col("md5"),col("mappings.*")).show(false)
------Results------
------------- -------- ----------------- ----------------- ------------- ---------------- --------------------------- ----------------
| filename| md5|shipping_address1|shipping_address2|shipping_city|shipping_country|shipping_state_abbreviation|shipping_zipcode|
------------- -------- ----------------- ----------------- ------------- ---------------- --------------------------- ----------------
|some_file.csv|md5 hash| Street Line 1| Street Line 2| City| Country| State| Zip|
------------- -------- ----------------- ----------------- ------------- ---------------- --------------------------- ----------------
You can read the spark documentation for examples on reading data of StructType/Case Class for further knowledge
For Python use the following code using schema Structype class
from pyspark.sql.types import StringType, StructType, StructField
from pyspark.sql.functions import col
json_schema = StructType(
StructField("filename",StringType(),nullable = True),
StructField("client_id",StringType(),nullable = True),
StructField("md5",StringType(),nullable = True),
StructField("mappings",StructType(
StructField("shipping_address1",StringType(),nullable = True),
StructField("shipping_address2",StringType(),nullable = True),
StructField("shipping_city",StringType(),nullable = True),
StructField("shipping_country",StringType(),nullable = True),
StructField("shipping_state_abbreviation",StringType(),nullable = True),
StructField("shipping_zipcode",StringType(),nullable = True)),nullable = True))
spark = SparkSession.builder.getOrCreate()
df = spark.read.schema(json_schema).option("multiline","true").json("/path/to/json/data/here")
df.select(col("filename"),col("md5"),col("mappings.*")).show(truncate=False)
The result is the same as that shown above in the Scala code section
CodePudding user response:
Adding to @Yayati's answer (more concise approach) -
import json
json_={
"filename": "some_file.csv",
"md5": "md5 hash",
"client_id": "some uuid",
"mappings": {
"shipping_city": "City",
"shipping_country": "Country",
"shipping_zipcode": "Zip",
"shipping_address1": "Street Line 1",
"shipping_address2": "Street Line 2",
"shipping_state_abbreviation": "State"
}
}
# Creating a rdd using json library:
rdd = sc.parallelize([json.dumps(json_)])
Now, creating a dataframe out of the rdd
df = spark.read.json(rdd)
df.select("filename", "md5", "client_id", "mappings.*").show(truncate=False)
# Output
------------- -------- --------- ----------------- ----------------- ------------- ---------------- --------------------------- ----------------
|filename |md5 |client_id|shipping_address1|shipping_address2|shipping_city|shipping_country|shipping_state_abbreviation|shipping_zipcode|
------------- -------- --------- ----------------- ----------------- ------------- ---------------- --------------------------- ----------------
|some_file.csv|md5 hash|some uuid|Street Line 1 |Street Line 2 |City |Country |State |Zip |
------------- -------- --------- ----------------- ----------------- ------------- ---------------- --------------------------- ----------------