Skip to content

KAFKA-19431: Ensure consumer and share assignment consistency with subscriptions #20055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: trunk
Choose a base branch
from

Conversation

squah-confluent
Copy link
Contributor

@squah-confluent squah-confluent commented Jun 27, 2025

Filter out unsubscribed topics during reconciliation.

This eliminates the window where a consumer group assignment could
contain unsubscribed topics when a member unsubscribes from a topic
while it has unrevoked partitions.

We also apply filtering in a few other cases that would arise when
client-side assignors are implemented, since new assignments would no
longer be available immediately. This is important for mixed groups,
since clients on the classic protocol will rejoin if they receive a
topic in their assignment that is no longer in their subscription.

Regex subscriptions have a window where the regex is not resolved and we
cannot know which topics are part of the subscription. We opt to be
conservative and treat unresolved regexes as matching no topics.

The same change is applied to share groups, since the reconciliation
process is similar.

To gauge the performance impact of the change, we add a jmh benchmark.

Filter out unsubscribed topics during reconciliation.

This eliminates the window where an assignment could contain
unsubscribed topics when a member unsubscribes from a topic while it has
unrevoked partitions.

We also apply filtering in a few other cases that would arise when
client-side assignors are implemented, since new assignments would no
longer be available immediately. This is important for mixed groups,
since clients on the classic protocol will rejoin if they receive a
topic in their assignment that is no longer in their subscription.

This change does not address regex subscriptions. When regex
subscriptions are in use, we do no filtering when the regex is not
resolved, since we do not know which topics are part of the
subscription.
Filter out unsubscribed topics during reconciliation.

This has no impact currently.
@squah-confluent
Copy link
Contributor Author

Benchmark results:

Without filter

Benchmark                                                  (partitionsPerTopic)  (topicCount)  Mode  Cnt   Score   Error  Units
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                     5            10  avgt    2   0.004          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                     5           100  avgt    2   0.042          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                     5          1000  avgt    2   0.408          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                    50            10  avgt    2   0.025          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                    50           100  avgt    2   0.254          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                    50          1000  avgt    2   2.688          ms/op
JMH benchmarks done

With filter

Benchmark                                                  (partitionsPerTopic)  (topicCount)  Mode  Cnt   Score   Error  Units
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                     5            10  avgt    2   0.005          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                     5           100  avgt    2   0.046          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                     5          1000  avgt    2   0.510          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                    50            10  avgt    2   0.025          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                    50           100  avgt    2   0.269          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableNextEpoch                    50          1000  avgt    2   2.783          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableSameEpoch                     5            10  avgt    2  ≈ 10⁻⁴          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableSameEpoch                     5           100  avgt    2   0.003          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableSameEpoch                     5          1000  avgt    2   0.046          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableSameEpoch                    50            10  avgt    2  ≈ 10⁻⁴          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableSameEpoch                    50           100  avgt    2   0.003          ms/op
CurrentAssignmentBuilderBenchmark.stableToStableSameEpoch                    50          1000  avgt    2   0.046          ms/op
JMH benchmarks done

stableToStableSameEpoch exercises the computeCurrentAssignment code path that only filters the current assignment.

At 1,000 topics, the impact of this change is <=0.1 ms per reconcile.

@squah-confluent
Copy link
Contributor Author

I can split this change into three PRs if needed.

Comment on lines 257 to 265
/**
* Updates the current assignment, removing any partitions that are not part of the subscribed topics.
*
* @param memberAssignedPartitions The assigned partitions of the member to use.
* @return A new ConsumerGroupMember.
*/
private ConsumerGroupMember computeCurrentAssignment(
Map<Uuid, Set<Integer>> memberAssignedPartitions
) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This differs from computeNextAssignment when there are unrevoked partitions.

  • computeNextAssignment will replace the partitions pending revocation instead of adding to it (which can be fixed)
  • computeNextAssignment will add partitions from the latest target assignment to the current assignment even when there are unrevoked partitions. This is not easy to fix without breaking the stable -> unrevoked partitions state transition.
