Skip to content

Commit 268c926

Browse files
author
Kyle McClellan
committed
Fix heartbeat erroneous state
* Withstands reassignment better
1 parent 45054d4 commit 268c926

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

Confluent.Kafka.Dataflow/Internal/ConsumerBlockFactory.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,13 @@ public async Task<TopicPartitionOffset> Consume(CancellationToken cancellationTo
128128
var task = this.buffer.SendAsync(kvp, cancellationToken);
129129
while (await Task.WhenAny(task, Task.Delay(HEARTBEAT_INTERVAL, cancellationToken)) != task)
130130
{
131-
// Reconsume to stay in consumer group.
132-
this.consumer.Seek(result.TopicPartitionOffset);
133-
this.consumer.Consume(cancellationToken);
131+
// Poll for message to stay in consumer group.
132+
var next = this.consumer.Consume(TimeSpan.Zero);
133+
if (next != null)
134+
{
135+
// Go back to where we were.
136+
this.consumer.Seek(next.TopicPartitionOffset);
137+
}
134138
}
135139

136140
if (!task.Result)

0 commit comments

Comments
 (0)