Home > Blockchain >  How to run multiple instances of the same app as producer/consumer in Kafka
How to run multiple instances of the same app as producer/consumer in Kafka

Time:11-06

i have an simple rest api that have a h2 database so my plan is when i run multiple instances of the same app they will have different in memory databases.Now i want to syncronize these databases beetwen them.I thought kafka to be a good solution , so for example when i get an POST for instance with port 8080 , i should post also for all other instances. Now my app acts as a producer/consumer at the same time and i do not know why only one instance receive the message. The code:

    @EnableKafka
@Configuration
public class KafkaProducerConfigForDepartment {


    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, MessageEventForDepartment> producerFactoryForDepartment() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, MessageEventForDepartment> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactoryForDepartment());
    }

}
    @Configuration
public class KafkaTopicConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String, MessageEventForDepartment> consumerFactoryForDepartments() {
        Map<String, Object> props = new HashMap<>();
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MessageEventForDepartment.class));
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("topic12")
                .partitions(10)
                .replicas(10)
                .build();
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, MessageEventForDepartment>
    kafkaListenerContainerFactoryForDepartments() {

        ConcurrentKafkaListenerContainerFactory<String, MessageEventForDepartment> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryForDepartments());
        return factory;
    }

}
    @Component
@Slf4j
public class DepartmentKafkaService {

    @Autowired
    private  DepartmentService departmentService;

    @KafkaListener(topics = "topic12" , groupId = "groupId",containerFactory = "kafkaListenerContainerFactoryForDepartments")
    public void listenGroupFoo(MessageEventForDepartment message) {
         log.info(message.toString());
    }
}

Why is this happening ? or maybe my approach is not very good , what are your thoughts ,guys?

CodePudding user response:

Have you considered Kafka Streams? In my opinion, your solution is already done by internal RocksDB and Global KTable definition in Kafka Streams.

CodePudding user response:

The reason why only one instance of the application receives each message is that each instance has the same ConsumerConfig.GROUP_ID_CONFIG. Kafka's consumer protocol is such that each consumer group gets each message delivered once (obviously, there's a lot more nuance to it, but this is basically how it works).

Pawel's suggestion to use KafkaStreams is a good one—a GlobalKTable would provide what you want.

Luca Pette wrote a great primer on Kakfa Streams here: https://lucapette.me/writing/getting-started-with-kafka-streams/

  • Related