In Spark 3.1.1, I did a groupBy without distinct on a DataFrame. I tried to turn off the partial aggregation using
spark.conf.set("spark.sql.aggregate.partialaggregate.skip.enabled", "true")
and then ran the query
df.groupBy("method").agg(sum("request_body_len"))
Spark still ends up doing the partial aggreagtion as shown in the physical plan.
== Physical Plan ==
*(2) HashAggregate(keys=[method#23], functions=[sum(cast(request_body_len#28 as bigint))], output=[method#23, sum(request_body_len)#142L])
- Exchange hashpartitioning(method#23, 200), ENSURE_REQUIREMENTS, [id=#58]
- *(1) HashAggregate(keys=[method#23], functions=[partial_sum(cast(request_body_len#28 as bigint))], output=[method#23, sum#146L])
- FileScan csv [method#23,request_body_len#28] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/mnt/http.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<method:string,request_body_len:int>
I tried this after viewing this video on Youtube : https://youtu.be/_Ne27JcLnEc @56:53
Is this feature no longer available in the latest Spark or am I missing something?
CodePudding user response:
The config spark.sql.aggregate.partialaggregate.skip.enabled
doesn't exist in spark source code.
All the spark.sql.*
configurations are defined in the SQLConf object (and this one is not there):
https://github.com/apache/spark/blob/651904a2ef57d2261ea6e256e4f3cdd26aa1b69d/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
As far as I checked, this config was not available in previous versions as well.
In the video you shared, in the relevant slide there is a link for a PR that enables skipping partial aggregation but it seems that it wasn't merged.
There was a PR for the spark project that addresses this issue and add this config (maybe this is the same PR that was presented in the video, I don't know) but it was closed: https://github.com/apache/spark/pull/28804