I need to be able to customise the id for each method annotated with @KafkaListener
based on the value of one of it's attributes as well as a value defined in the application.yaml
, for example :
Having a class with a method annotated like so :
@KafkaListener(info="myInfo")
public void listen(String msg){
}
And a custom property in my application.yaml
myapp:
myProperty: myProp
At runtime I would like to have the id for the registered endpoint consumer to be myInfo_myProp
rather than the autogenerated one that is produced when I do not explicitly provide one in the attributes of the annotation.
What would the best way to achieve this? I was thinking of extending the KafkaListenerEndpointRegistrar
or the BeanPostProcessor
?
Thanks
CodePudding user response:
See JavaDocs of that id()
attribute of the @KafkaListener
:
/**
* The unique identifier of the container for this listener.
* <p>If none is specified an auto-generated id is used.
* <p>Note: When provided, this value will override the group id property
* in the consumer factory configuration, unless {@link #idIsGroup()}
* is set to false or {@link #groupId()} is provided.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";
So, you can use SpEL and properties placeholders to define a dynamic id for the target listener container. Something like:
@KafkaKListener(id = "#{myBean.generateId('${property.from.env}')}")
CodePudding user response:
Override the endpoint registry bean and manipulate the endpoint properties there, before calling the super class.
Here's an example:
@SpringBootApplication
public class So72719215Application {
public static void main(String[] args) {
SpringApplication.run(So72719215Application.class, args);
}
@Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
KafkaListenerEndpointRegistry registry(@Value("${myApp.myProperty}") String myProperty) {
return new KafkaListenerEndpointRegistry() {
@Override
public void registerListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {
AbstractKafkaListenerEndpoint<?, ?> akle = (AbstractKafkaListenerEndpoint<?, ?>) endpoint;
akle.setId(new String(akle.getListenerInfo()) "_" myProperty);
akle.setGroupId("group_" myProperty);
super.registerListenerContainer(endpoint, factory);
}
};
}
@KafkaListener(topics = "so72719215", info = "foo")
void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so72719215").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaListenerEndpointRegistry reg) {
return args -> {
System.out.println(reg.getListenerContainerIds());
};
}
}