I'm using @KafkaListener and I need a dynamic topic name so I use the SpEL '__listener' in order to do that
@PostConstruct
public void init() {
myProps= generateTopicDynamically();
}
@KafkaListener(topics = "#{__listener.myProps}")
public void listenerKafka(@Payload MyObject myObject) {
//Do something with my event
}
It works perfectly well.
The main issue is when I want to add another annotation that trigger some Aspect programmation
@MyCustomAnnotationToRecordPerformance @KafkaListener(topics = "#{__listener.myProps}") public void listenerKafka(@Payload MyObject myObject)
and here the aspect class
@Aspect
@Configuration
@Slf4j
public class MyCustomAnnotationToRecordPerformanceAspect {
@Pointcut("@annotation(MyCustomAnnotationToRecordPerformance)")
public void annotationMyCustomAnnotationToRecordPerformance() {
}
@Around("annotationMyCustomAnnotationToRecordPerformance()")
public Object doSomething(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return proceedingJoinPoint.proceed();
}
}
I have this issue because Spring try to resolve __listener before @PostConstruct has been called.
Caused by: java.lang.IllegalArgumentException: @KafKaListener can't resolve 'null' as a String
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveAsString(KafkaListenerAnnotationBeanPostProcessor.java:648)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveTopics(KafkaListenerAnnotationBeanPostProcessor.java:520)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processListener(KafkaListenerAnnotationBeanPostProcessor.java:419)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:370)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:298)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:431)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:595)
... 41 common frames omitted
I tried to debug it We can see lot of CGLIB reference, so bean has been already proxified, but all properties are null. So I supposed Autowired and PostConstruct method has not been called yet
For now, I tried to delay the processor that manage @KafkaListener, but I was not able to find where I can change that without have to redefine fully Kafka configuration
@EnableKafka import KafkaListenerConfigurationSelector that is DeferredImportSelector.
Here the comment on this class
A {@link DeferredImportSelector} implementation with the lowest order to import a {@link KafkaBootstrapConfiguration} as late as possible.
So I supposed it already delay as late as possible based on the comment
I test it with @Transactional, and I have the same issue.
@Transactional
@KafkaListener(topics = "#{__listener.myProps}")
public void listenerKafka(@Payload MyObject myObject)
Do have any idea about it?
The only alternative I see for now is split my class in 2 and create 2 beans. KafkaListener method call the other bean. But I found very strange to have to do that.
Thanks in advance for you help.
CodePudding user response:
I just tested it with @Transactional
and it works as expected for me - I have confirmed that we already have a CGLIB proxy by the time we get to the @KafkaListener
annotation BPP...
@SpringBootApplication
@EnableTransactionManagement
public class So69817946Application {
public static void main(String[] args) {
SpringApplication.run(So69817946Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69817946").partitions(1).replicas(1).build();
}
}
@Component
class listener {
public String getTopic() {
return "so69817946";
}
@Transactional
@KafkaListener(id = "so69817946", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}
}
@Component
class TM extends AbstractPlatformTransactionManager {
@Override
protected Object doGetTransaction() throws TransactionException {
return new Object();
}
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
}
@Override
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
}
@Override
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
}
}
so69817946: partitions assigned: [so69817946-0]
And I can see the transaction interceptor in the call stack.
So, yes, an MCVE would be helpful.
CodePudding user response:
Thanks to the help of Gary, I found the solution. Once we have aspect, the class is proxified and properties became null in the CGLIB object. We need to call getter in order to have the value from original object, not the proxified one
SpEL is able to read public getter that will be executed on the original object, and not the CGLIB one
So the solution was simply to create a public getter for my private
public String getMyProps(){
return this.myProps;
}
Thanks all.