Home > front end >  Spring-batch partitioned step duplicated processing
Spring-batch partitioned step duplicated processing

Time:08-29

I have thousands of records to process using spring-batch and it takes too much time for our business requirement. The logic is simply to read a CSV file record by record and do the processing.

I want to parallelise the step execution to speed up batch processing. I think the best choice would be to partition the step and execute let's say 1-100 records in one thread, 101-200 in another and so on. But instead, every thread processes all of the records.

The same happens in this very simple example:

  1. 10 rows in CSV file
  2. 5 threads
  3. Each should process 2 records
  4. Instead each processes 10 records
@Configuration
@RequiredArgsConstructor
public class JobConfig {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Value("classpath:employees.csv")
    private Resource resourceFile;

    @Bean("MyJob1")
    public Job createJob() {
        return jobBuilderFactory.get("my job 1")
            .incrementer(new RunIdIncrementer())
            .start(step())
            .build();
    }

    @Bean("MyStep1")
    public Step step() {
        return stepBuilderFactory.get("my step 1")
            .partitioner("my step 1", new SimplePartitioner())
            .partitionHandler(partitionHandler())
            .build();
    }

    @Bean("slaveStep")
    public Step slaveStep() {
        return stepBuilderFactory.get("read csv stream")
            .<Employee, Employee>chunk(1)
            .reader(flatFileItemReader())
            .processor((ItemProcessor<Employee, Employee>) employee -> {
                System.out.printf("Processed item %s%n", employee.getId());
                return employee;
            })
            .writer(list -> {
                for (Employee item : list) {
                    System.out.println(item);
                }
            })
            .build();
    }

    @StepScope
    @Bean
    public FlatFileItemReader<Employee> flatFileItemReader() {
        FlatFileItemReader<Employee> reader = new FlatFileItemReader<>();
        reader.setResource(resourceFile);

        DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<>();
        lineMapper.setFieldSetMapper(fieldSet -> {
            String[] values = fieldSet.getValues();
            return Employee.builder()
                .id(Integer.parseInt(values[0]))
                .firstName(values[1])
                .build();
        });

        lineMapper.setLineTokenizer(new DelimitedLineTokenizer(";"));
        reader.setLineMapper(lineMapper);

        return reader;
    }

    @Bean
    public PartitionHandler partitionHandler() {
        TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();

        taskExecutorPartitionHandler.setTaskExecutor(taskExecutor());
        taskExecutorPartitionHandler.setStep(slaveStep());
        taskExecutorPartitionHandler.setGridSize(5);

        return taskExecutorPartitionHandler;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(5);
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setQueueCapacity(5);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }
}
@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchTestsApplication implements CommandLineRunner {
    private final JobLauncher jobLauncher;
    private final Job job;

    public SpringBatchTestsApplication(JobLauncher jobLauncher,
                                       @Qualifier("MyJob1") Job job) {
        this.jobLauncher = jobLauncher;
        this.job = job;
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestsApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        jobLauncher.run(job, new JobParameters());
    }
}
@Value
@Builder
public class Employee {
    private final int id;
    private final String firstName;
}

employees.csv:

1;Jakub
2;Mike
3;Pawel
4;Joana
5;Michal
6;Joe
7;Bailey
8;Bailhache
9;John
10;Eva

Example (example because the order is not important) expected output for 5 threads (gridSize):

Processed item 1
Employee(id=1, firstName=Jakub)
Processed item 2
Employee(id=2, firstName=Mike)
Processed item 3
Employee(id=3, firstName=Pawel)
Processed item 4
Employee(id=4, firstName=Joana)
Processed item 5
Employee(id=5, firstName=Michal)
Processed item 6
Employee(id=6, firstName=Joe)
Processed item 7
Employee(id=7, firstName=Bailey)
Processed item 8
Employee(id=8, firstName=Bailhache)
Processed item 9
Employee(id=9, firstName=John)
Processed item 10
Employee(id=10, firstName=Eva)

The actual output is that the above occurs 5 times.

CodePudding user response:

I don't know whether my solution is the best but it is worked.

I added two parameters as global in JobConfig class:

private int startLine = -1;
private int lastLine = 1;

Then I set these parameters to reader in flatFileItemReader method:

reader.setCurrentItemCount(startLine);
reader.setMaxItemCount(lastLine);
startLine =2;
lastLine =2;

All code:

@Configuration
@RequiredArgsConstructor
public class JobConfig {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    private int startLine = -1;
    private int lastLine = 1;

    @Value("classpath:employees.csv")
    private Resource resourceFile;

    @Bean("MyJob1")
    public Job createJob() {
        return jobBuilderFactory.get("my job 1")
                .incrementer(new RunIdIncrementer())
                .start(step())
                .build();
    }

    @Bean("MyStep1")
    public Step step() {
        return stepBuilderFactory.get("my step 1")
                .partitioner("my step 1", new SimplePartitioner())
                .partitionHandler(partitionHandler())
                .build();
    }

