Home > Enterprise >  KafkaListener and __listener @KafKaListener can't resolve 'null' with Aspect Annotati
KafkaListener and __listener @KafKaListener can't resolve 'null' with Aspect Annotati

Time:11-05

I'm using @KafkaListener and I need a dynamic topic name so I use the SpEL '__listener' in order to do that

@PostConstruct
public void init() {
    myProps= generateTopicDynamically();
}

@KafkaListener(topics = "#{__listener.myProps}")
public void listenerKafka(@Payload MyObject myObject) {
       //Do something with my event
}

It works perfectly well.

The main issue is when I want to add another annotation that trigger some Aspect programmation

@MyCustomAnnotationToRecordPerformance @KafkaListener(topics = "#{__listener.myProps}") public void listenerKafka(@Payload MyObject myObject)

and here the aspect class

@Aspect
@Configuration
@Slf4j
public class MyCustomAnnotationToRecordPerformanceAspect {

    @Pointcut("@annotation(MyCustomAnnotationToRecordPerformance)")
    public void annotationMyCustomAnnotationToRecordPerformance() {
    }

    @Around("annotationMyCustomAnnotationToRecordPerformance()")
    public Object doSomething(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        return proceedingJoinPoint.proceed();
    }
}

I have this issue because Spring try to resolve __listener before @PostConstruct has been called.

Caused by: java.lang.IllegalArgumentException: @KafKaListener can't resolve 'null' as a String
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveAsString(KafkaListenerAnnotationBeanPostProcessor.java:648)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.resolveTopics(KafkaListenerAnnotationBeanPostProcessor.java:520)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processListener(KafkaListenerAnnotationBeanPostProcessor.java:419)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:370)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:298)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:431)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:595)
    ... 41 common frames omitted

I tried to debug it enter image description here We can see lot of CGLIB reference, so bean has been already proxified, but all properties are null. So I supposed Autowired and PostConstruct method has not been called yet

For now, I tried to delay the processor that manage @KafkaListener, but I was not able to find where I can change that without have to redefine fully Kafka configuration

@EnableKafka import KafkaListenerConfigurationSelector that is DeferredImportSelector.

Here the comment on this class

A {@link DeferredImportSelector} implementation with the lowest order to import a {@link KafkaBootstrapConfiguration} as late as possible.

So I supposed it already delay as late as possible based on the comment

I test it with @Transactional, and I have the same issue.

@Transactional 
@KafkaListener(topics = "#{__listener.myProps}")
public void listenerKafka(@Payload MyObject myObject)

Do have any idea about it?

The only alternative I see for now is split my class in 2 and create 2 beans. KafkaListener method call the other bean. But I found very strange to have to do that.

Thanks in advance for you help.

CodePudding user response:

I just tested it with @Transactional and it works as expected for me - I have confirmed that we already have a CGLIB proxy by the time we get to the @KafkaListener annotation BPP...

@SpringBootApplication
@EnableTransactionManagement
public class So69817946Application {

    public static void main(String[] args) {
        SpringApplication.run(So69817946Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so69817946").partitions(1).replicas(1).build();
    }

}

@Component
class listener {

    public String getTopic() {
        return "so69817946";
    }

    @Transactional
    @KafkaListener(id = "so69817946", topics = "#{__listener.topic}")
    public void listen(String in) {
        System.out.println(in);
    }

}

@Component
class TM extends AbstractPlatformTransactionManager {

    @Override
    protected Object doGetTransaction() throws TransactionException {
        return new Object();
    }

    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
    }

    @Override
    protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
    }

    @Override
    protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
    }

}
so69817946: partitions assigned: [so69817946-0]

And I can see the transaction interceptor in the call stack.

So, yes, an MCVE would be helpful.

CodePudding user response:

Thanks to the help of Gary, I found the solution. Once we have aspect, the class is proxified and properties became null in the CGLIB object. We need to call getter in order to have the value from original object, not the proxified one

SpEL is able to read public getter that will be executed on the original object, and not the CGLIB one

So the solution was simply to create a public getter for my private

public String getMyProps(){
    return this.myProps;
}

Thanks all.

  • Related