I have a requirement of calling 3 different http endpoint which will return me a same domain object and gather the response of these 3 different http endpoint in a single object. I am using Spring scatter gather pattern for same and kind of lost how to aggregate results in single object. Below is my scatter gather config file:
<scatter-gather input-channel="inputDistribution" output-channel="outputDistribution" gather-channel="gatherChannel" send-timeout="1000"
gather-timeout="1000">
<scatterer apply-sequence="true">
<recipient channel="distribution1Channel"/>
<recipient channel="distribution2Channel"/>
<recipient channel="distribution3Channel"/>
</scatterer>
<gatherer release-strategy="customReleaseStrategy" ref="customReleaseStrategy"/>
</scatter-gather>
<channel id="gatherChannel">
<queue/>
</channel>
<bridge input-channel="distribution1Channel" output-channel="serviceChannel1"/>
<bridge input-channel="distribution2Channel" output-channel="serviceChannel2"/>
<bridge input-channel="distribution3Channel" output-channel="serviceChannel3"/>
<service-activator input-channel="serviceChannel1" output-channel="gatherChannel" ref="firstService" method="process"/>
<service-activator input-channel="serviceChannel2" output-channel="gatherChannel" ref="secondService" method="process"/>
<service-activator input-channel="serviceChannel3" output-channel="gatherChannel" ref="thirdService" method="process"/>
<beans:bean id="firstService" />
<beans:bean id="secondService" />
<beans:bean id="thirdService" />
<beans:bean id="customReleaseStrategy" />
Below is my service activator code which will call and endpoint and get a domain object
public class FirstService {
@ServiceActivator
public Dummy process() {
//In actual scenario, call api endpoint and get this object. This object will be populated from API response.
dummy.setID(1);
Dummy dummy = new Dummy();
return dummy;
}}
Similarly secondService and thirdService will populate Dummy object by calling an endpoint.
Below is the code for ReleaseStrategy where i am checking if all the 3 responses has been received
public class CustomReleaseStrategy implements ReleaseStrategy {
@Override
public boolean canRelease(MessageGroup group) {
if(group.getMessages().size() == 3) {
return true;
}
else {
return false;
}
}}
I am using below test to send the message:
this.inputDistribution.send(new GenericMessage<>("foo"),1000);
Message<?> outputMessage = this.outputDistribution.receive(10000);
And getting this error:
15:24:11.579 [task-scheduler-2] DEBUG org.springframework.integration.handler.LoggingHandler - bean '_org.springframework.integration.errorLogger.handler' for component '_org.springframework.integration.errorLogger' received message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'org.springframework.integration.scattergather.ScatterGatherHandler#0.gatherer'; defined in: 'class path resource [com/test/scattergather/scattergather/config/scatter-gather-context.xml]'; from source: ''gatherer'']; nested exception is org.springframework.expression.AccessException: Unable to access property 'payload' through getter method, failedMessage=GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}], headers={id=19b6097a-2114-e896-4c00-49f818a6d9e6, timestamp=1674595451578}] for original GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}]
15:24:11.584 [task-scheduler-2] ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'org.springframework.integration.scattergather.ScatterGatherHandler#0.gatherer'; defined in: 'class path resource [com/test/scattergather/scattergather/config/scatter-gather-context.xml]'; from source: ''gatherer'']; nested exception is org.springframework.expression.AccessException: Unable to access property 'payload' through getter method, failedMessage=GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:158)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:475)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:461)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:95)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.expression.AccessException: Unable to access property 'payload' through getter method
at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:708)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.readProperty(PropertyOrFieldReference.java:204)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:104)
at org.springframework.expression.spel.ast.PropertyOrFieldReference.getValueInternal(PropertyOrFieldReference.java:91)
at org.springframework.expression.spel.ast.MethodReference.getArguments(MethodReference.java:164)
at org.springframework.expression.spel.ast.MethodReference.getValueRef(MethodReference.java:81)
at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:70)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:91)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:376)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:154)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:637)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:630)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:616)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:587)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:479)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:363)
at org.springframework.integration.aggregator.MethodInvokingMessageListProcessor.process(MethodInvokingMessageListProcessor.java:92)
at org.springframework.integration.aggregator.MethodInvokingMessageGroupProcessor.aggregatePayloads(MethodInvokingMessageGroupProcessor.java:90)
at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:94)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:898)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processMessageForGroup(AbstractCorrelatingMessageHandler.java:574)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:541)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
... 17 more
Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.expression.spel.support.ReflectivePropertyAccessor$OptimalPropertyAccessor.read(ReflectivePropertyAccessor.java:704)
... 41 more
Caused by: java.lang.IllegalStateException: Invalid method parameter for payload: was expecting collection.
at org.springframework.util.Assert.state(Assert.java:76)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$ParametersWrapper.getPayload(MessagingMethodInvokerHelper.java:1384)
... 46 more
15:24:11.586 [task-scheduler-2] DEBUG org.springframework.integration.channel.PublishSubscribeChannel - postSend (sent=true) on channel 'bean 'errorChannel'', message: ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'org.springframework.integration.scattergather.ScatterGatherHandler#0.gatherer'; defined in: 'class path resource [com/test/scattergather/scattergather/config/scatter-gather-context.xml]'; from source: ''gatherer'']; nested exception is org.springframework.expression.AccessException: Unable to access property 'payload' through getter method, failedMessage=GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}], headers={id=19b6097a-2114-e896-4c00-49f818a6d9e6, timestamp=1674595451578}] for original GenericMessage [payload=com.test.scattergather.domain.Dummy@28b1ccd1, headers={gatherResultChannel=org.springframework.integration.channel.QueueChannel@7879348, replyChannel=bean 'gatherChannel', sequenceNumber=3, sequenceSize=3, correlationId=ee0c2cd6-7650-3f31-e093-3c0da197ac8b, id=ab7a162c-73aa-d7ac-608b-a93923c38969, timestamp=1674595451526}]
15:24:12.545 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.545 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 5, missCount = 1]
15:24:12.553 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.554 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 6, missCount = 1]
15:24:12.558 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.558 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 7, missCount = 1]
15:24:12.559 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.559 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 8, missCount = 1]
15:24:12.564 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.564 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 9, missCount = 1]
15:24:12.569 [Test worker] DEBUG org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate - Retrieved ApplicationContext [758700911] from cache with key [[MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]]
15:24:12.570 [Test worker] DEBUG org.springframework.test.context.cache - Spring test ApplicationContext cache statistics: [DefaultContextCache@5e6dcfa size = 1, maxSize = 32, parentContextCount = 0, hitCount = 10, missCount = 1]
15:24:12.572 [Test worker] DEBUG org.springframework.test.context.support.AbstractDirtiesContextTestExecutionListener - After test method: context [DefaultTestContext@305e95a4 testClass = ScatterGatherConfigTest, testInstance = com.test.scattergather.scattergather.config.ScatterGatherConfigTest@67b8d45, testMethod = configDistributionTest@ScatterGatherConfigTest, testException = org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'org.springframework.integration.scattergather.ScatterGatherHandler#0', and its 'requiresReply' property is set to true., failedMessage=GenericMessage [payload=foo, headers={id=8a2e124d-ae9a-00b0-dc3e-f434849499c1, timestamp=1674595451468}], mergedContextConfiguration = [MergedContextConfiguration@55e42684 testClass = ScatterGatherConfigTest, locations = '{classpath:/com/test/scattergather/scattergather/config/scatter-gather-context.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', propertySourceLocations = '{}', propertySourceProperties = '{}', contextCustomizers = set[org.springframework.boot.test.autoconfigure.actuate.metrics.MetricsExportContextCustomizerFactory$DisableMetricExportContextCustomizer@c497a55, org.springframework.boot.test.autoconfigure.properties.PropertyMappingContextCustomizer@0, org.springframework.boot.test.autoconfigure.web.servlet.WebDriverContextCustomizerFactory$Customizer@2f86f9cf, org.springframework.boot.test.context.filter.ExcludeFilterContextCustomizer@6dded900, org.springframework.boot.test.json.DuplicateJsonObjectContextCustomizerFactory$DuplicateJsonObjectContextCustomizer@39c7fb0b, org.springframework.boot.test.mock.mockito.MockitoContextCustomizer@0], contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]], attributes = map['org.springframework.test.context.event.ApplicationEventsTestExecutionListener.recordApplicationEvents' -> false]], class annotated with @DirtiesContext [true] with mode [AFTER_CLASS], method annotated with @DirtiesContext [false] with mode [null].
No reply produced by handler 'org.springframework.integration.scattergather.ScatterGatherHandler#0', and its 'requiresReply' property is set to true.
org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'org.springframework.integration.scattergather.ScatterGatherHandler#0', and its 'requiresReply' property is set to true., failedMessage=GenericMessage [payload=foo, headers={id=8a2e124d-ae9a-00b0-dc3e-f434849499c1, timestamp=1674595451468}]
at app//org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:146)
at app//org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at app//org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at app//org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at app//org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at app//org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at app//org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at app//com.test.scattergather.scattergather.config.ScatterGatherConfigTest.configDistributionTest(ScatterGatherConfigTest.java:41)
at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at [email protected]/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at [email protected]/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at [email protected]/java.lang.reflect.Method.invoke(Method.java:566)
I want to know how to resolve this error and also how to aggregate the results from all the scatter channel?
CodePudding user response:
Can you show, please, the whole stack trace in your question and formatted properly? For custom aggregation function see ref
attribute of the <gatherer>
.
Although I see you already use it, but you point to a wrong type ReleaseStrategy
.
That ref
have to be an implementation of MessageGroupProcessor
. See some out-of-the-box types. Not sure why a DefaultAggregatingMessageGroupProcessor
which produces a collection of payloads doesn't work for you...