Is there a way to inject a listener with custom RecordFilterStategy?
<int-kafka:message-driven-channel-adapter
id="kafkaConsumer"
listener-container="listenerContainer"
channel="consumerChannel"
message-converter="messageConverter"
payload-type="java.lang.String"
/>
I've tried to do the following:
<bean id="containerProperties" >
<constructor-arg name="topics">
<list>
<value>someTopic</value>
</list>
</constructor-arg>
<property name="errorHandler" ref="listenerErrorHandler"/>
<property name="messageListener" ref="filteringMessageListener"/>
</bean>
<bean id="recordFilterStrategy" />
<bean id="filteringMessageListener" >
<constructor-arg>
<bean factory-bean="containerProperties" factory-method="getMessageListener"/>
</constructor-arg>
<constructor-arg ref="recordFilterStrategy"/>
</bean>
But I got en error: Constructor threw exception; nested exception is java.lang.IllegalArgumentException: Container must not already have a listener
CodePudding user response:
It is a bug in the XML configuration.
The KafkaMessageDrivenChannelAdapter
has a respective property:
/**
* Specify a {@link RecordFilterStrategy} to wrap
* {@link KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener} into
* {@link FilteringMessageListenerAdapter}.
* @param recordFilterStrategy the {@link RecordFilterStrategy} to use.
*/
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {
We just missed to expose it for the XML configuration.
Feel free to raise a GH issue on the matter!
As a workaround consider to declare a regular <bean>
for the KafkaMessageDrivenChannelAdapter
instead of that XML config. Or just move away from XML config altogether in favor of Java DSL: https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound-adapter-configuration