Skip to content

Commit 169e211

Browse files
authored
KAFKA-19892: Add acq lock timeout ms field to share ack response. (#20901)
* Add field `AcquisitionLockTImeoutMs` to `ShareAcknowledgeResponse` to communicate the timeout to the share consumer. * This becomes useful with addition of RENEW ack type in KIP-1222. * Tests have been updated. Reviewers: Andrew Schofield <[email protected]>
1 parent ab6755c commit 169e211

File tree

5 files changed

+20
-9
lines changed

5 files changed

+20
-9
lines changed

clients/src/main/java/org/apache/kafka/common/requests/ShareAcknowledgeResponse.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,13 @@ public static ShareAcknowledgeResponseData.PartitionData partitionResponse(int p
107107
public static ShareAcknowledgeResponse of(Errors error,
108108
int throttleTimeMs,
109109
LinkedHashMap<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> responseData,
110-
List<Node> nodeEndpoints) {
111-
return new ShareAcknowledgeResponse(toMessage(error, throttleTimeMs, responseData.entrySet().iterator(), nodeEndpoints));
110+
List<Node> nodeEndpoints, int acquisitionLockTimeout) {
111+
return new ShareAcknowledgeResponse(toMessage(error, throttleTimeMs, responseData.entrySet().iterator(), nodeEndpoints, acquisitionLockTimeout));
112112
}
113113

114114
public static ShareAcknowledgeResponseData toMessage(Errors error, int throttleTimeMs,
115115
Iterator<Map.Entry<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> partIterator,
116-
List<Node> nodeEndpoints) {
116+
List<Node> nodeEndpoints, int acquisitionLockTimeout) {
117117
ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponseCollection topicResponses = new ShareAcknowledgeResponseData.ShareAcknowledgeTopicResponseCollection();
118118
while (partIterator.hasNext()) {
119119
Map.Entry<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> entry = partIterator.next();
@@ -140,6 +140,7 @@ public static ShareAcknowledgeResponseData toMessage(Errors error, int throttleT
140140
.setRack(endpoint.rack())));
141141
return data.setThrottleTimeMs(throttleTimeMs)
142142
.setErrorCode(error.code())
143+
.setAcquisitionLockTimeoutMs(acquisitionLockTimeout)
143144
.setResponses(topicResponses);
144145
}
145146
}

clients/src/main/resources/common/message/ShareAcknowledgeResponse.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
"about": "The top level response error code." },
4444
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
4545
"about": "The top-level error message, or null if there was no error." },
46+
{ "name": "AcquisitionLockTimeoutMs", "type": "int32", "versions": "2+",
47+
"about": "The time in milliseconds for which the acquired records are locked." },
4648
{ "name": "Responses", "type": "[]ShareAcknowledgeTopicResponse", "versions": "0+",
4749
"about": "The response topics.", "fields": [
4850
{ "name": "TopicId", "type": "uuid", "versions": "0+", "mapKey": true,

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2626,25 +2626,25 @@ private ShareFetchResponse fullFetchResponse(TopicIdPartition tp,
26262626

26272627
private ShareAcknowledgeResponse emptyAcknowledgeResponse() {
26282628
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = Collections.emptyMap();
2629-
return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList());
2629+
return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList(), 0);
26302630
}
26312631

26322632
private ShareAcknowledgeResponse acknowledgeResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
26332633
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = Map.of(tp,
26342634
partitionDataForAcknowledge(tp, Errors.NONE));
2635-
return ShareAcknowledgeResponse.of(error, 0, new LinkedHashMap<>(partitions), Collections.emptyList());
2635+
return ShareAcknowledgeResponse.of(error, 0, new LinkedHashMap<>(partitions), Collections.emptyList(), 0);
26362636
}
26372637

26382638
private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition tp, Errors error) {
26392639
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = Map.of(tp,
26402640
partitionDataForAcknowledge(tp, error));
2641-
return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList());
2641+
return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList(), 0);
26422642
}
26432643

