Skip to content

Commit ab6755c

Browse files
KAFKA-19815: Implement acquisitionLockTimeoutMs (#20895)
Implement KafkaShareConsumer.acquisitionLockTimeoutMs as part of KIP-1222. Reviewers: Apoorv Mittal <[email protected]>
1 parent 8461ae3 commit ab6755c

File tree

10 files changed

+104
-36
lines changed

10 files changed

+104
-36
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,10 @@ public void testSubscriptionAndPoll() {
260260
producer.send(record);
261261
producer.flush();
262262
shareConsumer.subscribe(Set.of(tp.topic()));
263+
assertEquals(Optional.empty(), shareConsumer.acquisitionLockTimeoutMs());
263264
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
264265
assertEquals(1, records.count());
266+
assertEquals(Optional.of(15000), shareConsumer.acquisitionLockTimeoutMs());
265267
verifyShareGroupStateTopicRecordsProduced();
266268
}
267269
}
@@ -276,8 +278,10 @@ public void testSubscriptionAndPollMultiple() {
276278
producer.send(record);
277279
producer.flush();
278280
shareConsumer.subscribe(Set.of(tp.topic()));
281+
assertEquals(Optional.empty(), shareConsumer.acquisitionLockTimeoutMs());
279282
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
280283
assertEquals(1, records.count());
284+
assertEquals(Optional.of(15000), shareConsumer.acquisitionLockTimeoutMs());
281285
producer.send(record);
282286
records = shareConsumer.poll(Duration.ofMillis(5000));
283287
assertEquals(1, records.count());
@@ -323,11 +327,14 @@ public void testAcknowledgementSentOnSubscriptionChange() throws ExecutionExcept
323327
shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
324328

325329
shareConsumer.subscribe(Set.of(tp.topic()));
330+
assertEquals(Optional.empty(), shareConsumer.acquisitionLockTimeoutMs());
326331

327332
TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
328333
DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for share consumer");
329334

335+
assertEquals(Optional.of(15000), shareConsumer.acquisitionLockTimeoutMs());
330336
shareConsumer.subscribe(Set.of(tp2.topic()));
337+
assertEquals(Optional.of(15000), shareConsumer.acquisitionLockTimeoutMs());
331338

332339
// Waiting for heartbeat to propagate the subscription change.
333340
TestUtils.waitForCondition(() -> {

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class ShareCompletedFetch {
6060
final int nodeId;
6161
final TopicIdPartition partition;
6262
final ShareFetchResponseData.PartitionData partitionData;
63+
final Optional<Integer> acquisitionLockTimeoutMs;
6364
final short requestVersion;
6465

6566
private final Logger log;
@@ -77,21 +78,23 @@ public class ShareCompletedFetch {
7778
private final List<OffsetAndDeliveryCount> acquiredRecordList;
7879
private ListIterator<OffsetAndDeliveryCount> acquiredRecordIterator;
7980
private OffsetAndDeliveryCount nextAcquired;
80-
private final ShareFetchMetricsAggregator metricAggregator;
81+
private final ShareFetchMetricsAggregator metricsAggregator;
8182

8283
ShareCompletedFetch(final LogContext logContext,
8384
final BufferSupplier decompressionBufferSupplier,
8485
final int nodeId,
8586
final TopicIdPartition partition,
8687
final ShareFetchResponseData.PartitionData partitionData,
87-
final ShareFetchMetricsAggregator metricAggregator,
88+
final Optional<Integer> acquisitionLockTimeoutMs,
89+
final ShareFetchMetricsAggregator metricsAggregator,
8890
final short requestVersion) {
8991
this.log = logContext.logger(org.apache.kafka.clients.consumer.internals.ShareCompletedFetch.class);
9092
this.decompressionBufferSupplier = decompressionBufferSupplier;
9193
this.nodeId = nodeId;
9294
this.partition = partition;
9395
this.partitionData = partitionData;
94-
this.metricAggregator = metricAggregator;
96+
this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
97+
this.metricsAggregator = metricsAggregator;
9598
this.requestVersion = requestVersion;
9699
this.batches = ShareFetchResponse.recordsOrFail(partitionData).batches().iterator();
97100
this.acquiredRecordList = buildAcquiredRecordList(partitionData.acquiredRecords());
@@ -154,7 +157,7 @@ void drain() {
154157
* and number of records parsed. After all partitions have reported, we write the metric.
155158
*/
156159
void recordAggregatedMetrics(int bytes, int records) {
157-
metricAggregator.record(partition.topicPartition(), bytes, records);
160+
metricsAggregator.record(partition.topicPartition(), bytes, records);
158161
}
159162

160163
/**
@@ -173,7 +176,7 @@ <K, V> ShareInFlightBatch<K, V> fetchRecords(final Deserializers<K, V> deseriali
173176
final int maxRecords,
174177
final boolean checkCrcs) {
175178
// Creating an empty ShareInFlightBatch
176-
ShareInFlightBatch<K, V> inFlightBatch = new ShareInFlightBatch<>(nodeId, partition);
179+
ShareInFlightBatch<K, V> inFlightBatch = new ShareInFlightBatch<>(nodeId, partition, acquisitionLockTimeoutMs);
177180

178181
if (cachedBatchException != null) {
179182
// If the event that a CRC check fails, reject the entire record batch because it is corrupt.
@@ -318,13 +321,13 @@ <K, V> ConsumerRecord<K, V> parseRecord(final Deserializers<K, V> deserializers,
318321
try {
319322
key = keyBytes == null ? null : deserializers.keyDeserializer().deserialize(partition.topic(), headers, keyBytes);
320323
} catch (RuntimeException e) {
321-
log.error("Key Deserializers with error: {}", deserializers);
324+
log.error("Key deserializers with error: {}", deserializers);
322325
throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY, partition.topicPartition(), timestampType, record, e, headers);
323326
}
324327
try {
325328
value = valueBytes == null ? null : deserializers.valueDeserializer().deserialize(partition.topic(), headers, valueBytes);
326329
} catch (RuntimeException e) {
327-
log.error("Value Deserializers with error: {}", deserializers);
330+
log.error("Value deserializers with error: {}", deserializers);
328331
throw newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, partition.topicPartition(), timestampType, record, e, headers);
329332
}
330333
return new ConsumerRecord<>(partition.topic(), partition.partition(), record.offset(),

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -821,6 +821,8 @@ private void handleShareFetchSuccess(Node fetchTarget,
821821
fetchTarget.id(),
822822
tip,
823823
partitionData,
824+
response.data().acquisitionLockTimeoutMs() > 0
825+
? Optional.of(response.data().acquisitionLockTimeoutMs()) : Optional.empty(),
824826
shareFetchMetricsAggregator,
825827
requestVersion)
826828
);

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -892,8 +892,12 @@ public Uuid clientInstanceId(final Duration timeout) {
892892
*/
893893
@Override
894894
public Optional<Integer> acquisitionLockTimeoutMs() {
895-
// To be implemented
896-
return Optional.empty();
895+
acquireAndEnsureOpen();
896+
try {
897+
return currentFetch.acquisitionLockTimeoutMs();
898+
} finally {
899+
release();
900+
}
897901
}
898902

899903
/**

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.Objects;
33+
import java.util.Optional;
3334

3435
/**
3536
* {@link ShareFetch} represents the records fetched from the broker to be returned to the consumer
@@ -41,13 +42,15 @@
4142
*/
4243
public class ShareFetch<K, V> {
4344
private final Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches;
45+
private Optional<Integer> acquisitionLockTimeoutMs;
4446

4547
public static <K, V> ShareFetch<K, V> empty() {
46-
return new ShareFetch<>(new HashMap<>());
48+
return new ShareFetch<>(new HashMap<>(), Optional.empty());
4749
}
4850

49-
private ShareFetch(Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches) {
51+
private ShareFetch(Map<TopicIdPartition, ShareInFlightBatch<K, V>> batches, Optional<Integer> acquisitionLockTimeoutMs) {
5052
this.batches = batches;
53+
this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
5154
}
5255

5356
/**
@@ -67,6 +70,9 @@ public void add(TopicIdPartition partition, ShareInFlightBatch<K, V> batch) {
6770
// but it might conceivably happen in some rare cases (such as partition leader changes).
6871
currentBatch.merge(batch);
6972
}
73+
if (batch.getAcquisitionLockTimeoutMs().isPresent()) {
74+
acquisitionLockTimeoutMs = batch.getAcquisitionLockTimeoutMs();
75+
}
7076
}
7177

7278
/**
@@ -108,6 +114,13 @@ public boolean isEmpty() {
108114
return numRecords() == 0;
109115
}
110116

117+
/**
118+
* @return The most up-to-date value of acquisition lock timeout, if available
119+
*/
120+
public Optional<Integer> acquisitionLockTimeoutMs() {
121+
return acquisitionLockTimeoutMs;
122+
}
123+
111124
/**
112125
* @return {@code true} if this fetch contains records being renewed
113126
*/

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Optional;
2728
import java.util.Set;
2829
import java.util.TreeMap;
2930
import java.util.TreeSet;
@@ -36,16 +37,18 @@ public class ShareInFlightBatch<K, V> {
3637
private Map<Long, ConsumerRecord<K, V>> renewedRecords;
3738
private final Set<Long> acknowledgedRecords;
3839
private Acknowledgements acknowledgements;
40+
private final Optional<Integer> acquisitionLockTimeoutMs;
3941
private ShareInFlightBatchException exception;
4042
private boolean hasCachedException = false;
4143
private boolean checkForRenewAcknowledgements = false;
4244

43-
public ShareInFlightBatch(int nodeId, TopicIdPartition partition) {
45+
public ShareInFlightBatch(int nodeId, TopicIdPartition partition, Optional<Integer> acquisitionLockTimeoutMs) {
4446
this.nodeId = nodeId;
4547
this.partition = partition;
4648
this.inFlightRecords = new TreeMap<>();
4749
this.acknowledgedRecords = new TreeSet<>();
4850
this.acknowledgements = Acknowledgements.empty();
51+
this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
4952
}
5053

5154
public void addAcknowledgement(long offset, AcknowledgeType type) {
@@ -182,6 +185,10 @@ Acknowledgements getAcknowledgements() {
182185
return acknowledgements;
183186
}
184187

188+
Optional<Integer> getAcquisitionLockTimeoutMs() {
189+
return acquisitionLockTimeoutMs;
190+
}
191+
185192
public boolean isEmpty() {
186193
return inFlightRecords.isEmpty() && acknowledgements.isEmpty();
187194
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
public class ShareCompletedFetchTest {
6666
private static final String TOPIC_NAME = "test";
6767
private static final TopicIdPartition TIP = new TopicIdPartition(Uuid.randomUuid(), 0, TOPIC_NAME);
68+
private static final Optional<Integer> DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS = Optional.of(30000);
6869
private static final long PRODUCER_ID = 1000L;
6970
private static final short PRODUCER_EPOCH = 0;
7071

@@ -89,6 +90,7 @@ public void testSimple() {
8990
assertEquals(Optional.of((short) 1), record.deliveryCount());
9091
Acknowledgements acknowledgements = batch.getAcknowledgements();
9192
assertEquals(0, acknowledgements.size());
93+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
9294

9395
batch = completedFetch.fetchRecords(deserializers, 10, true);
9496
records = batch.getInFlightRecords();
@@ -98,12 +100,14 @@ record = records.get(0);
98100
assertEquals(Optional.of((short) 1), record.deliveryCount());
99101
acknowledgements = batch.getAcknowledgements();
100102
assertEquals(0, acknowledgements.size());
103+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
101104

102105
batch = completedFetch.fetchRecords(deserializers, 10, true);
103106
records = batch.getInFlightRecords();
104107
assertEquals(0, records.size());
105108
acknowledgements = batch.getAcknowledgements();
106109
assertEquals(0, acknowledgements.size());
110+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
107111
}
108112

109113
@Test
@@ -126,12 +130,14 @@ public void testSoftMaxPollRecordLimit() {
126130
assertEquals(Optional.of((short) 1), record.deliveryCount());
127131
Acknowledgements acknowledgements = batch.getAcknowledgements();
128132
assertEquals(0, acknowledgements.size());
133+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
129134

130135
batch = completedFetch.fetchRecords(deserializers, 10, true);
131136
records = batch.getInFlightRecords();
132137
assertEquals(0, records.size());
133138
acknowledgements = batch.getAcknowledgements();
134139
assertEquals(0, acknowledgements.size());
140+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
135141
}
136142

137143
@Test
@@ -154,12 +160,14 @@ public void testUnaligned() {
154160
assertEquals(Optional.of((short) 1), record.deliveryCount());
155161
Acknowledgements acknowledgements = batch.getAcknowledgements();
156162
assertEquals(0, acknowledgements.size());
163+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
157164

158165
batch = completedFetch.fetchRecords(deserializers, 10, true);
159166
records = batch.getInFlightRecords();
160167
assertEquals(0, records.size());
161168
acknowledgements = batch.getAcknowledgements();
162169
assertEquals(0, acknowledgements.size());
170+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
163171
}
164172

165173
@Test
@@ -177,6 +185,7 @@ public void testCommittedTransactionRecordsIncluded() {
177185
assertEquals(10, records.size());
178186
Acknowledgements acknowledgements = batch.getAcknowledgements();
179187
assertEquals(0, acknowledgements.size());
188+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
180189
}
181190
}
182191

@@ -195,6 +204,7 @@ public void testNegativeFetchCount() {
195204
assertEquals(0, records.size());
196205
Acknowledgements acknowledgements = batch.getAcknowledgements();
197206
assertEquals(0, acknowledgements.size());
207+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
198208
}
199209
}
200210

@@ -210,6 +220,7 @@ public void testNoRecordsInFetch() {
210220
assertEquals(0, records.size());
211221
Acknowledgements acknowledgements = batch.getAcknowledgements();
212222
assertEquals(0, acknowledgements.size());
223+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
213224
}
214225
}
215226

@@ -263,6 +274,7 @@ public void testCorruptedMessage() {
263274
acknowledgements = batch.getAcknowledgements();
264275
assertEquals(1, acknowledgements.size());
265276
assertEquals(AcknowledgeType.RELEASE, acknowledgements.get(1L));
277+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
266278

267279
// Record 2 then results in an empty batch, because record 1 has now been skipped
268280
batch = completedFetch.fetchRecords(deserializers, 10, false);
@@ -280,6 +292,7 @@ public void testCorruptedMessage() {
280292
acknowledgements = batch.getAcknowledgements();
281293
assertEquals(1, acknowledgements.size());
282294
assertEquals(AcknowledgeType.RELEASE, acknowledgements.get(2L));
295+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
283296

284297
// Record 3 is returned in the next batch, because record 2 has now been skipped
285298
batch = completedFetch.fetchRecords(deserializers, 10, false);
@@ -289,6 +302,7 @@ public void testCorruptedMessage() {
289302
assertEquals(3L, fetchedRecords.get(0).offset());
290303
acknowledgements = batch.getAcknowledgements();
291304
assertEquals(0, acknowledgements.size());
305+
assertEquals(DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS, batch.getAcquisitionLockTimeoutMs());
292306
}
293307
}
294308
}
@@ -428,6 +442,7 @@ private ShareCompletedFetch newShareCompletedFetch(ShareFetchResponseData.Partit
428442
0,
429443
TIP,
430444
partitionData,
445+
DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS,
431446
shareFetchMetricsAggregator,
432447
ApiKeys.SHARE_FETCH.latestVersion());
433448
}

0 commit comments

Comments
 (0)