diff options
author | Mikhail Surin <ssmike@ydb.tech> | 2024-08-14 19:28:11 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-14 19:28:11 +0300 |
commit | c70d3ebbec403ebd3f095f2a931d8a061589094c (patch) | |
tree | 0d86c2198c00fe67dae6bbd894b11a958c9fd3a9 | |
parent | 17bb20c4cfb2176c56e808f1ca6faf93175b5af9 (diff) | |
download | ydb-c70d3ebbec403ebd3f095f2a931d8a061589094c.tar.gz |
Allow to forget consumption peaks (#7763)
-rw-r--r-- | ydb/core/kqp/node_service/kqp_node_service.cpp | 11 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_compute_scheduler.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_compute_scheduler.h | 1 | ||||
-rw-r--r-- | ydb/core/protos/table_service_config.proto | 11 |
4 files changed, 30 insertions, 7 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 925f615493a..3c0c90df78c 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -85,6 +85,8 @@ public: SetPriorities(config.GetPoolsConfiguration()); } Scheduler.ReportCounters(counters); + AdvanceTimeInterval = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec()); + Scheduler.SetForgetInterval(TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec())); } void Bootstrap() { @@ -104,7 +106,7 @@ public: } Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup(WakeCleaunupTag)); - Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag)); + Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag)); Become(&TKqpNodeService::WorkState); } @@ -343,7 +345,7 @@ private: void HandleWork(TEvents::TEvWakeup::TPtr& ev) { if (ev->Get()->Tag == WakeAdvanceTimeTag) { Scheduler.AdvanceTime(TMonotonic::Now()); - Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag)); + Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag)); } if (ev->Get()->Tag == WakeCleaunupTag) { Schedule(TDuration::Seconds(1), ev->Release().Release()); @@ -395,9 +397,11 @@ private: SetPriorities(event.GetConfig().GetTableServiceConfig().GetPoolsConfiguration()); } + AdvanceTimeInterval = TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec()); + Scheduler.SetForgetInterval(TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec())); + auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event); Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie); - } void SetIteratorReadsQuotaSettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadQuotaSettings& settings) { @@ -520,6 +524,7 @@ private: const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup; TComputeScheduler Scheduler; + TDuration AdvanceTimeInterval; //state sharded by TxId std::shared_ptr<TNodeServiceState> State_; diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp index 3946f405a64..a5764cfabb6 100644 --- a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp @@ -134,6 +134,8 @@ public: static constexpr double BatchCalcDecay = 0; TDuration BatchTime = AvgBatch; + TDuration OverflowToleranceTimeout = TDuration::Seconds(1); + static constexpr TDuration ActivationPenalty = TDuration::MicroSeconds(10); size_t Wakeups = 0; @@ -212,6 +214,7 @@ struct TComputeScheduler::TImpl { TIntrusivePtr<TKqpCounters> Counters; TDuration SmoothPeriod = TDuration::MilliSeconds(100); + TDuration ForgetInteval = TDuration::Seconds(2); TDuration MaxDelay = TDuration::Seconds(10); @@ -361,10 +364,13 @@ void TComputeScheduler::AdvanceTime(TMonotonic now) { } double delta = 0; - v.Next()->TrackedBefore = Impl->Records[i]->TrackedMicroSeconds.load(); + auto tracked = Impl->Records[i]->TrackedMicroSeconds.load(); v.Next()->MaxLimitDeviation = Impl->SmoothPeriod.MicroSeconds() * v.Next()->Weight; v.Next()->LastNowRecalc = now; - v.Next()->TrackedBefore = Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, v.Next()->TrackedBefore); + v.Next()->TrackedBefore = + Max<ssize_t>( + tracked - FromDuration(Impl->ForgetInteval) * group.get()->Weight, + Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, tracked)); if (!group.get()->Disabled && group.get()->EntitiesWeight > MinEntitiesWeight) { delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight; @@ -425,6 +431,10 @@ void TComputeScheduler::SetMaxDeviation(TDuration period) { Impl->SmoothPeriod = period; } +void TComputeScheduler::SetForgetInterval(TDuration period) { + Impl->ForgetInteval = period; +} + bool TComputeScheduler::Disabled(TString group) { auto ptr = Impl->PoolId.FindPtr(group); return !ptr || Impl->Records[*ptr]->MutableStats.Current().get()->Disabled; diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.h b/ydb/core/kqp/runtime/kqp_compute_scheduler.h index 21f7380db17..d2f706ab94a 100644 --- a/ydb/core/kqp/runtime/kqp_compute_scheduler.h +++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.h @@ -74,6 +74,7 @@ public: void SetPriorities(TDistributionRule rootRule, double cores, TMonotonic now); void SetMaxDeviation(TDuration); + void SetForgetInterval(TDuration); ::NMonitoring::TDynamicCounters::TCounterPtr GetGroupUsageCounter(TString group) const; TSchedulerEntityHandle Enroll(TString group, double weight, TMonotonic now); diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index fe2c90ba132..4ee8ec7bf4a 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -305,7 +305,7 @@ message TTableServiceConfig { message TSubPoolsConfiguration { repeated TComputePoolConfiguration SubPools = 1; - }; + } message TComputePoolConfiguration { optional double MaxCpuShare = 1; @@ -313,8 +313,15 @@ message TTableServiceConfig { string Name = 2; TSubPoolsConfiguration SubPoolsConfiguration = 3; } - }; + } + + message TComputeSchedulerSettings { + optional uint64 AdvanceTimeIntervalUsec = 1 [default = 50000]; + optional uint64 ForgetOverflowTimeoutUsec = 2 [default = 2000000]; + } + optional TComputePoolConfiguration PoolsConfiguration = 68; + optional TComputeSchedulerSettings ComputeSchedulerSettings = 70; optional bool EnableRowsDuplicationCheck = 69 [ default = false ]; }; |