I am implementing application using spring Batch. I am following ItemReader, processor, ItemWriter Approach. I have created Partitioner component which is partitioning Data. Through ItemReader I am reading Data and processing it. After processing I am writing back data in DB. Once job is finished, I observed there is some data missing in DB. Sometimes execution of the one partition fails. Sometimes Job executes successfully. Sometimes I get exceptions. Its random.
- "java.lang.RuntimeException: java.lang.reflect.UndeclaredThrowableException
- is mapped to a primary key column in the database. Updates are not allowed.
- org.eclipse.persistence.exceptions.DatabaseException Internal Exception: java.sql.SQLException: Closed Resultset: getObject
Is there any Thread synchronization or transaction, we need to maintain ?
E.g.
- Total no of records - 1000
- chunk - 100
- Partition1 - 500
- Partition2 - 500
This scenario works fine without using partitioning or using MultiThreaded Step
Sample Code -: This code some times works and commit all data and some times fails.. sometimes I observed few data is not committed in DB (even commit count in BATCH_STEP_EXECUTION table is correct). It is kind of random.
@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(jobRepository);
return launcher;
}
@Bean(name = "customerJob")
public Job prepareBatch1() {
return jobBuilderFactory.get("customerJob").incrementer(new RunIdIncrementer()).start(masterStep()).listener(listener())
.build();
}
@Bean
public Step masterStep() {
return stepBuilderFactory.get("masterStep").
partitioner(slaveStep().getName(), partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public BatchListener listener() {
return new BatchListener();
}
@Bean
@JobScope
public BatchPartitioner partitioner() {
return new BatchPartitioner();
}
@Bean
@StepScope
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler taskExecutorPartitionHandler = new TaskExecutorPartitionHandler();
taskExecutorPartitionHandler.setGridSize(2);
taskExecutorPartitionHandler.setTaskExecutor(taskExecutor());
taskExecutorPartitionHandler.setStep(slaveStep());
try {
taskExecutorPartitionHandler.afterPropertiesSet();
} catch (Exception e) {
return taskExecutorPartitionHandler;
}
@Bean
@StepScope
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep").<Customer, CustomerWrapperDTO>chunk(100)
.reader(getReader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
@StepScope
public BatchWriter writer() {
return new BatchWriter();
}
@Bean
@StepScope
public BatchProcessor processor() {
return new BatchProcessor();
}
@Bean
@StepScope
public BatchReader getReader() {
return new BatchReader();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}
class CustomerWrapperDTO {
private Address address;
private Customer customer;
//setter getter address, customer
}
Entity
class Customer {
String processStatus; // "U" : unprocessed, "C" : completed, "F" : Failed
}
public class BatchListener implements JobExecutionListener {
@Autowired
private CustomerRepo customerRepo;
public BatchListener() {
}
@Override
public void beforeJob(JobExecution jobExecution) {
List<Customer> customers;
try {
customers = customerRepo.getAllUnprocessedCustomer);
} catch (Exception e) {
throw new CustomerException("failed in BatchListener", e);
}
jobExecution.getExecutionContext().put("customers",customers);
}
@Override
public void afterJob(JobExecution jobExecution) {
}
}
public class BatchPartitioner implements Partitioner {
@Value("#{jobExecutionContext[customers]}")
private List<Customer> customers;
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<>();
int size = customers.size() / gridSize;
List<List<Customer>> lists = IntStream.range(0, customers.size()).boxed()
.collect(Collectors.groupingBy(i -> i / size,
Collectors.mapping(customers::get, Collectors.toList())))
.values().stream().collect(Collectors.toList());
for (int i = 0; i < gridSize; i ) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.putString("name", "Thread_" i);
executionContext.put("customers", lists.get(i));
result.put("partition" i, executionContext);
}
return result;
}
}
@Component
@StepScope
Class BatchReader {
private int index;
@Value("#{stepExecutionContext[customers]}")
private List<Customer> customers;
@Override
public Customer read() {
Customer Customer = null;
if (index < customers.size()) {
Customer = customers.get(index);
index ;
} else {
index = 0;
}
return Customer;
}
}
@Component
@StepScope
public class BatchProcessor implements ItemProcessor<Customer, CustomerWrapperDTO> {
public BatchProcessor() {
}
@Override
public BatchProcessor process(Customer item) {
CustomerWrapperDTO customerWrapper = new CustomerWrapperDTO();
try {
// logic to get address
Address address = // API call or some business logic.
item.setAddress(address);
item.setProcessStatus("C"); // Completed
}catch(Exception e) {
item.setProcessStatus("F");// failed
}
//logic to get Address
customerWrapper.setCustomer(item);
return customerWrapper;
}
}
@Component
@StepScope
public class BatchWriter implements ItemWriter<CustomerBatchWrapperDTO> {
@Autowired
private CustmerRepo customerRepo;
@Autowired
private AddressRepo addessRepo;
public BatchWriter() {
}
@Override
public void write(List<? extends CustomerBatchWrapperDTO> items) {
items.forEach(item -> {
try {
if(item.getCustomer() != null) {
customerRepo.merge(item.getCustomer());
}
if(item.getAddress() != null) {
addessRepo.save(item.getAddress());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
CodePudding user response:
Spring batch is gonna process by chunks. If a chunk fails (this means at least one item failed to process), the transaction is gonna be rolled back.
CodePudding user response:
The issue is with your item reader:
- The implementation is not thread-safe, yet it is used in a multi-threaded step. You should synchronize the
read
method or wrap your reader in aSynchronizedItemStreamReader
- The execution context is not safe to share between threads, and you seem to be sharing items between threads through the execution context. BTW, storing items in the execution context is not recommended even for single threaded cases, because the context will be persisted (possibly several times) during the job execution.