Comment on lines 267 to 271
if (subscribedTopicIds == null) {
// The member's subscribed topic regex is unresolved, so we cannot tell whether topics
// are part of its subscription.
return member;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent, I think that we should rather consider unresolved regex as empty set. Don't you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that would be better for consistency with the client's subscription. I've made the change and added tests for unresolved vs resolved regexes at the CurrentAssignmentBuilder level. There's an existing GroupMetadataManager test that runs through the regex update flow.

Comment on lines 335 to 338
// If the member is no longer subscribed to the topic, treat its target assignment as empty.
if (subscribedTopicIds != null && !subscribedTopicIds.contains(topicId)) {
target = Set.of();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be inconsistent with what you do in computeCurrentAssignment.

* @param memberAssignedPartitions The assigned partitions of the member to use.
* @return A new ConsumerGroupMember.
*/
private ConsumerGroupMember computeCurrentAssignment(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: updateCurrentAssignment?

@@ -3463,15 +3473,19 @@ private ConsumerGroupMember maybeReconcile(
BiFunction<Uuid, Integer, Integer> currentPartitionEpoch,
int targetAssignmentEpoch,
Assignment targetAssignment,
Map<String, ResolvedRegularExpression> resolvedRegularExpressions,
boolean forceSubscriptionConsistency,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: hasUpdatedSubscription?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed it to hasSubscriptionChanged, to be similar to the hasMemberSubscriptionChanged method.

@@ -139,7 +178,7 @@ public ConsumerGroupMember build() {
member.assignedPartitions()
);
} else {
return member;
return computeCurrentAssignment(member.assignedPartitions());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we should push down the boolean flag into this class to to properly model the state machine here. The bypass in maybeReconcile is more an optmization from my point of view.

Copy link
Contributor Author

@squah-confluent squah-confluent Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the check and added a few more test cases to cover it.

@dajac dajac added ci-approved and removed triage PRs from the community labels Jun 27, 2025
@squah-confluent
Copy link
Contributor Author

squah-confluent commented Jun 30, 2025

@dajac Thanks for taking the time to review the patch.

  • I "optimized" away a transition to UNREVOKED_PARTITIONS when the subscription has changed, but the client doesn't own the revoked partitions anyway. This makes the client re-enter the STABLE state 1 heartbeat sooner. It adds a ownsRevokedPartitions call, which has a small performance hit (compare stableToUnrevokedPartitionsWithSubscriptionChange and stableToStableWithSubscriptionChange in the benchmarks below).

  • I discovered an interesting behavior when transitioning away from UNREVOKED_PARTITIONS. When the target assignment contains a partition that was previously pending revocation, the member enters the UNRELEASED_PARTITIONS state, as if another member was holding the partition. On the next reconcile, I think there is no issue. I haven't tried to fix this behavior since it existed before this PR.

I reran the benchmarks with the largest parameter sizes:

Benchmark                                                                            (partitionsPerTopic)  (topicCount)  Mode  Cnt   Score   Error  Units
CurrentAssignmentBuilderBenchmark.stableToStableWithNewTargetAssignment                                50          1000  avgt        2.771          ms/op    
CurrentAssignmentBuilderBenchmark.stableToStableWithNoChange                                           50          1000  avgt       ≈ 10⁻⁶          ms/op    
CurrentAssignmentBuilderBenchmark.stableToStableWithSubscriptionChange                                 50          1000  avgt        0.046          ms/op
CurrentAssignmentBuilderBenchmark.stableToUnrevokedPartitionsWithSubscriptionChange                    50          1000  avgt        0.059          ms/op
JMH benchmarks done                                                                                                                              
@squah-confluent squah-confluent requested a review from dajac June 30, 2025 05:07
);

ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(0)
.setPreviousMemberEpoch(10)
Copy link
Contributor Author

@squah-confluent squah-confluent Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a mistake in the existing code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The member epoch and previous member epoch before the heartbeat is 10. After the heartbeat, the previous member epoch should still be 10, not 0. This wasn't an issue previously because newConsumerGroupMemberSubscriptionRecord and newConsumerGroupCurrentAssignmentRecord don't put the previous member epoch into any records.

Comment on lines +20439 to +20496
assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
memberId1, new MemberAssignmentImpl(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
))
)));

// Member heartbeats again with the same regex.
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId1)
.setMemberEpoch(10)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignor("range")
.setTopicPartitions(List.of()));

assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId1)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(List.of(
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(fooTopicId)
.setPartitions(List.of(0, 1, 2, 3, 4, 5)),
new ConsumerGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(barTopicId)
.setPartitions(List.of(0, 1, 2))))),
result2.response()
);

ConsumerGroupMember expectedMember2 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(11)
.setPreviousMemberEpoch(10)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicRegex("foo*|bar*")
.setServerAssignorName("range")
.setAssignedPartitions(mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)))
.build();

expectedRecords = List.of(
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment(
mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
mkTopicAssignment(barTopicId, 0, 1, 2)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember2)
);

assertRecordsEquals(expectedRecords, result2.records());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added testing for completion of the regex update, from the next assignment computation to the end of the heartbeat with the updated assignment.

