I do window based aggregation with watermark, but everytime all of the data getting aggregated.
relevant code:
val jsonDF = spark.readStream.format("json").schema(schema).load("data-source")
val result = jsonDF.withWatermark("dateTime","10 minutes").groupBy(window($"dateTime","10 minutes","5 minutes"),$"location").sum("value")
val query = result.writeStream.outputMode("complete").format("console").queryName("location-query").start()
Once query started I start to put files into the directory "data-source": current time is 2022-12-29T10:44:30
[
{
"location": "A",
"value": 2,
"dateTime": "2022-12-01T15:44:21"
},
{
"location": "B",
"value": 3,
"dateTime": "2022-12-28T16:44:21"
},
{
"location": "A",
"value": 7,
"dateTime": "2022-12-29T10:44:21"
}
]
result:
-------------------- -------- ----------
| window|location|sum(value)|
-------------------- -------- ----------
|{2022-12-28 16:40...| B| 3|
|{2022-12-29 10:35...| A| 7|
|{2022-12-01 15:35...| A| 2|
|{2022-12-01 15:40...| A| 2|
|{2022-12-28 16:35...| B| 3|
|{2022-12-29 10:40...| A| 7|
-------------------- -------- ----------
expected result:
-------------------- -------- ----------
| window|location|sum(value)|
-------------------- -------- ----------
|{2022-12-29 10:35...| A| 7|
|{2022-12-29 10:40...| A| 7|
-------------------- -------- ----------
As you can see even very old data from 2022-12-01 also aggregated
Even if I wait some time say 20min and add new files with older dates, everything still getting aggregated
CodePudding user response:
Ok, the problem was the output mode:
outputMode("complete")
I complete mode Spark can't drop the state because it have to write the whole result all the time. Which makes sense.
Changing to append or update mode solved the issue