See one of the JDK stream api collectors below:
public static <T> Collector<T, ?, Integer>
summingInt(ToIntFunction<? super T> mapper) {
return new CollectorImpl<>(
() -> new int[1],
(a, t) -> { a[0] = mapper.applyAsInt(t); },
(a, b) -> { a[0] = b[0]; return a; },
a -> a[0], CH_NOID);
}
Considering using parallel processing, it is clear there is no need in any synchronization and it is safe to use a non-concurrent containers () -> new int[1]
, since each such container is created and filled with accumulator (a, t) -> { a[0] = mapper.applyAsInt(t); }
in a separate thread. Then all these containers are combined in a separate thread with combiner (a, b) -> { a[0] = b[0]; return a; }
into a final result.
Meanwhile, JLS postulates that the reader thread may not see the actual value of non-volatile field if this field was somehow updated in another thread (because of thread caching and other JMM features). But considering the example collector, you can see that a plain int array is used, the elements of which are not volatile. I can't figure out why a plain int array is used here, instead of e.g. AtomicIntegerArray, when the non-volatile element int[0]
is updated in one thread and then is read in other thread that performs combining.
Could you please explain why it is safe to use containers with non-volatile mechanics in stream api using parallel processing?
CodePudding user response:
The way it works is that every parallel thread receives it's own individual copies of the container (one element int array), which they update independently of each other. No volatile required for the array access.
Once the threads run out of source values, they use a memory barrier that ensures their individual array entry is properly published, then "dance" with each other to perform the combining of these individual containers. Each such combination is then followed with yet another memory barrier. This goes on until only one container remains.
The actual implementation is a bit more involved. Check out ReferencePipeline.collect.