Skip to content

Commit cb2edfd

Browse files
authored
KAFKA-19720: Regex subscription should be empty for classic members joining mixed group (4.1) (#20904)
We don't recompute the assignment on consumer -> classic member replacement when the consumer member had a regex subscription and the classic member does not. This PR sets regex subscription to empty during the replacement. Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
1 parent 77dd47d commit cb2edfd

File tree

2 files changed

+310
-0
lines changed

2 files changed

+310
-0
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2422,6 +2422,7 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
24222422
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
24232423
.maybeUpdateServerAssignorName(Optional.empty())
24242424
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
2425+
.setSubscribedTopicRegex("") // Regex subscription is not supported for classic member.
24252426
.setClientId(context.clientId())
24262427
.setClientHost(context.clientAddress().toString())
24272428
.setClassicMemberMetadata(
@@ -2437,6 +2438,18 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
24372438
records
24382439
);
24392440

2441+
bumpGroupEpoch |= maybeUpdateRegularExpressions(
2442+
context,
2443+
group,
2444+
member,
2445+
updatedMember,
2446+
records
2447+
);
2448+
2449+
// We bump the group epoch if the updated member replaces a static member
2450+
// with regex subscription to trigger a new assignment computation.
2451+
bumpGroupEpoch |= !member.subscribedTopicRegex().isEmpty();
2452+
24402453
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
24412454
// The subscription metadata is updated in two cases:
24422455
// 1) The member has updated its subscriptions;

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java

Lines changed: 297 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20790,6 +20790,303 @@ foooTopicName, computeTopicHash(foooTopicName, newImage)
2079020790
);
2079120791
}
2079220792

