I have an application that uses CompletableFuture to process data from a stream async. The demo showcasing my async implementation is as follows:
@Async
@Transactional(readOnly = true)
public void beginProcessing() {
try(Stream<String> st = myJpa.getMyStream()) { // use spring jpa to get a stream of data from db
CompletableFuture.allOf(st.map(i -> CompletableFuture.supplyAsync(() ->
myMethod(i)))
.toArray(CompletableFuture[]::new)).join();
}
}
@Async
private CompletableFuture<Void> myMethod(String i) {
// logic goes here
}
And it works fine. However, at the moment the CompletableFuture uses some default thread pool to do its job. I would like to use a custom defined taskExecutor instead. This can be achieved by supplying a name of the taskExecutor as a 2nd argument of supplyAsync(...)
method. I have done it with ForkJoin
thread pool before, so I am positive that it works.
Now, I want to make it work with taskExecutor, so I have added a new bean to my config class as below (as well as another one which I will use elsewhere):
@Configuration
public class MyConfigClass {
@Bean
public TaskExecutor myNewExecutor() {
RejectedExecutionHandler re = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(0);
executor.setThreadNamePrefix("myNewExecutor-");
executor.setRejectedExecutionHandler(re);
return executor;
}
@Bean
public TaskExecutor someOtherExecutor() { // would be used elsewhere
RejectedExecutionHandler re = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(0);
executor.setThreadNamePrefix("someOtherExecutor-");
executor.setRejectedExecutionHandler(re);
return executor;
}
}
And then I autowired TaskExecutor into my class and added the 2nd arg. So the code looks like below:
@Autowired
private TaskExecutor myNewExecutor;
@Async
@Transactional(readOnly = true)
public void beginProcessing() {
try(Stream<String> st = myJpa.getMyStream()) { // use spring jpa to get a stream of data from db
CompletableFuture.allOf(st.map(i -> CompletableFuture.supplyAsync(() ->
myMethod(i),myNewExecutor))
.toArray(CompletableFuture[]::new)).join();
}
}
@Async
private CompletableFuture<Void> myMethod(String i) {
// logic goes here
}
However, that does not work since we have multiple beans of type TaskExecutor, so I get the following exception
Caused by: org.springframework.beans.factory.NoUniqueBeanDefinitionException:
No qualifying bean of type 'org.springframework.core.task.TaskExecutor' available: expected single matching bean but found 3: myNewExecutor, someOtherExecutor, taskScheduler
Ok, so I thought I would be able to solve it by using @Qualifier
annotation. However, it didn't work for me - the exception is still being thrown (I used said annotation under the @Bean
annotaiton in a config class and then under the @Autowired
annotation in my class where logic is.).
I know that you can make a method use a custom thread pool by giving each bean a name and having the corresponding name being passed as an arg to @Async
annotation. So, I guess I could rework my code to be a simple for
loop that calls mytMethod(...)
with method's async annotation being replaced with @Async("myNewExecutor")
. That would probably work but I would like to preserve CompletableFuture if I can, so wonder if anyone can identify what am I missing that results in the error mentioned above?
I feel like I am missing something trivial to make it work but I just cannot see it.
CodePudding user response:
So ... I figured it out. It was indeed a trivial thing that I somehow missed (embarrassingly so).
By specifying which thread pool to use in my logic, I made sure to effectively tell @Async
above myMethod(...)
which of 3 available beans to use. But I forgot that I have one more @Async
above beginProcessing()
method. So naturally it was confused with these changes and was throwing the exception I mentioned. By specifying which thread pool it needs to use (using bean name trick I mentioned I know), the code works like a charm.