Home > Mobile >  Spark Structured Streaming Batch Read Checkpointing
Spark Structured Streaming Batch Read Checkpointing

Time:10-19

I am fairly new to Spark and am still learning. One of the more difficult concepts I have come across is checkpointing and how Spark uses it to recover from failures. I am doing batch reads from Kafka using Structured Streaming and writing them to S3 as Parquet file as:

dataset
    .write()
    .mode(SaveMode.Append)
    .option("checkpointLocation", checkpointLocation)
    .partitionBy("date_hour")
    .parquet(getS3PathForTopic(topicName));

The checkpoint location is a S3 filesystem path. However, as the job runs, I see no checkpointing files. In subsequent runs, I see the following log:

21/10/14 12:20:51 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0-5, groupId=spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0] Found no committed offset for partition topic-1

This indicates that the previous run did not checkpoint any offsets for this run to pick them up from. So it keeps consuming from the earliest offset.

How can I make my job pick up new offsets? Note that this is a batch query as described here.

This is how I read:

             sparkSession
                .read()
                .format("kafka")
                .option("kafka.bootstrap.servers", kafkaProperties.bootstrapServers())
                .option("subscribe", topic)
                .option("kafka.security.protocol", "SSL")
                .option("kafka.ssl.truststore.location", sslConfig.truststoreLocation())
                .option("kakfa.ssl.truststore.password", sslConfig.truststorePassword())
                .option("kafka.ssl.keystore.location", sslConfig.keystoreLocation())
                .option("kafka.ssl.keystore.password", sslConfig.keystorePassword())
                .option("kafka.ssl.endpoint.identification.algorithm", "")
                .option("failOnDataLoss", "true");

CodePudding user response:

I am not sure why batch Spark Structured Streaming with Kafka still exists now. If you wish to use it, then you must code your own Offset management. See the guide, but it is badly explained.

I would say Trigger.Once is a better use case for you; Offset management is provided by Spark as it is thus not batch mode.

  • Related