Comment on lines +126 to +135
@Benchmark
@Threads(1)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public ConsumerGroupMember stableToStableWithNoChange() {
return new CurrentAssignmentBuilder(member)
.withMetadataImage(metadataImage)
.withTargetAssignment(member.memberEpoch(), targetAssignment)
.withCurrentPartitionEpoch((topicId, partitionId) -> -1)
.build();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This benchmark case should be a no-op and pretty fast.

@@ -2305,6 +2305,9 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
group::currentPartitionEpoch,
targetAssignmentEpoch,
targetAssignment,
group.resolvedRegularExpressions(),
// Force consistency with the subscription when the subscription has changed.
bumpGroupEpoch || hasMemberRegularExpressionChanged(member, updatedMember),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not really happy with this. Could we somehow make hasMemberRegularExpressionChanged part of the checks that we already do in the beginning of this method? We may be able to refactor it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you have in mind? I wanted to avoid bumping the epoch because it would trigger an assignor run before the regex is resolved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should not trigger an assignment. My point is more about the structure of the code. We have all the subscription checks over there except this one. It would be better if we could define a boolean over there too and combine checks if possible. For instance, the method checking the regex could return a record with two booleans or we could refactor it differently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I've updated maybeUpdateRegularExpressions to return a (bumpGroupEpoch, subscribedTopicRegexChanged) record.

* @param updatedMember The new member.
* @return A boolean indicating whether the subscribed topic regular expression has changed.
*/
private boolean hasMemberRegularExpressionChanged(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: static?

Comment on lines +198 to +199
} else if (hasSubscriptionChanged) {
return updateCurrentAssignment(member.assignedPartitions());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we update the top level comment of the states?

}

// When the member has revoked all the pending partitions, it can
// transition to the next epoch (current + 1) and we can reconcile
// its state towards the latest target assignment.
return computeNextAssignment(
member.memberEpoch() + 1,
Math.min(member.memberEpoch() + 1, targetAssignmentEpoch),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change? Is it to be more defensive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have client-side assignors, we could transition from STABLE to UNREVOKED_PARTITIONS at epoch N when unsubscribing from a topic. Then the client could revoke the partitions and transition back to STABLE before the epoch N+1 target assignment is ready.

return member;
}

if (ownsRevokedPartitions(newPartitionsPendingRevocation)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really need this check here or if we could just transition to UNREVOKED_PARTITIONS if we revoked at least one partition above. I may be missing something though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't check this, then we run into the UNREVOKED_PARTITIONS -> UNRELEASED_PARTITIONS bug in the regex tests and have to heartbeat an extra time. It just makes the tests worse is all.

Instead of transitioning

STABLE --subscription update--> STABLE --regex and assignor update--> STABLE

it goes

STABLE --subscription update--> UNREVOKED_PARTITIONS --regex and assignor update--> UNRELEASED_PARTITIONS --> STABLE
}
}

TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(metadataImage.topics());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the cache in this case? It seems that we will check every topic id once so the cache does not bring any benefits, does it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we can remove the cache.

@@ -83,11 +125,35 @@ public ShareGroupMember build() {
// when the member is updated.
return new ShareGroupMember.Builder(member)
.setState(MemberState.STABLE)
.setAssignedPartitions(targetAssignment.partitions())
.setAssignedPartitions(filterAssignedPartitions(targetAssignment.partitions(), member.subscribedTopicNames()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need filterAssignedPartitions here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we ever have client-side assignors, we could have two subscription changes in a row.
The first would trigger the assignor and filtering. Then the second would trigger the filtering. Finally, when the assignment is ready, it would be based on an older subscription and may not be consistent with the latest subscription.

Map<Uuid, Set<Integer>> partitions,
Set<String> subscribedTopicNames
) {
TopicIds.TopicResolver topicResolver = new TopicIds.CachedTopicResolver(metadataImage.topics());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question about the cache.

);

ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(10)
.setPreviousMemberEpoch(0)
.setPreviousMemberEpoch(10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate?

@dajac
Copy link
Member

dajac commented Jun 30, 2025

  • I discovered an interesting behavior when transitioning away from UNREVOKED_PARTITIONS. When the target assignment contains a partition that was previously pending revocation, the member enters the UNRELEASED_PARTITIONS state, as if another member was holding the partition. On the next reconcile, I think there is no issue. I haven't tried to fix this behavior since it existed before this PR.

Interesting. We should fix this. Could you please file a jira and raise a PR for it?

@squah-confluent
Copy link
Contributor Author

  • I discovered an interesting behavior when transitioning away from UNREVOKED_PARTITIONS. When the target assignment contains a partition that was previously pending revocation, the member enters the UNRELEASED_PARTITIONS state, as if another member was holding the partition. On the next reconcile, I think there is no issue. I haven't tried to fix this behavior since it existed before this PR.

Interesting. We should fix this. Could you please file a jira and raise a PR for it?

I filed this as https://issues.apache.org/jira/browse/KAFKA-19449.

@@ -2250,13 +2250,15 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
records
);

bumpGroupEpoch |= maybeUpdateRegularExpressions(
UpdateRegularExpressionsResult updateRegularExpressionsResult = maybeUpdateRegularExpressions(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would it make sense to define the following variables?

  • subscribedTopicNamesChanged
  • subscribedTopicRegexChanged (already part of updateRegularExpressionsResult)

Then we can define:

  • hasSubscriptionChanged = subscribedTopicNamesChanged | subscribedTopicRegexChanged
  • bumpGroupEpoch = group.groupEpoch() == 0 | subscribedTopicNamesChanged | updateRegularExpressionsResult.bumpGroupEpoch.

For both, we can put a comment explaining the differences. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave that a go. Let me know your thoughts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment