Home > other >  How to execute operations for a batch of elements in ArrayList with CompletableFuture?
How to execute operations for a batch of elements in ArrayList with CompletableFuture?

Time:11-13

Java version : 11

I have List of objects, I want to perform certain operations on them, where one operations depends on the result of another. To achieve this workflow in Async fashion, I am using CompletableFuture.

Currently I am doing this by partitioning the List into sub lists and giving each of the sub list to CompletableFuture, so each thread in my thread-pool can start working on that sublist.

The code for above approach I used and working is :

List<SomeObject> someObjectList // original list

List<List<SomeObject>> partitionList = ListUtils.partition(someObjectList, partionSize);

partitionList.forEach(subList -> {
    CompletableFuture.supplyAsync(() ->  firstOperation(subList), executorService)
            .thenAcceptAsync(firstOpresult -> secondOperationWithFirstOpResult(firstOpresult),executorService);
});

public static List<String> firstOperation(List<SomeObject> subList){
        //perform operation
        return List<String>;
}

public static void secondOperationWithFirstOpResult(List<String> firstOpProducedList) {
        //perform operation
        //print results.
}

Here the problem is partitioning the original list,

because if my original list has 100 thousand records and partion size is say 100(which means I want 100 items in each sublist), I will have 1000 sublist objects each holding 100 elements, which may not be good considering this many objects in memory, moreover if the partion size is user controlled/config controller, a smaller partion size would result in huge no of sublist objects.

instead of partitioning the original list,

  1. I want to take the original list.(say 100 elements)
  2. Have a startIndex and endIndex on the original list(say 0 to 9, 10 to 19...)
  3. And give each of those batches to a thread, in threadpool with CompletableFuture
  4. So this thread can perform the two operations.

I know SO is not place for exact solution, but if you could nudge me in the right direction, a pseudo code or even if this is possible with CompletableFuture in the first place, would be great help :)

CodePudding user response:

Since ListUtils.partition is not a standard method, it’s impossible to say, how it works. But if it works “the smart way”, it will use subList on the original list, instead of copying data.

If you want to be on the safe side, you can do the trivial partitioning yourself:

IntStream.range(0, (someObjectList.size()   partionSize - 1) / partionSize)
    .forEach(p -> {
        int start = p * partionSize,
            end = Math.min(someObjectList.size(), start   partionSize);
        List<SomeObject> subList = someObjectList.subList(start, end);
        CompletableFuture.supplyAsync(() ->  firstOperation(subList), executorService)
            .thenAcceptAsync(r -> secondOperationWithFirstOpResult(r), executorService);
    });

Since these sublists do not store the elements, they consume less memory than the CompletableFuture instances, for example. So this is nothing to worry about.

But if you can live with using the default thread pool¹ instead of executorService, you could use

IntStream.range(0, (someObjectList.size()   partionSize - 1) / partionSize)
    .parallel()
    .forEach(p -> {
        int start = p * partionSize,
            end = Math.min(someObjectList.size(), start   partionSize);
        List<SomeObject> subList = someObjectList.subList(start, end);
        secondOperationWithFirstOpResult(firstOperation(subList));
    });

where each sublist only exists while being processed.

This will already use Fork/Join tasks under the hood. There is no need to implement those Fork/Join operations yourself.

¹ the default pool is unspecified, but will be ForkJoinPool.commonPool() in practice.

  • Related