I am going through Parallel Scala Collections' aggregate method
def aggregate[S](z: S)(sop: (S, T) => S, cop: (S, S) => S): S
I understand why cop needs to be associative but not the following statement taken from Learning Concurrent Programming in Scala:
The aggregate operation requires that cop is associative and that z is the zero element for the accumulator, that is, cop(z, a) == a
Why does z need to satisfy cop(z,a) == a?
CodePudding user response:
How does this method calculate things?
- for each element
T
you would have to transform it intoS
element - it could be done by some functionS => T
but authors choose some more elastic approach with(S, T) => S
, where thisS
could be a zero, or it could be a result of some other parallel computation that ended earlier - to enable parallel combination of elements you are providing
(S, S) => S
- this allows combining any two elements into one
Now, parallel collections has some options to optimize things:
- it can calculate
sop(element: T, z): S
to convert eachelement
toS
and later on combine each result withcop(result1, result2)
- it can squash these two operations into
cop(element, otherResult)
if someotherResult
is ready
These two operations have to be kind-of interchangable to allow this optimization, but there is more to it.
What if you wanted to combine elements like this?
s1 s2 s3 s4
\ / \ /
cop cop
\ /
\ /
\ /
cop
It works if the number of elements is a power of 2. For some other number it could do things like:
s1 s2 s3 z
\ / \ /
cop cop
\ /
\ /
\ /
cop
to make task distribution easier.
In general, by assuming that S
is a monoid with associative cop
, neutral z
and sop
providing something like a isomorphism from T
to S
with S S
addition, operations can easily optimize stuff internally, while making correctness easy to prove. And you could easily change the implementation to optimize it further, or make code simpler. cop
having z
as a neutral element/zero/identity is just a part of this requirement.
If you give up on these requirements... then your implementation is easily (always?) broken. Your result could change depending on the order of execution, and parallel computation have no guarantees on the order of execution of any operation: neither in which order elements T
will be converted to S
and in which order S
es will be combined.
You could not rely on z
being neutral... but by making S
a semigroup you:
- would not be able to return
S
always - you wouldn't be able to provide neutralS
for empty collection, so at best you would be able to provideOption[S]
- would not be able to reduce 1-element collection this way as you would have nothing to seed initially to
sop
, do you'd have to useT => S
instead of(T, S) => S
(which might reduce a number of optimizations available to algorithm) - would probably has more complex implementation as you'd have to rearrange merges things so that you'd never would need something like
cop(s, zero)
orsop(t, zero)
which could potentially simplify the internal code
CodePudding user response:
Let's start by analyzing the signature of aggregate
and how it works:
def aggregate[S](z: S)(sop: (S, T) => S, cop: (S, S) => S): S
So this says that it will aggregate in parallel the elements of a collection into a new single element. For doing so it will "split" (semantically) the collection into multiple chunks to process in parallel, for each chunk it will run the sop
(starting from the z
) to combine them into a single S
. Then, it will combine the intermediate results of each chunk using cop
until all elements have been successfully aggregated into a single value.
Now, why does the z
needs to be a "zero" element? To ensure the consistency of the result!
The idea is that no matter how many chunks you create the result must be the same as if there was no parallelism involved.
Let's see a simple example, we want to sum all the numbers:
someBigParallelArray.aggregate(0)(_ _, _ _)
Thus, if instead of 0
we would have used 5
then the result will change according to the number of chunks created; since each chunk will add an additional 5
to the final sum.
Now, you may be asking why do we even need a zero element at all? We may just assume that the chunks are not empty (like we do with cop
).
And you would be more or less correct (since the special case of the whole collection being empty or too small can be handled differently). But, that would make aggregate
very limited, since it could only produce values of the same type of the original collection; and you can see that the signature is more powerful than that, it allows you to produce values of any type, as long as that type has a valid "zero" and you know how to add values of your current type to the target type.
This is what allows us to write something like this:
// Collects all the different values in a Set
val distinct = someBigParallelArray.aggregate(Set.empty[Int])(_ _, _ | _)