Home > Net >  spark - Dictionary Keys as Columns?
spark - Dictionary Keys as Columns?

Time:05-09

Converting any nested Dictionary to Spark DataFrame.

  1. Using the dict object into rdd into DF
json_={
   "filename": "some_file.csv",
   "md5": "md5 hash",
   "client_id": "some uuid",
   "mappings": {
       "shipping_city": "City",
       "shipping_country": "Country",
       "shipping_zipcode": "Zip",
       "shipping_address1": "Street Line 1",
       "shipping_address2": "Street Line 2",
       "shipping_state_abbreviation": "State"
   }
}

sc.parallelize(json_.items()).toDF().show()


 --------- -------------------- 
|       _1|                  _2|
 --------- -------------------- 
| filename|       some_file.csv|
|      md5|            md5 hash|
|client_id|           some uuid|
| mappings|{shipping_zipcode...|
 --------- -------------------- 
  1. Loading the same dictionary as a json through spark read
spark.read.format("json").option("multiLine", "true").load("json_").show()

 --------- ------------- -------------------- -------- 
|client_id|     filename|            mappings|     md5|
 --------- ------------- -------------------- -------- 
|some uuid|some_file.csv|{Street Line 1, S...|md5 hash|
 --------- ------------- -------------------- -------- 

Needed behaviour is the 2nd one. How do i make sure the data structure dict directly loads correctly into Spark DF/RDD? Sometimes the files might not be present to be loaded..

CodePudding user response:

I would suggest you either write a schema with the StructType class(if using Scala or Python) or using a case class if programming in the Scala language.

The value inside the mappings key shall be stored as a Struct which can be accessed using the . operator.

For Scala, use the following code using Case Classes

import org.apache.spark.sql.{Encoders,Encoder}
import org.apache.spark.sql.{SparkSession,Dataset}

case class Mappings(
  shipping_city: String,
  shipping_country: String,
  shipping_zipcode: String,
  shipping_address1: String,
  shipping_address2: String,
  shipping_state_abbreviation: String
)
case class Root(
  filename: String,
  md5: String,
  client_id: String,
  mappings: Mappings
)

implicit val jsonDataEncoder: Encoder[Root] = Encoders.product[Root]

val spark: SparkSession = SparkSession.builder.getOrCreate()

val df: Dataset[Root] = spark.read.option("multiline","true").json("/path/to/json/data").as[Root]

df.select(col("filename"),col("md5"),col("mappings.*")).show(false)

------Results------
 ------------- -------- ----------------- ----------------- ------------- ---------------- --------------------------- ---------------- 
|     filename|     md5|shipping_address1|shipping_address2|shipping_city|shipping_country|shipping_state_abbreviation|shipping_zipcode|
 ------------- -------- ----------------- ----------------- ------------- ---------------- --------------------------- ---------------- 
|some_file.csv|md5 hash|    Street Line 1|    Street Line 2|         City|         Country|                      State|             Zip|
 ------------- -------- ----------------- ----------------- ------------- ---------------- --------------------------- ---------------- 

You can read the spark documentation for examples on reading data of StructType/Case Class for further knowledge

For Python use the following code using schema Structype class

from pyspark.sql.types import StringType, StructType, StructField

from pyspark.sql.functions import col

json_schema = StructType(
    StructField("filename",StringType(),nullable = True),
    StructField("client_id",StringType(),nullable = True),
    StructField("md5",StringType(),nullable = True),
    StructField("mappings",StructType(
        StructField("shipping_address1",StringType(),nullable = True), 
        StructField("shipping_address2",StringType(),nullable = True), 
        StructField("shipping_city",StringType(),nullable = True), 
        StructField("shipping_country",StringType(),nullable = True), 
        StructField("shipping_state_abbreviation",StringType(),nullable = True), 
        StructField("shipping_zipcode",StringType(),nullable = True)),nullable = True))

spark = SparkSession.builder.getOrCreate()

df = spark.read.schema(json_schema).option("multiline","true").json("/path/to/json/data/here")

df.select(col("filename"),col("md5"),col("mappings.*")).show(truncate=False)
The result is the same as that shown above in the Scala code section

CodePudding user response:

Adding to @Yayati's answer (more concise approach) -

import json

json_={
   "filename": "some_file.csv",
   "md5": "md5 hash",
   "client_id": "some uuid",
   "mappings": {
       "shipping_city": "City",
       "shipping_country": "Country",
       "shipping_zipcode": "Zip",
       "shipping_address1": "Street Line 1",
       "shipping_address2": "Street Line 2",
       "shipping_state_abbreviation": "State"
   }
}

# Creating a rdd using json library:
rdd = sc.parallelize([json.dumps(json_)])

Now, creating a dataframe out of the rdd

df = spark.read.json(rdd)
df.select("filename", "md5", "client_id", "mappings.*").show(truncate=False)

# Output
 ------------- -------- --------- ----------------- ----------------- ------------- ---------------- --------------------------- ---------------- 
|filename     |md5     |client_id|shipping_address1|shipping_address2|shipping_city|shipping_country|shipping_state_abbreviation|shipping_zipcode|
 ------------- -------- --------- ----------------- ----------------- ------------- ---------------- --------------------------- ---------------- 
|some_file.csv|md5 hash|some uuid|Street Line 1    |Street Line 2    |City         |Country         |State                      |Zip             |
 ------------- -------- --------- ----------------- ----------------- ------------- ---------------- --------------------------- ---------------- 
  • Related