i am using spring boot batch , i have a custom reader , processor and writer.
the custom reader ----> reads something, returns ReadItem
the custom processor ----> reads ReadItem, returns List
the custom Writer ----> should reads List (here is the problem)
MyCustomReader :
@Component
public class MyCustomReader extends JdbcCursorItemReader<ReadItem>
implements ItemReader<ReadItem>{
public MyCustomReader(@Autowired DataSource primaryDataSource) {
setSql("SELECT FROM data where processed = false");
setFetchSize(100);
setRowMapper(new ItemRowMapper());
}
public class ItemRowMapper implements RowMapper<ReadItem> {
@Override
public ReadItem mapRow(ResultSet rs, int rowNum) throws SQLException {
ReadItem item = new ReadItem();
item.setId_Rame(rs.getString("ID"));
item.setTime(rs.getString("TIME"));
return item;
}
}
}
MyCustomProcessor :
@Component
public class MyCustomProcessor implements ItemProcessor<ReadItem, List<? extends
ProcessedItem>> {
@Override
public List<? extends ProcessedItem> process(ReadItem item) throws Exception {
List<ProcessedItem> listData = new ArrayList<>();
// processing data to build the list to return
return listData;
}
}
MyCustomWriter :
@Component
public class MyCustomWriter implements ItemWriter<ProcessedItem> {
@Override
public void write(List<? extends ProcessedItem> items) throws Exception {
for (ProcessedItem data : items) {
// process data
}
}
}
MyBatchConfig :
@Configuration
@EnableBatchProcessing
public class MyBatchConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
MyCustomReader myCustomReader;
@Autowired
MyCustomWriter myCustomWriter;
@Autowired
MyCustomProcessor myCustomProcessor;
@Autowired
@Qualifier("pgsqlTransactionManager")
private PlatformTransactionManager transactionManager;
@Bean
public Job createJob() {
return jobBuilderFactory.get("MyJob")
.incrementer(new RunIdIncrementer())
.flow(createStep()).end().build();
}
@Bean
public Step createStep() {
return stepBuilderFactory.get("MyStep")
.<ReadItem, List<? extends ProcessedItem>> chunk(1)
.reader(myCustomReader)
.processor(myCustomProcessor)
.writer(myCustomWriter)
.transactionManager(transactionManager)
.build();
}
}
i got a compile error in MyBatchConfig :
The method writer(ItemWriter<? super List<? extends ProcessedItem>>) in the type SimpleStepBuilder<ReadItem,List<? extends ProcessedItem>> is not applicable for the arguments (MyCustomWriter)
is there a workaround to fix this ?
CodePudding user response:
In chunk based batch processing, reader and processor reads/processes single record at a time, keeps it in buffer and sends all records to writer after chunk size has met. If you notice signature of ItemWriter
, it is List<? extends T> items
, which means it accepts list of items returned by processor. In your case, your processor is returning List<ProcessedItem>
so your writer would expect List<List<ProcessedItem>>
. To fix your issue, change MyCustomWriter
's write method to below
@Override
public void write(List<List<ProcessedItem> items) throws Exception {
items.stream().flatMap(Collection::stream).forEach(data -> { //data is ProcessedItem instance
// process data
});
}
CodePudding user response:
Change your createStep() method to look like this...
@Bean
public Step createStep() {
return stepBuilderFactory.get("MyStep")
.<ReadItem, ProcessedItem> chunk(1)
.reader(myCustomReader)
.processor(myCustomProcessor)
.writer(myCustomWriter)
.transactionManager(transactionManager)
.build();
}
You don't need to specify List<? extends ProcessedItem> since ItemWriters always act on Lists of item by default.