Home > other >  RecordFilterStrategy with int-kafka:message-driven-channel-adapter
RecordFilterStrategy with int-kafka:message-driven-channel-adapter

Time:12-09

Is there a way to inject a listener with custom RecordFilterStategy?

<int-kafka:message-driven-channel-adapter
        id="kafkaConsumer"
        listener-container="listenerContainer"
        channel="consumerChannel"
        message-converter="messageConverter"
        payload-type="java.lang.String"
/>

I've tried to do the following:

<bean id="containerProperties" >
    <constructor-arg name="topics">
        <list>
            <value>someTopic</value>
        </list>
    </constructor-arg>
    <property name="errorHandler" ref="listenerErrorHandler"/>
    <property name="messageListener" ref="filteringMessageListener"/>
</bean>

<bean id="recordFilterStrategy" />

<bean id="filteringMessageListener" >
    <constructor-arg>
        <bean factory-bean="containerProperties" factory-method="getMessageListener"/>
    </constructor-arg>
    <constructor-arg ref="recordFilterStrategy"/>
</bean>

But I got en error: Constructor threw exception; nested exception is java.lang.IllegalArgumentException: Container must not already have a listener

CodePudding user response:

It is a bug in the XML configuration.

The KafkaMessageDrivenChannelAdapter has a respective property:

/**
 * Specify a {@link RecordFilterStrategy} to wrap
 * {@link KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener} into
 * {@link FilteringMessageListenerAdapter}.
 * @param recordFilterStrategy the {@link RecordFilterStrategy} to use.
 */
public void setRecordFilterStrategy(RecordFilterStrategy<K, V> recordFilterStrategy) {

We just missed to expose it for the XML configuration.

Feel free to raise a GH issue on the matter!

As a workaround consider to declare a regular <bean> for the KafkaMessageDrivenChannelAdapter instead of that XML config. Or just move away from XML config altogether in favor of Java DSL: https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound-adapter-configuration

  • Related