Home > OS >  Spring boot batch custom ItemWriter with a list of items
Spring boot batch custom ItemWriter with a list of items

Time:07-01

i am using spring boot batch , i have a custom reader , processor and writer.

the custom reader ----> reads something, returns ReadItem

the custom processor ----> reads ReadItem, returns List

the custom Writer ----> should reads List (here is the problem)

MyCustomReader :

@Component
public class MyCustomReader extends JdbcCursorItemReader<ReadItem> 
implements ItemReader<ReadItem>{

public MyCustomReader(@Autowired DataSource primaryDataSource) {
    
    setSql("SELECT FROM data where processed = false");
    setFetchSize(100);
    setRowMapper(new ItemRowMapper());
}

public class ItemRowMapper implements RowMapper<ReadItem> {
    @Override
    public ReadItem mapRow(ResultSet rs, int rowNum) throws SQLException {

        ReadItem item  = new ReadItem();
        item.setId_Rame(rs.getString("ID"));
        item.setTime(rs.getString("TIME"));

        return item;
    }
}
}

MyCustomProcessor :

@Component
public class MyCustomProcessor implements ItemProcessor<ReadItem, List<? extends 
ProcessedItem>> {

@Override
public List<? extends ProcessedItem> process(ReadItem item) throws Exception {
    
    List<ProcessedItem> listData = new ArrayList<>();
    
    // processing data to build the list to return

    return listData;
}
}

MyCustomWriter :

@Component
public class MyCustomWriter implements ItemWriter<ProcessedItem> {


@Override
public void write(List<? extends ProcessedItem> items) throws Exception {


    for (ProcessedItem data : items) {
        // process data
    }
 }
}

MyBatchConfig :

@Configuration
@EnableBatchProcessing
public class MyBatchConfig {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;
    
@Autowired
MyCustomReader myCustomReader;

@Autowired
MyCustomWriter myCustomWriter;

@Autowired
MyCustomProcessor myCustomProcessor;

@Autowired 
@Qualifier("pgsqlTransactionManager") 
private PlatformTransactionManager transactionManager;

@Bean
public Job createJob() {
    return jobBuilderFactory.get("MyJob")
            .incrementer(new RunIdIncrementer())
            .flow(createStep()).end().build();
}

@Bean
public Step createStep() {
    return stepBuilderFactory.get("MyStep")
            .<ReadItem, List<? extends ProcessedItem>> chunk(1)
            .reader(myCustomReader)
            .processor(myCustomProcessor)
            .writer(myCustomWriter)
            .transactionManager(transactionManager)
            .build();
}
}

i got a compile error in MyBatchConfig :

The method writer(ItemWriter<? super List<? extends ProcessedItem>>) in the type SimpleStepBuilder<ReadItem,List<? extends ProcessedItem>> is not applicable for the arguments (MyCustomWriter)

is there a workaround to fix this ?

CodePudding user response:

In chunk based batch processing, reader and processor reads/processes single record at a time, keeps it in buffer and sends all records to writer after chunk size has met. If you notice signature of ItemWriter, it is List<? extends T> items, which means it accepts list of items returned by processor. In your case, your processor is returning List<ProcessedItem> so your writer would expect List<List<ProcessedItem>>. To fix your issue, change MyCustomWriter's write method to below

@Override
public void write(List<List<ProcessedItem> items) throws Exception {

    items.stream().flatMap(Collection::stream).forEach(data -> { //data is ProcessedItem instance
        // process data 
    });
 }

CodePudding user response:

Change your createStep() method to look like this...

@Bean
public Step createStep() {
    return stepBuilderFactory.get("MyStep")
            .<ReadItem, ProcessedItem> chunk(1)
            .reader(myCustomReader)
            .processor(myCustomProcessor)
            .writer(myCustomWriter)
            .transactionManager(transactionManager)
            .build();
}

You don't need to specify List<? extends ProcessedItem> since ItemWriters always act on Lists of item by default.

  • Related