I have thousands of records to process using spring-batch and it takes too much time for our business requirement. The logic is simply to read a CSV file record by record and do the processing.
I want to parallelise the step execution to speed up batch processing. I think the best choice would be to partition the step and execute let's say 1-100 records in one thread, 101-200 in another and so on. But instead, every thread processes all of the records.
The same happens in this very simple example:
- 10 rows in CSV file
- 5 threads
- Each should process 2 records
- Instead each processes 10 records
@Configuration
@RequiredArgsConstructor
public class JobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Value("classpath:employees.csv")
private Resource resourceFile;
@Bean("MyJob1")
public Job createJob() {
return jobBuilderFactory.get("my job 1")
.incrementer(new RunIdIncrementer())
.start(step())
.build();
}
@Bean("MyStep1")
public Step step() {
return stepBuilderFactory.get("my step 1")
.partitioner("my step 1", new SimplePartitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean("slaveStep")
public Step slaveStep() {
return stepBuilderFactory.get("read csv stream")
.<Employee, Employee>chunk(1)
.reader(flatFileItemReader())
.processor((ItemProcessor<Employee, Employee>) employee -> {
System.out.printf("Processed item %s%n", employee.getId());
return employee;
})
.writer(list -> {
for (Employee item : list) {
System.out.println(item);
}
})
.build();
}
@StepScope
@Bean
public FlatFileItemReader<Employee> flatFileItemReader() {
FlatFileItemReader<Employee> reader = new FlatFileItemReader<>();
reader.setResource(resourceFile);
DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<>();
lineMapper.setFieldSetMapper(fieldSet -> {
String[] values = fieldSet.getValues();
return Employee.builder()
.id(Integer.parseInt(values[0]))
.firstName(values[1])
.build();
});
lineMapper.setLineTokenizer(new DelimitedLineTokenizer(";"));
reader.setLineMapper(lineMapper);
return reader;
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();
taskExecutorPartitionHandler.setTaskExecutor(taskExecutor());
taskExecutorPartitionHandler.setStep(slaveStep());
taskExecutorPartitionHandler.setGridSize(5);
return taskExecutorPartitionHandler;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(5);
taskExecutor.setCorePoolSize(5);
taskExecutor.setQueueCapacity(5);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}
@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchTestsApplication implements CommandLineRunner {
private final JobLauncher jobLauncher;
private final Job job;
public SpringBatchTestsApplication(JobLauncher jobLauncher,
@Qualifier("MyJob1") Job job) {
this.jobLauncher = jobLauncher;
this.job = job;
}
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestsApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
jobLauncher.run(job, new JobParameters());
}
}
@Value
@Builder
public class Employee {
private final int id;
private final String firstName;
}
employees.csv:
1;Jakub
2;Mike
3;Pawel
4;Joana
5;Michal
6;Joe
7;Bailey
8;Bailhache
9;John
10;Eva
Example (example because the order is not important) expected output for 5 threads (gridSize):
Processed item 1
Employee(id=1, firstName=Jakub)
Processed item 2
Employee(id=2, firstName=Mike)
Processed item 3
Employee(id=3, firstName=Pawel)
Processed item 4
Employee(id=4, firstName=Joana)
Processed item 5
Employee(id=5, firstName=Michal)
Processed item 6
Employee(id=6, firstName=Joe)
Processed item 7
Employee(id=7, firstName=Bailey)
Processed item 8
Employee(id=8, firstName=Bailhache)
Processed item 9
Employee(id=9, firstName=John)
Processed item 10
Employee(id=10, firstName=Eva)
The actual output is that the above occurs 5 times.
CodePudding user response:
I don't know whether my solution is the best but it is worked.
I added two parameters as global in JobConfig
class:
private int startLine = -1;
private int lastLine = 1;
Then I set these parameters to reader in flatFileItemReader
method:
reader.setCurrentItemCount(startLine);
reader.setMaxItemCount(lastLine);
startLine =2;
lastLine =2;
All code:
@Configuration
@RequiredArgsConstructor
public class JobConfig {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private int startLine = -1;
private int lastLine = 1;
@Value("classpath:employees.csv")
private Resource resourceFile;
@Bean("MyJob1")
public Job createJob() {
return jobBuilderFactory.get("my job 1")
.incrementer(new RunIdIncrementer())
.start(step())
.build();
}
@Bean("MyStep1")
public Step step() {
return stepBuilderFactory.get("my step 1")
.partitioner("my step 1", new SimplePartitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean("slaveStep")
public Step slaveStep() {
return stepBuilderFactory.get("read csv stream")
.<Employee, Employee>chunk(1)
.reader(flatFileItemReader())
.processor((ItemProcessor<Employee, Employee>) employee -> {
System.out.printf("Processed item %s%n", employee.getId());
return employee;
})
.writer(list -> {
for (Employee item : list) {
//System.out.println(item);
}
})
.build();
}
@StepScope
@Bean
public FlatFileItemReader<Employee> flatFileItemReader() {
FlatFileItemReader<Employee> reader = new FlatFileItemReader<>();
reader.setResource(resourceFile);
DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<>();
lineMapper.setFieldSetMapper(fieldSet -> {
String[] values = fieldSet.getValues();
return Employee.builder()
.id(Integer.parseInt(values[0]))
.firstName(values[1])
.build();
});
lineMapper.setLineTokenizer(new DelimitedLineTokenizer(";"));
reader.setLineMapper(lineMapper);
reader.setCurrentItemCount(startLine);
reader.setMaxItemCount(lastLine);
startLine =2;
lastLine =2;
return reader;
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();
taskExecutorPartitionHandler.setTaskExecutor(taskExecutor());
taskExecutorPartitionHandler.setStep(slaveStep());
taskExecutorPartitionHandler.setGridSize(5);
return taskExecutorPartitionHandler;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(5);
taskExecutor.setCorePoolSize(5);
taskExecutor.setQueueCapacity(5);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}
CodePudding user response:
This should be the solution look at the partitioner he will take care of counting the lines of the file and divde them for the reader and the usage of @Value("#{stepExecutionContext['fromLine']}")
to inject in the reader the correct lines to read.
An please use injection instead of calling the methods.
@Configuration
@RequiredArgsConstructor
public class JobConfig {
private static final Logger log = LoggerFactory.getLogger(JobConfig.class);
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Value(value = "classpath:employees.csv")
private Resource resource;
@Bean("MyJob1")
public Job createJob(@Qualifier("MyStep1") Step stepMaster) {
return jobBuilderFactory.get("MyJob1")
.incrementer(new RunIdIncrementer())
.start(stepMaster)
.build();
}
@Bean("MyStep1")
public Step step(PartitionHandler partitionHandler, Partitioner partitioner) {
return stepBuilderFactory.get("MyStep1")
.partitioner("slaveStep", partitioner)
.partitionHandler(partitionHandler)
.build();
}
@Bean("slaveStep")
public Step slaveStep(FlatFileItemReader<Employee> reader) {
return stepBuilderFactory.get("slaveStep")
.<Employee, Employee>chunk(1)
.reader(reader)
.processor((ItemProcessor<Employee, Employee>) employee -> {
System.out.printf("Processed item %s%n", employee.getId());
return employee;
})
.writer(list -> {
for (Employee item : list) {
System.out.println(item);
}
})
.build();
}
@Bean
public Partitioner partitioner() {
return gridSize -> {
Map<String, ExecutionContext> result = new HashMap<>();
int lines = 0;
try(BufferedReader reader = new BufferedReader(new InputStreamReader(resource.getInputStream()))) {
while (reader.readLine() != null) lines ;
} catch (IOException e) {
throw new RuntimeException(e);
}
int range = lines / gridSize;
int remains = lines % gridSize;
int fromLine = 0;
int toLine = range;
for (int i = 1; i <= gridSize; i ) {
if(i == gridSize) {
toLine = remains;
}
ExecutionContext value = new ExecutionContext();
value.putInt("fromLine", fromLine);
value.putInt("toLine", toLine);
fromLine = toLine;
toLine = range;
result.put("partition" i, value);
}
return result;
};
}
@StepScope
@Bean
public FlatFileItemReader<Employee> flatFileItemReader(@Value("#{stepExecutionContext['fromLine']}") int startLine, @Value("#{stepExecutionContext['toLine']}") int lastLine) {
FlatFileItemReader<Employee> reader = new FlatFileItemReader<>();
reader.setResource(resource);
DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<>();
lineMapper.setFieldSetMapper(fieldSet -> {
String[] values = fieldSet.getValues();
return Employee.builder()
.id(Integer.parseInt(values[0]))
.firstName(values[1])
.build();
});
lineMapper.setLineTokenizer(new DelimitedLineTokenizer(";"));
reader.setLineMapper(lineMapper);
reader.setCurrentItemCount(startLine);
reader.setMaxItemCount(lastLine);
return reader;
}
@Bean
public PartitionHandler partitionHandler(@Qualifier("slaveStep") Step step, TaskExecutor taskExecutor) {
TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();
taskExecutorPartitionHandler.setTaskExecutor(taskExecutor);
taskExecutorPartitionHandler.setStep(step);
taskExecutorPartitionHandler.setGridSize(5);
return taskExecutorPartitionHandler;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(5);
taskExecutor.setCorePoolSize(5);
taskExecutor.setQueueCapacity(5);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}