Home > Mobile >  Is it possible to handle poisoned messages in RabbitMQ with Apache Flink?
Is it possible to handle poisoned messages in RabbitMQ with Apache Flink?

Time:11-18

I'm creating an easy Apache Flink job (with Scala) that only tries to print the case class representing the events received by a RabbitMQ queue (RMQSource)

I've created my own deserialization schema (using Jackson) and it works fine when the consumed message is actually a JSON representing the case class. But the job fails and keeps restarting if the queue receives a wrong-formatted event (we can call it a "poisoned message", I guess). I have to purge the queue and then the job state changes to 'running'.

QUESTION:

How can I prevent the job from failing when receiving a poisoned message? Can I validate the message before consuming it? If I can set a dead-letter exchange in Rabbit, where should I do (if possible) the negative acknowledgements on behalf Apache Flink as consumer? There is a better way of handling this and keeping the job running consuming the next well-formatted messages?


My custom DeserializationSchema provided to RMQSource[Test]:

class eventSerializationSchema extends  DeserializationSchema[Test] {

  @throws(classOf[IOException])
  def deserialize(message: Array[Byte]): Test =  objectMapper.readValue(message, classOf[Test])

  def isEndOfStream(nextElement: Test): Boolean = false

  def getProducedType: TypeInformation[Test] = createTypeInformation[Test]
}

object eventSerializationSchema{

  val objectMapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)

}

Error obtained when a poisoned message arrives to the consumed queue:

com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'a': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"a"; line: 1, column: 2]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762)
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3609)
    at org.angoglez.deserializers.eventSerializationSchema.deserialize(eventSerializationSchema.scala:17)
    at org.angoglez.deserializers.eventSerializationSchema.deserialize(eventSerializationSchema.scala:14)
    at org.apache.flink.streaming.connectors.rabbitmq.RMQDeserializationSchemaWrapper.deserialize(RMQDeserializationSchemaWrapper.java:47)
    at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.processMessage(RMQSource.java:319)
    at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.run(RMQSource.java:331)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

CodePudding user response:

From my point of view you have a few options:

  • Catch exceptions that are thrown inside the deserialize method, and drop poison records.

  • Catch exceptions thrown inside the deserialize method, and somehow encode what you want to know about these poison records into the object being produced. Then in a downstream process function, filter out these poison records and send them to a side output.

  • Don't apply the ObjectMapper in the deserializer, and instead do the real deserialization in a chained process function that can directly send the poison records to a side output.

  • Related