I have a composite item processor like so:
@Bean
public CompositeItemProcessor<User, User> compositeItemProcessor() throws Exception {
CompositeItemProcessor<User, User> processor = new CompositeItemProcessor<>();
List<ItemProcessor<User, User>> processors = Arrays.asList(new Processor(),validatingItemProcessor());
processor.setDelegates(processors);
processor.afterPropertiesSet();
return processor;
}
I am not able to get jobExecutionId
in any of the process with @Beforestep
This is processor 1.
public class Processor implements ItemProcessor<User, User> {
private static final Map<String, String> DEPT_NAMES =
new HashMap<>();
private Long jobExecutionId;
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
this.jobExecutionId = stepExecution.getJobExecutionId();
}
public Processor() {
DEPT_NAMES.put("001", "Technology");
DEPT_NAMES.put("002", "Operations");
DEPT_NAMES.put("003", "Accounts");
}
@Override
public User process(User user) throws Exception {
System.out.println(this.jobExecutionId);
String deptCode = user.getDept();
String dept = DEPT_NAMES.get(deptCode);
user.setDept(dept);
user.setTime(new Date());
System.out.println(String.format("Converted from [%s] to [%s]", deptCode, dept));
return user;
}
}
This is processor 2.
@Bean
public ValidatingItemProcessor<User> validatingItemProcessor() {
return new ValidatingItemProcessor<>(new UserValidator());
}
Here is my job configuration.
Step step = stepBuilderFactory.get("ETL-file-load")
.<User, User>chunk(100)
.reader(itemReader)
.processor(compositeItemProcessor())
.writer(itemWriter)
.faultTolerant()
.skipPolicy(jobSkipPolicy())
.listener(userValidationListener())
.build();
return jobBuilderFactory.get("ETL-Load")
.incrementer(new RunIdIncrementer())
.start(step)
.build();
I am using a listener for skipped user records.
@Bean
public UserValidationListener userValidationListener() {
return new UserValidationListener();
}
public class UserValidationListener implements SkipListener<User, User> {
@Override
public void onSkipInRead(Throwable throwable) {
}
@Override
public void onSkipInWrite(User user, Throwable throwable) {
}
@Override
public void onSkipInProcess(User user, Throwable throwable) {
System.out.println(user.toString());
System.out.println(throwable.getMessage());
//write error
//errorStaorage.()
}
}
With this configuration, I am not abled to get the jobExecutionId
in any of the processor.
I have tried using a single processor instead of composite processor. I get the jobExecutionId
.
Step step = stepBuilderFactory.get("ETL-file-load")
.<User, User>chunk(100)
.reader(itemReader)
.processor(new Processor())
.writer(itemWriter)
.faultTolerant()
.skipPolicy(jobSkipPolicy())
.listener(userValidationListener())
.build();
I am getting the jobExecutionId
Converted from [001] to [Technology]
1
Converted from [002] to [Operations]
1
Converted from [003] to [Accounts]
1
Converted from [001] to [Technology]
1
Converted from [001] to [Technology]
Where 1
is the jobId. So, this brings me to a conclusion that there is an error with the way I have setup composite processor
and cannot get the id
with @BeforeStep
.
Why does @Beforestep
behave differently and not get invoke when using a composite item processor?
UPDATE As per suggested answer, I have added this.
@Bean
public Processor itemProcessor() {
return new Processor();
}
And now my steplooks like this...
Step step = stepBuilderFactory.get("ETL-file-load")
.<User, User>chunk(100)
.reader(itemReader)
.processor(compositeItemProcessor())
.writer(itemWriter)
.faultTolerant()
.skipPolicy(jobSkipPolicy())
.listener(userValidationListener())
.listener(itemProcessor())
.build();
CodePudding user response:
The issue is that your processor is not being implicitly registered as a listener, only the composite processor is. You will need to do this explicitly in your step
@Bean
public CompositeItemProcessor<User, User> compositeItemProcessor() throws Exception {
CompositeItemProcessor<User, User> processor = new CompositeItemProcessor<>();
List<ItemProcessor<User, User>> processors = Arrays.asList(processor(), validatingItemProcessor());
processor.setDelegates(processors);
processor.afterPropertiesSet();
return processor;
}
@Bean
public ItemProcessor<User, User> processor() {
return new Processor();
}
@Bean
public void step() {
Step step = stepBuilderFactory.get("ETL-file-load")
.<User, User>chunk(100)
.reader(itemReader)
.processor(compositeItemProcessor())
.writer(itemWriter)
.faultTolerant()
.skipPolicy(jobSkipPolicy())
.listener(userValidationListener())
.listener(processor()) // register your custom processor as a listener to execute @BeforeStep
.build();
return step;
}