Skip to content

Commit 94aa925

Browse files
committed
Fix offsets not being committed bug
1 parent 524961e commit 94aa925

File tree

2 files changed

+8
-3
lines changed

2 files changed

+8
-3
lines changed

Confluent.Kafka.Dataflow/Confluent.Kafka.Dataflow.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
<PackageTags>kafka;confluent;dataflow</PackageTags>
2323
<RepositoryType>git</RepositoryType>
2424
<RepositoryUrl>https://github.com/kmcclellan/kafka-dataflow.git</RepositoryUrl>
25+
<IncludeSymbols>true</IncludeSymbols>
26+
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
2527
</PropertyGroup>
2628

2729
<ItemGroup>

Confluent.Kafka.Dataflow/Internal/ProducerBlockFactory.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,16 @@ public ITargetBlock<IEnumerable<TopicPartitionOffset>> GetOffsetTarget(
5656
return this.GetBatchTarget<TopicPartitionOffset>(
5757
offsets =>
5858
{
59-
var commitOffsets = new List<TopicPartitionOffset>();
59+
// Track the latest offset per partition.
60+
// (Client seems to unable to handle a large number of offsets).
61+
var commitOffsets = new Dictionary<TopicPartition, TopicPartitionOffset>();
6062
foreach (var offset in offsets)
6163
{
62-
commitOffsets.Add(new TopicPartitionOffset(offset.TopicPartition, offset.Offset + 1));
64+
commitOffsets[offset.TopicPartition] =
65+
new TopicPartitionOffset(offset.TopicPartition, offset.Offset + 1);
6366
}
6467

65-
this.producer.SendOffsetsToTransaction(commitOffsets, consumerGroup, Timeout.InfiniteTimeSpan);
68+
this.producer.SendOffsetsToTransaction(commitOffsets.Values, consumerGroup, Timeout.InfiniteTimeSpan);
6669
},
6770
options);
6871
}

0 commit comments

Comments
 (0)