Home > Software engineering >  Spark Structured streaming watermark has no effect
Spark Structured streaming watermark has no effect


I do window based aggregation with watermark, but everytime all of the data getting aggregated.

relevant code:

val jsonDF = spark.readStream.format("json").schema(schema).load("data-source")
val result = jsonDF.withWatermark("dateTime","10 minutes").groupBy(window($"dateTime","10 minutes","5 minutes"),$"location").sum("value")
val query = result.writeStream.outputMode("complete").format("console").queryName("location-query").start()

Once query started I start to put files into the directory "data-source": current time is 2022-12-29T10:44:30

"location": "A",
"value": 2,
 "dateTime": "2022-12-01T15:44:21"
"location": "B",
"value": 3,
 "dateTime": "2022-12-28T16:44:21"
"location": "A",
"value": 7,
 "dateTime": "2022-12-29T10:44:21"


 -------------------- -------- ---------- 
|              window|location|sum(value)|
 -------------------- -------- ---------- 
|{2022-12-28 16:40...|       B|         3|
|{2022-12-29 10:35...|       A|         7|
|{2022-12-01 15:35...|       A|         2|
|{2022-12-01 15:40...|       A|         2|
|{2022-12-28 16:35...|       B|         3|
|{2022-12-29 10:40...|       A|         7|
 -------------------- -------- ---------- 

expected result:

 -------------------- -------- ---------- 
|              window|location|sum(value)|
 -------------------- -------- ---------- 
|{2022-12-29 10:35...|       A|         7|
|{2022-12-29 10:40...|       A|         7|
 -------------------- -------- ---------- 

As you can see even very old data from 2022-12-01 also aggregated

Even if I wait some time say 20min and add new files with older dates, everything still getting aggregated

CodePudding user response:

Ok, the problem was the output mode:


I complete mode Spark can't drop the state because it have to write the whole result all the time. Which makes sense.
Changing to append or update mode solved the issue

  • Related