Home > Back-end >  How to set custom task executor on Kafka Listener
How to set custom task executor on Kafka Listener

Time:06-17

I am running on spring-kafka:2.6.7 and I am looking for a way to set a custom task executor for my listener. Below is my Kafka configuration.

@Bean
    ProducerFactory<Integer, BaseEventTemplate> eventProducerFactory() {
        Map<String, Object> producerProps = new HashMap<>()
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class)
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BaseEventTemplateSerializer.class)
        producerProps.put(ProducerConfig.ACKS_CONFIG, "all")
        producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 256)
        return new DefaultKafkaProducerFactory<>(producerProps)
    }

    @Bean
    KafkaTemplate<Integer, BaseEventTemplate> baseEventKafkaTemplate() {
        return new KafkaTemplate<>(eventProducerFactory())
    }

    @Bean
    ConsumerFactory<Integer, BaseEventTemplate> baseEventConsumerFactory() {
        Map<String, Object> consumerProps = new HashMap<>()
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkaeventconsumer")
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class)
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BaseEventTemplateDeserializer.class)
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false)
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Collections.singletonList(RoundRobinAssignor.class))
        return new DefaultKafkaConsumerFactory<>(consumerProps)
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, BaseEventTemplate> baseEventKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, BaseEventTemplate> factory =
                new ConcurrentKafkaListenerContainerFactory<>()
        factory.setConsumerFactory(baseEventConsumerFactory())
        factory.setConcurrency(3)
        factory.getContainerProperties().setPollTimeout(3000)
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)
        factory.getContainerProperties().setSyncCommits(true)
        return factory
    }

I have a way to set consumer task executor via factory.getContainerProperties().setConsumerTaskExecutor() but not sure how to set task executor for listener.

CodePudding user response:

2.6.x is out of OSS support https://spring.io/projects/spring-kafka#support

The same thread used to poll the consumer is used to call the listener.

In very early versions (before 1.3), there were two threads due to limitations in the kafka-clients, but there is only one now (for the last 5 years).

  • Related