From b13ea97e2cd7e9e40abfe0bb0b38b656689a5347 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Fri, 21 Nov 2025 14:29:29 +0300 Subject: [PATCH 1/3] Stats for CTAS fix (#29269) --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 19 +++- ydb/core/kqp/ut/cost/kqp_cost_ut.cpp | 129 +++++++++++++++++++++-- 2 files changed, 140 insertions(+), 8 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 45d392081219..762814b4fbc0 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -706,7 +706,6 @@ class TKqpTableWriteActor : public TActorBootstrapped { return builder; }() << ", Cookie=" << ev->Cookie); - UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->AddParticipantNode(ev->Sender.NodeId()); @@ -735,6 +734,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNSPECIFIED, @@ -758,6 +758,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::ABORTED, @@ -776,6 +777,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie); RetryResolve(); } else { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNAVAILABLE, @@ -791,6 +793,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::INTERNAL_ERROR, @@ -805,6 +808,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNAVAILABLE, @@ -823,6 +827,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << getIssues().ToOneLineString()); // TODO: support waiting if (!InconsistentTx) { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::OVERLOADED, @@ -842,6 +847,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << getIssues().ToOneLineString()); // TODO: support waiting if (!InconsistentTx) { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::OVERLOADED, @@ -859,6 +865,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::CANCELLED, @@ -873,6 +880,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::BAD_REQUEST, @@ -892,6 +900,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie); RetryResolve(); } else { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::SCHEME_ERROR, @@ -909,6 +918,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->BreakLock(ev->Get()->Record.GetOrigin()); YQL_ENSURE(TxManager->BrokenLocks()); TxManager->SetError(ev->Get()->Record.GetOrigin()); @@ -925,6 +935,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::PRECONDITION_FAILED, @@ -942,6 +953,8 @@ class TKqpTableWriteActor : public TActorBootstrapped { const auto& record = ev->Get()->Record; AFL_ENSURE(record.GetTxLocks().empty()); + UpdateStats(record.GetTxStats()); + IKqpTransactionManager::TPrepareResult preparedInfo; preparedInfo.ShardId = record.GetOrigin(); preparedInfo.MinStep = record.GetMinStep(); @@ -979,6 +992,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { if (Mode == EMode::WRITE) { for (const auto& lock : ev->Get()->Record.GetTxLocks()) { if (!TxManager->AddLock(ev->Get()->Record.GetOrigin(), lock)) { + UpdateStats(ev->Get()->Record.GetTxStats()); YQL_ENSURE(TxManager->BrokenLocks()); NYql::TIssues issues; issues.AddIssue(*TxManager->GetLockIssue()); @@ -991,6 +1005,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { } if (Mode == EMode::COMMIT) { + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), 0); return; } @@ -999,9 +1014,11 @@ class TKqpTableWriteActor : public TActorBootstrapped { const auto result = ShardedWriteController->OnMessageAcknowledged( ev->Get()->Record.GetOrigin(), ev->Cookie); if (result && result->IsShardEmpty && Mode == EMode::IMMEDIATE_COMMIT) { + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), result->DataSize); } else if (result) { AFL_ENSURE(Mode == EMode::WRITE); + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnMessageAcknowledged(result->DataSize); } } diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 40418bb28bd4..3e66ab3ebf00 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -1122,12 +1122,16 @@ Y_UNIT_TEST_SUITE(KqpCost) { } - Y_UNIT_TEST_TWIN(OltpWriteRow, isSink) { + Y_UNIT_TEST_QUAD(WriteRow, isSink, isOlap) { + if (isOlap) { + // TODO: same stats for olap? + return; + } TKikimrRunner kikimr(GetAppConfig(false, false, isSink)); auto db = kikimr.GetQueryClient(); auto session = db.GetSession().GetValueSync().GetSession(); - CreateTestTable(session, false); + CreateTestTable(session, isOlap); { auto query = Q_(R"( @@ -1143,7 +1147,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); Cerr << stats.DebugString() << Endl; - size_t phase = stats.query_phases_size() - 1; + size_t phase = isOlap ? 0 : stats.query_phases_size() - 1; UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); @@ -1176,7 +1180,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); Cerr << stats.DebugString() << Endl; - size_t phase = stats.query_phases_size() - 1; + size_t phase = isOlap ? 0 : stats.query_phases_size() - 1; UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); @@ -1387,13 +1391,17 @@ Y_UNIT_TEST_SUITE(KqpCost) { } } - Y_UNIT_TEST_TWIN(OltpWriteRowInsertFails, isSink) { + Y_UNIT_TEST_QUAD(WriteRowInsertFails, isSink, isOlap) { + if (isOlap) { + // TODO: same stats for olap? + return; + } TKikimrRunner kikimr(GetAppConfig(false, false, isSink)); auto db = kikimr.GetQueryClient(); auto session = db.GetSession().GetValueSync().GetSession(); - CreateTestTable(session, false); - CreateTestTable(session, false, "2"); + CreateTestTable(session, isOlap); + CreateTestTable(session, isOlap, "2"); { // Three inserts @@ -1744,6 +1752,113 @@ Y_UNIT_TEST_SUITE(KqpCost) { } } + Y_UNIT_TEST_TWIN(CTAS, isOlap) { + TKikimrRunner kikimr(GetAppConfig(false, false, true)); + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + CreateTestTable(session, isOlap); + + { + auto query = std::format(R"( + CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`; + )", isOlap ? "COLUMN" : "ROW"); + + auto txControl = NYdb::NQuery::TTxControl::NoTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1); + size_t phase = 0; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1472 : 80); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80); + + Check( + FromProto(stats), + TTotalStats{ + .Writes = 4, + .Reads = 4, + .Deletes = 0, + }); + } + } + + Y_UNIT_TEST_TWIN(CTASWithRetry, isOlap) { + auto appConfig = GetAppConfig(false, false, true); + appConfig.MutableTableServiceConfig()->MutableWriteActorSettings()->SetInFlightMemoryLimitPerActorBytes(40); + // For executing REPLACE + appConfig.MutableTableServiceConfig()->SetEnableStreamWrite(true); + TKikimrSettings settings(appConfig); + settings.SetUseRealThreads(false); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetQueryClient(); + auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); }); + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + + kikimr.RunCall([&] { + CreateTestTable(session, isOlap); + }); + + size_t messages = 0; + + auto grab = [&](TAutoPtr &ev) -> auto { + if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWriteResult::EventType) { + ++messages; + auto* msg = ev->Get(); + for (size_t index = 0; index < 3; ++index) { + // Send several duplicates + auto copy = std::make_unique(); + copy->Record = msg->Record; + runtime.Send(new IEventHandle(ev->Recipient, ev->Sender, copy.release(), ev->Flags, ev->Cookie)); + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + runtime.SetObserverFunc(grab); + + { + auto query = std::format(R"( + CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`; + )", isOlap ? "COLUMN" : "ROW"); + + auto txControl = NYdb::NQuery::TTxControl::NoTx(); + + auto result = kikimr.RunCall([&] { return session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); }); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1); + size_t phase = 0; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1472 : 80); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80); + + Check( + FromProto(stats), + TTotalStats{ + .Writes = 4, + .Reads = 4, + .Deletes = 0, + }); + } + + UNIT_ASSERT_EQUAL(messages, isOlap ? 4 : 1); + } + } } From 311e2da1d9d8db2a1c37df4a0605b0046a1421cd Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Mon, 24 Nov 2025 11:14:44 +0300 Subject: [PATCH 2/3] fix --- ydb/core/kqp/ut/cost/kqp_cost_ut.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 3e66ab3ebf00..e934d1a8c5a0 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -1753,7 +1753,9 @@ Y_UNIT_TEST_SUITE(KqpCost) { } Y_UNIT_TEST_TWIN(CTAS, isOlap) { - TKikimrRunner kikimr(GetAppConfig(false, false, true)); + auto appConfig = GetAppConfig(false, false, true); + appConfig.MutableTableServiceConfig()->SetEnableDataShardCreateTableAs(true); + TKikimrRunner kikimr(appConfig); auto db = kikimr.GetQueryClient(); auto session = db.GetSession().GetValueSync().GetSession(); @@ -1793,6 +1795,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { Y_UNIT_TEST_TWIN(CTASWithRetry, isOlap) { auto appConfig = GetAppConfig(false, false, true); + appConfig.MutableTableServiceConfig()->SetEnableDataShardCreateTableAs(true); appConfig.MutableTableServiceConfig()->MutableWriteActorSettings()->SetInFlightMemoryLimitPerActorBytes(40); // For executing REPLACE appConfig.MutableTableServiceConfig()->SetEnableStreamWrite(true); From 25c2828f4f814111361160d0cc729c6738b9fd76 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 25 Nov 2025 16:41:28 +0300 Subject: [PATCH 3/3] fix --- ydb/core/kqp/ut/cost/kqp_cost_ut.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index e934d1a8c5a0..ec14b701b521 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -1778,7 +1778,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1); size_t phase = 0; UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1472 : 80); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1568 : 80); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80); @@ -1845,7 +1845,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1); size_t phase = 0; UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1472 : 80); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1568 : 80); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80);