I am trying to run a async method in scheduled for every 5 mins in Spring to process 1000 of tasks using 100 threads.At end of every run I need to figure out how many task's failed & succeeded. I tried using Completable future using below sample code but I am facing 2 main issue.
- If some exceptions comes schedular restarts without completing run.
- How to get success/failure task number after run.I would like to print at the end success tasks:[1,2,4,5] failed tasks : [9,10,7,8]
//ScheduledTask
public void processTask(){
List<CompletableFuture<Integer>> futures=new ArrayList<>();
for(int I=0;i<300;i ){
futures.add(service.performTask(i));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}
//MyAsyncService
@Async
public CompletableFuture<Integer> performTask(int i){
try{
Thread.sleep(1000);
int test=(int) (i 10)/RandomNumber(0,10); // generate random number between 0 to 10 and divide I 10 by that to fail some tasks randomly.
return CompletableFuture.completeFuture(i);
}catch(Exception e){
CompletableFuture<Integer> f = new CompletableFuture<>();
f.completeExceptionally(e);
return f;
}
}
//MyAsyncConfig
public Executor getAsyncExecutor() {
final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("async-thread-");
threadPoolTaskExecutor.setCorePoolSize(100);
threadPoolTaskExecutor.setMaxPoolSize(300);
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
CodePudding user response:
The call to join() on the CompleteableFuture is throwing an exception if any of the tasks failed, so that is likely the problem.
Instead of calling join, try something like this to get the counts of success / failures using the whenComplete method:
public void processTask(){
List<CompletableFuture<Integer>> futures = new ArrayList<>();
List<Integer> success = new ArrayList<>();
List<Integer> failures = new ArrayList<>();
for (int i = 0; i < 300; i ) {
int rec = i;
futures.add(CompletableFuture.supplyAsync(() -> performTask(rec)).whenComplete((a, ex) -> {
synchronized (success) {
if (ex == null)
success.add(rec);
else
failures.add(rec);
}
}));
}
for (CompletableFuture<Integer> f : futures) {
try {
f.join();
} catch (CompletionException ex) {
// ignore
}
}
System.out.println("Successes = " success);
System.out.println("Failures = " failures);
}
public Integer performTask(int i) {
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
int test = (int) (i 10) / RandomUtils.nextInt(0, 10);
return i;
}
Edit: There was a problem with reporting the numbers of the failures, which I have fixed.
Edit #2: I incorrectly started whenComplete waits for completion before proceeding. That was not correct, code has been adjusted.
Edit #3: The method performTask as written in this post does not actually perform any work asynchronously. I have re-written it to be asynchronous.
CodePudding user response:
Hmmm, I think we still need the join to wait the completion of the whole set of CompletableFutures for the scheduled task :)
Another thing, to make sure the scheduled task finished, we need to use exceptionally() or handle() to catch the exception.
public void processTask() {
List<CompletableFuture<Integer>> futures=new ArrayList<>();
List<Integer> successes = new ArrayList<>();
List<Integer> failures = new ArrayList<>();
for (int i = 0; i < 10; i ) {
int record = i;
CompletableFuture<Integer> integerCompletableFuture = performTask(i).handle(((integer, throwable) -> {
if (throwable != null) {
failures.add(record);
} else {
successes.add(record);
}
return integer;
}));
futures.add(integerCompletableFuture);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
log.info("successes: {}", successes);
log.info("failures: {}", failures);
}
This will print something like this:
[main] INFO rxjava.CompletableFutureConcurrency - successes: [0, 1, 3, 4, 5, 6, 7, 9]
[main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]
or something like this if you don't need success records.
public void processTask(){
List<CompletableFuture<Integer>> futures=new ArrayList<>();
List<Integer> failures = new ArrayList<>();
for (int i = 0; i < 10; i ) {
int record = i;
CompletableFuture<Integer> integerCompletableFuture = performTask(i)
.exceptionally(throwable -> {
failures.add(record);
return record;
});
futures.add(integerCompletableFuture);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
log.info("failures: {}", failures);
This will print something like this:
[main] INFO rxjava.CompletableFutureConcurrency - failures: [2, 8]
PS: Here I used the int record = i
. You can play without it know why I did so ;)