Home > Software engineering >  Why ParallelStream won't use all commonPool's thread in recursion?
Why ParallelStream won't use all commonPool's thread in recursion?

Time:10-14

When I run the following code, only 2 out of 8 threads that available run, can anyone explain why is it the case? how can I change the code in such a way that it will take advantage of all 8 threads?

Tree.java:

package il.co.roy;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class Tree<T>
{
    private final T data;
    private final Set<Tree<T>> subTrees;

    public Tree(T data, Set<Tree<T>> subTrees)
    {
        this.data = data;
        this.subTrees = subTrees;
    }

    public Tree(T data)
    {
        this(data, new HashSet<>());
    }

    public Tree()
    {
        this(null);
    }

    public T getData()
    {
        return data;
    }

    public Set<Tree<T>> getSubTrees()
    {
        return subTrees;
    }

    @Override
    public boolean equals(Object o)
    {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        Tree<?> tree = (Tree<?>) o;
        return Objects.equals(data, tree.data) &&
                Objects.equals(subTrees, tree.subTrees);
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(data, subTrees);
    }

    @Override
    public String toString()
    {
        return "Tree{"  
                "data="   data  
                ", subTrees="   subTrees  
                '}';
    }

    public void sendCommandAll()
    {
        if (data != null)
            System.out.println("["   Thread.currentThread().getName()   "] sending command to "   data);
        try
        {
            Thread.sleep(5000);
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        if (data != null)
            System.out.println("["   Thread.currentThread().getName()   "] tree with data "   data   " got "   true);
        subTrees.parallelStream()
//              .map(Tree::sendCommandAll)
                .forEach(Tree::sendCommandAll);
//              .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
    }
}

(It doesn't matter if I use forEach or reduce).

Main.java:

package il.co.roy;

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Main
{
    public static void main(String... args)
    {
        System.out.println("Processors: "   Runtime.getRuntime().availableProcessors());


        final Tree<Integer> root = new Tree<>(null,
                Set.of(new Tree<>(1,
                        IntStream.range(2, 7)
                                        .boxed()
                                        .map(Tree::new)
                                        .collect(Collectors.toSet()))));

        root.sendCommandAll();

//      IntStream.generate(() -> 1)
//              .parallel()
//              .forEach(i ->
//              {
//                  System.out.println(Thread.currentThread().getName());
//                  try
//                  {
//                      Thread.sleep(5000);
//                  } catch (InterruptedException e)
//                  {
//                      e.printStackTrace();
//                  }
//              });
    }
}

In the main method I create a tree with the following structure:\

root (data is `null`)
  |- 1
     |- 2
     |- 3
     |- 4
     |- 5
     |- 6

sendCommandAll function process every sub-tree (in parallel) only if it's parent finishes to be processed. but the result is as follows:

Processors: 8
[main] sending command to 1
[main] tree with data 1 got true
[main] sending command to 6
[ForkJoinPool.commonPool-worker-2] sending command to 5
[main] tree with data 6 got true
[ForkJoinPool.commonPool-worker-2] tree with data 5 got true
[ForkJoinPool.commonPool-worker-2] sending command to 4
[ForkJoinPool.commonPool-worker-2] tree with data 4 got true
[ForkJoinPool.commonPool-worker-2] sending command to 3
[ForkJoinPool.commonPool-worker-2] tree with data 3 got true
[ForkJoinPool.commonPool-worker-2] sending command to 2
[ForkJoinPool.commonPool-worker-2] tree with data 2 got true

(For the record, when I execute the commented code in Main.java, the JVM uses all 7 ( 1) threads available commonPool)

How can I improve my code?

CodePudding user response:

As explained in the second half of this answer, the thread utilization when processing HashMaps or HashSets depends on the distribution of the elements within the backing array, which depends on the hashcodes. Especially with with a small number of elements, compared to the (default) capacity, this may result in bad work splitting.

A simple work-around is using new ArrayList<>(subTrees).parallelStream() instead of subTrees.parallelStream().

But note that your method performs the actual work of the current node (in the example simulated with a sleep) before processing the children which also reduces the potential parallelism.

You may use

public void sendCommandAll() {
    if(subTrees.isEmpty()) {
        actualSendCommand();
        return;
    }
    List<Tree<T>> tmp = new ArrayList<>(subTrees.size()   1);
    tmp.addAll(subTrees);
    tmp.add(this);
    tmp.parallelStream().forEach(t -> {
        if(t != this) t.sendCommandAll(); else t.actualSendCommand();
    });
}

private void actualSendCommand() {
    if (data != null)
        System.out.println("["   Thread.currentThread().getName()
                           "] sending command to "   data);
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if (data != null)
        System.out.println("["   Thread.currentThread().getName()
                           "] tree with data "   data   " got "   true);
}

This allows to process the current node concurrently to the processing of the children.

  • Related