Home > OS >  Can I use CompletableFuture in Scheduling Tasks with Spring?
Can I use CompletableFuture in Scheduling Tasks with Spring?

Time:09-16

The Problem:
My application is using Spring Boot and Java 8. I have a schedule task to get some data from three different data sources but now they are running in sequence. I want to speed them up by dividing the three data retrieval actions by using CompletableFuture at the same time concurrently. However, I run the code and found that the CompletableFuture tasks are running in one single thread called 'scheduling-1' rather than running asynchronously.

What I have tried:
I thought I was doing something not great. So, I decided to step back and tried it out by running a small testing project. What I have tried is like I combine the techniques mentioned by the two examples below:

  1. https://spring.io/guides/gs/scheduling-tasks/
  2. https://spring.io/guides/gs/async-method/

But when I put them together, I find that they are using the same thread when calling the Async method.

The output was:

[   scheduling-1] Looking up PivotalSoftware, thread: scheduling-1  
[   scheduling-1] Looking up CloudFoundry, thread: scheduling-1  
[   scheduling-1] Looking up Spring-Projects, thread: scheduling-1

My Question:
Whether my code went wrong or there is only one thread available in the scheduler even I use Async for CompletableFuture? What's the underlying reasons behind the result of my experiment result?

My code:

AsyncMethodApplication

@SpringBootApplication  
@EnableAsync  
@EnableScheduling  
@PropertySource("classpath:common.properties")  
public class AsyncMethodApplication {

    public static void main(String[] args) {
        // close the application context to shut down the custom ExecutorService
        SpringApplication.run(AsyncMethodApplication.class, args).close();
    }

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(3);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("GithubLookup-");
        executor.initialize();
        return executor;
    }


}

MyService

@Service  
public class MyService {  
    
    private static final Logger logger = LoggerFactory.getLogger(MyService.class);
    
    @Autowired
    private UserLookupService gitHubLookupService;
    
    @Scheduled(cron = "${cron.expression.reloadUserData}")
    @PostConstruct
    public void loadCurrentDayUsers() throws Exception {
        
        this.loadUsers();
        // this.loadOthers();
        
    }
    
    public void loadUsers() throws Exception {
        
        // Start the clock
        long start = System.currentTimeMillis();

        // Kick of multiple, asynchronous lookups
        CompletableFuture<User> page1 = gitHubLookupService.findUser("PivotalSoftware");
        CompletableFuture<User> page2 = gitHubLookupService.findUser("CloudFoundry");
        CompletableFuture<User> page3 = gitHubLookupService.findUser("Spring-Projects");

        // Wait until they are all done
        CompletableFuture.allOf(page1,page2,page3).join();

        // Print results, including elapsed time
        logger.info("Elapsed time: "   (System.currentTimeMillis() - start));
        logger.info("--> "   page1.get());
        logger.info("--> "   page2.get());
        logger.info("--> "   page3.get());
    }

}

User

@JsonIgnoreProperties(ignoreUnknown=true)  
public class User {  

    private String name;
    private String blog;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getBlog() {
        return blog;
    }

    public void setBlog(String blog) {
        this.blog = blog;
    }

    @Override
    public String toString() {
        return "User [name="   name   ", blog="   blog   "]";
    }

}

UserLookupService

@Service  
public class UserLookupService {  

    private static final Logger logger = LoggerFactory.getLogger(UserLookupService.class);

    @Async
    public CompletableFuture<User> findUser(String user) throws InterruptedException {
        logger.info("Looking up "   user   ", thread: "   Thread.currentThread().getName());
        
        // mock result
        User results = new User();
        results.setName(user);
        results.setBlog("Blog - "   user);
        
        // Artificial delay of 1s for demonstration purposes
        Thread.sleep(1000L);
        return CompletableFuture.completedFuture(results);
    }

}

common.properties

cron.expression.reloadUserData=0 0/1 3-23 * * MON-SAT

CodePudding user response:

Credit to M. Deinum.

"remove the @PostConstruct as that might lead to eager init of your UserLookupService leading to no proxy and thus no @Async behavior"

This is a valid answer.

CodePudding user response:

You can refer to:

@Service
public class MyService {

private static final Logger logger = LoggerFactory.getLogger(MyService.class);

@Autowired
private UserLookupService gitHubLookupService;

@Scheduled(cron = "${cron.expression.reloadUserData}")
@PostConstruct
public void loadCurrentDayUsers() throws Exception {

    this.loadUsers();
    // this.loadOthers();

}

@Autowired
@Qualifier("threadTaskGithubLookup")
private ThreadPoolTaskExecutor threadTaskGithubLookup;

public void loadUsers() throws Exception {

    // Start the clock
    long start = System.currentTimeMillis();

    List<String> stringList = new ArrayList<>();
    stringList.add("PivotalSoftware");
    stringList.add("CloudFoundry");
    stringList.add("Spring-Projects");

    List<CompletableFuture<User>> lstComplet = new ArrayList<>();

    for (String str : stringList) {
        lst.add(CompletableFuture.runAsync(() -> {
            try {
                lstComplet.add(doExecuted(str));
            } catch (Exception e){
                log.error("Exception {}", e.getMessage());
            }
        }, threadTaskGithubLookup));
    }

    List<User> userResults = new ArrayList<>();
    for(CompletableFuture<User> compl : lstComplet){
        User user = compl.get();
        userResult.add(user);
    }

    // Print results, including elapsed time
    logger.info("Elapsed time: "   (System.currentTimeMillis() - start));
    for(User us : userResults){
        logger.info("--> "   new ObjectMapper().writeValueAsString(us));
    }
}

private CompletableFuture<User> doExecuted(String str) {
    User user = gitHubLookupService.findUser(str);
    return CompletableFuture.completedFuture(user);;
}

I re-declare the bean Executor:

@Bean("threadTaskGithubLookup")
public ThreadPoolTaskExecutor threadTaskGithubLookup() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(3);
    executor.setMaxPoolSize(3);
    executor.setThreadNamePrefix("GithubLookup-");
    executor.initialize();
    return executor;
}
  • Related