Home > other >  Read Avro records from Kafka using Spark Dstreams
Read Avro records from Kafka using Spark Dstreams

Time:11-24

I'm using spark 2.3 and trying to stream data from Kafka using Dstreams (using DStreams to acheive a specific usecase which we were not able to using Structured Streaming).

The Kafka topic contains data in avro format. I want the read that data using Spark DStreams and interpret it as a json string.

I'm trying to do something like this,

val kafkaParams: Map[String, Object] = Map(
    "bootstrap.servers" -> "kafka-servers",
    "key.serializer" -> classOf[StringSerializer],
    "value.serializer" -> classOf[StringSerializer],
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[org.apache.spark.sql.avro.AvroDeserializer],
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> (false: java.lang.Boolean),
    "group.id" -> "group1"
  )

val kafkaDstream = KafkaUtils.createDirectStream(
    ssc,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
  )

val processedStream = kafkaDstream.map(record => (record.key(), record.value()))

  processedStream.foreachRDD(
    someRdd =>
      someRdd.foreach(
        paths=> {
          println(paths._2)
        }
      )
  )

But I don't see the data getting processed (getting below error message), which I think is because AvroDeserializer is available only after Spark 2.4.0.

Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.apache.spark.sql.avro.AvroDeserializer Does it have a public no-argument constructor?

Any idea on how I can acheive this?

Thank you.

CodePudding user response:

Spark's Avro deserializer is not a Kafka deserializer (by the way, you cannot have duplicate keys in your config map). That class is for SparkSQL/Structured Streaming, also, not for (deprecated) Streaming

Unclear how your producer has serialized data, but if using Confluent Schema Registry, you'll need to use Confluent's own KafkaAvroDeserializer class, and you would then use [String, GenericRecord] as your stream types. Data is never automatically converted to JSON, and using String as the stream type will fail when using Avro Deserializer.

  • Related