Skip to content

Commit db7f56a

Browse files
committed
increase and make configurable some slj params (#29319)
1 parent dfe4fbf commit db7f56a

File tree

10 files changed

+65
-14
lines changed

10 files changed

+65
-14
lines changed

ydb/core/kqp/counters/kqp_counters.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,8 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co
815815
DataShardIteratorFails = KqpGroup->GetCounter("IteratorReads/DatashardFails", true);
816816
DataShardIteratorMessages = KqpGroup->GetCounter("IteratorReads/DatashardMessages", true);
817817
IteratorDeliveryProblems = KqpGroup->GetCounter("IteratorReads/DeliveryProblems", true);
818+
StreamLookupIteratorTotalQuotaBytesInFlight = KqpGroup->GetCounter("IteratorReads/StreamLookupIteratorTotalQuotaBytesInFlight", false);
819+
StreamLookupIteratorTotalQuotaBytesExceeded = KqpGroup->GetCounter("IteratorReads/StreamLookupIteratorTotalQuotaBytesExceeded", true);
818820

819821
/* sink writes */
820822
WriteActorsShardResolve = KqpGroup->GetCounter("SinkWrites/WriteActorShardResolve", true);

ydb/core/kqp/counters/kqp_counters.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,8 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter
409409
::NMonitoring::TDynamicCounters::TCounterPtr ReadActorRetries;
410410
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorFails;
411411
::NMonitoring::TDynamicCounters::TCounterPtr DataShardIteratorMessages;
412+
::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupIteratorTotalQuotaBytesInFlight;
413+
::NMonitoring::TDynamicCounters::TCounterPtr StreamLookupIteratorTotalQuotaBytesExceeded;
412414
::NMonitoring::TDynamicCounters::TCounterPtr IteratorDeliveryProblems;
413415

414416
// Sink write counters

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
434434
ptr->ReadResponseTimeout = TDuration::MilliSeconds(settings.GetIteratorResponseTimeoutMs());
435435
}
436436
ptr->MaxRetryDelay = TDuration::MilliSeconds(settings.GetMaxDelayMs());
437+
ptr->MaxRowsProcessingStreamLookup = settings.GetMaxRowsProcessingStreamLookup();
438+
ptr->MaxTotalBytesQuotaStreamLookup = settings.GetMaxTotalBytesQuotaStreamLookup();
437439
SetReadIteratorBackoffSettings(ptr);
438440
}
439441

ydb/core/kqp/runtime/kqp_read_iterator_common.cpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ size_t MaxShardResolves() {
9191
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxShardResolves;
9292
}
9393

94-
size_t MaxShardRetries() {
94+
size_t MaxShardRetries() {
9595
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxShardAttempts;
9696
}
9797

@@ -103,5 +103,13 @@ TMaybe<TDuration> ShardTimeout() {
103103
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->ReadResponseTimeout;
104104
}
105105

106+
size_t MaxRowsProcessingStreamLookup() {
107+
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxRowsProcessingStreamLookup;
108+
}
109+
110+
ui64 MaxTotalBytesQuotaStreamLookup() {
111+
return Singleton<TBackoffStorage>()->SettingsPtr.AtomicLoad()->MaxTotalBytesQuotaStreamLookup;
112+
}
113+
106114
} // namespace NKqp
107115
} // namespace NKikimr

ydb/core/kqp/runtime/kqp_read_iterator_common.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ struct TIteratorReadBackoffSettings : TAtomicRefCount<TIteratorReadBackoffSettin
1717

1818
TMaybe<size_t> MaxTotalRetries;
1919
TMaybe<TDuration> ReadResponseTimeout;
20+
size_t MaxRowsProcessingStreamLookup = 65536;
21+
ui64 MaxTotalBytesQuotaStreamLookup = 5_MB * 512;
2022
};
2123

