very good evening. i am struggling with partitioning a list after following Mahmoud Ben Hassine solution Spring Batch: how to process a list of String in parallel and return a subset of them? . i would like help with the partitioner. i have used it on a list of 25 elements. it splits the lists into 2 partitions 0 - 12, 13 - 24. that means there are 2 threads and the chunk size is 5. so shouldnt thread 1 process 3 chunks sizes 5, 5, 3 and thread 2 process 3 chunks sizes 5, 5, 2? but when i look at my logs, i see threads 1 and 2 both process 5, 5, 5, 5, 5...making 50 items. that means 50 items are being written to the DB instead of 25. can anyone please help me with solving this problem?
here are the logs
[2m2022-10-11 14:37:00.165[0;39m [32m INFO [,,][0;39m [35m19504[0;39m [2m---[0;39m [2m[nio-2222-exec-2][0;39m [36mo.s.b.c.j.SimpleStepHandler [0;39m [2m:[0;39m Executing step: [masterStep]
partitions - {partition1={range=Range(start=13, end=24)}, partition0={range=Range(start=0, end=12)}}
hawaii thread name - buildTaskExecutor2-1 - List size = 5
hawaii thread name - buildTaskExecutor2-2 - List size = 5
hawaii thread name - buildTaskExecutor2-1 - List size = 5
hawaii thread name - buildTaskExecutor2-2 - List size = 5
hawaii thread name - buildTaskExecutor2-2 - List size = 5
hawaii thread name - buildTaskExecutor2-1 - List size = 5
hawaii thread name - buildTaskExecutor2-2 - List size = 5
hawaii thread name - buildTaskExecutor2-1 - List size = 5
hawaii thread name - buildTaskExecutor2-1 - List size = 5
hawaii thread name - buildTaskExecutor2-2 - List size = 5
[2m2022-10-11 14:37:04.420[0;39m [32m INFO [,,][0;39m [35m19504[0;39m [2m---[0;39m [2m[TaskExecutor2-1][0;39m [36mo.s.b.c.s.AbstractStep [0;39m [2m:[0;39m Step: [step1:partition1] executed in 3s673ms
[2m2022-10-11 14:37:04.420[0;39m [32m INFO [,,][0;39m [35m19504[0;39m [2m---[0;39m [2m[TaskExecutor2-2][0;39m [36mo.s.b.c.s.AbstractStep [0;39m [2m:[0;39m Step: [step1:partition0] executed in 3s673ms
[2m2022-10-11 14:37:04.619[0;39m [32m INFO [,,][0;39m [35m19504[0;39m [2m---[0;39m [2m[nio-2222-exec-2][0;39m [36mo.s.b.c.s.AbstractStep [0;39m [2m:[0;39m Step: [masterStep] executed in 4s453ms
here is the code
@Component
public class CustomerWriter implements ItemWriter<Customer> {
@Autowired
private CustomerRepository customerRepository;
public CustomerWriter(CustomerRepository customerRepository) {
this.customerRepository = customerRepository;
}
public CustomerWriter() {
}
@Override
public void write(List<? extends Customer> customersList) throws Exception {
//im expecting thread 1 to call this method 3 times with the sizes 5, 5, 3
//and thread 2 to call the method 3 times with the sizes 5, 5, 2 and that will equal 25 items saved
//but this method is being called 5 times with 5 items in each call by each thread totaling 50 times saved
System.out.println("hawaii thread name - " Thread.currentThread().getName() " - List size = " customersList.size());
customerRepository.saveAll(customersList);
}
}
@Configuration
@EnableBatchProcessing
@AllArgsConstructor
public class PocBatchConfig {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
private CustomerWriter customerWriter;
private static List<Customer> staticCustomerList = createMockCustomerList();
@Bean
@StepScope
public ItemReader<Customer> buildListItemReader(){
return new ListItemReader<Customer>(staticCustomerList);
}
private static List<Customer> createMockCustomerList() {
ReentrantLock reentrantLock = new ReentrantLock();
List<Customer> mockCustomerList = new ArrayList<>();
try {
reentrantLock.lock();
for (int i = 0; i < 25; i ) {
String ran1 = RandomStringUtils.random(4, true, true);
String ran2 = UUID.randomUUID().toString().substring(5, 10);
String ran3 = RandomStringUtils.random(4, true, true);
String randomId = ran1 ran2 ran3;
mockCustomerList.add(new Customer(randomId, RandomStringUtils.random(10, true, true),
RandomStringUtils.random(10, true, true), RandomStringUtils.random(10, true, true),
RandomStringUtils.random(10, true, true), RandomStringUtils.random(10, true, true),
RandomStringUtils.random(10, true, true)));
}
} finally {
reentrantLock.unlock();
}
return mockCustomerList;
}
@Bean
public CustomerProcessor buildCustomerProcessor() {
return new CustomerProcessor();
}
@Bean
public CustomerRangePartitioner buildCustomerRangePartitioner() {
return new CustomerRangePartitioner(staticCustomerList.size());
}
@Bean
public PartitionHandler buildPartitionHandler() {
TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();
taskExecutorPartitionHandler.setGridSize(2);
taskExecutorPartitionHandler.setTaskExecutor(buildTaskExecutor2());
taskExecutorPartitionHandler.setStep(buildSlaveStep1());
return taskExecutorPartitionHandler;
}
@Bean
public Step buildSlaveStep1() {
return stepBuilderFactory
.get("step1")
.<Customer, Customer>chunk(5)
.reader(buildListItemReader())
.processor(buildCustomerProcessor())
.writer(customerWriter)
.build();
}
@Bean
public Step buildMasterStep1() {
return stepBuilderFactory
.get("masterStep")
.partitioner(buildSlaveStep1().getName(), buildCustomerRangePartitioner())
.partitionHandler(buildPartitionHandler())
.build();
}
@Bean
public Job buildJob1() {
return jobBuilderFactory
.get("job1")
.flow(buildMasterStep1())
.end()
.build();
}
@Bean
public TaskExecutor buildTaskExecutor2() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(4);
threadPoolTaskExecutor.setCorePoolSize(4);
threadPoolTaskExecutor.setQueueCapacity(4);
return threadPoolTaskExecutor;
}
public class CustomerRangePartitioner implements Partitioner{
private int listSize;
public CustomerRangePartitioner(int listSize) {
this.listSize = listSize;
}
public CustomerRangePartitioner() {
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int partitionSize = listSize / gridSize;
Range[] ranges = new Range[gridSize];
for (int i = 0, j = 0; i < gridSize; i , j = partitionSize) {
ranges[i] = new Range(j, j partitionSize);
}
Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
for (int i = 0; i < gridSize; i ) {
ExecutionContext context = new ExecutionContext();
if (i > 0) {
Range range = ranges[i];
range.setStart(range.getStart() 1);
}
context.put("range", ranges[i]);
partitions.put("partition" i, context);
}
System.out.println("partitions - " partitions);
return partitions;
}
}
public class CustomerProcessor implements ItemProcessor<Customer, Customer>{
@Override
public Customer process(Customer customer) throws Exception {
return customer;
}
}
}
CodePudding user response:
im back with some more information. i disabled the partitioning by bypassing the masterslavestep and going directly to the workerSlaveStep. i also increased the chunk to 20 and the single thread processed the list of 25 correctly. one chunk was 20 and the other was 5. i would like the same thing to happen using multiple partitions with the partitioner.
here are the logs
[2m2022-10-11 17:09:45.333[0;39m [32m INFO [,,][0;39m [35m12764[0;39m [2m---[0;39m [2m[nio-2222-exec-1][0;39m [36mo.s.b.c.j.SimpleStepHandler [0;39m [2m:[0;39m Executing step: [step1]
hawaii thread name - http-nio-2222-exec-1 - List size = 20
hawaii thread name - http-nio-2222-exec-1 - List size = 5
[2m2022-10-11 17:09:48.166[0;39m [32m INFO [,,][0;39m [35m12764[0;39m [2m---[0;39m [2m[nio-2222-exec-1][0;39m [36mo.s.b.c.s.AbstractStep [0;39m [2m:[0;39m Step: [step1] executed in 2s833ms
here is the code change that i made. i go directly to the child instead of the master.
@Bean
public Job buildJob1() {
return jobBuilderFactory
.get("job1")
// the original logic goes to buildMasterStep1() method
.flow(buildSlaveStep1())
.end()
.build();
}
CodePudding user response:
lol im back after waking up in the middle of the night to try another idea. this time i created 2 steps and manually split the list in half using subList to see if it would properly print the 5, 5, 3 during step 1 and 5, 5, 2 during step 2 using a chunksize of 5 and it did. i would like this same exact thing to happen with different partitions using the partitioner. please help me accomplish the same outcome using a partitioner
here are the logs
[2m2022-10-11 22:55:47.409[0;39m [32m INFO [,,][0;39m [35m12084[0;39m [2m---[0;39m [2m[nio-2222-exec-1][0;39m [36mo.s.b.c.j.SimpleStepHandler [0;39m [2m:[0;39m Executing step: [step1]
hawaii thread name - http-nio-2222-exec-1 - List size = 5
hawaii thread name - http-nio-2222-exec-1 - List size = 5
hawaii thread name - http-nio-2222-exec-1 - List size = 3
[2m2022-10-11 22:55:49.284[0;39m [32m INFO [,,][0;39m [35m12084[0;39m [2m---[0;39m [2m[nio-2222-exec-1][0;39m [36mo.s.b.c.s.AbstractStep [0;39m [2m:[0;39m Step: [step1] executed in 1s874ms
[2m2022-10-11 22:55:49.864[0;39m [32m INFO [,,][0;39m [35m12084[0;39m [2m---[0;39m [2m[nio-2222-exec-1][0;39m [36mo.s.b.c.j.SimpleStepHandler [0;39m [2m:[0;39m Executing step: [step2]
hawaii thread name - http-nio-2222-exec-1 - List size = 5
hawaii thread name - http-nio-2222-exec-1 - List size = 5
hawaii thread name - http-nio-2222-exec-1 - List size = 2
[2m2022-10-11 22:55:51.537[0;39m [32m INFO [,,][0;39m [35m12084[0;39m [2m---[0;39m [2m[nio-2222-exec-1][0;39m [36mo.s.b.c.s.AbstractStep [0;39m [2m:[0;39m Step: [step2] executed in 1s672ms
here is the code change. i just added a next step to the job. i manually passed the first half of the list to step1 and the second half to step 2
@Bean
public Job buildJob1() {
return jobBuilderFactory
.get("job1")
.flow(buildSlaveStep1())
.next(buildSlaveStep2())
.end()
.build();
}