Skip to content

Commit 0f345d3

Browse files
authored
25-3-1: Fix datashards losing lock conflicts during in-memory migration (#29152)
2 parents 46c6634 + 1ae0bfc commit 0f345d3

File tree

4 files changed

+386
-68
lines changed

4 files changed

+386
-68
lines changed

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5282,6 +5282,247 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {
52825282
"ERROR: ABORTED");
52835283
}
52845284

5285+
Y_UNIT_TEST(ReadLockConflictUncommittedRemoveMigration) {
5286+
TPortManager pm;
5287+
TServerSettings serverSettings(pm.GetPort(2134));
5288+
serverSettings.SetDomainName("Root")
5289+
.SetNodeCount(1)
5290+
.SetUseRealThreads(false);
5291+
5292+
// This test requires in-memory state migration to work
5293+
serverSettings.FeatureFlags.SetEnableDataShardInMemoryStateMigration(true);
5294+
5295+
TServer::TPtr server = new TServer(serverSettings);
5296+
5297+
auto& runtime = *server->GetRuntime();
5298+
auto sender = runtime.AllocateEdgeActor();
5299+
5300+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
5301+
5302+
InitRoot(server, sender);
5303+
5304+
TDisableDataShardLogBatching disableDataShardLogBatching;
5305+
5306+
UNIT_ASSERT_VALUES_EQUAL(
5307+
KqpSchemeExec(runtime, R"(
5308+
CREATE TABLE `/Root/table` (key int, index int, value int, PRIMARY KEY (key, index))
5309+
WITH (PARTITION_AT_KEYS = (10));
5310+
)"),
5311+
"SUCCESS"
5312+
);
5313+
5314+
const auto shards = GetTableShards(server, sender, "/Root/table");
5315+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
5316+
5317+
// Populate tables with initial values
5318+
ExecSQL(server, sender, R"(
5319+
UPSERT INTO `/Root/table` (key, index, value) VALUES
5320+
(1, 0, 1001),
5321+
(1, 1, 1002),
5322+
(11, 0, 2001);
5323+
)");
5324+
5325+
ui64 tx1lockId = 0;
5326+
ui32 tx1lockNode = 0;
5327+
auto shard1actor = ResolveTablet(runtime, shards.at(0));
5328+
auto lockStatusObserver = runtime.AddObserver<TEvLongTxService::TEvLockStatus>(
5329+
[&](auto& ev) {
5330+
if (ev->GetRecipientRewrite() == shard1actor) {
5331+
auto* msg = ev->Get();
5332+
tx1lockId = msg->Record.GetLockId();
5333+
tx1lockNode = msg->Record.GetLockNode();
5334+
}
5335+
});
5336+
5337+
// Begin the 1st transaction appending to key 1
5338+
TString session1, tx1;
5339+
UNIT_ASSERT_VALUES_EQUAL(
5340+
KqpSimpleBegin(runtime, session1, tx1, R"(
5341+
SELECT key, index, value FROM `/Root/table` WHERE key = 1 ORDER BY key, index;
5342+
UPSERT INTO `/Root/table` (key, index, value) VALUES (1, 2, 3003);
5343+
SELECT key, index, value FROM `/Root/table` WHERE key = 11 ORDER BY key, index;
5344+
)"),
5345+
"{ items { int32_value: 1 } items { int32_value: 0 } items { int32_value: 1001 } }, "
5346+
"{ items { int32_value: 1 } items { int32_value: 1 } items { int32_value: 1002 } }\n"
5347+
"{ items { int32_value: 11 } items { int32_value: 0 } items { int32_value: 2001 } }");
5348+
5349+
runtime.WaitFor("subscribed lock", [&]{ return tx1lockId != 0; });
5350+
lockStatusObserver.Remove();
5351+
5352+
// Begin the 2nd transaction reading key 1
5353+
TString session2, tx2;
5354+
UNIT_ASSERT_VALUES_EQUAL(
5355+
KqpSimpleBegin(runtime, session2, tx2, R"(
5356+
SELECT key, index, value FROM `/Root/table` WHERE key = 1 ORDER BY key, index;
5357+
)"),
5358+
"{ items { int32_value: 1 } items { int32_value: 0 } items { int32_value: 1001 } }, "
5359+
"{ items { int32_value: 1 } items { int32_value: 1 } items { int32_value: 1002 } }");
5360+
5361+
// Make sure everything settles down
5362+
runtime.SimulateSleep(TDuration::Seconds(1));
5363+
5364+
// Note: originally a graceful restart supposedly caused shards to send
5365+
// restart notifications, which caused an internal error in kqp, which
5366+
// aborted the commit and removed the lock. We send an unavailable lock
5367+
// status here with a blocked and failing commit, which covers a broader
5368+
// range of failures which could happen in practice.
5369+
TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits(runtime, [&](auto& ev) {
5370+
auto* msg = ev->Get();
5371+
if (msg->Id.Channel() == 0 && msg->Id.TabletID() == shards.at(0)) {
5372+
Cerr << "... blocking commit " << msg->Id << Endl;
5373+
return true;
5374+
}
5375+
return false;
5376+
});
5377+
runtime.Send(
5378+
new IEventHandle(shard1actor, {},
5379+
new TEvLongTxService::TEvLockStatus(tx1lockId, tx1lockNode, NKikimrLongTxService::TEvLockStatus::STATUS_UNAVAILABLE)),
5380+
0, true);
5381+
runtime.WaitFor("blocked commits", [&]{ return blockedCommits.size() >= 1; });
5382+
5383+
// Stop blocking commits and make them fail (will cause shards to restart)
5384+
blockedCommits.Stop();
5385+
for (auto& ev : blockedCommits) {
5386+
auto proxy = ev->Recipient;
5387+
ui32 groupId = GroupIDFromBlobStorageProxyID(proxy);
5388+
auto response = ev->Get()->MakeErrorResponse(NKikimrProto::ERROR, "Something went wrong", TGroupId::FromValue(groupId));
5389+
runtime.Send(new IEventHandle(ev->Sender, proxy, response.release()), 0, true);
5390+
}
5391+
runtime.SimulateSleep(TDuration::Seconds(1));
5392+
5393+
// Commit the 1st transaction
5394+
UNIT_ASSERT_VALUES_EQUAL(
5395+
KqpSimpleCommit(runtime, session1, tx1, R"(SELECT 1)"),
5396+
"{ items { int32_value: 1 } }");
5397+
5398+
// Commit the 2nd transaction writing to the same index
5399+
UNIT_ASSERT_VALUES_EQUAL(
5400+
KqpSimpleCommit(runtime, session2, tx2, R"(
5401+
UPSERT INTO `/Root/table` (key, index, value) VALUES (1, 2, 4003);
5402+
)"),
5403+
"ERROR: ABORTED");
5404+
}
5405+
5406+
Y_UNIT_TEST(ReadLockConflictUncommittedBreakMigration) {
5407+
TPortManager pm;
5408+
TServerSettings serverSettings(pm.GetPort(2134));
5409+
serverSettings.SetDomainName("Root")
5410+
.SetNodeCount(1)
5411+
.SetUseRealThreads(false);
5412+
5413+
// This test requires in-memory state migration to work
5414+
serverSettings.FeatureFlags.SetEnableDataShardInMemoryStateMigration(true);
5415+
5416+
TServer::TPtr server = new TServer(serverSettings);
5417+
5418+
auto& runtime = *server->GetRuntime();
5419+
auto sender = runtime.AllocateEdgeActor();
5420+
5421+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
5422+
5423+
InitRoot(server, sender);
5424+
5425+
TDisableDataShardLogBatching disableDataShardLogBatching;
5426+
5427+
UNIT_ASSERT_VALUES_EQUAL(
5428+
KqpSchemeExec(runtime, R"(
5429+
CREATE TABLE `/Root/table` (key int, index int, value int, PRIMARY KEY (key, index))
5430+
WITH (PARTITION_AT_KEYS = (10));
5431+
)"),
5432+
"SUCCESS"
5433+
);
5434+
5435+
const auto shards = GetTableShards(server, sender, "/Root/table");
5436+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
5437+
5438+
// Populate tables with initial values
5439+
ExecSQL(server, sender, R"(
5440+
UPSERT INTO `/Root/table` (key, index, value) VALUES
5441+
(1, 0, 1001),
5442+
(1, 1, 1002),
5443+
(11, 0, 2001);
5444+
)");
5445+
5446+
ui64 tx1lockId = 0;
5447+
ui32 tx1lockNode = 0;
5448+
auto shard1actor = ResolveTablet(runtime, shards.at(0));
5449+
auto lockStatusObserver = runtime.AddObserver<TEvLongTxService::TEvLockStatus>(
5450+
[&](auto& ev) {
5451+
if (ev->GetRecipientRewrite() == shard1actor) {
5452+
auto* msg = ev->Get();
5453+
tx1lockId = msg->Record.GetLockId();
5454+
tx1lockNode = msg->Record.GetLockNode();
5455+
}
5456+
});
5457+
5458+
// Begin the 1st transaction appending to key 1
5459+
TString session1, tx1;
5460+
UNIT_ASSERT_VALUES_EQUAL(
5461+
KqpSimpleBegin(runtime, session1, tx1, R"(
5462+
SELECT key, index, value FROM `/Root/table` WHERE key = 1 ORDER BY key, index;
5463+
UPSERT INTO `/Root/table` (key, index, value) VALUES (1, 2, 3003);
5464+
SELECT key, index, value FROM `/Root/table` WHERE key = 2 ORDER BY key, index;
5465+
SELECT key, index, value FROM `/Root/table` WHERE key = 11 ORDER BY key, index;
5466+
)"),
5467+
"{ items { int32_value: 1 } items { int32_value: 0 } items { int32_value: 1001 } }, "
5468+
"{ items { int32_value: 1 } items { int32_value: 1 } items { int32_value: 1002 } }"
5469+
"\n"
5470+
// empty result set
5471+
"\n"
5472+
"{ items { int32_value: 11 } items { int32_value: 0 } items { int32_value: 2001 } }");
5473+
5474+
runtime.WaitFor("subscribed lock", [&]{ return tx1lockId != 0; });
5475+
lockStatusObserver.Remove();
5476+
5477+
// Begin the 2nd transaction reading key 1
5478+
TString session2, tx2;
5479+
UNIT_ASSERT_VALUES_EQUAL(
5480+
KqpSimpleBegin(runtime, session2, tx2, R"(
5481+
SELECT key, index, value FROM `/Root/table` WHERE key = 1 ORDER BY key, index;
5482+
)"),
5483+
"{ items { int32_value: 1 } items { int32_value: 0 } items { int32_value: 1001 } }, "
5484+
"{ items { int32_value: 1 } items { int32_value: 1 } items { int32_value: 1002 } }");
5485+
5486+
// Make sure everything settles down
5487+
runtime.SimulateSleep(TDuration::Seconds(1));
5488+
5489+
// Block commits and try to break the first transaction by a blind upsert
5490+
TBlockEvents<TEvBlobStorage::TEvPut> blockedCommits(runtime, [&](auto& ev) {
5491+
auto* msg = ev->Get();
5492+
if (msg->Id.Channel() == 0 && msg->Id.TabletID() == shards.at(0)) {
5493+
Cerr << "... blocking commit " << msg->Id << Endl;
5494+
return true;
5495+
}
5496+
return false;
5497+
});
5498+
KqpSimpleSend(runtime, R"(
5499+
UPSERT INTO `/Root/table` (key, index, value) VALUES (2, 0, 4001);
5500+
)");
5501+
runtime.WaitFor("blocked commits", [&]{ return blockedCommits.size() >= 1; });
5502+
5503+
// Stop blocking commits and make them fail (will cause shards to restart)
5504+
blockedCommits.Stop();
5505+
for (auto& ev : blockedCommits) {
5506+
auto proxy = ev->Recipient;
5507+
ui32 groupId = GroupIDFromBlobStorageProxyID(proxy);
5508+
auto response = ev->Get()->MakeErrorResponse(NKikimrProto::ERROR, "Something went wrong", TGroupId::FromValue(groupId));
5509+
runtime.Send(new IEventHandle(ev->Sender, proxy, response.release()), 0, true);
5510+
}
5511+
runtime.SimulateSleep(TDuration::Seconds(1));
5512+
5513+
// Commit the 1st transaction (it must succeed)
5514+
UNIT_ASSERT_VALUES_EQUAL(
5515+
KqpSimpleCommit(runtime, session1, tx1, R"(SELECT 1)"),
5516+
"{ items { int32_value: 1 } }");
5517+
5518+
// Commit the 2nd transaction writing to the same index
5519+
UNIT_ASSERT_VALUES_EQUAL(
5520+
KqpSimpleCommit(runtime, session2, tx2, R"(
5521+
UPSERT INTO `/Root/table` (key, index, value) VALUES (1, 2, 4003);
5522+
)"),
5523+
"ERROR: ABORTED");
5524+
}
5525+
52855526
}
52865527

