Home > Enterprise >  Aggregate response in Spring integration scatter gather
Aggregate response in Spring integration scatter gather

Time:01-26

I have a requirement of calling 3 different http endpoint which will return me a same domain object and gather the response of these 3 different http endpoint in a single object. I am using Spring scatter gather pattern for same and kind of lost how to aggregate results in single object. Below is my scatter gather config file:

<scatter-gather input-channel="inputDistribution" output-channel="outputDistribution" gather-channel="gatherChannel" send-timeout="1000"
                gather-timeout="1000">
    <scatterer apply-sequence="true">
        <recipient channel="distribution1Channel"/>
        <recipient channel="distribution2Channel"/>
        <recipient channel="distribution3Channel"/>
    </scatterer>
    <gatherer release-strategy="customReleaseStrategy" ref="customReleaseStrategy"/>
</scatter-gather>

<channel id="gatherChannel">
    <queue/>
</channel>

<bridge input-channel="distribution1Channel" output-channel="serviceChannel1"/>
<bridge input-channel="distribution2Channel" output-channel="serviceChannel2"/>
<bridge input-channel="distribution3Channel" output-channel="serviceChannel3"/>

<service-activator input-channel="serviceChannel1" output-channel="gatherChannel" ref="firstService" method="process"/>
<service-activator input-channel="serviceChannel2" output-channel="gatherChannel" ref="secondService" method="process"/>
<service-activator input-channel="serviceChannel3" output-channel="gatherChannel" ref="thirdService" method="process"/>

<beans:bean id="firstService" />
<beans:bean id="secondService" />
<beans:bean id="thirdService" />

<beans:bean id="customReleaseStrategy" />

Below is my service activator code which will call and endpoint and get a domain object

public class FirstService {
@ServiceActivator
public Dummy process() {
    //In actual scenario, call api endpoint and get this object. This object will be populated from API response.
    dummy.setID(1);
    Dummy dummy = new Dummy();
    return dummy;
}}

Similarly secondService and thirdService will populate Dummy object by calling an endpoint.

Below is the code for ReleaseStrategy where i am checking if all the 3 responses has been received

public class CustomReleaseStrategy implements ReleaseStrategy {

@Override
public boolean canRelease(MessageGroup group) {
    if(group.getMessages().size() == 3) {
        return true;
    }
    else {
        return false;
    }
}}

I am using below test to send the message:

 this.inputDistribution.send(new GenericMessage<>("foo"),1000);
 Message<?> outputMessage = this.outputDistribution.receive(10000);

And getting this error:

15:24:11.579 [task-scheduler-2] DEBUG org.springframework.integration.handler.LoggingHandler - bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger' received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'org.springframework.integration.scattergather.ScatterGatherHandler#0.gatherer'; defined in: 'class path resource [com/test/scattergather/scattergather/config/scatter-gather-context.xml]'; from source: ''gatherer'']; nested exception is org.springframework.expression.AccessException: Unable to access property 'payload' through getter method, failedMessage=GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}], headers={id=19b6097a-2114-e896-4c00-49f818a6d9e6, timestamp=1674595451578}] for original GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}]
15:24:11.584 [task-scheduler-2] ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'org.springframework.integration.scattergather.ScatterGatherHandler#0.gatherer'; defined in: 'class path resource [com/test/scattergather/scattergather/config/scatter-gather-context.xml]'; from source: ''gatherer'']; nested exception is org.springframework.expression.AccessException: Unable to access property 'payload' through getter method, failedMessage=GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:158)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:475)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:461)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.expression.AccessException: Unable to access property 'payload' through getter method
    at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:708)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:204)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:104)
    at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:91)
    at org.springframework.expression.spel.ast.MethodReference.getArguments(MethodReference.java:164)
    at org.springframework.expression.spel.ast.MethodReference.getValueRef(MethodReference.java:81)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:70)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:91)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:376)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:154)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:637)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:630)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:616)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:587)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:479)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:363)
    at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:92)
    at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.aggregatePayloads(MethodInvokingMessageGroupProcessor.java:90)
    at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:94)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:898)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processMessageForGroup(AbstractCorrelatingMessageHandler.java:574)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:541)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 17 more
