Home > Software engineering >  Read Spark streaming source once when applying and writing multiple transformations
Read Spark streaming source once when applying and writing multiple transformations

Time:05-31

I'm trying to use Spark Structured Streaming to achieve the following flow:

                                        ┌──────────────────────┐
                                    ┌──►│Transformation DF (1) ├──┐
                                    │   └──────────────────────┘  │
┌──────────────────┐   ┌─────────┬──┘                             └──►┌────────────────┐
│Source Kafka Topic├──►│Source DF│                                    │Sink Kafka Topic│
└──────────────────┘   └─────────┴──┐                             ┌──►└────────────────┘
                                    │   ┌──────────────────────┐  │
                                    └──►│Transformation DF (2) ├──┘
                                        └──────────────────────┘

What actually happens is the following:

                         ┌──────────────┐    ┌──────────────────────┐
                      ┌─►│Source DF (1) ├───►│Transformation DF (1) ├──┐
                      │  └──────────────┘    └──────────────────────┘  │
┌──────────────────┬──┘                                                └──►┌────────────────┐
│Source Kafka Topic│                                                       │Sink Kafka Topic│
└──────────────────┴──┐                                                ┌──►└────────────────┘
                      │  ┌──────────────┐    ┌──────────────────────┐  │
                      └─►│Source DF (2) ├───►│Transformation DF (2) ├──┘
                         └──────────────┘    └──────────────────────┘

The logic runs as expected, the only issue is that Spark initializes two different streaming queries that are reading the same exact messages from the source topic.

This creates a huge load on the source kafka topic, especially when running hundreds of different transformations. Since transformations are lazily evaluated, It seems like I don't have any control over this behavior.

The code roughly looks like the following:

    source_df = spark.readStream.format("kafka")...
    # each transformation is using different values for select/filter/window/watermark/group by
    # therefore the transformations can't be merged into a single dataframe
    source_df.filter(some_conditions).writeStream.format("kafka")...
    source_df.filter(some_other_conditions).writeStream.format("kafka")...

Is there any way to overcome this behavior or is it intentional by design for some reason?

CodePudding user response:

Using cache should avoid recomputing of DF although it would still lead to two queries.

source_df.cache()

CodePudding user response:

Using foreachBatch you can achieve it. foreachBatch performs custom write logic on each micro-batch. This way spark will not initializes two different streaming queries.

You can do this way,

source_df
  .writeStream
  .foreachBatch { (batch: DataFrame, _: Long) =>
    
    batch.filter(some_conditions).write.format("kafka")...
    batch.filter(some_other_conditions).write.format("kafka")...

  }
  .start()
  .awaitTermination()
  • Related