I am using spring. I have a configured ObjectMapper for the entire project and I use it to set up a kafka deserializer. And then I need a custom kafka deserializer to be used in KafkaListener.
I'm configuring KafkaListener via autoconfiguration, not via @Configuration class.
@Component
@RequiredArgsConstructor
public class CustomMessageDeserializer implements Deserializer<MyMessage> {
private final ObjectMapper objectMapper;
@SneakyThrows
@Override
public MyMessage deserialize(String topic, byte[] data) {
return objectMapper.readValue(data, MyMessage.class);
}
}
If i do like this
@KafkaListener(
topics = {"${topics.invite-user-topic}"},
properties = {"value.deserializer=com.service.deserializer.CustomMessageDeserializer"}
)
public void receiveInviteUserMessages(MyMessage myMessage) {}
I received KafkaException: Could not find a public no-argument constructor
But with public no-argument constructor in CustomMessageDeserializer class i am getting NPE because ObjectMapper = null. It creates and uses a new class, not a spring component.
@KafkaListener supports SpEL expressions.
And I think that this problem can be solved using SpEL. Do you have any idea how to inject spring bean CustomMessageDeserializer with SpEL?
CodePudding user response:
Here how it works for me:
@KafkaListener(topics = "${solr.kafka.topic}", containerFactory = "batchFactory")
public void listen(List<SolrInputDocument> docs, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers, Acknowledgment ack) throws IOException {...}
And then I have 2 beans defined in my Configuration
@Profile("!test")
@Bean
@Autowired
public ConsumerFactory<String, SolrInputDocument> consumerFactory(KafkaProperties properties) {
Map<String, Object> props = properties.buildConsumerProperties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
DefaultKafkaConsumerFactory<String, SolrInputDocument> result = new DefaultKafkaConsumerFactory<>(props);
String validatedKeyDeserializerName = KafkaMessageType.valueOf(keyDeserializerName).toString();
ZiDeserializer<SolrInputDocument> deserializer = ZiDeserializerFactory.getInstance(validatedKeyDeserializerName);
result.setValueDeserializer(deserializer);
return result;
}
@Profile("!test")
@Bean
@Autowired
public ConcurrentKafkaListenerContainerFactory<String, SolrInputDocument> batchFactory(ConsumerFactory<String, SolrInputDocument> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, SolrInputDocument> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
factory.setConcurrency(2);
ExponentialBackOffWithMaxRetries backoff = new ExponentialBackOffWithMaxRetries(10);
backoff.setMultiplier(3); // Default is 1.5 but this seems more reasonable
factory.setCommonErrorHandler(new DefaultErrorHandler(null, backoff));
// Needed for manual commits
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
Note that the interface ZiDeserializer<SolrInputDocument> deserializer
is my interface and ZiDeserializerFactory.getInstance(validatedKeyDeserializerName);
returns my custom implementation of ZiDeserializer
. And ZiDeserializer
extends org.apache.kafka.common.serialization.Deserializer
. This works for me
CodePudding user response:
There are no easy ways to do it with SPeL.
Analysis
To get started, see the JavaDoc for @KafkaListener#properties
:
/**
*
* SpEL expressions must resolve to a String ...
*/
The value of value.deserializer
is used to instantiate the specified deserializer class. Let's follow the call chain:
- You specify this value in the
@KafkaListener
annotation, then you are probably not creating a bean of theConsumerFactory.class
. So Spring creates this bean class itself - seeKafkaAutoConfiguration#kafkaConsumerFactory
. - Next is the creation of the returned object
new DefaultKafkaConsumerFactory(...)
asConsumerFactory<?,?>
using the constructor for default delivery expressions keyDeserializer/valueDeserializer =() -> null
- This factory is used to create a Kafka consumer (The entry point is the constructor
KafkaMessageListenerContainer#ListenerConsumer
, thenKafkaMessageListenerContainer.this.consumerFactory.createConsumer...
) - In the
KafkaConsumer
constructor, thevalueDeserializer
object is being created, because it is null (for the default factory of point 2 above):
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- The implementation of
config.getConfiguredInstance
involves instantiating your deserializer class via a parameterless constructor using reflection and your String"com.service.deserializer.CustomMessageDeserializer"
class name
Solutions
- To use
value.deserializer
with your customizedObjectMapper
, you must create theConsumerFactory
bean yourself using thesetValueDeserializer(...)
method. This is also mentioned in the second Important part of the JSON.Mapping_Types.Important documentation - If you don't want to create a
ConsumerFactory
bean, and also don't have complicated logic in your deserializer (you only havereturn objectMapper.readValue(data, MyMessage.class);
), then registerDefaultKafkaConsumerFactoryCustomizer
:
@Bean
// inject your custom objectMapper
public DefaultKafkaConsumerFactoryCustomizer customizeJsonDeserializer(ObjectMapper objectMapper) {
return consumerFactory ->
consumerFactory.setValueDeserializerSupplier(() ->
new org.springframework.kafka.support.serializer.JsonDeserializer<>(objectMapper));
}
In this case, you don't need to create your own CustomMessageDeserializer
class (remove it) and Spring will automatically parse the message into your MyMessage
.
@KafkaListener
annotation should also not contains the property properties = {"value.deserializer=com.my.kafka_test.component.CustomMessageDeserializer"}
. This DefaultKafkaConsumerFactoryCustomizer
bean will automatically be used to configure the default ConsumerFactory<?, ?>
(see the implementation of the KafkaAutoConfiguration#kafkaConsumerFactory
method)