I am trying to produce tombstone messages to a compacted Kafka topic with Avro schema using Scala (v2.13.10) and FS2 Kafka library(v3.0.0-M8) with Vulcan module.
The app consumes from a topic A and produces a tombstone to the same topic A for the values that matches some condition.
A sample snippet
val producerSettings =
ProducerSettings(
keySerializer = keySerializer,
valueSerializer = Serializer.unit[IO]
).withBootstrapServers("localhost:9092")
def processRecord(committableRecord: CommittableConsumerRecord[IO, KeySchema, ValueSchema]
, producer: KafkaProducer.Metrics[IO, KeySchema, Unit]
): IO[CommittableOffset[IO]] = {
val key = committableRecord.record.key
val value = committableRecord.record.value
if(value.filterColumn.field1 == "<removable>") {
val tombStone = ProducerRecord(committableRecord.record.topic, key, ())
val producerRecord: ProducerRecords[CommittableOffset[IO], KeySchema, Unit] = ProducerRecords.one(tombStone, committableRecord.offset)
producer.produce(producerRecord).flatten.map(_.flatMap(a => {
IO(a.passthrough)
}))
}
else
IO(committableRecord.offset)
}
The above snippet works fine if I produce a valid key value message. However, I am getting the below error when I try to generate an null/empty messages:
java.lang.IllegalArgumentException: Invalid Avro record: bytes is null or empty
at fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$4(AvroDeserializer.scala:32)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at mapN @ fs2.kafka.KafkaProducerConnection$$anon$1.withSerializersFrom(KafkaProducerConnection.scala:141)
at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:184)
at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:265)
at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$26(KafkaConsumer.scala:267)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
at mapN @ fs2.kafka.KafkaProducerConnection$$anon$1.withSerializersFrom(KafkaProducerConnection.scala:141)
A sample Avro Schema:
{
"type": "record",
"name": "SampleOrder",
"namespace": "com.myschema.global",
"fields": [
{
"name": "cust_id",
"type": "int"
},
{
"name": "month",
"type": "int"
},
{
"name": "expenses",
"type": "double"
},
{
"name": "filterColumn",
"type": {
"type": "record",
"name": "filterColumn",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "field1",
"type": "string"
}
]
}
}
]
}
Thanks in advance.
I've tried different serializers for producer but all result in same above exception.
CodePudding user response:
First, a producer would use a Serializer, yet your stacktrace says deserializer. Unless your keys are Avro, you don't need an Avro schema to send null values into a topic. Use ByteArraySerializer, and simply send null value...
But this seems like a bug. If the incoming record key/value is null, it should return null
, not explicitly throw an error