-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19431: Ensure consumer and share assignment consistency with subscriptions #20055
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-19431: Ensure consumer and share assignment consistency with subscriptions #20055
Conversation
Filter out unsubscribed topics during reconciliation. This eliminates the window where an assignment could contain unsubscribed topics when a member unsubscribes from a topic while it has unrevoked partitions. We also apply filtering in a few other cases that would arise when client-side assignors are implemented, since new assignments would no longer be available immediately. This is important for mixed groups, since clients on the classic protocol will rejoin if they receive a topic in their assignment that is no longer in their subscription. This change does not address regex subscriptions. When regex subscriptions are in use, we do no filtering when the regex is not resolved, since we do not know which topics are part of the subscription.
Filter out unsubscribed topics during reconciliation. This has no impact currently.
Benchmark results: Without filter
With filter
At 1,000 topics, the impact of this change is <=0.1 ms per reconcile. |
I can split this change into three PRs if needed. |
/** | ||
* Updates the current assignment, removing any partitions that are not part of the subscribed topics. | ||
* | ||
* @param memberAssignedPartitions The assigned partitions of the member to use. | ||
* @return A new ConsumerGroupMember. | ||
*/ | ||
private ConsumerGroupMember computeCurrentAssignment( | ||
Map<Uuid, Set<Integer>> memberAssignedPartitions | ||
) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This differs from computeNextAssignment
when there are unrevoked partitions.
computeNextAssignment
will replace the partitions pending revocation instead of adding to it (which can be fixed)computeNextAssignment
will add partitions from the latest target assignment to the current assignment even when there are unrevoked partitions. This is not easy to fix without breaking the stable -> unrevoked partitions state transition.
if (subscribedTopicIds == null) { | ||
// The member's subscribed topic regex is unresolved, so we cannot tell whether topics | ||
// are part of its subscription. | ||
return member; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent, I think that we should rather consider unresolved regex as empty set. Don't you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that would be better for consistency with the client's subscription. I've made the change and added tests for unresolved vs resolved regexes at the CurrentAssignmentBuilder
level. There's an existing GroupMetadataManager
test that runs through the regex update flow.
// If the member is no longer subscribed to the topic, treat its target assignment as empty. | ||
if (subscribedTopicIds != null && !subscribedTopicIds.contains(topicId)) { | ||
target = Set.of(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be inconsistent with what you do in computeCurrentAssignment.
* @param memberAssignedPartitions The assigned partitions of the member to use. | ||
* @return A new ConsumerGroupMember. | ||
*/ | ||
private ConsumerGroupMember computeCurrentAssignment( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: updateCurrentAssignment
?
@@ -3463,15 +3473,19 @@ private ConsumerGroupMember maybeReconcile( | |||
BiFunction<Uuid, Integer, Integer> currentPartitionEpoch, | |||
int targetAssignmentEpoch, | |||
Assignment targetAssignment, | |||
Map<String, ResolvedRegularExpression> resolvedRegularExpressions, | |||
boolean forceSubscriptionConsistency, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: hasUpdatedSubscription
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed it to hasSubscriptionChanged
, to be similar to the hasMemberSubscriptionChanged
method.
@@ -139,7 +178,7 @@ public ConsumerGroupMember build() { | |||
member.assignedPartitions() | |||
); | |||
} else { | |||
return member; | |||
return computeCurrentAssignment(member.assignedPartitions()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether we should push down the boolean flag into this class to to properly model the state machine here. The bypass in maybeReconcile is more an optmization from my point of view.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the check and added a few more test cases to cover it.
…Epoch parameter for previousMemberEpoch
…rom a topic and the topic's partitions are not currently owned
…ltering with and without a new target assignment
@dajac Thanks for taking the time to review the patch.
I reran the benchmarks with the largest parameter sizes:
|
); | ||
|
||
ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) | ||
.setState(MemberState.STABLE) | ||
.setMemberEpoch(10) | ||
.setPreviousMemberEpoch(0) | ||
.setPreviousMemberEpoch(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a mistake in the existing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The member epoch and previous member epoch before the heartbeat is 10. After the heartbeat, the previous member epoch should still be 10, not 0. This wasn't an issue previously because newConsumerGroupMemberSubscriptionRecord
and newConsumerGroupCurrentAssignmentRecord
don't put the previous member epoch into any records.
assignor.prepareGroupAssignment(new GroupAssignment(Map.of( | ||
memberId1, new MemberAssignmentImpl(mkAssignment( | ||
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), | ||
mkTopicAssignment(barTopicId, 0, 1, 2) | ||
)) | ||
))); | ||
|
||
// Member heartbeats again with the same regex. | ||
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result2 = context.consumerGroupHeartbeat( | ||
new ConsumerGroupHeartbeatRequestData() | ||
.setGroupId(groupId) | ||
.setMemberId(memberId1) | ||
.setMemberEpoch(10) | ||
.setRebalanceTimeoutMs(5000) | ||
.setSubscribedTopicRegex("foo*|bar*") | ||
.setServerAssignor("range") | ||
.setTopicPartitions(List.of())); | ||
|
||
assertResponseEquals( | ||
new ConsumerGroupHeartbeatResponseData() | ||
.setMemberId(memberId1) | ||
.setMemberEpoch(11) | ||
.setHeartbeatIntervalMs(5000) | ||
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() | ||
.setTopicPartitions(List.of( | ||
new ConsumerGroupHeartbeatResponseData.TopicPartitions() | ||
.setTopicId(fooTopicId) | ||
.setPartitions(List.of(0, 1, 2, 3, 4, 5)), | ||
new ConsumerGroupHeartbeatResponseData.TopicPartitions() | ||
.setTopicId(barTopicId) | ||
.setPartitions(List.of(0, 1, 2))))), | ||
result2.response() | ||
); | ||
|
||
ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId1) | ||
.setState(MemberState.STABLE) | ||
.setMemberEpoch(11) | ||
.setPreviousMemberEpoch(10) | ||
.setClientId(DEFAULT_CLIENT_ID) | ||
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) | ||
.setRebalanceTimeoutMs(5000) | ||
.setSubscribedTopicRegex("foo*|bar*") | ||
.setServerAssignorName("range") | ||
.setAssignedPartitions(mkAssignment( | ||
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), | ||
mkTopicAssignment(barTopicId, 0, 1, 2))) | ||
.build(); | ||
|
||
expectedRecords = List.of( | ||
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( | ||
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5), | ||
mkTopicAssignment(barTopicId, 0, 1, 2) | ||
)), | ||
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11), | ||
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2) | ||
); | ||
|
||
assertRecordsEquals(expectedRecords, result2.records()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added testing for completion of the regex update, from the next assignment computation to the end of the heartbeat with the updated assignment.
@Benchmark | ||
@Threads(1) | ||
@OutputTimeUnit(TimeUnit.MILLISECONDS) | ||
public ConsumerGroupMember stableToStableWithNoChange() { | ||
return new CurrentAssignmentBuilder(member) | ||
.withMetadataImage(metadataImage) | ||
.withTargetAssignment(member.memberEpoch(), targetAssignment) | ||
.withCurrentPartitionEpoch((topicId, partitionId) -> -1) | ||
.build(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This benchmark case should be a no-op and pretty fast.
@@ -2305,6 +2305,9 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> | |||
group::currentPartitionEpoch, | |||
targetAssignmentEpoch, | |||
targetAssignment, | |||
group.resolvedRegularExpressions(), | |||
// Force consistency with the subscription when the subscription has changed. | |||
bumpGroupEpoch || hasMemberRegularExpressionChanged(member, updatedMember), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not really happy with this. Could we somehow make hasMemberRegularExpressionChanged part of the checks that we already do in the beginning of this method? We may be able to refactor it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you have in mind? I wanted to avoid bumping the epoch because it would trigger an assignor run before the regex is resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we should not trigger an assignment. My point is more about the structure of the code. We have all the subscription checks over there except this one. It would be better if we could define a boolean over there too and combine checks if possible. For instance, the method checking the regex could return a record with two booleans or we could refactor it differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I've updated maybeUpdateRegularExpressions
to return a (bumpGroupEpoch, subscribedTopicRegexChanged)
record.
* @param updatedMember The new member. | ||
* @return A boolean indicating whether the subscribed topic regular expression has changed. | ||
*/ | ||
private boolean hasMemberRegularExpressionChanged( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static?
} else if (hasSubscriptionChanged) { | ||
return updateCurrentAssignment(member.assignedPartitions()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we update the top level comment of the states?
} | ||
|
||
// When the member has revoked all the pending partitions, it can | ||
// transition to the next epoch (current + 1) and we can reconcile | ||
// its state towards the latest target assignment. | ||
return computeNextAssignment( | ||
member.memberEpoch() + 1, | ||
Math.min(member.memberEpoch() + 1, targetAssignmentEpoch), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this change? Is it to be more defensive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have client-side assignors, we could transition from STABLE
to UNREVOKED_PARTITIONS
at epoch N when unsubscribing from a topic. Then the client could revoke the partitions and transition back to STABLE
before the epoch N+1 target assignment is ready.
return member; | ||
} | ||
|
||
if (ownsRevokedPartitions(newPartitionsPendingRevocation)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we really need this check here or if we could just transition to UNREVOKED_PARTITIONS if we revoked at least one partition above. I may be missing something though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't check this, then we run into the UNREVOKED_PARTITIONS
-> UNRELEASED_PARTITIONS
bug in the regex tests and have to heartbeat an extra time. It just makes the tests worse is all.
Instead of transitioning
STABLE --subscription update--> STABLE --regex and assignor update--> STABLE
it goes
STABLE --subscription update--> UNREVOKED_PARTITIONS --regex and assignor update--> UNRELEASED_PARTITIONS --> STABLE
} | ||
} | ||
|
||
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(metadataImage.topics()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the cache in this case? It seems that we will check every topic id once so the cache does not bring any benefits, does it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, we can remove the cache.
@@ -83,11 +125,35 @@ public ShareGroupMember build() { | |||
// when the member is updated. | |||
return new ShareGroupMember.Builder(member) | |||
.setState(MemberState.STABLE) | |||
.setAssignedPartitions(targetAssignment.partitions()) | |||
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), member.subscribedTopicNames())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need filterAssignedPartitions here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we ever have client-side assignors, we could have two subscription changes in a row.
The first would trigger the assignor and filtering. Then the second would trigger the filtering. Finally, when the assignment is ready, it would be based on an older subscription and may not be consistent with the latest subscription.
Map<Uuid, Set<Integer>> partitions, | ||
Set<String> subscribedTopicNames | ||
) { | ||
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(metadataImage.topics()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question about the cache.
); | ||
|
||
ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1) | ||
.setState(MemberState.STABLE) | ||
.setMemberEpoch(10) | ||
.setPreviousMemberEpoch(0) | ||
.setPreviousMemberEpoch(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate?
Interesting. We should fix this. Could you please file a jira and raise a PR for it? |
I filed this as https://issues.apache.org/jira/browse/KAFKA-19449. |
@@ -2250,13 +2250,15 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> | |||
records | |||
); | |||
|
|||
bumpGroupEpoch |= maybeUpdateRegularExpressions( | |||
UpdateRegularExpressionsResult updateRegularExpressionsResult = maybeUpdateRegularExpressions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would it make sense to define the following variables?
- subscribedTopicNamesChanged
- subscribedTopicRegexChanged (already part of updateRegularExpressionsResult)
Then we can define:
- hasSubscriptionChanged = subscribedTopicNamesChanged | subscribedTopicRegexChanged
- bumpGroupEpoch = group.groupEpoch() == 0 | subscribedTopicNamesChanged | updateRegularExpressionsResult.bumpGroupEpoch.
For both, we can put a comment explaining the differences. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave that a go. Let me know your thoughts.
Filter out unsubscribed topics during reconciliation.
This eliminates the window where a consumer group assignment could
contain unsubscribed topics when a member unsubscribes from a topic
while it has unrevoked partitions.
We also apply filtering in a few other cases that would arise when
client-side assignors are implemented, since new assignments would no
longer be available immediately. This is important for mixed groups,
since clients on the classic protocol will rejoin if they receive a
topic in their assignment that is no longer in their subscription.
Regex subscriptions have a window where the regex is not resolved and we
cannot know which topics are part of the subscription. We opt to be
conservative and treat unresolved regexes as matching no topics.
The same change is applied to share groups, since the reconciliation
process is similar.
To gauge the performance impact of the change, we add a jmh benchmark.