Home > other >  Spring Integration Channel works for first time, then does not the second time
Spring Integration Channel works for first time, then does not the second time

Time:07-07

When using Spring Integration and Channel to another integration flow, it only works the first time. Then after that it skips over the channel and returns.

From first integration flow:

.handle((p, h) -> {
      
      System.out.println("Payload Before Channel"   p.toString());
      return p;
      
    })
    
    .channel(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.getChannelName())
    .handle((p, h) -> {
      
      System.out.println("Payload After Channel"   p.toString());
      return p;
      
    })

Then on the next integration flow:

@Bean
public IntegrationFlow jamsSubmitJob() {
    
    return IntegrationFlows.from(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.getChannelName())
        .handle((p, h) -> {
          try {
        jamsToken = authMang.getJamsAuth().getTokenWithTokenType();
        } catch (Exception e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
        JAMS_SUBMIT_JOB_INTGRTN
                .info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString()   " Integration called.");
        JAMS_SUBMIT_JOB_INTGRTN.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString()   " Headers:= "   h);
        JAMS_SUBMIT_JOB_INTGRTN.debug(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString()   " Payload:= "   p);
        return p;
        })
        .handle((p, h) -> {
            // hail mary to get new token
            return MessageBuilder
                .withPayload(p)
                .removeHeaders("*")
                .setHeader(HttpHeaders.AUTHORIZATION.toLowerCase(), jamsToken)
                .setHeader(HttpHeaders.CONTENT_TYPE.toLowerCase(), "application/json")
                .build();
        })
        .handle((p, h) -> {
            JAMS_SUBMIT_JOB_INTGRTN
                    .info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString()   " Submitting payload to JAMS:");
            
            JAMS_SUBMIT_JOB_INTGRTN.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString()   " Headers:= "   h);
            JAMS_SUBMIT_JOB_INTGRTN.info(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.toString()   " Payload:= "   p);
            return p;
        })
        .handle(Http.outboundGateway(JAMS_SUBMIT_ENDPOINT)
            .requestFactory(alliantPooledHttpConnection.get_httpComponentsClientHttpRequestFactory())
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(String.class)
            .extractPayload(true))
        .logAndReply();
}

The behavior is that every other message gets through, basically skipping over the channel until the next time around. Strangely if I duplicate the jamsSubmitJob Bean, then it will work twice, then fail, then start over again.

Thanks!

CodePudding user response:

This is a misunderstanding what is channel and what is a subscriber to that channel.

So, you have this:

.channel(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.getChannelName())
.handle((p, h) -> {

This way you declare a channel (if that does not exist in the application context yet) and subscriber to it.

Then you have this:

 return IntegrationFlows.from(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.getChannelName())
    .handle((p, h) -> {

This way you declare a channel (if that does not exist in the application context yet) and subscriber to it.

Did you notice a duplication in my explanation? I did that deliberately to take your attention to the problem.

So, if channel does not exist it is created. In the other place an existing object is used. In the end you end up with two subscribers to the same channel. The framework by default creates for us an instance of a DirectChannel, which come with a round-robin dispatching strategy. That means that the firsts message is going to a first subscriber, the second - to second, the third to the first and so on.

What you want is probably a request-reply pattern, and you better look into the .gateway(IntegrationNamesEnum.JAMS_SUBMIT_JOB_INTGRTN.getChannelName()) instead of .channel() in that your first IntegrationFlow.

See more info in docs:

https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-directchannel

https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels

https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-gateway

  • Related