Home > database >  Spring Integration: Define sequential handling of messages using Java DSL
Spring Integration: Define sequential handling of messages using Java DSL

Time:11-23

I can't get my head around a solution for a - as I consider - rather common integration flow:

  1. read a file from source
  2. process file
  3. delete file from sourced if processing was successful.

Currently, I have an IntegrationFlow using a PublishSubscribeChannel with two IntegrationFlows as subscribers: One to process file, one to delete file. Unfortunately, the later one (delete) is executed regardless of the outcome of the first one (process), even if the "process" flow throws an Exception.

What I need is a sequential handling flow, but I cannot figure out how to realize. Created some test code, but that doesn't work, reporting

2022-11-22 09:55:54.256 ERROR 14648 --- [   scheduling-1]
o.s.integration.handler.LoggingHandler   :
org.springframework.messaging.MessagingException: Failed to 
invoke method; nested exception is 
java.lang.IllegalArgumentException: wrong number of arguments

The lab code:

@Configuration
@EnableIntegration
public class SeqChannels {

    @Bean
    public AtomicInteger integerSource() {
        return new AtomicInteger();
    }

    @InboundChannelAdapter(channel = "process", poller = @Poller(fixedDelay = "1000"))
    public Message<Integer> source(final AtomicInteger integerSource) {
        return MessageBuilder.withPayload(integerSource.incrementAndGet()).build();
    }

    @ServiceActivator(inputChannel = "process", outputChannel = "delete")
    public Integer process(@Payload Integer message) {
        return message;
    }

    @ServiceActivator(inputChannel = "delete")
    public void delete(@Payload Integer message) {

    }

}

CodePudding user response:

You can't have parameters on an @InboundChannelAdapter method. This works...

@Configuration
@EnableIntegration
class SeqChannels {

    AtomicInteger integerSource = new AtomicInteger();

    @InboundChannelAdapter(channel = "process", poller = @Poller(fixedDelay = "1000"))
    public Message<Integer> source() {
        return MessageBuilder.withPayload(this.integerSource.incrementAndGet()).build();
    }

    @ServiceActivator(inputChannel = "process", outputChannel = "delete")
    public Integer process(@Payload Integer message) {
        System.out.println("Process: "   message);
        return message;
    }

    @ServiceActivator(inputChannel = "delete")
    public void delete(@Payload Integer message) {
        System.out.println("delete: "   message);
    }

}
Process: 1
delete: 1
Process: 2
delete: 2
Process: 3
...
  • Related