Home > other >  How to select to use a specific task executor in a method in springboot java?
How to select to use a specific task executor in a method in springboot java?

Time:01-29

I have an application that uses CompletableFuture to process data from a stream async. The demo showcasing my async implementation is as follows:

    @Async
    @Transactional(readOnly = true)
    public void beginProcessing() {
        try(Stream<String> st = myJpa.getMyStream()) { // use spring jpa to get a stream of data from db 
            CompletableFuture.allOf(st.map(i -> CompletableFuture.supplyAsync(() ->
                            myMethod(i)))
                    .toArray(CompletableFuture[]::new)).join();
        }
    }

    @Async
    private CompletableFuture<Void> myMethod(String i) {
       // logic goes here
    }

And it works fine. However, at the moment the CompletableFuture uses some default thread pool to do its job. I would like to use a custom defined taskExecutor instead. This can be achieved by supplying a name of the taskExecutor as a 2nd argument of supplyAsync(...) method. I have done it with ForkJoin thread pool before, so I am positive that it works.

Now, I want to make it work with taskExecutor, so I have added a new bean to my config class as below (as well as another one which I will use elsewhere):

@Configuration
public class MyConfigClass {

    @Bean
    public TaskExecutor myNewExecutor() {
        RejectedExecutionHandler re = new ThreadPoolExecutor.CallerRunsPolicy();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(0);
        executor.setThreadNamePrefix("myNewExecutor-");
        executor.setRejectedExecutionHandler(re);
        return executor;
    }

    @Bean
    public TaskExecutor someOtherExecutor() { // would be used elsewhere
        RejectedExecutionHandler re = new ThreadPoolExecutor.CallerRunsPolicy();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(100); 
        executor.setQueueCapacity(0);
        executor.setThreadNamePrefix("someOtherExecutor-");
        executor.setRejectedExecutionHandler(re);
        return executor;
    }
  
}

And then I autowired TaskExecutor into my class and added the 2nd arg. So the code looks like below:

    @Autowired
    private TaskExecutor myNewExecutor;

    @Async
    @Transactional(readOnly = true)
    public void beginProcessing() {
        try(Stream<String> st = myJpa.getMyStream()) { // use spring jpa to get a stream of data from db 
            CompletableFuture.allOf(st.map(i -> CompletableFuture.supplyAsync(() ->
                            myMethod(i),myNewExecutor))
                    .toArray(CompletableFuture[]::new)).join();
        }
    }

    @Async
    private CompletableFuture<Void> myMethod(String i) {
       // logic goes here
    }

However, that does not work since we have multiple beans of type TaskExecutor, so I get the following exception

Caused by: org.springframework.beans.factory.NoUniqueBeanDefinitionException: 
No qualifying bean of type 'org.springframework.core.task.TaskExecutor' available: expected single matching bean but found 3: myNewExecutor, someOtherExecutor, taskScheduler

Ok, so I thought I would be able to solve it by using @Qualifier annotation. However, it didn't work for me - the exception is still being thrown (I used said annotation under the @Bean annotaiton in a config class and then under the @Autowired annotation in my class where logic is.).

I know that you can make a method use a custom thread pool by giving each bean a name and having the corresponding name being passed as an arg to @Async annotation. So, I guess I could rework my code to be a simple for loop that calls mytMethod(...) with method's async annotation being replaced with @Async("myNewExecutor"). That would probably work but I would like to preserve CompletableFuture if I can, so wonder if anyone can identify what am I missing that results in the error mentioned above?

I feel like I am missing something trivial to make it work but I just cannot see it.

CodePudding user response:

So ... I figured it out. It was indeed a trivial thing that I somehow missed (embarrassingly so).

By specifying which thread pool to use in my logic, I made sure to effectively tell @Async above myMethod(...) which of 3 available beans to use. But I forgot that I have one more @Async above beginProcessing() method. So naturally it was confused with these changes and was throwing the exception I mentioned. By specifying which thread pool it needs to use (using bean name trick I mentioned I know), the code works like a charm.

  • Related