We need to delay start of consumer. Here's what we need:
- Start consumer A (reading topic "xyz")
- When consumer A will process all messages, we need to start consumer B (reading topic "zyx")
After reading this: How to find no more messages in kafka topic/partition & reading only after writing to topic is done
We set idleEventInterval on containerProperties of consumer A:
containerProperties.setIdleEventInterval(30000L);
and on consumer B:
container.setAutoStartup(false);
then we have:
@EventListener
public void handleListenerContainerIdleEvent(ListenerContainerIdleEvent event) {
if(canStartContainer(event.getListenerId())) {
Optional.ofNullable(containers.get("container-a"))
.ifPresent(AbstractMessageListenerContainer::start);
}
}
We found that it's exactly what we need - it works fine, but we faced one problem: when consumer B is starting, it forces rebalance of all other consumers. Can we avoid it?
Request joining group due to: group is already rebalancing
Revoke previously assigned partitions
(Re-)joining group
It's not a big issue, but we use ConsumerSeekAware to reset offset using seekToBeginning, so topic is read twice
CodePudding user response:
You should not use the same group.id
with consumers on different topics; it will cause an unnecessary rebalance, as you have found out.
Use different group.id
s for consumers on different topics.