Home > Enterprise >  SpEL KafkaListener. How can i inject custom deserializer through properties?
SpEL KafkaListener. How can i inject custom deserializer through properties?

Time:12-14

I am using spring. I have a configured ObjectMapper for the entire project and I use it to set up a kafka deserializer. And then I need a custom kafka deserializer to be used in KafkaListener.

I'm configuring KafkaListener via autoconfiguration, not via @Configuration class.

@Component
@RequiredArgsConstructor
public class CustomMessageDeserializer implements Deserializer<MyMessage> {
    private final ObjectMapper objectMapper;

    @SneakyThrows
    @Override
    public MyMessage deserialize(String topic, byte[] data) {
        return objectMapper.readValue(data, MyMessage.class);
    }
}

If i do like this

@KafkaListener(
    topics = {"${topics.invite-user-topic}"},
    properties = {"value.deserializer=com.service.deserializer.CustomMessageDeserializer"}
)
public void receiveInviteUserMessages(MyMessage myMessage) {}

I received KafkaException: Could not find a public no-argument constructor

But with public no-argument constructor in CustomMessageDeserializer class i am getting NPE because ObjectMapper = null. It creates and uses a new class, not a spring component.

@KafkaListener supports SpEL expressions.

And I think that this problem can be solved using SpEL. Do you have any idea how to inject spring bean CustomMessageDeserializer with SpEL?

CodePudding user response:

Here how it works for me:

@KafkaListener(topics = "${solr.kafka.topic}", containerFactory = "batchFactory")
public void listen(List<SolrInputDocument> docs, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers, Acknowledgment ack) throws IOException {...}

And then I have 2 beans defined in my Configuration

@Profile("!test")
@Bean
@Autowired
public ConsumerFactory<String, SolrInputDocument> consumerFactory(KafkaProperties properties) {
    Map<String, Object> props = properties.buildConsumerProperties();
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    DefaultKafkaConsumerFactory<String, SolrInputDocument> result = new DefaultKafkaConsumerFactory<>(props);
    String validatedKeyDeserializerName = KafkaMessageType.valueOf(keyDeserializerName).toString();
    ZiDeserializer<SolrInputDocument> deserializer = ZiDeserializerFactory.getInstance(validatedKeyDeserializerName);
    result.setValueDeserializer(deserializer);

    return result;
}

@Profile("!test")
@Bean
@Autowired
public ConcurrentKafkaListenerContainerFactory<String, SolrInputDocument> batchFactory(ConsumerFactory<String, SolrInputDocument> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, SolrInputDocument> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory);
    factory.setBatchListener(true);
    factory.setConcurrency(2);

    ExponentialBackOffWithMaxRetries backoff = new ExponentialBackOffWithMaxRetries(10);
    backoff.setMultiplier(3); // Default is 1.5 but this seems more reasonable
    factory.setCommonErrorHandler(new DefaultErrorHandler(null, backoff));

    // Needed for manual commits
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    return factory;
}

Note that the interface ZiDeserializer<SolrInputDocument> deserializeris my interface and ZiDeserializerFactory.getInstance(validatedKeyDeserializerName); returns my custom implementation of ZiDeserializer. And ZiDeserializer extends org.apache.kafka.common.serialization.Deserializer. This works for me

CodePudding user response:

There are no easy ways to do it with SPeL.

Analysis

To get started, see the JavaDoc for @KafkaListener#properties:

/**
* 
* SpEL expressions must resolve to a String ...
*/

The value of value.deserializer is used to instantiate the specified deserializer class. Let's follow the call chain:

  1. You specify this value in the @KafkaListener annotation, then you are probably not creating a bean of the ConsumerFactory.class. So Spring creates this bean class itself - see KafkaAutoConfiguration#kafkaConsumerFactory.
  2. Next is the creation of the returned object new DefaultKafkaConsumerFactory(...) as ConsumerFactory<?,?> using the constructor for default delivery expressions keyDeserializer/valueDeserializer = () -> null
  3. This factory is used to create a Kafka consumer (The entry point is the constructor KafkaMessageListenerContainer#ListenerConsumer, then KafkaMessageListenerContainer.this.consumerFactory.createConsumer...)
  4. In the KafkaConsumer constructor, the valueDeserializer object is being created, because it is null (for the default factory of point 2 above):
if (valueDeserializer == null) {
     this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
  1. The implementation of config.getConfiguredInstance involves instantiating your deserializer class via a parameterless constructor using reflection and your String "com.service.deserializer.CustomMessageDeserializer" class name

Solutions

  1. To use value.deserializer with your customized ObjectMapper, you must create the ConsumerFactory bean yourself using the setValueDeserializer(...) method. This is also mentioned in the second Important part of the JSON.Mapping_Types.Important documentation
  2. If you don't want to create a ConsumerFactory bean, and also don't have complicated logic in your deserializer (you only have return objectMapper.readValue(data, MyMessage.class);), then register DefaultKafkaConsumerFactoryCustomizer:
@Bean
// inject your custom objectMapper
public DefaultKafkaConsumerFactoryCustomizer customizeJsonDeserializer(ObjectMapper objectMapper) {  
    return consumerFactory ->
            consumerFactory.setValueDeserializerSupplier(() ->
                    new org.springframework.kafka.support.serializer.JsonDeserializer<>(objectMapper));
}

In this case, you don't need to create your own CustomMessageDeserializer class (remove it) and Spring will automatically parse the message into your MyMessage. @KafkaListener annotation should also not contains the property properties = {"value.deserializer=com.my.kafka_test.component.CustomMessageDeserializer"}. This DefaultKafkaConsumerFactoryCustomizer bean will automatically be used to configure the default ConsumerFactory<?, ?> (see the implementation of the KafkaAutoConfiguration#kafkaConsumerFactory method)

  • Related