I am using below spark streaming Scala code for consuming real time kafka message from producer topic. But the issue is sometime my job is failed due to server connectivity or some other reason and in my code auto commit property is set true due to that some message is lost and not able to store in my database.
So just want to know is there any way if we want to pull old kafka message from specific offset number. I tried to set "auto.offset.reset" is earliest or latest but it fetch only new message those is not yet commit.
Let's take the example here like my current offset number is 1060 and auto offset reset property is earliest so when I restart my job it starts reading the message from 1061 but in some case if I want to read old kafka message from offset number 1020 then is there any property that we can use to start the consuming message from specific offset number
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val topic = "test123"
val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[KafkaAvroDeserializer],
"schema.registry.url" -> "http://abc.test.com:8089"
"group.id" -> "spark-streaming-notes",
"auto.offset.reset" -> "earliest"
"enable.auto.commit" -> true
)
val stream = KafkaUtils.createDirectStream[String, Object](
ssc,
PreferConsistent,
Subscribe[String, Object](topic, KafkaParams)
stream.print()
ssc.start()
ssc.awaitTermination()
CodePudding user response:
From Spark Streaming, you can't. You'd need to use kafka-consumer-groups
CLI to commit offsets specific to your group id. Or manually construct a KafkaConsumer instance and invoke commitSync before starting the Spark context.
import org.apache.kafka.clients.consumer.KafkaConsumer
val c = KafkaConsumer(...)
val toCommit: java.util.Map[TopicPartition,OffsetAndMetadata] = ...
c.commitSync(toCommit) // But don't do this every run of your app
ssc.start()
Alternatively, Structured Streaming does offer startingOffsets
config.
auto.offset.reset only applies to non existing group.id's