Home > OS >  KafkaConsumer not processing all the request posted in springboot
KafkaConsumer not processing all the request posted in springboot

Time:10-04

Have implemented KafkaProducer and KafkaConsumers using springboot. Once message received at the consumer I am invoking REST API for saving the record into MongoDB. But All the records posted to Kafka consumer are not getting saved to mongoDB.
Ex: out of 5 records posted to kafkaconsumer only 3 records saved to mongoDB.

Attaching the kafkaconsumer code:

KafkaConsumerConfiguration

@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {

    @Value("${spring.kafka.hostname}")
    private String kafkaHostname;
    public static final String GROUP_ID_CONFIG = "group_json1";
    public static final String ENABLE_AUTO_COMMIT_CONFIG = "false";
    public static final String AUTO_OFFSET_RESET_CONFIG = "earliest";


    @Bean
    public ConsumerFactory<String, UserKafkaDTO> userConsumerFactory() {

        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHostname);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ENABLE_AUTO_COMMIT_CONFIG);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);

        JsonDeserializer<UserKafkaDTO> deserializer = new JsonDeserializer<>(UserKafkaDTO.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
    }

    

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, UserKafkaDTO> userKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, UserKafkaDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(userConsumerFactory());
        factory.setBatchListener(true);
        return factory;
    }


  
    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, Object>kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setBatchListener(true);
        return factory;

    }

}

KafkaConsumerListener

@EnableKafka
@Configuration
public class KafkaConsumerListener {


    @KafkaListener(topics = { "topicName" }, containerFactory = "userKafkaListenerFactory",autoStartup = "${listen.auto.start}")
    public void consumeJson(UserKafkaDTO userKafkaDTO) {
        invokeAPItoSaveRecordTOMongoDB(userKafkaDTO);

    }

CodePudding user response:

Try change the
factory.setBatchListener(true); to factory.setBatchListener(false);

because your listener handle single object only.

  • Related