Skip to content

Commit 6dd09f0

Browse files
Entries remain in the TxWrites after the immediate transaction (#29285) (#29373)
2 parents 98d3da2 + 548706b commit 6dd09f0

File tree

3 files changed

+93
-15
lines changed

3 files changed

+93
-15
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3322,7 +3322,7 @@ void TPersQueue::ScheduleDeleteExpiredKafkaTransactions() {
33223322
for (auto& pair : TxWrites) {
33233323
if (txnExpired(pair.second)) {
33243324
PQ_LOG_D("Transaction for Kafka producer " << pair.first.KafkaProducerInstanceId << " is expired");
3325-
BeginDeletePartitions(pair.second);
3325+
BeginDeletePartitions(pair.first, pair.second);
33263326
}
33273327
}
33283328
}
@@ -5420,7 +5420,7 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
54205420
}
54215421

54225422
PQ_LOG_TX_I("delete partitions for WriteId " << writeId << " (longTxService lost tx)");
5423-
BeginDeletePartitions(writeInfo);
5423+
BeginDeletePartitions(writeId, writeInfo);
54245424
}
54255425

54265426
void TPersQueue::Handle(TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx)
@@ -5470,6 +5470,14 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
54705470
DeletePartition(partitionId, ctx);
54715471

54725472
writeInfo.Partitions.erase(partitionId.OriginalPartitionId);
5473+
TryDeleteWriteId(writeId, writeInfo, ctx);
5474+
TxWritesChanged = true;
5475+
5476+
TryWriteTxs(ctx);
5477+
}
5478+
5479+
void TPersQueue::TryDeleteWriteId(const TWriteId& writeId, const TTxWriteInfo& writeInfo, const TActorContext& ctx)
5480+
{
54735481
if (writeInfo.Partitions.empty()) {
54745482
if (!writeInfo.KafkaTransaction) {
54755483
UnsubscribeWriteId(writeId, ctx);
@@ -5481,14 +5489,15 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
54815489
(tx->State == NKikimrPQ::TTransaction::CANCELED)) {
54825490
TryExecuteTxs(ctx, *tx);
54835491
}
5492+
} else {
5493+
// if the transaction is not in Txs, then it is an immediate transaction
5494+
DeleteWriteId(writeId);
54845495
}
5485-
} else if (writeInfo.KafkaTransaction) { // case when kafka transaction haven't even started in KQP, but data for it was already written in partition
5496+
} else {
5497+
// this is kafka transaction or immediate transaction
54865498
DeleteWriteId(writeId);
54875499
}
54885500
}
5489-
TxWritesChanged = true;
5490-
5491-
TryWriteTxs(ctx);
54925501
}
54935502

54945503
void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext&)
@@ -5508,20 +5517,24 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo
55085517
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
55095518
Y_ABORT_UNLESS(writeInfo.Partitions.size() == 1);
55105519

5511-
BeginDeletePartitions(writeInfo);
5520+
BeginDeletePartitions(writeId, writeInfo);
55125521
}
55135522

5514-
void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
5523+
void TPersQueue::BeginDeletePartitions(const TWriteId& writeId, TTxWriteInfo& writeInfo)
55155524
{
55165525
if (writeInfo.Deleting) {
55175526
PQ_LOG_TX_D("Already deleting WriteInfo");
55185527
return;
55195528
}
5520-
for (auto& [_, partitionId] : writeInfo.Partitions) {
5521-
Y_ABORT_UNLESS(Partitions.contains(partitionId));
5522-
const TPartitionInfo& partition = Partitions.at(partitionId);
5523-
PQ_LOG_TX_D("send TEvPQ::TEvDeletePartition to partition " << partitionId);
5524-
Send(partition.Actor, new TEvPQ::TEvDeletePartition);
5529+
if (writeInfo.Partitions.empty()) {
5530+
TryDeleteWriteId(writeId, writeInfo, ActorContext());
5531+
} else {
5532+
for (auto& [_, partitionId] : writeInfo.Partitions) {
5533+
Y_ABORT_UNLESS(Partitions.contains(partitionId));
5534+
const TPartitionInfo& partition = Partitions.at(partitionId);
5535+
PQ_LOG_TX_D("send TEvPQ::TEvDeletePartition to partition " << partitionId);
5536+
Send(partition.Actor, new TEvPQ::TEvDeletePartition);
5537+
}
55255538
}
55265539
writeInfo.Deleting = true;
55275540
}
@@ -5533,7 +5546,7 @@ void TPersQueue::BeginDeletePartitions(const TDistributedTransaction& tx)
55335546
}
55345547

55355548
TTxWriteInfo& writeInfo = TxWrites.at(*tx.WriteId);
5536-
BeginDeletePartitions(writeInfo);
5549+
BeginDeletePartitions(*tx.WriteId, writeInfo);
55375550
}
55385551

