Home > Blockchain >  How to use ListDeserializer properly?
How to use ListDeserializer properly?

Time:12-06

A Kafka listener container factory should be configured to consume messages whose values are a list of objects. I tried to configure the factory using ListDeserializer in the following way:

@Configuration
public class KafkaConsumerConfig {
  @Value(value = "${spring.kafka.consumer.bootstrap-servers}")
  private String bootstrapServers;

  @Value(value = "${spring.kafka.consumer.group-id}")
  private String groupId;

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, List<Item>>
      kafkaListenerContainerFactoryItem() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    ConcurrentKafkaListenerContainerFactory<String, List<Item>> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(
        new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            new ListDeserializer<>(ArrayList.class, new JsonDeserializer<>(Item.class, false))));
    return factory;
  }
}

But the exception below appears:

org.apache.kafka.common.config.ConfigException: List deserializer was already initialized using a non-default constructor 
    at org.apache.kafka.common.serialization.ListDeserializer.configure(ListDeserializer.java:78) ~[kafka-clients-3.1.1.jar:na]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.lambda$valueDeserializerSupplier$9(DefaultKafkaConsumerFactory.java:199) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:483) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:451) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:427) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:394) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:371) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:776) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:352) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:461) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:209) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:461) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:347) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:292) ~[spring-kafka-2.8.8.jar:2.8.8]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.22.jar:5.3.22]
    ... 14 common frames omitted

How can the container factory be properly configured using ListDeserializer?

CodePudding user response:

Since If we don't wouldn't use any relevant (deserializer) config, the quickest fix would be to use:

// ...
new DefaultKafkaConsumerFactory<>(
  props,
  new StringDeserializer(),
  new ListDeserializer<>(
    ArrayList.class, new JsonDeserializer<>(Item.class, false)
  ),
  false /* !!! */
)//...

Where false refers to configureDesirializers parameter of (an overloaded) constuctor. (ref)

In this case it has no side/negative effects, since you:

  • Use StringDeserializer as key deserializer.

    It has the only "configuration option": [[key|value].]deserializer.encoding (which we don't set and which defaults to UTF-8). (ref)

  • For the value desirializer (a ListDeserializer), you "configure" already everything usefull (via constructor parameters). (ref)

We do:

props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");

.../want to configure our JsonDeserializer, then the "workaround" breaks this setting!

We should configure props, and initialize the factory with empty (deserializer) constructors:

// props.put(...);
// when this is *really* needed (otherwise, quickfix!):
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
// extra config for our "value deserializer" (analogous to constructor):
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, ArrayList.class /*alternatively: "java.util.ArrayList" (string!) */);
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
 /* this looks little hacky, i am open to better proposals (we need Serde here): */
 Serdes.serdeFrom(
   new SomeItemSerializer(), /* <- we'll need this (not null!) due to api, but it can be "no-op" (is never used with given code) */
   new JsonDeserializer<>(Item.class, false)
 )
);
// and then:
// ...
factory.setConsumerFactory(
  new DefaultKafkaConsumerFactory<>(
    props,
    new StringDeserializer(),
    new ListDeserializer<>(/* just empty */)
    /* now with configureDeserializers = true/default */
  )
); // ...

To work around the (ugly) no-op-serializer, we could (try):

new Serdes.WrapperSerde(/*!!*/null, new JsonDeserializer<>(Item.class, false))

..instead of:

Serdes.serdeFrom(..)

..and cross thumbs, that no-one/thing calls anything besides deserializer() on WrapperSerde (..otherwise: NPE! ..consumer does not serialize!;). (ref)

  • Related