I'm using Spring @KafkaListener with a topicPattern. If during the runtime of this application I create a new topic matching the pattern and start publishing to that, the listener application simply ignores those messages. In other words, it only pulls all the topics matching the pattern at startup and listens to those.
What's the easiest way to "refresh" that? Thanks!
CodePudding user response:
By default, new topics will be picked up within 5 minutes (default) according to the setting of https://kafka.apache.org/documentation/#consumerconfigs_metadata.max.age.ms
The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
You can reduce it to speed things up at the expense of increased traffic.
EDIT
This shows it working as expected...
@SpringBootApplication
public class So71386069Application {
private static final Logger log = LoggerFactory.getLogger(So71386069Application.class);
public static void main(String[] args) {
SpringApplication.run(So71386069Application.class, args);
}
@KafkaListener(id = "so71386069", topicPattern = "so71386069.*",
properties = "metadata.max.age.ms:60000")
void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so71386069").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaAdmin admin) {
return args -> {
try (AdminClient client = AdminClient.create(admin.getConfigurationProperties())) {
IntStream.range(0, 10).forEach(i -> {
try {
Thread.sleep(30_000);
String topic = "so71386069-" i;
log.info("Creating {}", topic);
client.createTopics(Collections.singleton(
TopicBuilder.name(topic).partitions(1).replicas(1).build())).all().get();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
e.printStackTrace();
}
});
}
};
}
}
2022-03-07 15:41:07.131 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions assigned: [so71386069-0]
2022-03-07 15:41:34.007 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-0
2022-03-07 15:42:04.193 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-1
...
2022-03-07 15:42:07.590 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions revoked: [so71386069-0]
...
2022-03-07 15:42:07.599 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions assigned: [so71386069-0, so71386069-1-0, so71386069-0-0]
2022-03-07 15:42:34.378 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-2
2022-03-07 15:43:04.554 INFO 33630 --- [ main] com.example.demo.So71386069Application
: Creating so71386069-3
...
2022-03-07 15:43:08.403 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions revoked: [so71386069-0, so71386069-1-0, so71386069-0-0]
...
2022-03-07 15:43:08.411 INFO 33630 --- [o71386069-0-C-1] o.s.k.l.KafkaMessageListenerContainer
: so71386069: partitions assigned: [so71386069-0, so71386069-3-0, so71386069-2-0, so71386069-1-0, so71386069-0-0]
...
CodePudding user response:
I think that’s how it is by design. The Kafka client always has to subscribe to a topic before be able to get messages.
In this case, on startup the Kafka client/consumer is subscribing to topics matching patterns once at the startup and that’s what it carries on with.
But this is really an interesting question. The easiest and simplest answer is “Restarting the client/consumer“. However, will keep a watch on others answers to learn about any ideas.