Home > Mobile >  Spring Batch Transaction issue
Spring Batch Transaction issue

Time:09-21

I am implementing application using spring Batch. I am following ItemReader, processor, ItemWriter Approach. I have created Partitioner component which is partitioning Data. Through ItemReader I am reading Data and processing it. After processing I am writing back data in DB. Once job is finished, I observed there is some data missing in DB. Sometimes execution of the one partition fails. Sometimes Job executes successfully. Sometimes I get exceptions. Its random.

  • "java.lang.RuntimeException: java.lang.reflect.UndeclaredThrowableException
  • is mapped to a primary key column in the database. Updates are not allowed.
  • org.eclipse.persistence.exceptions.DatabaseException Internal Exception: java.sql.SQLException: Closed Resultset: getObject

Is there any Thread synchronization or transaction, we need to maintain ?

E.g.

  • Total no of records - 1000
  • chunk - 100
  • Partition1 - 500
  • Partition2 - 500

This scenario works fine without using partitioning or using MultiThreaded Step

Sample Code -: This code some times works and commit all data and some times fails.. sometimes I observed few data is not committed in DB (even commit count in BATCH_STEP_EXECUTION table is correct). It is kind of random.

@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class BatchConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher launcher = new SimpleJobLauncher();
        launcher.setJobRepository(jobRepository);
        return launcher;
    }

  
    @Bean(name = "customerJob")
    public Job prepareBatch1() {
        return jobBuilderFactory.get("customerJob").incrementer(new RunIdIncrementer()).start(masterStep()).listener(listener())
                .build();
    }

    @Bean
    public Step masterStep() {
        return stepBuilderFactory.get("masterStep").
                partitioner(slaveStep().getName(), partitioner())
                .partitionHandler(partitionHandler())
                .build();
    }

    @Bean
    public BatchListener listener() {
        return new BatchListener();
    }

    @Bean
    @JobScope
    public BatchPartitioner partitioner() {
        return new BatchPartitioner();
    }

    @Bean
    @StepScope
    public PartitionHandler partitionHandler() {
        TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();
        taskExecutorPartitionHandler.setGridSize(2);
        taskExecutorPartitionHandler.setTaskExecutor(taskExecutor());
        taskExecutorPartitionHandler.setStep(slaveStep());
        try {
            taskExecutorPartitionHandler.afterPropertiesSet();
        } catch (Exception e) {
            
        return taskExecutorPartitionHandler;
    }

    @Bean
    @StepScope
    public Step slaveStep() {
        return stepBuilderFactory.get("slaveStep").<Customer, CustomerWrapperDTO>chunk(100)
                .reader(getReader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    @Bean
    @StepScope
    public BatchWriter writer() {
        return new BatchWriter();
    }

    @Bean
    @StepScope
    public BatchProcessor processor() {
        return new BatchProcessor();
    }

    @Bean
    @StepScope
    public BatchReader getReader() {
        return new BatchReader();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
        taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }
}
class CustomerWrapperDTO {
    private Address address;
    private Customer customer;
    //setter getter address, customer
    } 

Entity

class Customer {
    String processStatus; // "U" : unprocessed, "C" : completed, "F" : Failed
    }
public class BatchListener implements JobExecutionListener {

    @Autowired
    private CustomerRepo customerRepo;
   
    public BatchListener() {
    }

    @Override
    public void beforeJob(JobExecution jobExecution) {
      
        List<Customer> customers;
        try {
            customers = customerRepo.getAllUnprocessedCustomer);
        } catch (Exception e) {
            throw new CustomerException("failed in BatchListener", e);
        }
        jobExecution.getExecutionContext().put("customers",customers);
        
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        
    }
}
public class BatchPartitioner implements Partitioner {
   
    @Value("#{jobExecutionContext[customers]}")
    private List<Customer> customers;

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        
        Map<String, ExecutionContext> result = new HashMap<>();
        int size = customers.size() / gridSize;
        List<List<Customer>> lists = IntStream.range(0, customers.size()).boxed()
                .collect(Collectors.groupingBy(i -> i / size,
                        Collectors.mapping(customers::get, Collectors.toList())))
                .values().stream().collect(Collectors.toList());
        for (int i = 0; i < gridSize; i  ) {
            ExecutionContext executionContext = new ExecutionContext();
            executionContext.putString("name", "Thread_"   i);
            executionContext.put("customers", lists.get(i));
            result.put("partition"   i, executionContext);
        }
        
        return result;
    }
}
@Component
@StepScope
Class BatchReader {
        private int index;

        @Value("#{stepExecutionContext[customers]}")
        private List<Customer> customers;


        @Override
        public Customer read() {
            Customer Customer = null;
            if (index < customers.size()) {
                Customer = customers.get(index);
                index  ;
            } else {
                index = 0;
            }
            return Customer;
        }
}
@Component
@StepScope
public class BatchProcessor implements ItemProcessor<Customer, CustomerWrapperDTO> {

    public BatchProcessor() {
    }

    @Override
    public BatchProcessor process(Customer item) {
    CustomerWrapperDTO customerWrapper = new CustomerWrapperDTO();
    try {
    
    // logic to get address
    Address address = // API call or some business logic.
    item.setAddress(address);
    item.setProcessStatus("C"); // Completed
    }catch(Exception e) {
    item.setProcessStatus("F");// failed
    }
    //logic to get Address 
    customerWrapper.setCustomer(item);
     return customerWrapper;
      
    }
}
@Component
@StepScope
public class BatchWriter implements ItemWriter<CustomerBatchWrapperDTO> {
 

    @Autowired
    private CustmerRepo customerRepo;
    @Autowired
    private AddressRepo addessRepo;

    public BatchWriter() {
    }

    @Override
    public void write(List<? extends CustomerBatchWrapperDTO> items) {

        items.forEach(item -> {
            try {
                if(item.getCustomer() != null) {
                    customerRepo.merge(item.getCustomer());
                }

                if(item.getAddress() != null) {
                    addessRepo.save(item.getAddress());
                }

            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        
    }
}

CodePudding user response:

Spring batch is gonna process by chunks. If a chunk fails (this means at least one item failed to process), the transaction is gonna be rolled back.

CodePudding user response:

The issue is with your item reader:

  • The implementation is not thread-safe, yet it is used in a multi-threaded step. You should synchronize the read method or wrap your reader in a SynchronizedItemStreamReader
  • The execution context is not safe to share between threads, and you seem to be sharing items between threads through the execution context. BTW, storing items in the execution context is not recommended even for single threaded cases, because the context will be persisted (possibly several times) during the job execution.
  • Related