Home > Enterprise >  How tu customise id generation of @KafkaListener?
How tu customise id generation of @KafkaListener?

Time:06-23

I need to be able to customise the id for each method annotated with @KafkaListener based on the value of one of it's attributes as well as a value defined in the application.yaml, for example :

Having a class with a method annotated like so :

@KafkaListener(info="myInfo")
public void listen(String msg){

}

And a custom property in my application.yaml

myapp:
  myProperty: myProp

At runtime I would like to have the id for the registered endpoint consumer to be myInfo_myProp rather than the autogenerated one that is produced when I do not explicitly provide one in the attributes of the annotation.

What would the best way to achieve this? I was thinking of extending the KafkaListenerEndpointRegistrar or the BeanPostProcessor ?

Thanks

CodePudding user response:

See JavaDocs of that id() attribute of the @KafkaListener:

/**
 * The unique identifier of the container for this listener.
 * <p>If none is specified an auto-generated id is used.
 * <p>Note: When provided, this value will override the group id property
 * in the consumer factory configuration, unless {@link #idIsGroup()}
 * is set to false or {@link #groupId()} is provided.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the {@code id} for the container managing for this endpoint.
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
 */
String id() default "";

So, you can use SpEL and properties placeholders to define a dynamic id for the target listener container. Something like:

@KafkaKListener(id = "#{myBean.generateId('${property.from.env}')}")

CodePudding user response:

Override the endpoint registry bean and manipulate the endpoint properties there, before calling the super class.

Here's an example:

@SpringBootApplication
public class So72719215Application {

    public static void main(String[] args) {
        SpringApplication.run(So72719215Application.class, args);
    }

    @Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    KafkaListenerEndpointRegistry registry(@Value("${myApp.myProperty}") String myProperty) {
        return new KafkaListenerEndpointRegistry() {

            @Override
            public void registerListenerContainer(KafkaListenerEndpoint endpoint,
                    KafkaListenerContainerFactory<?> factory) {

                AbstractKafkaListenerEndpoint<?, ?> akle = (AbstractKafkaListenerEndpoint<?, ?>) endpoint;
                akle.setId(new String(akle.getListenerInfo())   "_"   myProperty);
                akle.setGroupId("group_"   myProperty);
                super.registerListenerContainer(endpoint, factory);
            }

        };
    }

    @KafkaListener(topics = "so72719215", info = "foo")
    void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so72719215").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(KafkaListenerEndpointRegistry reg) {
        return args -> {
            System.out.println(reg.getListenerContainerIds());
        };
    }

}
  • Related