20793+
@Test
20794+
public void testConsumerMemberWithRegexReplacedByClassicMemberWithSameSubscription() {
20795+
String groupId = "fooup";
20796+
String instanceId = "instance-id";
20797+
String memberId1 = Uuid.randomUuid().toString();
20798+
String memberId2 = Uuid.randomUuid().toString();
20799+
20800+
Uuid fooTopicId = Uuid.randomUuid();
20801+
String fooTopicName = "foo";
20802+
Uuid barTopicId = Uuid.randomUuid();
20803+
String barTopicName = "bar";
20804+
20805+
MetadataImage metadataImage = new MetadataImageBuilder()
20806+
.addTopic(fooTopicId, fooTopicName, 6)
20807+
.addTopic(barTopicId, barTopicName, 1)
20808+
.build(12345L);
20809+
20810+
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
20811+
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
20812+
20813+
// Member 1 is a static member with both regex and topic name subscription
20814+
// Member 2 uses topic name subscription.
20815+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
20816+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
20817+
.withMetadataImage(metadataImage)
20818+
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
20819+
.withMember(new ConsumerGroupMember.Builder(memberId1)
20820+
.setInstanceId(instanceId)
20821+
.setState(MemberState.STABLE)
20822+
.setMemberEpoch(10)
20823+
.setPreviousMemberEpoch(10)
20824+
.setClientId(DEFAULT_CLIENT_ID)
20825+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
20826+
.setRebalanceTimeoutMs(5000)
20827+
.setSubscribedTopicRegex("bar*")
20828+
.setSubscribedTopicNames(List.of(fooTopicName))
20829+
.setServerAssignorName("range")
20830+
.setAssignedPartitions(mkAssignment(
20831+
mkTopicAssignment(fooTopicId, 0, 1, 2),
20832+
mkTopicAssignment(barTopicId, 0)))
20833+
.build())
20834+
.withMember(new ConsumerGroupMember.Builder(memberId2)
20835+
.setState(MemberState.STABLE)
20836+
.setMemberEpoch(10)
20837+
.setPreviousMemberEpoch(10)
20838+
.setClientId(DEFAULT_CLIENT_ID)
20839+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
20840+
.setRebalanceTimeoutMs(5000)
20841+
.setSubscribedTopicNames(List.of(fooTopicName))
20842+
.setServerAssignorName("range")
20843+
.setAssignedPartitions(mkAssignment(
20844+
mkTopicAssignment(fooTopicId, 3, 4, 5)))
20845+
.build())
20846+
.withAssignment(memberId1, mkAssignment(
20847+
mkTopicAssignment(fooTopicId, 0, 1, 2),
20848+
mkTopicAssignment(barTopicId, 0)))
20849+
.withAssignment(memberId2, mkAssignment(
20850+
mkTopicAssignment(fooTopicId, 3, 4, 5)))
20851+
.withAssignmentEpoch(10)
20852+
.withResolvedRegularExpression("bar*", new ResolvedRegularExpression(
20853+
Set.of(barTopicName), 0L, 0L)))
20854+
.build();
20855+
ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId);
20856+
group.setMetadataRefreshDeadline(Long.MAX_VALUE, 10);
20857+
20858+
// Member 1 is replaced by a classic member with the same instance id.
20859+
JoinGroupRequestProtocolCollection joinProtocols = new JoinGroupRequestProtocolCollection();
20860+
joinProtocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
20861+
.setName("range")
20862+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
20863+
List.of(fooTopicName)
20864+
))))
20865+
);
20866+
JoinGroupRequestData joinRequest = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
20867+
.withGroupId(groupId)
20868+
.withMemberId(UNKNOWN_MEMBER_ID)
20869+
.withGroupInstanceId(instanceId)
20870+
.withRebalanceTimeoutMs(5000)
20871+
.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
20872+
.withProtocols(joinProtocols)
20873+
.build();
20874+
GroupMetadataManagerTestContext.JoinResult result = context.sendClassicGroupJoin(joinRequest);
20875+
20876+
ConsumerGroupMember newMember1 = group.staticMember(instanceId);
20877+
20878+
ConsumerGroupMember expectedCopiedMember = new ConsumerGroupMember.Builder(newMember1.memberId())
20879+
.setState(MemberState.STABLE)
20880+
.setInstanceId(instanceId)
20881+
.setMemberEpoch(0)
20882+
.setPreviousMemberEpoch(0)
20883+
.setClientId(DEFAULT_CLIENT_ID)
20884+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
20885+
.setRebalanceTimeoutMs(5000)
20886+
.setSubscribedTopicRegex("bar*") // Still uses regex subscription.
20887+
.setSubscribedTopicNames(List.of(fooTopicName))
20888+
.setServerAssignorName("range")
20889+
.setAssignedPartitions(mkAssignment(
20890+
mkTopicAssignment(fooTopicId, 0, 1, 2),
20891+
mkTopicAssignment(barTopicId, 0)))
20892+
.build();
20893+
20894+
ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(newMember1.memberId())
20895+
.setState(MemberState.STABLE)
20896+
.setInstanceId(instanceId)
20897+
.setMemberEpoch(11)
20898+
.setPreviousMemberEpoch(0)
20899+
.setClientId(DEFAULT_CLIENT_ID)
20900+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
20901+
.setRebalanceTimeoutMs(5000)
20902+
.setSubscribedTopicRegex("") // empty regex subscription
20903+
.setSubscribedTopicNames(List.of(fooTopicName))
20904+
.setServerAssignorName("range")
20905+
.setAssignedPartitions(Map.of()) // empty assignment
20906+
.setClassicMemberMetadata(
20907+
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
20908+
.setSessionTimeoutMs(500)
20909+
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(joinRequest.protocols()))
20910+
)
20911+
.build();
20912+
20913+
List<List<CoordinatorRecord>> expectedRecords = List.of(
20914+
// The previous member is deleted.
20915+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1)),
20916+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1)),
20917+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1)),
20918+
// The previous member is replaced by the new one.
20919+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedCopiedMember)),
20920+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, expectedCopiedMember.memberId(), expectedCopiedMember.assignedPartitions())),
20921+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedCopiedMember)),
20922+
// The member subscription is updated.
20923+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1)),
20924+
// The regex is tombstoned.
20925+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "bar*")),
20926+
// The group epoch is bumped.
20927+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
20928+
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
20929+
)))),
20930+
// The target assignment is updated.
20931+
List.of(
20932+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, expectedMember1.memberId(), Map.of()),
20933+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of())
20934+
),
20935+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11)),
20936+
// The member assignment is updated.
20937+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1))
20938+
);
20939+
20940+
assertUnorderedRecordsEquals(
20941+
expectedRecords,
20942+
result.records
20943+
);
20944+
}
20945+
20946+
@Test
20947+
public void testConsumerMemberWithRegexReplacedByClassicMemberWithChangedSubscription() {
20948+
String groupId = "fooup";
20949+
String instanceId = "instance-id";
20950+
String memberId1 = Uuid.randomUuid().toString();
20951+
String memberId2 = Uuid.randomUuid().toString();
20952+
20953+
Uuid fooTopicId = Uuid.randomUuid();
20954+
String fooTopicName = "foo";
20955+
20956+
MetadataImage metadataImage = new MetadataImageBuilder()
20957+
.addTopic(fooTopicId, fooTopicName, 6)
20958+
.build(12345L);
20959+
20960+
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
20961+
assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
20962+
20963+
// Member 1 is a static member with regex subscription and
20964+
// Member 2 uses topic name subscription.
20965+
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
20966+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
20967+
.withMetadataImage(metadataImage)
20968+
.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
20969+
.withMember(new ConsumerGroupMember.Builder(memberId1)
20970+
.setInstanceId(instanceId)
20971+
.setState(MemberState.STABLE)
20972+
.setMemberEpoch(10)
20973+
.setPreviousMemberEpoch(10)
20974+
.setClientId(DEFAULT_CLIENT_ID)
20975+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
20976+
.setRebalanceTimeoutMs(5000)
20977+
.setSubscribedTopicRegex("foo*")
20978+
.setServerAssignorName("range")
20979+
.setAssignedPartitions(mkAssignment(
20980+
mkTopicAssignment(fooTopicId, 0, 1, 2)))
20981+
.build())
20982+
.withMember(new ConsumerGroupMember.Builder(memberId2)
20983+
.setState(MemberState.STABLE)
20984+
.setMemberEpoch(10)
20985+
.setPreviousMemberEpoch(10)
20986+
.setClientId(DEFAULT_CLIENT_ID)
20987+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
20988+
.setRebalanceTimeoutMs(5000)
20989+
.setSubscribedTopicNames(List.of(fooTopicName))
20990+
.setServerAssignorName("range")
20991+
.setAssignedPartitions(mkAssignment(
20992+
mkTopicAssignment(fooTopicId, 3, 4, 5)))
20993+
.build())
20994+
.withAssignment(memberId1, mkAssignment(
20995+
mkTopicAssignment(fooTopicId, 0, 1, 2)))
20996+
.withAssignment(memberId2, mkAssignment(
20997+
mkTopicAssignment(fooTopicId, 3, 4, 5)))
20998+
.withAssignmentEpoch(10)
20999+
.withResolvedRegularExpression("foo*", new ResolvedRegularExpression(
21000+
Set.of(fooTopicName), 0L, 0L)))
21001+
.build();
21002+
ConsumerGroup group = context.groupMetadataManager.consumerGroup(groupId);
21003+
group.setMetadataRefreshDeadline(Long.MAX_VALUE, 10);
21004+
21005+
// Member 1 is replaced by a classic member with the same instance id.
21006+
JoinGroupRequestProtocolCollection joinProtocols = new JoinGroupRequestProtocolCollection();
21007+
joinProtocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
21008+
.setName("range")
21009+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(
21010+
List.of()
21011+
))))
21012+
);
21013+
JoinGroupRequestData joinRequest = new GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
21014+
.withGroupId(groupId)
21015+
.withMemberId(UNKNOWN_MEMBER_ID)
21016+
.withGroupInstanceId(instanceId)
21017+
.withRebalanceTimeoutMs(5000)
21018+
.withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
21019+
.withProtocols(joinProtocols)
21020+
.build();
21021+
GroupMetadataManagerTestContext.JoinResult result = context.sendClassicGroupJoin(joinRequest);
21022+
21023+
ConsumerGroupMember newMember1 = group.staticMember(instanceId);
21024+
21025+
ConsumerGroupMember expectedCopiedMember = new ConsumerGroupMember.Builder(newMember1.memberId())
21026+
.setState(MemberState.STABLE)
21027+
.setInstanceId(instanceId)
21028+
.setMemberEpoch(0)
21029+
.setPreviousMemberEpoch(0)
21030+
.setClientId(DEFAULT_CLIENT_ID)
21031+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
21032+
.setRebalanceTimeoutMs(5000)
21033+
.setSubscribedTopicRegex("foo*") // Still uses regex subscription.
21034+
.setServerAssignorName("range")
21035+
.setAssignedPartitions(mkAssignment(
21036+
mkTopicAssignment(fooTopicId, 0, 1, 2)))
21037+
.build();
21038+
21039+
ConsumerGroupMember expectedMember1 = new ConsumerGroupMember.Builder(newMember1.memberId())
21040+
.setState(MemberState.STABLE)
21041+
.setInstanceId(instanceId)
21042+
.setMemberEpoch(11)
21043+
.setPreviousMemberEpoch(0)
21044+
.setClientId(DEFAULT_CLIENT_ID)
21045+
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
21046+
.setRebalanceTimeoutMs(5000)
21047+
.setSubscribedTopicRegex("") // empty regex subscription
21048+
.setServerAssignorName("range")
21049+
.setAssignedPartitions(Map.of()) // empty assignment
21050+
.setClassicMemberMetadata(
21051+
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
21052+
.setSessionTimeoutMs(500)
21053+
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(joinRequest.protocols()))
21054+
)
21055+
.build();
21056+
21057+
List<List<CoordinatorRecord>> expectedRecords = List.of(
21058+
// The previous member is deleted.
21059+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId1)),
21060+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId1)),
21061+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId1)),
21062+
// The previous member is replaced by the new one.
21063+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedCopiedMember)),
21064+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, expectedCopiedMember.memberId(), expectedCopiedMember.assignedPartitions())),
21065+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedCopiedMember)),
21066+
// The member subscription is updated.
21067+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1)),
21068+
// The regex is tombstoned.
21069+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")),
21070+
// The group epoch is bumped.
21071+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of(
21072+
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
21073+
)))),
21074+
// The target assignment is updated.
21075+
List.of(
21076+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, expectedMember1.memberId(), Map.of()),
21077+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of())
21078+
),
21079+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 11)),
21080+
// The member assignment is updated.
21081+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedMember1))
21082+
);
21083+
21084+
assertUnorderedRecordsEquals(
21085+
expectedRecords,
21086+
result.records
21087+
);
21088+
}
21089+
2079321090
@Test
2079421091
public void testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() {
2079521092
String groupId = "fooup";

0 commit comments

Comments
 (0)