I am fairly new to Spark and am still learning. One of the more difficult concepts I have come across is checkpointing and how Spark uses it to recover from failures. I am doing batch reads from Kafka using Structured Streaming and writing them to S3 as Parquet file as:
dataset
.write()
.mode(SaveMode.Append)
.option("checkpointLocation", checkpointLocation)
.partitionBy("date_hour")
.parquet(getS3PathForTopic(topicName));
The checkpoint location is a S3 filesystem path. However, as the job runs, I see no checkpointing files. In subsequent runs, I see the following log:
21/10/14 12:20:51 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0-5, groupId=spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0] Found no committed offset for partition topic-1
This indicates that the previous run did not checkpoint any offsets for this run to pick them up from. So it keeps consuming from the earliest offset.
How can I make my job pick up new offsets? Note that this is a batch query as described here.
This is how I read:
sparkSession
.read()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaProperties.bootstrapServers())
.option("subscribe", topic)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", sslConfig.truststoreLocation())
.option("kakfa.ssl.truststore.password", sslConfig.truststorePassword())
.option("kafka.ssl.keystore.location", sslConfig.keystoreLocation())
.option("kafka.ssl.keystore.password", sslConfig.keystorePassword())
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("failOnDataLoss", "true");
CodePudding user response:
I am not sure why batch
Spark Structured Streaming with Kafka still exists now. If you wish to use it, then you must code your own Offset management
. See the guide, but it is badly explained.
I would say Trigger.Once
is a better use case for you; Offset management
is provided by Spark as it is thus not batch mode.