26442644
private ShareAcknowledgeResponse fullAcknowledgeResponse(Map<TopicIdPartition, Errors> partitionErrorsMap) {
26452645
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = new HashMap<>();
26462646
partitionErrorsMap.forEach((tip, error) -> partitions.put(tip, partitionDataForAcknowledge(tip, error)));
2647-
return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList());
2647+
return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList(), 0);
26482648
}
26492649

26502650
private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition tp,
@@ -2653,7 +2653,7 @@ private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition tp,
26532653
List<Node> nodeEndpoints) {
26542654
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = Map.of(tp,
26552655
partitionDataForAcknowledge(tp, error, currentLeader));
2656-
return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), nodeEndpoints);
2656+
return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), nodeEndpoints, 0);
26572657
}
26582658

26592659
private ShareFetchResponseData.PartitionData partitionDataForFetch(TopicIdPartition tp,

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4035,7 +4035,8 @@ class KafkaApis(val requestChannel: RequestChannel,
40354035
Errors.NONE,
40364036
0,
40374037
partitions,
4038-
nodeEndpoints.values.toList.asJava
4038+
nodeEndpoints.values.toList.asJava,
4039+
config.shareGroupConfig.shareGroupRecordLockDurationMs
40394040
)
40404041
}
40414042

core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
8787
val shareAcknowledgeResponse = IntegrationTestUtils.sendAndReceive[ShareAcknowledgeResponse](shareAcknowledgeRequest, socket)
8888

8989
assertEquals(Errors.UNSUPPORTED_VERSION.code, shareAcknowledgeResponse.data.errorCode)
90+
assertEquals(0, shareAcknowledgeResponse.data.acquisitionLockTimeoutMs)
9091
}
9192

9293
@ClusterTests(
@@ -505,6 +506,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
505506
.setErrorCode(Errors.NONE.code())
506507

507508
val acknowledgePartitionData = shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().get(0)
509+
assertEquals(30000, shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
508510
compareAcknowledgeResponsePartitions(expectedAcknowledgePartitionData, acknowledgePartitionData)
509511

510512
// Producing 10 more records to the topic
@@ -730,6 +732,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
730732
assertEquals(1, shareAcknowledgeResponseData.responses().size())
731733
assertEquals(topicId, shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
732734
assertEquals(1, shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
735+
assertEquals(30000, shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
733736

734737
val expectedAcknowledgePartitionData = new ShareAcknowledgeResponseData.PartitionData()
735738
.setPartitionIndex(PARTITION)
@@ -952,6 +955,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
952955
assertEquals(1, shareAcknowledgeResponseData.responses().size())
953956
assertEquals(topicId, shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
954957
assertEquals(1, shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
958+
assertEquals(30000, shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
955959

956960
val expectedAcknowledgePartitionData = new ShareAcknowledgeResponseData.PartitionData()
957961
.setPartitionIndex(PARTITION)
@@ -1183,6 +1187,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
11831187
assertEquals(1, shareAcknowledgeResponseData.responses().size())
11841188
assertEquals(topicId, shareAcknowledgeResponseData.responses().stream().findFirst().get().topicId())
11851189
assertEquals(1, shareAcknowledgeResponseData.responses().stream().findFirst().get().partitions().size())
1190+
assertEquals(30000, shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
11861191

11871192
var expectedAcknowledgePartitionData = new ShareAcknowledgeResponseData.PartitionData()
11881193
.setPartitionIndex(PARTITION)
@@ -1766,6 +1771,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
17661771

17671772
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
17681773
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode)
1774+
assertEquals(0, shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
17691775
}
17701776

17711777
@ClusterTests(
@@ -1913,6 +1919,7 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
19131919

19141920
val shareAcknowledgeResponseData = shareAcknowledgeResponse.data()
19151921
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, shareAcknowledgeResponseData.errorCode)
1922+
assertEquals(0, shareAcknowledgeResponseData.acquisitionLockTimeoutMs())
19161923
}
19171924

19181925
@ClusterTests(

0 commit comments

Comments
 (0)