Home > database >  Spring @KafkaListener with topicPattern: handle runtime topic creation
Spring @KafkaListener with topicPattern: handle runtime topic creation

Time:03-08

I'm using Spring @KafkaListener with a topicPattern. If during the runtime of this application I create a new topic matching the pattern and start publishing to that, the listener application simply ignores those messages. In other words, it only pulls all the topics matching the pattern at startup and listens to those.

What's the easiest way to "refresh" that? Thanks!

CodePudding user response:

By default, new topics will be picked up within 5 minutes (default) according to the setting of https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

You can reduce it to speed things up at the expense of increased traffic.

EDIT

This shows it working as expected...

@SpringBootApplication
public class So71386069Application {

    private static final Logger log = LoggerFactory.getLogger(So71386069Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So71386069Application.class, args);
    }

    @KafkaListener(id = "so71386069", topicPattern = "so71386069.*",
            properties = "metadata.max.age.ms:60000")
    void listen(String in) {
        System.out.println(in);
    }


    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71386069").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(KafkaAdmin admin) {
        return args -> {
            try (AdminClient client = AdminClient.create(admin.getConfigurationProperties())) {
                IntStream.range(0, 10).forEach(i -> {
                    try {
                        Thread.sleep(30_000);
                        String topic = "so71386069-"   i;
                        log.info("Creating {}", topic);
                        client.createTopics(Collections.singleton(
                                TopicBuilder.name(topic).partitions(1).replicas(1).build())).all().get();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                });
            }
        };
    }

}
2022-03-07 15:41:07.131  INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
   : so71386069: partitions assigned: [so71386069-0]
2022-03-07 15:41:34.007  INFO 33630 --- [           main] com.example.demo.So71386069Application
   : Creating so71386069-0
2022-03-07 15:42:04.193  INFO 33630 --- [           main] com.example.demo.So71386069Application
   : Creating so71386069-1
...
2022-03-07 15:42:07.590  INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
    : so71386069: partitions revoked: [so71386069-0]
...
2022-03-07 15:42:07.599  INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
   : so71386069: partitions assigned: [so71386069-0, so71386069-1-0, so71386069-0-0]
2022-03-07 15:42:34.378  INFO 33630 --- [           main] com.example.demo.So71386069Application
   : Creating so71386069-2
2022-03-07 15:43:04.554  INFO 33630 --- [           main] com.example.demo.So71386069Application
   : Creating so71386069-3
...
2022-03-07 15:43:08.403  INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
  : so71386069: partitions revoked: [so71386069-0, so71386069-1-0, so71386069-0-0]
...
2022-03-07 15:43:08.411  INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
   : so71386069: partitions assigned: [so71386069-0, so71386069-3-0, so71386069-2-0, so71386069-1-0, so71386069-0-0]
...

CodePudding user response:

I think that’s how it is by design. The Kafka client always has to subscribe to a topic before be able to get messages.

In this case, on startup the Kafka client/consumer is subscribing to topics matching patterns once at the startup and that’s what it carries on with.

But this is really an interesting question. The easiest and simplest answer is “Restarting the client/consumer“. However, will keep a watch on others answers to learn about any ideas.

  • Related