Home > Enterprise >  Why does list.size change when executing java parallel stream?
Why does list.size change when executing java parallel stream?

Time:11-27

    static void statefullParallelLambdaSet() {
        Set<Integer> s = new HashSet<>(
            Arrays.asList(1, 2, 3, 4, 5, 6)
        );

        List<Integer> list = new ArrayList<>();
        int sum = s.parallelStream().mapToInt(e -> {    // pipeline start
            if (list.size() <= 3) {     // list.size() changes while the pipeline operation is executing.
                list.add(e);            // mapToInt's lambda expression depends on this value, so it's stateful.
                return e;
            }
            else return 0;
        }).sum();   // terminal operation
        System.out.println(sum);
    }

In the code above, it says that list.size() changes while the pipe operation is running, but I don't understand.

Since list.add(e) is executed at once in multiple threads because it is executed in parallel, is it correct to assume that the value changes each time it is executed?

The reason why the value changes even if it is executed as a serial stream is that there is no order because it is a set, so the number drawn is different each time it is executed...

Am I right?

CodePudding user response:

So the reason this happens is because of what is called race conditions CPU even many threaded ones are running more processes than just your applications processes so it could parse and instruction evaluate it and then have to jump off to do something for the OS and then come back and another parallel process for your application has managed to get past it because the core / hyper-thread has not been stolen from its job.

you can read about race conditions in books like: https://link.springer.com/referenceworkentry/10.1007/978-0-387-09766-4_36

But what you're supposed to do to prevent this is implemented locks on the memory you're altering, in Java you want to look at java.util.concurrent.Locks https://www.baeldung.com/java-concurrent-locks

CodePudding user response:

Your code accumulates the result by operating via side-effects which is discouraged by the Stream API documentation.

In your code, you've stumbled on the very first bullet point from the link above:

... there are no guarantees as to:

  • the visibility of those side-effects to other threads;

ArrayList is not thread-safe, and as a consequence each thread is not guaranteed to observe the same state of the list.

Also, note that map() operation (and all it's flavors) is not intended to perform side-effects and it's function according to the documentation should be stateless:

mapper - a non-interfering, stateless function to apply to each element

In this case, the correct way to incorporate from processing the previous stream elements would be to define a Collector.

For that we would need to define a mutable container which would hold a list

In a nut-shell, Collector can be implemented as parallel (all the threads are updating the same mutable container) or non-parallel (each thread creates its own instance of the mutable container and populates it, then results produces by each thread are getting merged).

In order to implement a parallel collector we need a thread-safe list. If take a look at the implementations of the List interface you'll find out that the only options that JDK offers are CopyOnWriteArrayList and outdated Verctor.

CopyOnWriteArrayList would be a terrible choice since under the hood it would create a new list with every added element, it's not suitable for our purpose.

And if we would use a synchronized List it would buy anything in terms of performance, because threads would not be able to operate on this list simultaneously. While one thread is adding an element, the others are blocked. In fact, it would be slower than processing the data sequentially, because synchronization has a cost.

For that reason, Locking suggested in another answer would only allow to get a correct result, but you would not be able to benefit from the parallel execution.

What we can do is create a non-parallel Collector (i.e. using a non-thread-safe container) based on a plain ArrayList (it still would be able to be used with a parallel stream, each thread would act independently on a separate container without locking and running into concurrency-related issues).

That's how it might look like:

public static Collector<Integer, ?, IntSumContainer> toParallelIntSumContainer(int limit) {
    
    return Collector.of(
        () -> new IntSumContainer(limit),
        IntSumContainer::accept,
        IntSumContainer::merge
    );
}

public class IntSumContainer implements IntConsumer {
    private int sum;
    private List<Integer> list = new ArrayList<>();
    private final int limit;

    public IntSumContainer(int limit) {
        this.limit = limit;
    }

    @Override
    public void accept(int value) {
        if (list.size() < limit) {
            list.add(value);
            sum  = value;
        }
    }
    
    public IntSumContainer merge(IntSumContainer other) {
        other.list.stream().limit(limit - list.size()).forEach(this::accept); // there couldn't be issues related to concurrent access in the case, hence performing side-effects via forEach is safe 
        return this;
    }
    
    // getters
}

And that's how the stream would look like:

List<Integer> source = List.of(1, 2, 3, 4, 5, 6);

IntSumContainer result = s.parallelStream()
    .collect(toIntSumContainer(3));

List<Integer> list = result.getList();
int sum = result.getSum();

System.out.println(list);
System.out.println(sum);

Output:

[1, 2, 3]
6
  • Related