Skip to content

Commit 07a4378

Browse files
authored
Iteration produce actor fix stable 25-3-1 (#29078)
2 parents 5fe73ea + 1efe521 commit 07a4378

File tree

1 file changed

+16
-10
lines changed

1 file changed

+16
-10
lines changed

ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,22 @@ void TKafkaProduceActor::CleanWriters(const TActorContext& ctx) {
108108
const auto earliestAllowedTs = ctx.Now() - WRITER_EXPIRATION_INTERVAL;
109109

110110
for (auto& [topicPath, partitionWriters] : NonTransactionalWriters) {
111-
for (auto it = partitionWriters.begin(); it != partitionWriters.end(); ++it) {
112-
if (it->second.LastAccessed < earliestAllowedTs) {
113-
CleanWriter({topicPath, it->first}, it->second.ActorId);
114-
partitionWriters.erase(it);
111+
auto itPartWriters = partitionWriters.begin();
112+
while (itPartWriters != partitionWriters.end()) {
113+
auto itCopy = itPartWriters++;
114+
if (itCopy->second.LastAccessed < earliestAllowedTs) {
115+
CleanWriter({topicPath, itCopy->first}, itCopy->second.ActorId);
116+
partitionWriters.erase(itCopy);
115117
}
116118
}
117119
}
118-
for (auto it = TransactionalWriters.begin(); it != TransactionalWriters.end(); ++it) {
119-
if (it->second.LastAccessed < earliestAllowedTs) {
120-
CleanWriter(it->first, it->second.ActorId);
121-
TransactionalWriters.erase(it);
120+
121+
auto itTransWriters = TransactionalWriters.begin();
122+
while (itTransWriters != TransactionalWriters.end()) {
123+
auto itCopy = itTransWriters++;
124+
if (itCopy->second.LastAccessed < earliestAllowedTs) {
125+
CleanWriter(itCopy->first, itCopy->second.ActorId);
126+
TransactionalWriters.erase(itCopy);
122127
}
123128
}
124129

@@ -184,10 +189,11 @@ void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TP
184189

185190
auto it = NonTransactionalWriters.find(path);
186191
if (it != NonTransactionalWriters.end()) {
187-
for(auto& [_, writer] : it->second) {
192+
auto itCopy = it++;
193+
for(auto& [_, writer] : itCopy->second) {
188194
Send(writer.ActorId, new TEvents::TEvPoison());
189195
}
190-
NonTransactionalWriters.erase(it);
196+
NonTransactionalWriters.erase(itCopy);
191197
}
192198
for (auto& [topicPartition, writer] : TransactionalWriters) {
193199
if (topicPartition.TopicPath == path) {

0 commit comments

Comments
 (0)