We are trying to implement badRecordsPath when we are reading in events from an eventhub, as an example to try get it working I have put in schema that should fail the event:
eventStreamDF = (spark.readStream
.format("eventhubs")
.options(**eventHubsConf)
.option("badRecordsPath", "/tmp/badRecordsPath/test1")
.schema(badSchema)
.load()
)
Yet this never fails and always reads the events, is this the behaviour of the readstream for the eventhub for databricks? Currently the work around is to check the inferSchema against our own schema.
CodePudding user response:
The schema of the data in EventHubs is fixed (see docs) (same is for Kafka) - the actual payload is always encoded as binary field with name body
, and it's up the developer to decode this binary payload according to the "contact" between producer(s) of the data and consumers of that data. So even if you specify the schema, and badRecordsPath
option, they aren't used.
You will need to implement some function that will decode data from JSON, or something else, that would for example return null if data is broken, and then you'll have a filter for null values to split stream into two substreams - for good & bad data.