I have a spring batch application that uses Azure SQL server as a backend, I am using Hibernate to update the database.
I am reading the data from CSV file using FlatfileReader & writing into Azure SQL Server using ItemWriter as mentioned below
@Entity
@Table(name = "STUDENTS")
public class Student implements Serializable {
@Id
@GeneratedValue(strategy=GenerationType.SEQUENCE,generator="student_sequence")
@SequenceGenerator(name="student_sequence",sequenceName="student_sequence", allocationSize = 1)
@Column(name="STUDENT_ID", unique = true)
private Long studentId;
@Column(name = "STUDENT_NAME")
private String studentName;
@Temporal(TemporalType.DATE)
@Column(name = "ENROLLED_DATE")
private Date enrolledDate;
public String getStudentName() {
return studentName;
}
public void setStudentName(String studentName) {
this.studentName = studentName;
}
public Date getEnrolledDate() {
return enrolledDate;
}
public void setEnrolledDate(Date enrolledDate) {
this.enrolledDate = enrolledDate;
}
@Override
public String toString() {
return "Student [studentId=" studentId ", studentName=" studentName ", enrolledDate=" enrolledDate
"]";
}
}
public class StudentMapper implements FieldSetMapper<Student> {
private SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
@Override
public Student mapFieldSet(FieldSet fieldSet) throws BindException {
Student student = new Student();
student.setStudentName(fieldSet.readString(0));
student.setEnrolledDate(new Date());
return student;
}
}
public class Reader implements ItemReader<FlatFileItemReader<Student>> {
@Override
public FlatFileItemReader<Student> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
// Data Sources
Resource inputCsv = new ClassPathResource("demo01/input/record.csv");
// Spring Batch's built-in reader
FlatFileItemReader<Student> reader = new FlatFileItemReader<Student>();
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
String[] tokens = {"studentname"};
tokenizer.setNames(tokens);
reader.setResource(inputCsv);
DefaultLineMapper<Student> lineMapper = new DefaultLineMapper<Student>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(new StudentMapper());
reader.setLineMapper(lineMapper);
return reader;
}
}
public class Processor implements ItemProcessor<Student, Student> {
@Override
public Student process(Student item) throws Exception {
System.out.println("Processing..." item);
return item;
}
}
public class Writer implements ItemWriter<Student> {
@Autowired
private SessionFactory sessionFactory;
@Override
public void write(List<? extends Student> list) throws Exception {
HibernateItemWriter<Student> hibernateItemWriter = new HibernateItemWriter<Student>();
hibernateItemWriter.setSessionFactory(sessionFactory);
hibernateItemWriter.write(list);
hibernateItemWriter.afterPropertiesSet();
}
}
I will be getting the complete the single CSV file to be processed and I don't want to split the CSV, so I will be using the same Reader process to read the CSV records. However I want to introduce multiple Processor & Writer to improve the performance. something like AsyncItemProcessor
<bean id="student" class="com.mkyong.entity.Student" scope="prototype" />
<batch:job id="helloWorldJob">
<batch:step id="step1">
<batch:tasklet>
<batch:chunk
reader="cvsFileItemReader"
processor="itemProcessor"
writer="itemWriter"
commit-interval="10">
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
<bean id="cvsFileItemReader" class="com.mkyong.batch.Reader" />
<bean id="itemProcessor" class="com.mkyong.batch.Processor" />
<bean id="itemWriter" class="com.mkyong.batch.Writer" />
How do I convert this to support AsyncItemProcessor?
CodePudding user response:
Before going async or parallel, you should first fix the code you shared. The Reader
class is incorrect. It is currently declared to read items of type FlatFileItemReader
and not Student
. The read
method should return items of type Student
and not the reader itself. This class is not needed, you can use the FlatFileItemReader
provided by Spring Batch directly in your step.
Same thing for the writer, you can use HibernateItemWriter
directly in your step.
Now to answer your question about using an asynchronous processor and writer, you need to wrap your processor in a AsyncItemProcessor
and your writer in a AsyncItemWriter
. You can find more details and a code example here: Asynchronous Processors.