A Kafka listener container factory should be configured to consume messages whose values are a list of objects. I tried to configure the factory using ListDeserializer
in the following way:
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Value(value = "${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, List<Item>>
kafkaListenerContainerFactoryItem() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConcurrentKafkaListenerContainerFactory<String, List<Item>> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(
new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new ListDeserializer<>(ArrayList.class, new JsonDeserializer<>(Item.class, false))));
return factory;
}
}
But the exception below appears:
org.apache.kafka.common.config.ConfigException: List deserializer was already initialized using a non-default constructor
at org.apache.kafka.common.serialization.ListDeserializer.configure(ListDeserializer.java:78) ~[kafka-clients-3.1.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.lambda$valueDeserializerSupplier$9(DefaultKafkaConsumerFactory.java:199) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:483) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:451) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:427) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:394) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:371) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:776) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:352) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:461) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:461) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:347) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:292) ~[spring-kafka-2.8.8.jar:2.8.8]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.22.jar:5.3.22]
... 14 common frames omitted
How can the container factory be properly configured using ListDeserializer
?
CodePudding user response:
Since If we don't wouldn't use any relevant (deserializer) config, the quickest fix would be to use:
// ...
new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new ListDeserializer<>(
ArrayList.class, new JsonDeserializer<>(Item.class, false)
),
false /* !!! */
)//...
Where false
refers to configureDesirializers
parameter of (an overloaded) constuctor. (ref)
In this case it has no side/negative effects, since you:
Use
StringDeserializer
as key deserializer.It has the only "configuration option":
[[key|value].]deserializer.encoding
(which we don't set and which defaults toUTF-8
). (ref)For the value desirializer (a(ref)ListDeserializer
), you "configure" already everything usefull (via constructor parameters).
We do:
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
.../want to configure our JsonDeserializer
, then the "workaround" breaks this setting!
We should configure props
, and initialize the factory with empty (deserializer) constructors:
// props.put(...);
// when this is *really* needed (otherwise, quickfix!):
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
// extra config for our "value deserializer" (analogous to constructor):
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, ArrayList.class /*alternatively: "java.util.ArrayList" (string!) */);
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
/* this looks little hacky, i am open to better proposals (we need Serde here): */
Serdes.serdeFrom(
new SomeItemSerializer(), /* <- we'll need this (not null!) due to api, but it can be "no-op" (is never used with given code) */
new JsonDeserializer<>(Item.class, false)
)
);
// and then:
// ...
factory.setConsumerFactory(
new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new ListDeserializer<>(/* just empty */)
/* now with configureDeserializers = true/default */
)
); // ...
To work around the (ugly) no-op-serializer, we could (try):
new Serdes.WrapperSerde(/*!!*/null, new JsonDeserializer<>(Item.class, false))
..instead of:
Serdes.serdeFrom(..)
..and cross thumbs, that no-one/thing calls anything besides deserializer()
on WrapperSerde
(..otherwise: NPE! ..consumer does not serialize!;). (ref)