When I run the following code, only 2 out of 8 threads that available run, can anyone explain why is it the case? how can I change the code in such a way that it will take advantage of all 8 threads?
Tree.java
:
package il.co.roy;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class Tree<T>
{
private final T data;
private final Set<Tree<T>> subTrees;
public Tree(T data, Set<Tree<T>> subTrees)
{
this.data = data;
this.subTrees = subTrees;
}
public Tree(T data)
{
this(data, new HashSet<>());
}
public Tree()
{
this(null);
}
public T getData()
{
return data;
}
public Set<Tree<T>> getSubTrees()
{
return subTrees;
}
@Override
public boolean equals(Object o)
{
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
Tree<?> tree = (Tree<?>) o;
return Objects.equals(data, tree.data) &&
Objects.equals(subTrees, tree.subTrees);
}
@Override
public int hashCode()
{
return Objects.hash(data, subTrees);
}
@Override
public String toString()
{
return "Tree{"
"data=" data
", subTrees=" subTrees
'}';
}
public void sendCommandAll()
{
if (data != null)
System.out.println("[" Thread.currentThread().getName() "] sending command to " data);
try
{
Thread.sleep(5000);
} catch (InterruptedException e)
{
e.printStackTrace();
}
if (data != null)
System.out.println("[" Thread.currentThread().getName() "] tree with data " data " got " true);
subTrees.parallelStream()
// .map(Tree::sendCommandAll)
.forEach(Tree::sendCommandAll);
// .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
}
}
(It doesn't matter if I use forEach
or reduce
).
Main.java
:
package il.co.roy;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Main
{
public static void main(String... args)
{
System.out.println("Processors: " Runtime.getRuntime().availableProcessors());
final Tree<Integer> root = new Tree<>(null,
Set.of(new Tree<>(1,
IntStream.range(2, 7)
.boxed()
.map(Tree::new)
.collect(Collectors.toSet()))));
root.sendCommandAll();
// IntStream.generate(() -> 1)
// .parallel()
// .forEach(i ->
// {
// System.out.println(Thread.currentThread().getName());
// try
// {
// Thread.sleep(5000);
// } catch (InterruptedException e)
// {
// e.printStackTrace();
// }
// });
}
}
In the main
method I create a tree with the following structure:\
root (data is `null`)
|- 1
|- 2
|- 3
|- 4
|- 5
|- 6
sendCommandAll
function process every sub-tree (in parallel) only if it's parent finishes to be processed.
but the result is as follows:
Processors: 8
[main] sending command to 1
[main] tree with data 1 got true
[main] sending command to 6
[ForkJoinPool.commonPool-worker-2] sending command to 5
[main] tree with data 6 got true
[ForkJoinPool.commonPool-worker-2] tree with data 5 got true
[ForkJoinPool.commonPool-worker-2] sending command to 4
[ForkJoinPool.commonPool-worker-2] tree with data 4 got true
[ForkJoinPool.commonPool-worker-2] sending command to 3
[ForkJoinPool.commonPool-worker-2] tree with data 3 got true
[ForkJoinPool.commonPool-worker-2] sending command to 2
[ForkJoinPool.commonPool-worker-2] tree with data 2 got true
(For the record, when I execute the commented code in Main.java
, the JVM uses all 7 ( 1) threads available commonPool
)
How can I improve my code?
CodePudding user response:
As explained in the second half of this answer, the thread utilization when processing HashMap
s or HashSet
s depends on the distribution of the elements within the backing array, which depends on the hashcodes. Especially with with a small number of elements, compared to the (default) capacity, this may result in bad work splitting.
A simple work-around is using new ArrayList<>(subTrees).parallelStream()
instead of subTrees.parallelStream()
.
But note that your method performs the actual work of the current node (in the example simulated with a sleep
) before processing the children which also reduces the potential parallelism.
You may use
public void sendCommandAll() {
if(subTrees.isEmpty()) {
actualSendCommand();
return;
}
List<Tree<T>> tmp = new ArrayList<>(subTrees.size() 1);
tmp.addAll(subTrees);
tmp.add(this);
tmp.parallelStream().forEach(t -> {
if(t != this) t.sendCommandAll(); else t.actualSendCommand();
});
}
private void actualSendCommand() {
if (data != null)
System.out.println("[" Thread.currentThread().getName()
"] sending command to " data);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (data != null)
System.out.println("[" Thread.currentThread().getName()
"] tree with data " data " got " true);
}
This allows to process the current node concurrently to the processing of the children.