2224
struct TEvReadSettings : public TAtomicRefCount<TEvReadSettings> {
@@ -38,6 +40,8 @@ size_t MaxShardResolves();
3840
size_t MaxShardRetries();
3941
TMaybe<size_t> MaxTotalRetries();
4042
TMaybe<TDuration> ShardTimeout();
43+
size_t MaxRowsProcessingStreamLookup();
44+
ui64 MaxTotalBytesQuotaStreamLookup();
4145

4246
void SetDefaultIteratorQuotaSettings(ui32 rows, ui32 bytes);
4347
THolder<NKikimr::TEvDataShard::TEvRead> GetDefaultReadSettings();

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
4747
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
4848
, LookupStrategy(settings.GetLookupStrategy())
4949
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TaskId, args.TypeEnv, args.HolderFactory, args.InputDesc))
50+
, MaxTotalBytesQuota(MaxTotalBytesQuotaStreamLookup())
51+
, MaxRowsProcessing(MaxRowsProcessingStreamLookup())
5052
, Counters(counters)
5153
, LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
5254
{
@@ -305,7 +307,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
305307
ReadRowsCount += replyResultStats.ReadRowsCount;
306308
ReadBytesCount += replyResultStats.ReadBytesCount;
307309

308-
auto overloaded = StreamLookupWorker->IsOverloaded();
310+
auto overloaded = StreamLookupWorker->IsOverloaded(MaxRowsProcessing);
309311
if (!overloaded.has_value()) {
310312
FetchInputRows();
311313
} else {
@@ -445,6 +447,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
445447
}
446448
}
447449

450+
TotalBytesQuota -= MaxBytesDefaultQuota;
451+
Counters->StreamLookupIteratorTotalQuotaBytesInFlight->Sub(MaxBytesDefaultQuota);
452+
448453
if (!Snapshot.IsValid()) {
449454
Snapshot = IKqpGateway::TKqpSnapshot(record.GetSnapshot().GetStep(), record.GetSnapshot().GetTxId());
450455
}
@@ -502,6 +507,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
502507
}
503508
}
504509

510+
505511
YQL_ENSURE(read.LastSeqNo < record.GetSeqNo());
506512
read.LastSeqNo = record.GetSeqNo();
507513

@@ -514,8 +520,14 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
514520
request->Record.SetSeqNo(record.GetSeqNo());
515521

516522
auto defaultSettings = GetDefaultReadAckSettings()->Record;
517-
request->Record.SetMaxRows(defaultSettings.GetMaxRows());
518-
request->Record.SetMaxBytes(defaultSettings.GetMaxBytes());
523+
request->Record.SetMaxRows(MaxRowsDefaultQuota);
524+
request->Record.SetMaxBytes(MaxBytesDefaultQuota);
525+
526+
TotalBytesQuota += MaxBytesDefaultQuota;
527+
Counters->StreamLookupIteratorTotalQuotaBytesInFlight->Add(MaxBytesDefaultQuota);
528+
if (TotalBytesQuota > MaxTotalBytesQuota) {
529+
Counters->StreamLookupIteratorTotalQuotaBytesExceeded->Inc();
530+
}
519531

520532
const bool needToCreatePipe = Reads.NeedToCreatePipe(read.ShardId);
521533

@@ -657,10 +669,22 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
657669
}
658670

659671
auto defaultSettings = GetDefaultReadSettings()->Record;
660-
record.SetMaxRows(defaultSettings.GetMaxRows());
661-
record.SetMaxBytes(defaultSettings.GetMaxBytes());
672+
if (!MaxRowsDefaultQuota || !MaxBytesDefaultQuota) {
673+
MaxRowsDefaultQuota = defaultSettings.GetMaxRows();
674+
MaxBytesDefaultQuota = defaultSettings.GetMaxBytes();
675+
}
676+
677+
record.SetMaxRows(MaxRowsDefaultQuota);
678+
record.SetMaxBytes(MaxBytesDefaultQuota);
662679
record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC);
663680