Caused by: java.lang.reflect.InvocationTargetException
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:704)
    ... 41 more
Caused by: java.lang.IllegalStateException: Invalid method parameter for payload: was expecting collection.
    at org.springframework.util.Assert.state(Assert.java:76)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$ParametersWrapper.getPayload(MessagingMethodInvokerHelper.java:1384)
    ... 46 more

15:24:11.586 [task-scheduler-2] DEBUG org.springframework.integration.channel.PublishSubscribeChannel - postSend (sent=true) on channel 'bean 'errorChannel'', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'org.springframework.integration.scattergather.ScatterGatherHandler#0.gatherer'; defined in: 'class path resource [com/test/scattergather/scattergather/config/scatter-gather-context.xml]'; from source: ''gatherer'']; nested exception is org.springframework.expression.AccessException: Unable to access property 'payload' through getter method, failedMessage=GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}], headers={id=19b6097a-2114-e896-4c00-49f818a6d9e6, timestamp=1674595451578}] for original GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}]
15:24:12.545 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.545 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 5, missCount = 1]
15:24:12.553 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.554 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 6, missCount = 1]
15:24:12.558 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.558 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 7, missCount = 1]
15:24:12.559 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.559 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 8, missCount = 1]
15:24:12.564 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.564 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 9, missCount = 1]
15:24:12.569 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.570 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 10, missCount = 1]
15:24:12.572 [Test worker] DEBUG org.springframework.test.context.support.AbstractDirtiesContextTestExecutionListener - After test method: context [DefaultTestContext@305e95a4 testClass = ScatterGatherConfigTest, testInstance = com.test.scattergather.scattergather.config.ScatterGatherConfigTest@67b8d45, testMethod = configDistributionTest@ScatterGatherConfigTest, testException = org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'org.springframework.integration.scattergather.ScatterGatherHandler#0', and its 'requiresReply' property is set to true., failedMessage=GenericMessage [payload=foo, headers={id=8a2e124d-ae9a-00b0-dc3e-f434849499c1, timestamp=1674595451468}], mergedContextConfiguration = [MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]], attributes = map['org.springframework.test.context.event.ApplicationEventsTestExecutionListener.recordApplicationEvents' -> false]], class annotated with @DirtiesContext [true] with mode [AFTER_CLASS], method annotated with @DirtiesContext [false] with mode [null].

No reply produced by handler 'org.springframework.integration.scattergather.ScatterGatherHandler#0', and its 'requiresReply' property is set to true.
org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'org.springframework.integration.scattergather.ScatterGatherHandler#0', and its 'requiresReply' property is set to true., failedMessage=GenericMessage [payload=foo, headers={id=8a2e124d-ae9a-00b0-dc3e-f434849499c1, timestamp=1674595451468}]
    at app//org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:146)
    at app//org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at app//org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at app//org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at app//org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at app//org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at app//org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at app//com.test.scattergather.scattergather.config.ScatterGatherConfigTest.configDistributionTest(ScatterGatherConfigTest.java:41)
    at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at [email protected]/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at [email protected]/java.lang.reflect.Method.invoke(Method.java:566)

I want to know how to resolve this error and also how to aggregate the results from all the scatter channel?

CodePudding user response:

Can you show, please, the whole stack trace in your question and formatted properly? For custom aggregation function see ref attribute of the <gatherer>.

Although I see you already use it, but you point to a wrong type ReleaseStrategy.

That ref have to be an implementation of MessageGroupProcessor. See some out-of-the-box types. Not sure why a DefaultAggregatingMessageGroupProcessor which produces a collection of payloads doesn't work for you...

  • Related