Home > Blockchain >  How to produce tombstone for a Kafka Avro topic
How to produce tombstone for a Kafka Avro topic

Time:11-15

I am trying to produce tombstone messages to a compacted Kafka topic with Avro schema using Scala (v2.13.10) and FS2 Kafka library(v3.0.0-M8) with Vulcan module.

The app consumes from a topic A and produces a tombstone to the same topic A for the values that matches some condition.

A sample snippet

val producerSettings =
  ProducerSettings(
    keySerializer = keySerializer,
    valueSerializer = Serializer.unit[IO]
  ).withBootstrapServers("localhost:9092")


def processRecord(committableRecord: CommittableConsumerRecord[IO, KeySchema, ValueSchema]
                     , producer: KafkaProducer.Metrics[IO, KeySchema, Unit]
                     ): IO[CommittableOffset[IO]] = {
      val key = committableRecord.record.key
      val value = committableRecord.record.value

      if(value.filterColumn.field1 == "<removable>") {
        val tombStone = ProducerRecord(committableRecord.record.topic, key, ())
        val producerRecord: ProducerRecords[CommittableOffset[IO], KeySchema, Unit] = ProducerRecords.one(tombStone, committableRecord.offset)
        producer.produce(producerRecord).flatten.map(_.flatMap(a => {
          IO(a.passthrough)
        }))
      }
      else
      IO(committableRecord.offset)
    }

The above snippet works fine if I produce a valid key value message. However, I am getting the below error when I try to generate an null/empty messages:

java.lang.IllegalArgumentException: Invalid Avro record: bytes is null or empty
    at fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$4(AvroDeserializer.scala:32)
    at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
    at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
    at mapN @ fs2.kafka.KafkaProducerConnection$$anon$1.withSerializersFrom(KafkaProducerConnection.scala:141)
    at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:184)
    at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:265)
    at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$26(KafkaConsumer.scala:267)
    at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
    at defer @ fs2.kafka.vulcan.AvroDeserializer$.$anonfun$using$3(AvroDeserializer.scala:29)
    at mapN @ fs2.kafka.KafkaProducerConnection$$anon$1.withSerializersFrom(KafkaProducerConnection.scala:141)

A sample Avro Schema:

{
    "type": "record",
    "name": "SampleOrder",
    "namespace": "com.myschema.global",
    "fields": [
        {
            "name": "cust_id",
            "type": "int"
        },
        {
            "name": "month",
            "type": "int"
        },
        {
            "name": "expenses",
            "type": "double"
        },
        {
            "name": "filterColumn",
            "type": {
                "type": "record",
                "name": "filterColumn",
                "fields": [
                    {
                        "name": "id",
                        "type": "string"
                    },
                    {
                        "name": "field1",
                        "type": "string"
                    }
                ]
            }
        }
    ]
}

Thanks in advance.

I've tried different serializers for producer but all result in same above exception.

CodePudding user response:

First, a producer would use a Serializer, yet your stacktrace says deserializer. Unless your keys are Avro, you don't need an Avro schema to send null values into a topic. Use ByteArraySerializer, and simply send null value...

But this seems like a bug. If the incoming record key/value is null, it should return null, not explicitly throw an error

https://github.com/fd4s/fs2-kafka/blob/series/2.x/modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroDeserializer.scala#L29

Compare to Confluent implementation

  • Related