Here in my project I'm using scatter gather pattern where 3 parallel calls are happening. I want to achieve conditional routing i.e., from my gateway the request will come and I need to see that request and by determining the request I need to make sure the flow doesn't go to a particular recipient flow. Actually I've configured my flow to be generic so that gateway accepts a request and trigger the same flow for different requests because for all the requests flow is similar. Now for a request if a particular information is not present inside that request json then I don't want to call a particular recipient flow. Below is the code -
//SpringIntegrationConfiguration
@Bean
public IntegrationFlow flow() {
return flow ->
flow.handle(validatorService, "validateRequest")
.split()
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.scatterGather(
scatterer ->
scatterer
.applySequence(true)
.recipientFlow(flow1())
.recipientFlow(flow2())
.recipientFlow(flow3()),
gatherer ->
gatherer
.releaseLockBeforeSend(true)
.releaseStrategy(group -> group.size() == 2))
.aggregate(lionService.someMethod())
.to(someMethod2());
}
// flow1
@Bean
public IntegrationFlow flow1() {
return integrationFlowDefinition ->
integrationFlowDefinition
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(
(payload, header) -> {
try {
return lionService.saveRequest(
payload,
(String) header.get("dbID"),
((SourceSystem) Objects.requireNonNull(header.get("sourceSystem")))
.getSourceSystemCode());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.nullChannel();
}
// flow2
@Bean
public IntegrationFlow flow2() {
return integrationFlowDefinition ->
integrationFlowDefinition
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(cdService, "callToaNativeMethod");
}
// flow3
@Bean
public IntegrationFlow flow3() {
return integrationFlowDefinition ->
integrationFlowDefinition
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(lionService, "prepareRequest")
.handle(
Http.outboundGateway(ServiceURL, restTemplateConfig.restTemplate())
.mappedRequestHeaders("Content-Type")
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class),
c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice =
new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString("'Failed'");
advice.setReturnFailureExpressionResult(true);
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
// flow for someMethod2
@Bean
public IntegrationFlow someMethod2() {
return flow ->
flow.handle(
Http.outboundGateway(someServiceUrl)
.httpMethod(HttpMethod.POST)
.expectedResponseType(CrResponse.class));
}
}
//Gateway
@MessagingGateway
public interface GenericGateway {
@Gateway(requestChannel = "flow.input")
void processRequest(
@Payload Message lionRequest,
@Header("dbID") String dbID,
@Header("sourceSystem") SourceSystem sourceSystem);
}
The Payload Message lionRequest is going through gateway and invoking the main flow.
Let's imagine LionRequest looks like -
{
"SCode" : "039",
"CId":"123456",
"RequestNumber": "56543457",
"dbID":"987654345678",
"someRequestBlock":{
"message":"Dummy input for dummy service"
}
}
Now
if
"someRequestBlock"
is not present inside request body then I wantflow2()
to be skipped andflow1()
andflow3()
to be run parallelly.Same for the CatRequest, the request body will be different and I need to make sure that for CatRequest
flow1()
to be skipped andflow2()
andflow3()
to be ran parallelly.
Kindly suggest how do I achieve that?
CodePudding user response:
You could add a .filter()
at the beginning of each flow.
Or, instead of .recipientFlow
, use this
/**
* Adds a recipient channel that will be selected if the the expression evaluates to 'true'.
* @param channelName the channel name.
* @param expression the expression.
* @return the router spec.
*/
public RecipientListRouterSpec recipient(String channelName, Expression expression) {
ExpressionEvaluatingSelector selector = new ExpressionEvaluatingSelector(expression);
this.handler.addRecipient(channelName, selector);
this.componentsToRegister.put(selector, null);
return _this();
}
With the expression being a selector to determine whether or not that recipient will receive the message.
You would then start each sub flow with a named channel.
CodePudding user response:
There is this recipient variant for the scatterrer:
/**
* Adds a subflow that will be invoked if the expression evaluates to 'true'.
* @param expression the expression.
* @param subFlow the subflow.
* @return the router spec.
*/
public RecipientListRouterSpec recipientFlow(String expression, IntegrationFlow subFlow) {
Such a SpEL expression must return boolean
and it is performed against the whole message. For your convenience the framework provides a #jsonPath()
SpEL-function to evaluate your someRequestBlock
attribute against the payload
:
.recipientFlow("#jsonPath(payload, '$..someRequestBlock.length()') > 1", flow2())
Can you explain us how do you miss that option, so we may improve something in the code or docs?