Skip to content

Commit 4a8ad8e

Browse files
authored
KAFKA-19781: Fix to not update positions for partitions being revoked (#20914)
Fix to avoid initializing positions for partitions being revoked, as it is unneeded (we do not allow fetching from partitions being revoked), and could lead to NoOffsetForPartitionException on a partition that is already being revoked (this is confusing for applications). This is the behaviour we already had for fetch, just applying it to update positions to align. This gap was surfaced on edge cases of partitions being assigned and revoked right away. Reviewers: Andrew Schofield <[email protected]>, Lucas Brutschy <[email protected]>
1 parent 679854c commit 4a8ad8e

File tree

2 files changed

+15
-4
lines changed

2 files changed

+15
-4
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,14 +1202,16 @@ private void resume() {
12021202
}
12031203

12041204
/**
1205-
* True if the partition is in {@link FetchStates#INITIALIZING} state. While in this
1206-
* state, a position for the partition can be retrieved (based on committed offsets or
1205+
* Check if we need to retrieve a fetch position for the given partition.
1206+
* True if the partition state is {@link FetchStates#INITIALIZING}, and the partition is not being revoked.
1207+
* <p/>
1208+
* While in this state, a position for the partition will be retrieved (based on committed offsets or
12071209
* partitions offsets).
12081210
* Note that retrieving a position does not mean that we can start fetching from the
12091211
* partition (see {@link #isFetchable()})
12101212
*/
12111213
private boolean shouldInitialize() {
1212-
return fetchState.equals(FetchStates.INITIALIZING);
1214+
return fetchState.equals(FetchStates.INITIALIZING) && !pendingRevocation;
12131215
}
12141216

12151217
private boolean isFetchable() {

clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,15 +306,24 @@ public void partitionPause() {
306306
}
307307

308308
@Test
309-
public void testMarkingPartitionPending() {
309+
public void testMarkingPendingRevocation() {
310310
state.assignFromUser(Set.of(tp0));
311311
state.seek(tp0, 100);
312312
assertTrue(state.isFetchable(tp0));
313+
assertFalse(state.isPaused(tp0));
313314
state.markPendingRevocation(Set.of(tp0));
314315
assertFalse(state.isFetchable(tp0));
315316
assertFalse(state.isPaused(tp0));
316317
}
317318

319+
@Test
320+
public void testMarkingPendingRevocationPreventsInitializingPosition() {
321+
state.assignFromUser(Set.of(tp0));
322+
assertTrue(state.initializingPartitions().contains(tp0));
323+
state.markPendingRevocation(Set.of(tp0));
324+
assertFalse(state.initializingPartitions().contains(tp0));
325+
}
326+
318327
@Test
319328
public void testAssignedPartitionsAwaitingCallbackKeepPositionDefinedInCallback() {
320329
// New partition assigned. Should not be fetchable or initializing positions.

0 commit comments

Comments
 (0)