I can't get my head around a solution for a - as I consider - rather common integration flow:
- read a file from source
- process file
- delete file from sourced if processing was successful.
Currently, I have an IntegrationFlow using a PublishSubscribeChannel with two IntegrationFlows as subscribers: One to process file, one to delete file. Unfortunately, the later one (delete) is executed regardless of the outcome of the first one (process), even if the "process" flow throws an Exception.
What I need is a sequential handling flow, but I cannot figure out how to realize. Created some test code, but that doesn't work, reporting
2022-11-22 09:55:54.256 ERROR 14648 --- [ scheduling-1]
o.s.integration.handler.LoggingHandler :
org.springframework.messaging.MessagingException: Failed to
invoke method; nested exception is
java.lang.IllegalArgumentException: wrong number of arguments
The lab code:
@Configuration
@EnableIntegration
public class SeqChannels {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@InboundChannelAdapter(channel = "process", poller = @Poller(fixedDelay = "1000"))
public Message<Integer> source(final AtomicInteger integerSource) {
return MessageBuilder.withPayload(integerSource.incrementAndGet()).build();
}
@ServiceActivator(inputChannel = "process", outputChannel = "delete")
public Integer process(@Payload Integer message) {
return message;
}
@ServiceActivator(inputChannel = "delete")
public void delete(@Payload Integer message) {
}
}
CodePudding user response:
You can't have parameters on an @InboundChannelAdapter
method. This works...
@Configuration
@EnableIntegration
class SeqChannels {
AtomicInteger integerSource = new AtomicInteger();
@InboundChannelAdapter(channel = "process", poller = @Poller(fixedDelay = "1000"))
public Message<Integer> source() {
return MessageBuilder.withPayload(this.integerSource.incrementAndGet()).build();
}
@ServiceActivator(inputChannel = "process", outputChannel = "delete")
public Integer process(@Payload Integer message) {
System.out.println("Process: " message);
return message;
}
@ServiceActivator(inputChannel = "delete")
public void delete(@Payload Integer message) {
System.out.println("delete: " message);
}
}
Process: 1
delete: 1
Process: 2
delete: 2
Process: 3
...