I'm using Spark Structured Streaming to consume messages sent from a few Kafka topics. The json string is structured like this and I want to extract 'created_at', 'text' and 'tag':
{"data":
{"created_at":"***",
"id":"***",
"text":"***"},
"matching_rules":
[{"id":"***",
"tag":"***"}]
}
I wrote the following schema:
val DFschema = StructType(Array(
StructField("data", StructType(Array(
StructField("created_at", TimestampType),
StructField("text", StringType)))),
StructField("matching_rules", StructType(Array(
StructField("tag", StringType)
)))
))
When I use the schema with from_json() I can successfully extract 'created_at' and 'text' as columns with non-null values using getField(), but when I try to do the same for 'tag' its column is populated with nulls:
val kafkaDF: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("failOnDataLoss", "false")
.option("subscribe", topics)
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.select(col("key"), from_json($"value", DFschema).alias("structdata"))
.select($"key",
$"structdata.data".getField("created_at").alias("created_at"),
$"structdata.data".getField("text").alias("text"),
$"structdata.matching_rules".getField("tag").alias("topic")
)
.withColumn("hour", date_format(col("created_at"), "HH"))
.withColumn("date", date_format(col("created_at"), "yyyy-MM-dd"))
Looking at the json I see that 'id' and 'tag' are wrapped in square brackets and this leads me to suspect I've left out a datatype in the schema, but I'm not experienced enough to know what. Appreciate the help.
CodePudding user response:
For arrays, you have to wrap your StructType
with ArrayType
as below:
val DFschema = StructType(Array(
StructField("data", StructType(Array(
StructField("created_at", TimestampType),
StructField("id", StringType),
StructField("text", StringType)))),
StructField("matching_rules", ArrayType(StructType(Array(
StructField("tag", StringType),
StructField("id", StringType))
))))
)
Another alternative is to use (under the assumption that content
is your column):
ds = ds.withColumn("content",
expr("from_json(content, 'STRUCT<data:STRUCT<created_at:STRING,id:STRING,text:STRING>,matching_rules:ARRAY<STRUCT<id:STRING,tag:STRING>>>')")
)
You can ask Spark to generate the schema through schema_of_json
.
Good luck!