52875528
Y_UNIT_TEST_SUITE(DataShardReadIteratorLatency) {

ydb/core/tx/datashard/memory_state_migration.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -622,8 +622,7 @@ TDataShard::TPreservedInMemoryState TDataShard::PreserveInMemoryState() {
622622
InMemoryVarsFrozen = true;
623623
}
624624

625-
for (const auto& pr : SysLocks.GetLocks()) {
626-
const auto& lockInfo = *pr.second;
625+
auto processLock = [&](TLockInfo& lockInfo) {
627626
auto* protoLockInfo = state->AddLocks();
628627
protoLockInfo->SetLockId(lockInfo.GetLockId());
629628
protoLockInfo->SetLockNodeId(lockInfo.GetLockNodeId());
@@ -675,6 +674,14 @@ TDataShard::TPreservedInMemoryState TDataShard::PreserveInMemoryState() {
675674
addedMessage(protoDep->ByteSizeLong());
676675
});
677676
maybeCheckpoint();
677+
};
678+
679+
for (const auto& pr : SysLocks.GetLocks()) {
680+
processLock(*pr.second);
681+
}
682+
683+
for (const auto& pr : SysLocks.GetRemovedLocks()) {
684+
processLock(*pr.second);
678685
}
679686

680687
for (const auto& [txId, op] : TransQueue.GetTxsInFly()) {

0 commit comments

Comments
 (0)