In the GroupCoordinator.scala of kafka we find the following code:
case Stable =>
val member = group.get(memberId)
if (group.isLeader(memberId)) {
// force a rebalance if the leader sends JoinGroup;
// This allows the leader to trigger rebalances for changes affecting assignment
// which do not affect the member metadata (such as topic metadata changes for the consumer)
updateMemberAndRebalance(group, member, protocols, s"Leader ${member.memberId} re-joining group during ${group.currentState}", responseCallback)
} else if (!member.matches(protocols)) {
updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback)
} else {
// for followers with no actual change to their metadata, just return group information
// for the current generation which will allow them to issue SyncGroup
responseCallback(JoinGroupResult(
members = List.empty,
memberId = memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE))
}
Given that i am periodically getting the following cause of rebalance when running my kafka stream application with static membership:
s"Updating metadata for member ${member.memberId} during Stable"
I am wondering what is the meaning of the following check:
else if (!member.matches(protocols)) {
updateMemberAndRebalance(group, member, protocols, s"Updating metadata for member ${member.memberId} during ${group.currentState}", responseCallback)
}
How can this happens and what does it means
!member.matches(protocols))
I'm trying to track down where the problem of my constant rebalance maybe coming from, but i have a hard time, the only thing so far is that this is the thing that ignite reblance in my app everynow and then and stop my application from progressing.
Maybe someone, has some clue about what this protocol mismatch means and what kind of situation can lead to that, so i can figure out in my cluster, what might be causing the situation to occur.
CodePudding user response:
In a consumer rebalance, the protocols
appear to be the partition.assignment.strategy
s configured for the consumer (see this article). So I would check that that's configured the same way on all the consumers.