I am trying to create a Spring boot application, which asynchronously process thousand of records using @Async multithreading. For this I am creating 8 threads and 8 sub list from main list so that 8 thread process 8 sub list asynchronously. I am also using @scheduler so that method calls in every 2 seconds.
But problem is that due to scheduler, sometimes this application process duplicate records because this method get called in every 2 seconds and retrieve data from database in every 2 seconds. for example first time method get called and retrieved 72000 records from database whose flag 0 and then @Async method process all these records and change processed records flag from 0 to 1. And again in 2 seconds method get called and retrieve new records wh0se flag is 0.
In log attached pic you can see that first time scheduler get called and retrieved 72000 records and multiple threads started processing in between next scheduler started and retrieved 16000 records which contain those records also which are present in current scheduler.
I am looking solution that next scheduler should not call until first scheduler get completed. Because sometime first scheduler processing records in between if next scheduler call in 2 seconds then may be it retrieve those records again which are already present in first scheduler call.
I can't increase scheduler call time. Because maximum time we get the records around 400-500 and sometime we get records in thousands.
Code as below.
@SpringBootApplication
@EnableScheduling
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public RestTemplate restTemplate(RestTemplateBuilder builder) {
return builder.build();
}
@Bean("ThreadPoolTaskExecutor")
public TaskExecutor getAsyncExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(8);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("async-");
executor.setRejectedExecutionHandler(new RejectedExecutionHandlerImpl());
executor.initialize();
return executor;
}
}
@Service
public class Main {
@Autowired
private DemoDao dao;
@Autowired
private Asyn asyn;
static int schedulerCount = 0;
@Scheduled(cron = "0/2 * * * * *")
public void schedule() {
System.out.println(" Scheduler started schedulerCount : " schedulerCount " " LocalDateTime.now());
List<Json> jsonList = new ArrayList<Json>();
List<List<Json>> smallLi = new ArrayList<List<Json>>();
try {
jsonList = dao.getJsonList();
System.out.println("jsonList size : " jsonList.size());
int count = jsonList.size();
//Creating 8 sublist (8 sublist because thread pool size is 8) from main list(jsonList)
int limit = Math.round(count / 8) 1;
for (int j = 0; j < count; j = limit) {
smallLi.add(new ArrayList<Json>(jsonList.subList(j, Math.min(count, j limit))));
}
System.out.println("smallLi : " smallLi.size());
//After creating 8 sublist, sending sublists with Async method so that 8 threads create and each thread process one sublist asynchronously.
for (int i = 0; i < smallLi.size(); i ) {
asyn.withAsyn(smallLi.get(i), schedulerCount);
}
schedulerCount ;
} catch (Exception e) {
e.printStackTrace();
}
}
@Async("ThreadPoolTaskExecutor")
public void withAsyn(List<Json> li, int schedulerCount) throws Exception {
System.out.println("with start schedulerCount " schedulerCount ", name : "
Thread.currentThread().getName() ", time : " LocalDateTime.now() ", start index : "
li.get(0).getId() ", end index : " li.get(li.size() - 1).getId());
try {
XSSFWorkbook workbook = new XSSFWorkbook();
XSSFSheet spreadsheet = workbook.createSheet("Data");
XSSFRow row;
for (int i = 0; i < li.size(); i ) {
row = spreadsheet.createRow(i);
Cell cell9 = row.createCell(0);
cell9.setCellValue(li.get(i).getId());
Cell cell = row.createCell(1);
cell.setCellValue(li.get(i).getName());
Cell cell1 = row.createCell(2);
cell1.setCellValue(li.get(i).getPhone());
Cell cell2 = row.createCell(3);
cell2.setCellValue(li.get(i).getEmail());
Cell cell3 = row.createCell(4);
cell3.setCellValue(li.get(i).getAddress());
Cell cell4 = row.createCell(5);
cell4.setCellValue(li.get(i).getPostalZip());
}
FileOutputStream out = new FileOutputStream(new File("C:\\Users\\RK658\\Desktop\\logs\\generated\\"
Thread.currentThread().getName() "_" schedulerCount ".xlsx"));
workbook.write(out);
out.close();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("with end schedulerCount " schedulerCount ", name : "
Thread.currentThread().getName() ", time : " LocalDateTime.now() ", start index : "
li.get(0).getId() ", end index : " li.get(li.size() - 1).getId());
}
CodePudding user response:
I believe you can use ShedLock to solve this issue. https://www.baeldung.com/shedlock-spring
https://github.com/lukas-krecan/ShedLock
CodePudding user response:
I am looking solution that next scheduler should not call until first scheduler get completed.
You have defined @Service public class Main {...}
and by default @Service
is an indication for spring to build a singleton. If you have not done anything to modify this, then the expected behavior is for spring to create only a single instance of your class Main
in heap memory.
So in this case if you add the synchronized
keyword in your method you will allow only 1 thread to be able to execute your method at some time. Every other thread requesting to execute your scheduled method will need to wait until the thread that is already running finishes.
@Scheduled(cron = "0/2 * * * * *")
public synchronized void schedule() { .... }
Example of above solution scenario.
Thread A
starts executing methodschedule
.- 2 seconds later while
Thread A
still executes,Thread B
starts and wants to execute the same method. Thread B
will wait outside and will start executing the method only afterThread A
has finished execution.
So in the above scenario if 3 schedulers are spawn at close 3 time intervals all of them will wait for execution and maybe this will lead to some bottle neck of your scheduling and might put some heavy load on your resources.
If that is the case another alternative solution would be to cancel
Thread B
method execution if it arrives while Thread A
already executes the same method. This way you will be sure that no bottle neck is built up.
To implement this you can use a ReentrantLock
.
@Service
public class Main {
private final ReentrantLock reentrantLock = new ReentrantLock();
....
@Scheduled(cron = "0/2 * * * * *")
public void schedule() {
if (reentrantLock.tryLock()) {
try {
//... all your existing method code
} finally {
reentrantLock.unlock();
}
} else {
//log that the `Thread B` was cancelled if you want, as it requested to execute the method while `Thread A` was already executing.
}
}
}