Skip to content

Commit 50613a5

Browse files
kirillvasilenkoKirill Vasilenko
andauthored
Support relaxed Snapshot isolation in Column Shards (#29190)
Co-authored-by: Kirill Vasilenko <[email protected]>
1 parent 8510207 commit 50613a5

File tree

24 files changed

+179
-53
lines changed

24 files changed

+179
-53
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,10 +1140,6 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
11401140
evWrite->SetLockId(LockTxId, LockNodeId);
11411141
evWrite->Record.SetLockMode(LockMode);
11421142

1143-
if (LockMode == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION) {
1144-
YQL_ENSURE(MvccSnapshot);
1145-
}
1146-
11471143
if (MvccSnapshot) {
11481144
*evWrite->Record.MutableMvccSnapshot() = *MvccSnapshot;
11491145
}

ydb/core/kqp/session_actor/kqp_session_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,6 +1157,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor>, IActorExce
11571157

11581158
if (QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE
11591159
&& QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RO
1160+
&& QueryState->TxCtx->EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SNAPSHOT_RW
11601161
&& QueryState->GetType() != NKikimrKqp::QUERY_TYPE_SQL_SCAN
11611162
&& QueryState->GetType() != NKikimrKqp::QUERY_TYPE_AST_SCAN
11621163
&& QueryState->GetType() != NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT

ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) {
6060
}
6161

6262
Y_UNIT_TEST(TSimpleOlap) {
63-
return;
6463
TSimple tester;
6564
tester.SetIsOlap(true);
6665
tester.Execute();
@@ -175,7 +174,6 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) {
175174
}
176175

177176
Y_UNIT_TEST(TConflictReadWriteOlap) {
178-
return;
179177
TConflictReadWrite tester;
180178
tester.SetIsOlap(true);
181179
tester.Execute();
@@ -233,11 +231,68 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) {
233231
}
234232

235233
Y_UNIT_TEST(TReadOnlyOlap) {
236-
return;
237234
TReadOnly tester;
238235
tester.SetIsOlap(true);
239236
tester.Execute();
240237
}
238+
239+
class TReadOwnChanges : public TTableDataModificationTester {
240+
protected:
241+
void DoExecute() override {
242+
auto client = Kikimr->GetQueryClient();
243+
auto session1 = client.GetSession().GetValueSync().GetSession();
244+
245+
// tx1 reads KV2
246+
auto result = session1.ExecuteQuery(Q_(R"(
247+
SELECT * FROM `/Root/KV2`;
248+
)"), TTxControl::BeginTx(TTxSettings::SnapshotRW())).ExtractValueSync();
249+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
250+
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
251+
auto tx1 = result.GetTransaction();
252+
UNIT_ASSERT(tx1);
253+
254+
// tx1 upserts a row
255+
result = session1.ExecuteQuery(Q_(R"(
256+
UPSERT INTO `/Root/KV2` (Key, Value)
257+
VALUES (1U, "val1");
258+
)"), TTxControl::Tx(*tx1)).ExtractValueSync();
259+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
260+
tx1 = result.GetTransaction();
261+
262+
// tx1 reads KV2 and sees the row
263+
result = session1.ExecuteQuery(Q_(R"(
264+
SELECT * FROM `/Root/KV2`;
265+
)"), TTxControl::Tx(*tx1)).ExtractValueSync();
266+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
267+
CompareYson(R"([[1u;["val1"]]])", FormatResultSetYson(result.GetResultSet(0)));
268+
tx1 = result.GetTransaction();
269+
270+
// tx1 commits
271+
result = tx1->Commit().ExtractValueSync();
272+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
273+
}
274+
};
275+
276+
Y_UNIT_TEST(TReadOwnChangesOltp) {
277+
return;
278+
TReadOwnChanges tester;
279+
tester.SetIsOlap(false);
280+
tester.Execute();
281+
}
282+
283+
Y_UNIT_TEST(TReadOwnChangesOltpNoSink) {
284+
return;
285+
TReadOwnChanges tester;
286+
tester.SetIsOlap(false);
287+
tester.SetDisableSinks(true);
288+
tester.Execute();
289+
}
290+
291+
Y_UNIT_TEST(TReadOwnChangesOlap) {
292+
TReadOwnChanges tester;
293+
tester.SetIsOlap(true);
294+
tester.Execute();
295+
}
241296
}
242297

243298
} // namespace NKqp

ydb/core/tx/columnshard/engines/column_engine.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ class IColumnEngine {
147147
}
148148
virtual bool IsOverloadedByMetadata(const ui64 limit) const = 0;
149149
virtual std::vector<std::shared_ptr<TPortionInfo>> Select(
150-
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const = 0;
150+
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& ownPortions) const = 0;
151151
virtual std::vector<std::shared_ptr<TColumnEngineChanges>> StartCompaction(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager) noexcept = 0;
152152
virtual ui64 GetCompactionPriority(const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const std::set<TInternalPathId>& pathIds,
153153
const std::optional<ui64> waitingPriority) const noexcept = 0;

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -453,36 +453,53 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool up
453453
}
454454

455455
std::vector<std::shared_ptr<TPortionInfo>> TColumnEngineForLogs::Select(
456-
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const {
456+
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& ownPortions) const {
457457
std::vector<std::shared_ptr<TPortionInfo>> out;
458458
auto spg = GranulesStorage->GetGranuleOptional(pathId);
459459
if (!spg) {
460460
return out;
461461
}
462462

463-
for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) {
464-
if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
463+
for (const auto& [writeId, portion] : spg->GetInsertedPortions()) {
464+
if (portion->IsRemovedFor(snapshot)) {
465465
continue;
466466
}
467-
const bool skipPortion = !pkRangesFilter.IsUsed(*portionInfo);
468-
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
469-
"portion", portionInfo->DebugString());
470-
if (skipPortion) {
467+
// nonconflicting: visible in the snapshot or own portions
468+
auto nonconflicting = portion->IsVisible(snapshot, true) || (ownPortions.has_value() && ownPortions->contains(writeId));
469+
auto conflicting = !nonconflicting;
470+
471+
if (nonconflicting && !withNonconflicting || conflicting && !withConflicting) {
471472
continue;
472473
}
473-
out.emplace_back(portionInfo);
474+
475+
const bool takePortion = pkRangesFilter.IsUsed(*portion);
476+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", takePortion ? "portion_selected" : "portion_skipped")("pathId", pathId)("portion", portion->DebugString());
477+
if (takePortion) {
478+
out.emplace_back(portion);
479+
}
474480
}
475-
for (const auto& [_, portionInfo] : spg->GetPortions()) {
476-
if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
481+
for (const auto& [_, portion] : spg->GetPortions()) {
482+
if (portion->IsRemovedFor(snapshot)) {
483+
continue;
484+
}
485+
486+
auto nonconflicting = portion->IsVisible(snapshot, true);
487+
auto conflicting = !nonconflicting;
488+
489+
// take compacted portions only if all the records are visible in the snapshot
490+
if (conflicting && portion->GetPortionType() == EPortionType::Compacted) {
477491
continue;
478492
}
479-
const bool skipPortion = !pkRangesFilter.IsUsed(*portionInfo);
480-
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
481-
"portion", portionInfo->DebugString());
482-
if (skipPortion) {
493+
494+
if (nonconflicting && !withNonconflicting || conflicting && !withConflicting) {
483495
continue;
484496
}
485-
out.emplace_back(portionInfo);
497+
498+
const bool takePortion = pkRangesFilter.IsUsed(*portion);
499+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", takePortion ? "portion_selected" : "portion_skipped")("pathId", pathId)("portion", portion->DebugString());
500+
if (takePortion) {
501+
out.emplace_back(portion);
502+
}
486503
}
487504

488505
return out;

ydb/core/tx/columnshard/engines/column_engine_logs.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ class TColumnEngineForLogs: public IColumnEngine {
170170
void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;
171171

172172
std::vector<std::shared_ptr<TPortionInfo>> Select(
173-
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const override;
173+
TInternalPathId pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withNonconflicting, const bool withConflicting, const std::optional<THashSet<TInsertWriteId>>& withUncommittedOnlyForTheseWrites) const override;
174174

175175
bool IsPortionExists(const TInternalPathId pathId, const ui64 portionId) const {
176176
return !!GranulesStorage->GetPortionOptional(pathId, portionId);

ydb/core/tx/columnshard/engines/metadata_accessor.cpp

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,18 @@ TUserTableAccessor::TUserTableAccessor(const TString& tableName, const NColumnSh
3939
}
4040

4141
std::unique_ptr<NReader::NCommon::ISourcesConstructor> TUserTableAccessor::SelectMetadata(const TSelectMetadataContext& context,
42-
const NReader::TReadDescription& readDescription, const bool withUncommitted, const bool isPlain) const {
42+
const NReader::TReadDescription& readDescription, const bool isPlain) const {
4343
AFL_VERIFY(readDescription.PKRangesFilter);
44+
// here we select portions for a read
4445
std::vector<std::shared_ptr<TPortionInfo>> portions =
45-
context.GetEngine().Select(PathId.InternalPathId, readDescription.GetSnapshot(), *readDescription.PKRangesFilter, withUncommitted);
46+
context.GetEngine().Select(
47+
PathId.InternalPathId,
48+
readDescription.GetSnapshot(),
49+
*readDescription.PKRangesFilter,
50+
readDescription.readNonconflictingPortions,
51+
readDescription.readConflictingPortions,
52+
readDescription.ownPortions
53+
);
4654
if (!isPlain) {
4755
std::deque<NReader::NSimple::TSourceConstructor> sources;
4856
for (auto&& i : portions) {
@@ -55,7 +63,7 @@ std::unique_ptr<NReader::NCommon::ISourcesConstructor> TUserTableAccessor::Selec
5563
}
5664

5765
std::unique_ptr<NReader::NCommon::ISourcesConstructor> TAbsentTableAccessor::SelectMetadata(const TSelectMetadataContext& /*context*/,
58-
const NReader::TReadDescription& /*readDescription*/, const bool /*withUncommitted*/, const bool /*isPlain*/) const {
66+
const NReader::TReadDescription& /*readDescription*/, const bool /*isPlain*/) const {
5967
return NReader::NSimple::TPortionsSources::BuildEmpty();
6068
}
6169

ydb/core/tx/columnshard/engines/metadata_accessor.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class ITableMetadataAccessor {
8383
};
8484

8585
virtual std::unique_ptr<NReader::NCommon::ISourcesConstructor> SelectMetadata(const TSelectMetadataContext& context,
86-
const NReader::TReadDescription& readDescription, const bool withUncommitted, const bool isPlain) const = 0;
86+
const NReader::TReadDescription& readDescription, const bool isPlain) const = 0;
8787
virtual std::optional<TGranuleShardingInfo> GetShardingInfo(
8888
const std::shared_ptr<const TVersionedIndex>& indexVersionsPointer, const NOlap::TSnapshot& ss) const = 0;
8989
};
@@ -109,7 +109,7 @@ class TUserTableAccessor: public ITableMetadataAccessor {
109109
}
110110

111111
virtual std::unique_ptr<NReader::NCommon::ISourcesConstructor> SelectMetadata(const TSelectMetadataContext& context,
112-
const NReader::TReadDescription& readDescription, const bool withUncommitted, const bool isPlain) const override;
112+
const NReader::TReadDescription& readDescription, const bool isPlain) const override;
113113
virtual std::optional<TGranuleShardingInfo> GetShardingInfo(
114114
const std::shared_ptr<const TVersionedIndex>& indexVersionsPointer, const NOlap::TSnapshot& ss) const override {
115115
return indexVersionsPointer->GetShardingInfoOptional(PathId.GetInternalPathId(), ss);
@@ -145,7 +145,7 @@ class TAbsentTableAccessor: public ITableMetadataAccessor {
145145
return std::nullopt;
146146
}
147147
virtual std::unique_ptr<NReader::NCommon::ISourcesConstructor> SelectMetadata(const TSelectMetadataContext& context,
148-
const NReader::TReadDescription& readDescription, const bool withUncommitted, const bool isPlain) const override;
148+
const NReader::TReadDescription& readDescription, const bool isPlain) const override;
149149
};
150150

151-
} // namespace NKikimr::NOlap
151+
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class TReadMetadataBase {
4646
std::shared_ptr<ISnapshotSchema> ResultIndexSchema;
4747
ui64 TxId = 0;
4848
std::optional<ui64> LockId;
49+
std::optional<NKikimrDataEvents::ELockMode> LockMode;
4950
EDeduplicationPolicy DeduplicationPolicy = EDeduplicationPolicy::ALLOW_DUPLICATES;
5051

5152
public:
@@ -55,6 +56,11 @@ class TReadMetadataBase {
5556
return TabletId;
5657
}
5758

59+
bool NeedToDetectConflicts() const {
60+
// do not detect conflicts for snapshot isolated transactions or txs with no lock
61+
return LockId.has_value() && LockMode.value_or(NKikimrDataEvents::OPTIMISTIC) != NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION;
62+
}
63+
5864
void SetRequestedLimit(const ui64 value) {
5965
AFL_VERIFY(!RequestedLimit);
6066
if (value == 0 || value >= Max<i64>()) {

ydb/core/tx/columnshard/engines/reader/common/description.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <ydb/core/tx/columnshard/common/snapshot.h>
44
#include <ydb/core/tx/columnshard/engines/metadata_accessor.h>
55
#include <ydb/core/tx/columnshard/engines/predicate/filter.h>
6+
#include <ydb/core/tx/columnshard/operations/manager.h>
67
#include <ydb/core/tx/program/program.h>
78

89
#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>
@@ -33,10 +34,15 @@ class TReadDescription {
3334
// Table
3435
ui64 TxId = 0;
3536
std::optional<ui64> LockId;
37+
std::optional<NKikimrDataEvents::ELockMode> LockMode;
3638
std::shared_ptr<ITableMetadataAccessor> TableMetadataAccessor;
3739
std::shared_ptr<NOlap::TPKRangesFilter> PKRangesFilter;
3840
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
3941
EDeduplicationPolicy DeduplicationPolicy = EDeduplicationPolicy::ALLOW_DUPLICATES;
42+
bool readNonconflictingPortions;
43+
bool readConflictingPortions;
44+
// portions that the current tx has written
45+
std::optional<THashSet<TInsertWriteId>> ownPortions;
4046

4147
bool IsReverseSort() const {
4248
return Sorting == ERequestSorting::DESC;
@@ -55,6 +61,34 @@ class TReadDescription {
5561
ScanCursor = cursor;
5662
}
5763

64+
void SetLock(std::optional<ui64> lockId, std::optional<NKikimrDataEvents::ELockMode> lockMode, const NColumnShard::TLockFeatures* lock) {
65+
LockId = lockId;
66+
LockMode = lockMode;
67+
68+
auto snapshotIsolation = lockMode.value_or(NKikimrDataEvents::OPTIMISTIC) == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION;
69+
70+
// always true for now, will be false for reads that check only conflicts (comming soon with Snapshot Isolation)
71+
readNonconflictingPortions = true;
72+
73+
// do not check conflicts for Snapshot isolated txs or txs with no lock
74+
readConflictingPortions = LockId.has_value() && !snapshotIsolation;
75+
76+
if (readNonconflictingPortions && !readConflictingPortions && lock != nullptr && lock->GetWriteOperations().size() > 0) {
77+
ownPortions = THashSet<TInsertWriteId>();
78+
for (auto& writeOperation : lock->GetWriteOperations()) {
79+
for (auto insertWriteId : writeOperation->GetInsertWriteIds()) {
80+
ownPortions->emplace(insertWriteId);
81+
}
82+
}
83+
}
84+
85+
// we want to read something, don't we?
86+
AFL_VERIFY(readNonconflictingPortions || readConflictingPortions);
87+
if (ownPortions.has_value() && !ownPortions->empty()) {
88+
AFL_VERIFY(readNonconflictingPortions);
89+
}
90+
}
91+
5892
TReadDescription(const ui64 tabletId, const TSnapshot& snapshot, const ERequestSorting sorting)
5993
: Snapshot(snapshot)
6094
, Sorting(sorting)

0 commit comments

Comments
 (0)