I'm using Pyspark 3.2.0, and I'm pretty new to Structured Streaming and couldn't find an answer to this question.
I want to read json data from a Kafka topic using a predefined schema like the following (code related to initialization / connections is omitted):
# The skeleton schema is defined in 'schema.py'
skeleton_schema = get_skeleton_schema()
df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json("value", skeleton_schema).alias("data")) \
.select(col("data.*"))
...
df.writeStream \
.format("console") \
.outputMode("append") \
.trigger(processingTime='5 minutes') \
.start()
df.awaitTermination()
I want to be able to modify the skeleton_schema (e.g add/remove columns) in the 'schema.py' file and to have those changes reflected to future triggers. Is there a way to achieve this? If not, is there a different mechanism to update the schema without restarting the session?
CodePudding user response:
Unless get_skeleton_schema()
function itself is ran per batch and not cached (for example, calls an external REST API, database, or parses some file), which it does not in the shown code, then no, it's not possible to change it at runtime.
Keep in mind, there's no guarantee that all records in the same batch will have the same schema....
You'd need to consume the columns as bytes, then use a ForEachWriter implementation to implement this, but I'm not familiar enough with pyspark to give an example
Depending on where you're actually going to be writing the data into (not the console, e.g. Using Mongo or Snowflake instead), you could look at using Kafka Connect and then using Avro or Protobuf serialization rather than JSON. Then your producer's would decide when to introduce/remove columns in a backwards-compatible manner, enforced by a Schema Registry, and your consumers wouldn't have to change or define any schema themselves