I have a Kafka consumer reading from a producer, but sometimes upon a record deletion from a table, null values get picked up by the consumer resulting in NoneType error message in the value serializer. How do I rewrite this 'value_serializer=lambda x loads(x.decode('utf-8'))' to check for None or Nulls. Note this is using Kafka stream from a postgresql database.
consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_serializer=lambda x loads(x.decode('utf-8'))
)
CodePudding user response:
You can perform the null check in the value_serialiser as well
lambda x: None if not x else loads(x.decode('utf-8')
Entire modified code:
consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_serializer=lambda x: None if not x else loads(x.decode('utf-8'))
)