I'm creating an easy Apache Flink job (with Scala) that only tries to print the case class representing the events received by a RabbitMQ queue (RMQSource
)
I've created my own deserialization schema (using Jackson) and it works fine when the consumed message is actually a JSON representing the case class. But the job fails and keeps restarting if the queue receives a wrong-formatted event (we can call it a "poisoned message", I guess). I have to purge the queue and then the job state changes to 'running'.
QUESTION:
How can I prevent the job from failing when receiving a poisoned message? Can I validate the message before consuming it? If I can set a dead-letter exchange in Rabbit, where should I do (if possible) the negative acknowledgements on behalf Apache Flink as consumer? There is a better way of handling this and keeping the job running consuming the next well-formatted messages?
My custom DeserializationSchema provided to RMQSource[Test]:
class eventSerializationSchema extends DeserializationSchema[Test] {
@throws(classOf[IOException])
def deserialize(message: Array[Byte]): Test = objectMapper.readValue(message, classOf[Test])
def isEndOfStream(nextElement: Test): Boolean = false
def getProducedType: TypeInformation[Test] = createTypeInformation[Test]
}
object eventSerializationSchema{
val objectMapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
}
Error obtained when a poisoned message arrives to the consumed queue:
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'a': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"a"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3609)
at org.angoglez.deserializers.eventSerializationSchema.deserialize(eventSerializationSchema.scala:17)
at org.angoglez.deserializers.eventSerializationSchema.deserialize(eventSerializationSchema.scala:14)
at org.apache.flink.streaming.connectors.rabbitmq.RMQDeserializationSchemaWrapper.deserialize(RMQDeserializationSchemaWrapper.java:47)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.processMessage(RMQSource.java:319)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.run(RMQSource.java:331)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
CodePudding user response:
From my point of view you have a few options:
Catch exceptions that are thrown inside the deserialize method, and drop poison records.
Catch exceptions thrown inside the deserialize method, and somehow encode what you want to know about these poison records into the object being produced. Then in a downstream process function, filter out these poison records and send them to a side output.
Don't apply the ObjectMapper in the deserializer, and instead do the real deserialization in a chained process function that can directly send the poison records to a side output.