kafka enable.auto.commit is set to false
- If using latest offset, Do we need to manually find last offset details and mention it in .CreateDirectStream() in Spark application? or will it automatically take latest offset? In any case do we need to find the last offset details manually.
2)is there any difference when use spark.readstrem.format(kafka).... and KafkaUtils.createDirectStream?
- When using earliest offset option, will it consider the offset automatically ?
CodePudding user response:
If you take a look at documentation of kafka connector for Spark, you can find most of the answers.
Documentation about startingOffsets
option for Kafka connector, last part is about streaming queries.
The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
If you have offsets, it will always pick up offsets if they're available, otherwise it will ask Kafka for earliest
or latest
offset. This should be true for both types of streams, direct and structured streams should consider offsets.
I see that you mentioned enable.auto.commit
option and I just want to make sure you're aware of the following quote from the same documentation site i provided above.
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception: enable.auto.commit: Kafka source doesn’t commit any offset.
CodePudding user response:
Here is my attempt to answer your questions
- Ques 1:
enable.auto.commit
is a kafka related parameter and if set to false requires you to manually commit (read update) your offset to the checkpoint directory. If your application restarts it will look into the checkpoint directory and start reading from last committed offset 1. same is mentioned here https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-properties-enable-auto-commit.html by jaceklaskowski. There is no need to specify the offset anywhere as part of your spark application. All you need is the checkpoint directory. Also, remember offsets are maintained per partition in topic per consumer so it would be bad on Spark to expect developers/users to provide that. - Ques 2: spark.readStream is a generic method to read data from streaming sources such as tcp socket, kafka topics etc while kafkaUtils is a dedicated class for integration of spark with kafka so I assume it is more optimised if you are using kafka topics as source. I usually use KafkaUtils on my own through I haven't done any performance benchmarks. If I am not wrong, KafkaUtils can be used to subscribe to more than 1 topic as well while readStream cannot be.
- Ques 3: earliest offset means your consumer will start reading from the oldest record available for example, if your topic is new (no clean up has happened) or cleanup is not configured for the topic it will start from offset 0. in case cleanup is configured and all records till offset 2000 have been removed, records will be read from offset 2001 while the topic may have records till offset 10000 ( this is assuming there is only one partition, in topics will multiple partitions the offset value will be different ). See section for batch queries here https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html for more details.
Hope my answers help and make sense!! If yes, please upvote.