681+
TotalBytesQuota += MaxBytesDefaultQuota;
682+
Counters->StreamLookupIteratorTotalQuotaBytesInFlight->Add(MaxBytesDefaultQuota);
683+
684+
if (TotalBytesQuota > MaxTotalBytesQuota) {
685+
Counters->StreamLookupIteratorTotalQuotaBytesExceeded->Inc();
686+
}
687+
664688
CA_LOG_D(TStringBuilder() << "Send EvRead (stream lookup) to shardId=" << shardId
665689
<< ", readId = " << record.GetReadId()
666690
<< ", tablePath: " << StreamLookupWorker->GetTablePath()
@@ -828,6 +852,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
828852
ui64 ReadRowsCount = 0;
829853
ui64 ReadBytesCount = 0;
830854

855+
size_t TotalBytesQuota = 0;
856+
ui64 MaxTotalBytesQuota = 0;
857+
size_t MaxRowsProcessing = 0;
858+
size_t MaxBytesDefaultQuota = 0;
859+
size_t MaxRowsDefaultQuota = 0;
860+
831861
TIntrusivePtr<TKqpCounters> Counters;
832862
NWilson::TSpan LookupActorSpan;
833863
NWilson::TSpan LookupActorStateSpan;

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
namespace NKikimr {
1616
namespace NKqp {
1717

18-
constexpr ui64 MAX_IN_FLIGHT_LIMIT = 500;
1918
constexpr ui64 SEQNO_SPACE = 40;
2019
constexpr ui64 MaxTaskId = (1ULL << (64 - SEQNO_SPACE));
2120

@@ -323,7 +322,7 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
323322
ReadResults.emplace_back(std::move(result));
324323
}
325324

326-
std::optional<TString> IsOverloaded() final {
325+
std::optional<TString> IsOverloaded(size_t) final {
327326
return std::nullopt;
328327
}
329328

@@ -596,10 +595,10 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
596595
YQL_ENSURE(false);
597596
}
598597

599-
std::optional<TString> IsOverloaded() final {
600-
if (UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT ||
601-
PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT ||
602-
ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT)
598+
std::optional<TString> IsOverloaded(size_t maxRowsProcessing) final {
599+
if (UnprocessedRows.size() >= maxRowsProcessing ||
600+
PendingLeftRowsByKey.size() >= maxRowsProcessing ||
601+
ResultRowsBySeqNo.size() >= maxRowsProcessing)
603602
{
604603
TStringBuilder overloadDescriptor;
605604
overloadDescriptor << "unprocessed rows: " << UnprocessedRows.size()

ydb/core/kqp/runtime/kqp_stream_lookup_worker.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class TKqpStreamLookupWorker {
8080
virtual bool AllRowsProcessed() = 0;
8181
virtual bool HasPendingResults() = 0;
8282
virtual void ResetRowsProcessing(ui64 readId) = 0;
83-
virtual std::optional<TString> IsOverloaded() = 0;
83+
virtual std::optional<TString> IsOverloaded(size_t maxRowsProcessing) = 0;
8484

8585
protected:
8686
const NMiniKQL::TTypeEnvironment& TypeEnv;

ydb/core/kqp/ut/scan/kqp_scan_ut.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2579,7 +2579,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
25792579
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetUnsertaintyRatio(0.5);
25802580
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMultiplier(2.0);
25812581
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxTotalRetries(100);
2582-
2582+
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxRowsProcessingStreamLookup(500);
2583+
appConfig.MutableTableServiceConfig()->MutableIteratorReadsRetrySettings()->SetMaxTotalBytesQuotaStreamLookup(100);
25832584

25842585
TPortManager tp;
25852586
ui16 mbusport = tp.GetPort(2134);

ydb/core/protos/table_service_config.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,9 @@ message TTableServiceConfig {
206206
optional double Multiplier = 5;
207207
optional uint32 IteratorResponseTimeoutMs = 6;
208208
optional uint32 MaxTotalRetries = 7;
209+
210+
optional uint64 MaxRowsProcessingStreamLookup = 9 [default = 65536];
211+
optional uint64 MaxTotalBytesQuotaStreamLookup = 10 [default = 2684354560]; // 5_MB * 512
209212
}
210213

211214
message TIteratorReadQuotaSettings {

0 commit comments

Comments
 (0)