Home > Blockchain >  How do I apply conditional routing in spring integration?
How do I apply conditional routing in spring integration?

Time:08-02

Here in my project I'm using scatter gather pattern where 3 parallel calls are happening. I want to achieve conditional routing i.e., from my gateway the request will come and I need to see that request and by determining the request I need to make sure the flow doesn't go to a particular recipient flow. Actually I've configured my flow to be generic so that gateway accepts a request and trigger the same flow for different requests because for all the requests flow is similar. Now for a request if a particular information is not present inside that request json then I don't want to call a particular recipient flow. Below is the code -

//SpringIntegrationConfiguration

 @Bean
  public IntegrationFlow flow() {
    return flow ->
        flow.handle(validatorService, "validateRequest")
            .split()
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .scatterGather(
                scatterer ->
                    scatterer
                        .applySequence(true)
                        .recipientFlow(flow1())
                        .recipientFlow(flow2())
                        .recipientFlow(flow3()),
                gatherer ->
                    gatherer
                        .releaseLockBeforeSend(true)
                        .releaseStrategy(group -> group.size() == 2))
            .aggregate(lionService.someMethod())
            .to(someMethod2());
  }

  //   flow1
  @Bean
  public IntegrationFlow flow1() {
    return integrationFlowDefinition ->
        integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(
                (payload, header) -> {
                  try {
                    return lionService.saveRequest(
                        payload,
                        (String) header.get("dbID"),
                        ((SourceSystem) Objects.requireNonNull(header.get("sourceSystem")))
                            .getSourceSystemCode());
                  } catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                  }
                })
            .nullChannel();
  }

  //  flow2
  @Bean
  public IntegrationFlow flow2() {
    return integrationFlowDefinition ->
        integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(cdService, "callToaNativeMethod");
  }

  // flow3
  @Bean
  public IntegrationFlow flow3() {
    return integrationFlowDefinition ->
        integrationFlowDefinition
            .channel(c -> c.executor(Executors.newCachedThreadPool()))
            .handle(lionService, "prepareRequest")
            .handle(
                Http.outboundGateway(ServiceURL, restTemplateConfig.restTemplate())
                    .mappedRequestHeaders("Content-Type")
                    .httpMethod(HttpMethod.POST)
                    .expectedResponseType(String.class),
                c -> c.advice(expressionAdvice()));
  }

  @Bean
  public Advice expressionAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice =
        new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setSuccessChannelName("success.input");
    advice.setOnSuccessExpressionString("payload   ' was successful'");
    advice.setFailureChannelName("failure.input");
    advice.setOnFailureExpressionString("'Failed'");
    advice.setReturnFailureExpressionResult(true);
    advice.setTrapException(true);
    return advice;
  }

  @Bean
  public IntegrationFlow success() {
    return f -> f.handle(System.out::println);
  }

  @Bean
  public IntegrationFlow failure() {
    return f -> f.handle(System.out::println);
  }
  // flow for someMethod2
  @Bean
  public IntegrationFlow someMethod2() {
    return flow ->
        flow.handle(
            Http.outboundGateway(someServiceUrl)
                .httpMethod(HttpMethod.POST)
                .expectedResponseType(CrResponse.class));
  }
}

//Gateway

@MessagingGateway
public interface GenericGateway {

  @Gateway(requestChannel = "flow.input")
  void processRequest(
      @Payload Message lionRequest,
      @Header("dbID") String dbID,
      @Header("sourceSystem") SourceSystem sourceSystem);
}

The Payload Message lionRequest is going through gateway and invoking the main flow.

Let's imagine LionRequest looks like -

{
     "SCode" : "039",
     "CId":"123456",
     "RequestNumber": "56543457",
     "dbID":"987654345678",
     "someRequestBlock":{
         "message":"Dummy input for dummy service"
     }

}

Now

  1. if "someRequestBlock" is not present inside request body then I want flow2() to be skipped and flow1() and flow3() to be run parallelly.

  2. Same for the CatRequest, the request body will be different and I need to make sure that for CatRequest flow1() to be skipped and flow2() and flow3() to be ran parallelly.

Kindly suggest how do I achieve that?

CodePudding user response:

You could add a .filter() at the beginning of each flow.

Or, instead of .recipientFlow, use this

/**
 * Adds a recipient channel that will be selected if the the expression evaluates to 'true'.
 * @param channelName the channel name.
 * @param expression the expression.
 * @return the router spec.
 */
public RecipientListRouterSpec recipient(String channelName, Expression expression) {
    ExpressionEvaluatingSelector selector = new ExpressionEvaluatingSelector(expression);
    this.handler.addRecipient(channelName, selector);
    this.componentsToRegister.put(selector, null);
    return _this();
}

With the expression being a selector to determine whether or not that recipient will receive the message.

You would then start each sub flow with a named channel.

CodePudding user response:

There is this recipient variant for the scatterrer:

/**
 * Adds a subflow that will be invoked if the expression evaluates to 'true'.
 * @param expression the expression.
 * @param subFlow the subflow.
 * @return the router spec.
 */
public RecipientListRouterSpec recipientFlow(String expression, IntegrationFlow subFlow) {

Such a SpEL expression must return boolean and it is performed against the whole message. For your convenience the framework provides a #jsonPath() SpEL-function to evaluate your someRequestBlock attribute against the payload:

https://docs.spring.io/spring-integration/docs/current/reference/html/spel.html#built-in-spel-functions

.recipientFlow("#jsonPath(payload, '$..someRequestBlock.length()') > 1", flow2())

Can you explain us how do you miss that option, so we may improve something in the code or docs?

  • Related