Home > Blockchain >  How many threads parallelStream() creates when given only one list as input?
How many threads parallelStream() creates when given only one list as input?

Time:07-23

I'm processing a batch of 100 objects splitting them in partitions of 10, and each partition is sent to a separate thread to be processed in parallel. This is the current code for that:

1. var itemsToSave = new ConcurrentLinkedQueue<ItemToSave>();
2
3. Lists.partition(originalList, 10)
4.  .parallelStream()
5.  .forEach(partitionedList -> process(partitionedList, itemsToSave));

My understanding is that

line1: creates a thread safe list to add the individual items once they are processed

line3: will return a number of lists, each of them with 10 entries from originalList

line4 will spawn a new thread for each of the lists created on line3

line5: for each list in its own thread, start the process that is supposed to be parallel.

Please correct me if my understanding is wrong, but nevertheless this code is working as expected: prior to parallelization 75 seconds to process, after parallelization 20 seconds. Looks good.

But after a while I noticed that the process is now taking literally ~1ms or ven zero according to Kibana. That is because the new code processes the items so fast that the database doesn't have time to reach 100 items to be processed, and therefore the batch that should be of 100 is now less than 10.

In that sense, line 3 of the code shown before will return one single list, after all a partition of 10 items from a list with less than 10 items will be a single list.

Then, this single list is sent to parallelStream(). Here I where my question is: does parallelStream() still spawn a new thread to process one single list? Or does it only spawn threads when inputted with more than one list? Because (to me) it doesn't make much sense to open a new thread to process one single batch of items... that could happen sequentially and reduce the overhead of spawning a thread and etc.

so: How many threads parallelStream() creates when given only one list as input?

sorry for the long question, but I felt like a had to explain my thoughts

CodePudding user response:

How many threads parallelStream() creates when given only one list as input?

Under the hood of parallel streams Fork-Join framework is being used. It divides the task (the whole lot of elements you feed into a stream) into smaller subtasks (which can be split further) and then joins the results of the executed tasks in the order they were created (that guarantees that encounter order of elements in the ordered stream would be preserved during parallel processing which you can observe in the behavior of reduce, collect, etc.).

Threads from the so-call Common pool (which is lazily initialed, and its maximum size depends on the actual number of CPU cores) would be assigned with the tasks while executing a parallel stream.

How many threads would be occupied depends on how large the input is, depends on how spliterator is implemented (i.e. how a task would be divided into subtasks), how many the threads there are in the Common pool (depends on the actual machine). Also important, how quickly each item would be processed (i.e. depends on your logic), threads would try to do their best by stealing work when they are done with the previous task.

Javadoc of parallelStream() says "Returns a possibly parallel Stream". There's no even guaranty that there would more than one thread occupied with your task (in case if amount of elements is tiny - one thread might handle it faster without splitting and joining the results).

You should not care about the actual number of threads because you're not in control of it. The only thing, you need to decide with parallel streams whether it's justifiable to execute a stream in parallel or not.

CodePudding user response:

I think you are using Lists.partition which is part of guava & it returns List of List & as you have mentioned that your Lists.partition is returning single list then in that case, I say it is not creating multiple threads.

By the way, parallelstream uses ForkJoinPool.commonPool() to get threads required for processing.

Below line of code just gives you how many threads available for you for processing.

System.out.println(Runtime.getRuntime().availableProcessors()); 

Now, lets take an example similar to your case:

    List<List<String>> list =new ArrayList<List<String>>();
    List<String> w1=new ArrayList<String>();

    w1.add("A");w1.add("B");w1.add("C");w1.add("D");w1.add("E");w1.add("F");w1.add("G");w1.add("H");w1.add("I");w1.add("J"); list.add(w1);

    list.parallelStream().
    forEach(x-> System.out.println(Thread.currentThread().getName())); // this gives output as `main`, no other threads got used.

But now, lets say you are giving number of items to parallelstream to process:

IntStream.range(0, 10).boxed().parallel().forEach(x-> System.out.println(Thread.currentThread().getName()));

The output of above will be something like:

main
main
main
main
ForkJoinPool.commonPool-worker-2
main
main
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-2

So, deciding number of threads really depends on input to parallelStream & how quickly your operation can be performed.

  • Related