    @Bean("slaveStep")
    public Step slaveStep() {
        return stepBuilderFactory.get("read csv stream")
                .<Employee, Employee>chunk(1)
                .reader(flatFileItemReader())
                .processor((ItemProcessor<Employee, Employee>) employee -> {
                    System.out.printf("Processed item %s%n", employee.getId());
                    return employee;
                })
                .writer(list -> {
                    for (Employee item : list) {
                        //System.out.println(item);
                    }
                })
                .build();
    }

    @StepScope
    @Bean
    public FlatFileItemReader<Employee> flatFileItemReader() {
        FlatFileItemReader<Employee> reader = new FlatFileItemReader<>();
        reader.setResource(resourceFile);

        DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<>();
        lineMapper.setFieldSetMapper(fieldSet -> {
            String[] values = fieldSet.getValues();
            return Employee.builder()
                    .id(Integer.parseInt(values[0]))
                    .firstName(values[1])
                    .build();
        });

        lineMapper.setLineTokenizer(new DelimitedLineTokenizer(";"));
        reader.setLineMapper(lineMapper);
        reader.setCurrentItemCount(startLine);
        reader.setMaxItemCount(lastLine);
        startLine =2;
        lastLine =2;
        return reader;
    }

    @Bean
    public PartitionHandler partitionHandler() {
        TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();

        taskExecutorPartitionHandler.setTaskExecutor(taskExecutor());
        taskExecutorPartitionHandler.setStep(slaveStep());
        taskExecutorPartitionHandler.setGridSize(5);

        return taskExecutorPartitionHandler;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(5);
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setQueueCapacity(5);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }
}

CodePudding user response:

This should be the solution look at the partitioner he will take care of counting the lines of the file and divde them for the reader and the usage of @Value("#{stepExecutionContext['fromLine']}") to inject in the reader the correct lines to read.

An please use injection instead of calling the methods.

@Configuration
@RequiredArgsConstructor
public class JobConfig {

    private static final Logger log = LoggerFactory.getLogger(JobConfig.class);

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Value(value = "classpath:employees.csv")
    private Resource resource;

    @Bean("MyJob1")
    public Job createJob(@Qualifier("MyStep1") Step stepMaster) {
        return jobBuilderFactory.get("MyJob1")
            .incrementer(new RunIdIncrementer())
            .start(stepMaster)
            .build();
    }

    @Bean("MyStep1")
    public Step step(PartitionHandler partitionHandler, Partitioner partitioner) {
        return stepBuilderFactory.get("MyStep1")
            .partitioner("slaveStep", partitioner)
            .partitionHandler(partitionHandler)
            .build();
    }

    @Bean("slaveStep")
    public Step slaveStep(FlatFileItemReader<Employee> reader) {
        return stepBuilderFactory.get("slaveStep")
            .<Employee, Employee>chunk(1)
            .reader(reader)
            .processor((ItemProcessor<Employee, Employee>) employee -> {
                System.out.printf("Processed item %s%n", employee.getId());
                return employee;
            })
            .writer(list -> {
                for (Employee item : list) {
                    System.out.println(item);
                }
            })
            .build();
    }
    
    @Bean
    public Partitioner partitioner() {
        return gridSize -> {
            Map<String, ExecutionContext> result = new HashMap<>();
            int lines = 0;
            try(BufferedReader reader = new BufferedReader(new InputStreamReader(resource.getInputStream()))) {
                while (reader.readLine() != null) lines  ;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            int range = lines / gridSize;
            int remains = lines % gridSize;
            int fromLine = 0;
            int toLine = range;
            for (int i = 1; i <= gridSize; i  ) {
                if(i == gridSize) {
                    toLine  = remains;
                }
                ExecutionContext value = new ExecutionContext();
                value.putInt("fromLine", fromLine);
                value.putInt("toLine", toLine);
                fromLine = toLine;
                toLine  = range;
                result.put("partition"   i, value);
            }
            return result;
        };
    }
        

    @StepScope
    @Bean
    public FlatFileItemReader<Employee> flatFileItemReader(@Value("#{stepExecutionContext['fromLine']}") int startLine, @Value("#{stepExecutionContext['toLine']}") int lastLine) {
        FlatFileItemReader<Employee> reader = new FlatFileItemReader<>();
        reader.setResource(resource);

        DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<>();
        lineMapper.setFieldSetMapper(fieldSet -> {
            String[] values = fieldSet.getValues();
            return Employee.builder()
                    .id(Integer.parseInt(values[0]))
                    .firstName(values[1])
                    .build();
        });

        lineMapper.setLineTokenizer(new DelimitedLineTokenizer(";"));
        reader.setLineMapper(lineMapper);
        reader.setCurrentItemCount(startLine);
        reader.setMaxItemCount(lastLine);

        return reader;
    }

    @Bean
    public PartitionHandler partitionHandler(@Qualifier("slaveStep") Step step, TaskExecutor taskExecutor) {
        TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();

        taskExecutorPartitionHandler.setTaskExecutor(taskExecutor);
        taskExecutorPartitionHandler.setStep(step);
        taskExecutorPartitionHandler.setGridSize(5);

        return taskExecutorPartitionHandler;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(5);
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setQueueCapacity(5);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }
    
}
  • Related