Home > database >  Kafka consumer failing due to nulls for value serializer
Kafka consumer failing due to nulls for value serializer

Time:10-13

I have a Kafka consumer reading from a producer, but sometimes upon a record deletion from a table, null values get picked up by the consumer resulting in NoneType error message in the value serializer. How do I rewrite this 'value_serializer=lambda x loads(x.decode('utf-8'))' to check for None or Nulls. Note this is using Kafka stream from a postgresql database.

 consumer = KafkaConsumer(
        topic,
        bootstrap_servers='localhost:9092',
        auto_offset_reset='earliest',
        value_serializer=lambda x loads(x.decode('utf-8'))
    )

CodePudding user response:

You can perform the null check in the value_serialiser as well

lambda x: None if not x else loads(x.decode('utf-8')

Entire modified code:

consumer = KafkaConsumer(
    topic,
    bootstrap_servers='localhost:9092',
    auto_offset_reset='earliest',
    value_serializer=lambda x: None if not x else loads(x.decode('utf-8'))
)
  • Related