I'm trying to use Spark Structured Streaming to achieve the following flow:
┌──────────────────────┐
┌──►│Transformation DF (1) ├──┐
│ └──────────────────────┘ │
┌──────────────────┐ ┌─────────┬──┘ └──►┌────────────────┐
│Source Kafka Topic├──►│Source DF│ │Sink Kafka Topic│
└──────────────────┘ └─────────┴──┐ ┌──►└────────────────┘
│ ┌──────────────────────┐ │
└──►│Transformation DF (2) ├──┘
└──────────────────────┘
What actually happens is the following:
┌──────────────┐ ┌──────────────────────┐
┌─►│Source DF (1) ├───►│Transformation DF (1) ├──┐
│ └──────────────┘ └──────────────────────┘ │
┌──────────────────┬──┘ └──►┌────────────────┐
│Source Kafka Topic│ │Sink Kafka Topic│
└──────────────────┴──┐ ┌──►└────────────────┘
│ ┌──────────────┐ ┌──────────────────────┐ │
└─►│Source DF (2) ├───►│Transformation DF (2) ├──┘
└──────────────┘ └──────────────────────┘
The logic runs as expected, the only issue is that Spark initializes two different streaming queries that are reading the same exact messages from the source topic.
This creates a huge load on the source kafka topic, especially when running hundreds of different transformations. Since transformations are lazily evaluated, It seems like I don't have any control over this behavior.
The code roughly looks like the following:
source_df = spark.readStream.format("kafka")...
# each transformation is using different values for select/filter/window/watermark/group by
# therefore the transformations can't be merged into a single dataframe
source_df.filter(some_conditions).writeStream.format("kafka")...
source_df.filter(some_other_conditions).writeStream.format("kafka")...
Is there any way to overcome this behavior or is it intentional by design for some reason?
CodePudding user response:
Using cache should avoid recomputing of DF although it would still lead to two queries.
source_df.cache()
CodePudding user response:
Using foreachBatch you can achieve it. foreachBatch performs custom write logic on each micro-batch. This way spark will not initializes two different streaming queries.
You can do this way,
source_df
.writeStream
.foreachBatch { (batch: DataFrame, _: Long) =>
batch.filter(some_conditions).write.format("kafka")...
batch.filter(some_other_conditions).write.format("kafka")...
}
.start()
.awaitTermination()