55395552
TString TPersQueue::LogPrefix() const {

ydb/core/persqueue/pq_impl.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
547547
void Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx);
548548
void Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext& ctx);
549549

550-
void BeginDeletePartitions(TTxWriteInfo& writeInfo);
550+
void BeginDeletePartitions(const TWriteId& writeId, TTxWriteInfo& writeInfo);
551551
void BeginDeletePartitions(const TDistributedTransaction& tx);
552552

553553
bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
@@ -580,6 +580,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
580580

581581
bool AllSupportivePartitionsHaveBeenDeleted(const TMaybe<TWriteId>& writeId) const;
582582
void DeleteWriteId(const TMaybe<TWriteId>& writeId);
583+
void TryDeleteWriteId(const TWriteId& writeId, const TTxWriteInfo& writeInfo, const TActorContext& ctx);
583584

584585
void UpdateReadRuleGenerations(NKikimrPQ::TPQTabletConfig& cfg) const;
585586

ydb/public/sdk/cpp/src/client/topic/ut/topic_to_table_ut.cpp

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ class TFixture : public NUnitTest::TBaseFixture {
158158

159159
void DeleteSupportivePartition(const TString& topicName,
160160
ui32 partition);
161+
void WaitForTheTabletToDeleteTheWriteInfo(const std::string& topicName,
162+
std::uint32_t partition);
161163

162164
struct TTableRecord {
163165
TTableRecord() = default;
@@ -1741,6 +1743,42 @@ void TFixture::SendLongTxLockStatus(const TActorId& actorId,
17411743
runtime.SendToPipe(tabletId, actorId, event.release());
17421744
}
17431745

1746+
void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const std::string& topicName,
1747+
std::uint32_t partition)
1748+
{
1749+
auto& runtime = Setup->GetRuntime();
1750+
NActors::TActorId edge = runtime.AllocateEdgeActor();
1751+
std::uint64_t tabletId = GetTopicTabletId(edge, "/Root/" + topicName, partition);
1752+
1753+
for (int i = 0; i < 20; ++i) {
1754+
auto request = std::make_unique<NKikimr::TEvKeyValue::TEvRequest>();
1755+
request->Record.SetCookie(12345);
1756+
request->Record.AddCmdRead()->SetKey("_txinfo");
1757+
1758+
auto& runtime = Setup->GetRuntime();
1759+
1760+
runtime.SendToPipe(tabletId, edge, request.release());
1761+
auto response = runtime.GrabEdgeEvent<NKikimr::TEvKeyValue::TEvResponse>();
1762+
1763+
UNIT_ASSERT(response->Record.HasCookie());
1764+
UNIT_ASSERT_VALUES_EQUAL(response->Record.GetCookie(), 12345);
1765+
UNIT_ASSERT_VALUES_EQUAL(response->Record.ReadResultSize(), 1);
1766+
1767+
auto& read = response->Record.GetReadResult(0);
1768+
1769+
NKikimrPQ::TTabletTxInfo info;
1770+
UNIT_ASSERT(info.ParseFromString(read.GetValue()));
1771+
1772+
if (info.TxWritesSize() == 0) {
1773+
return;
1774+
}
1775+
1776+
Sleep(TDuration::MilliSeconds(100));
1777+
}
1778+
1779+
UNIT_FAIL("TTabletTxInfo.TxWrites is expected to be empty");
1780+
}
1781+
17441782
void TFixture::WaitForTheTabletToDeleteTheWriteInfo(const TActorId& actorId,
17451783
ui64 tabletId,
17461784
const NPQ::TWriteId& writeId)
@@ -2301,6 +2339,32 @@ size_t TFixture::GetTableRecordsCount(const TString& tablePath)
23012339
return parser.ColumnParser(0).GetUint64();
23022340
}
23032341

2342+
Y_UNIT_TEST_F(The_TxWriteInfo_Is_Deleted_After_The_Immediate_Transaction, TFixture)
2343+
{
2344+
CreateTopic("topic_A");
2345+
2346+
NTable::TSession session = CreateTableSession();
2347+
2348+
NTable::TTransaction tx = BeginTx(session);
2349+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx);
2350+
RestartPQTablet("topic_A", 0);
2351+
CommitTx(tx, EStatus::SUCCESS);
2352+
2353+
tx = BeginTx(session);
2354+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #2", &tx);
2355+
RestartPQTablet("topic_A", 0);
2356+
CommitTx(tx, EStatus::SUCCESS);
2357+
2358+
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2));
2359+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
2360+
UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1");
2361+
UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2");
2362+
2363+
CheckTabletKeys("topic_A");
2364+
2365+
WaitForTheTabletToDeleteTheWriteInfo("topic_A", 0);
2366+
}
2367+
23042368
Y_UNIT_TEST_F(WriteToTopic_Demo_24, TFixture)
23052369
{
23062370
TestWriteToTopic24();

0 commit comments

Comments
 (0)