I am reading Kafka through Spark Structured streaming. The input Kafka message is of the below JSON format:
[
{
"customer": "Jim",
"sex": "male",
"country": "US"
},
{
"customer": "Pam",
"sex": "female",
"country": "US"
}
]
I have the define the schema like below to parse it:
val schemaAsJson = ArrayType(StructType(Seq(
StructField("customer",StringType,true),
StructField("sex",StringType,true),
StructField("country",StringType,true))),true)
My code looks like this,
df.select(from_json($"col", schemaAsJson) as "json")
.select("json.customer","json.sex","json.country")
The current output looks like this,
-------------- ---------------- ----------------
| customer| sex|country |
-------------- ---------------- ----------------
| [Jim, Pam]| [male, female]| [US, US]|
-------------- ---------------- ----------------
Expected output:
-------------- ---------------- ----------------
| customer| sex| country|
-------------- ---------------- ----------------
| Jim| male| US|
| Pam| female| US|
-------------- ---------------- ----------------
How do I split array of structs into individual rows as above? Can someone please help?
CodePudding user response:
You need explode column before selecting.
df.select(explode_outer(from_json($"value", schemaAsJson)) as "json")
.select("json.customer","json.sex","json.country").show()