@@ -47,6 +47,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
4747 , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
4848 , LookupStrategy(settings.GetLookupStrategy())
4949 , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TaskId, args.TypeEnv, args.HolderFactory, args.InputDesc))
50+ , MaxTotalBytesQuota(MaxTotalBytesQuotaStreamLookup())
51+ , MaxRowsProcessing(MaxRowsProcessingStreamLookup())
5052 , Counters(counters)
5153 , LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), " LookupActor" )
5254 {
@@ -305,7 +307,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
305307 ReadRowsCount += replyResultStats.ReadRowsCount ;
306308 ReadBytesCount += replyResultStats.ReadBytesCount ;
307309
308- auto overloaded = StreamLookupWorker->IsOverloaded ();
310+ auto overloaded = StreamLookupWorker->IsOverloaded (MaxRowsProcessing );
309311 if (!overloaded.has_value ()) {
310312 FetchInputRows ();
311313 } else {
@@ -445,6 +447,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
445447 }
446448 }
447449
450+ TotalBytesQuota -= MaxBytesDefaultQuota;
451+ Counters->StreamLookupIteratorTotalQuotaBytesInFlight ->Sub (MaxBytesDefaultQuota);
452+
448453 if (!Snapshot.IsValid ()) {
449454 Snapshot = IKqpGateway::TKqpSnapshot (record.GetSnapshot ().GetStep (), record.GetSnapshot ().GetTxId ());
450455 }
@@ -502,6 +507,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
502507 }
503508 }
504509
510+
505511 YQL_ENSURE (read.LastSeqNo < record.GetSeqNo ());
506512 read.LastSeqNo = record.GetSeqNo ();
507513
@@ -514,8 +520,14 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
514520 request->Record .SetSeqNo (record.GetSeqNo ());
515521
516522 auto defaultSettings = GetDefaultReadAckSettings ()->Record ;
517- request->Record .SetMaxRows (defaultSettings.GetMaxRows ());
518- request->Record .SetMaxBytes (defaultSettings.GetMaxBytes ());
523+ request->Record .SetMaxRows (MaxRowsDefaultQuota);
524+ request->Record .SetMaxBytes (MaxBytesDefaultQuota);
525+
526+ TotalBytesQuota += MaxBytesDefaultQuota;
527+ Counters->StreamLookupIteratorTotalQuotaBytesInFlight ->Add (MaxBytesDefaultQuota);
528+ if (TotalBytesQuota > MaxTotalBytesQuota) {
529+ Counters->StreamLookupIteratorTotalQuotaBytesExceeded ->Inc ();
530+ }
519531
520532 const bool needToCreatePipe = Reads.NeedToCreatePipe (read.ShardId );
521533
@@ -657,10 +669,22 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
657669 }
658670
659671 auto defaultSettings = GetDefaultReadSettings ()->Record ;
660- record.SetMaxRows (defaultSettings.GetMaxRows ());
661- record.SetMaxBytes (defaultSettings.GetMaxBytes ());
672+ if (!MaxRowsDefaultQuota || !MaxBytesDefaultQuota) {
673+ MaxRowsDefaultQuota = defaultSettings.GetMaxRows ();
674+ MaxBytesDefaultQuota = defaultSettings.GetMaxBytes ();
675+ }
676+
677+ record.SetMaxRows (MaxRowsDefaultQuota);
678+ record.SetMaxBytes (MaxBytesDefaultQuota);
662679 record.SetResultFormat (NKikimrDataEvents::FORMAT_CELLVEC);
663680
681+ TotalBytesQuota += MaxBytesDefaultQuota;
682+ Counters->StreamLookupIteratorTotalQuotaBytesInFlight ->Add (MaxBytesDefaultQuota);
683+
684+ if (TotalBytesQuota > MaxTotalBytesQuota) {
685+ Counters->StreamLookupIteratorTotalQuotaBytesExceeded ->Inc ();
686+ }
687+
664688 CA_LOG_D (TStringBuilder () << " Send EvRead (stream lookup) to shardId=" << shardId
665689 << " , readId = " << record.GetReadId ()
666690 << " , tablePath: " << StreamLookupWorker->GetTablePath ()
@@ -828,6 +852,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
828852 ui64 ReadRowsCount = 0 ;
829853 ui64 ReadBytesCount = 0 ;
830854
855+ size_t TotalBytesQuota = 0 ;
856+ ui64 MaxTotalBytesQuota = 0 ;
857+ size_t MaxRowsProcessing = 0 ;
858+ size_t MaxBytesDefaultQuota = 0 ;
859+ size_t MaxRowsDefaultQuota = 0 ;
860+
831861 TIntrusivePtr<TKqpCounters> Counters;
832862 NWilson::TSpan LookupActorSpan;
833863 NWilson::TSpan LookupActorStateSpan;
0 commit comments