aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMikhail Surin <ssmike@ydb.tech>2024-08-14 19:28:11 +0300
committerGitHub <noreply@github.com>2024-08-14 19:28:11 +0300
commitc70d3ebbec403ebd3f095f2a931d8a061589094c (patch)
tree0d86c2198c00fe67dae6bbd894b11a958c9fd3a9
parent17bb20c4cfb2176c56e808f1ca6faf93175b5af9 (diff)
downloadydb-c70d3ebbec403ebd3f095f2a931d8a061589094c.tar.gz
Allow to forget consumption peaks (#7763)
-rw-r--r--ydb/core/kqp/node_service/kqp_node_service.cpp11
-rw-r--r--ydb/core/kqp/runtime/kqp_compute_scheduler.cpp14
-rw-r--r--ydb/core/kqp/runtime/kqp_compute_scheduler.h1
-rw-r--r--ydb/core/protos/table_service_config.proto11
4 files changed, 30 insertions, 7 deletions
diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp
index 925f615493a..3c0c90df78c 100644
--- a/ydb/core/kqp/node_service/kqp_node_service.cpp
+++ b/ydb/core/kqp/node_service/kqp_node_service.cpp
@@ -85,6 +85,8 @@ public:
SetPriorities(config.GetPoolsConfiguration());
}
Scheduler.ReportCounters(counters);
+ AdvanceTimeInterval = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec());
+ Scheduler.SetForgetInterval(TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec()));
}
void Bootstrap() {
@@ -104,7 +106,7 @@ public:
}
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup(WakeCleaunupTag));
- Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag));
+ Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag));
Become(&TKqpNodeService::WorkState);
}
@@ -343,7 +345,7 @@ private:
void HandleWork(TEvents::TEvWakeup::TPtr& ev) {
if (ev->Get()->Tag == WakeAdvanceTimeTag) {
Scheduler.AdvanceTime(TMonotonic::Now());
- Schedule(TDuration::MilliSeconds(50), new TEvents::TEvWakeup(WakeAdvanceTimeTag));
+ Schedule(AdvanceTimeInterval, new TEvents::TEvWakeup(WakeAdvanceTimeTag));
}
if (ev->Get()->Tag == WakeCleaunupTag) {
Schedule(TDuration::Seconds(1), ev->Release().Release());
@@ -395,9 +397,11 @@ private:
SetPriorities(event.GetConfig().GetTableServiceConfig().GetPoolsConfiguration());
}
+ AdvanceTimeInterval = TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec());
+ Scheduler.SetForgetInterval(TDuration::MicroSeconds(event.GetConfig().GetTableServiceConfig().GetComputeSchedulerSettings().GetForgetOverflowTimeoutUsec()));
+
auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event);
Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
-
}
void SetIteratorReadsQuotaSettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadQuotaSettings& settings) {
@@ -520,6 +524,7 @@ private:
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
TComputeScheduler Scheduler;
+ TDuration AdvanceTimeInterval;
//state sharded by TxId
std::shared_ptr<TNodeServiceState> State_;
diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp
index 3946f405a64..a5764cfabb6 100644
--- a/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp
+++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.cpp
@@ -134,6 +134,8 @@ public:
static constexpr double BatchCalcDecay = 0;
TDuration BatchTime = AvgBatch;
+ TDuration OverflowToleranceTimeout = TDuration::Seconds(1);
+
static constexpr TDuration ActivationPenalty = TDuration::MicroSeconds(10);
size_t Wakeups = 0;
@@ -212,6 +214,7 @@ struct TComputeScheduler::TImpl {
TIntrusivePtr<TKqpCounters> Counters;
TDuration SmoothPeriod = TDuration::MilliSeconds(100);
+ TDuration ForgetInteval = TDuration::Seconds(2);
TDuration MaxDelay = TDuration::Seconds(10);
@@ -361,10 +364,13 @@ void TComputeScheduler::AdvanceTime(TMonotonic now) {
}
double delta = 0;
- v.Next()->TrackedBefore = Impl->Records[i]->TrackedMicroSeconds.load();
+ auto tracked = Impl->Records[i]->TrackedMicroSeconds.load();
v.Next()->MaxLimitDeviation = Impl->SmoothPeriod.MicroSeconds() * v.Next()->Weight;
v.Next()->LastNowRecalc = now;
- v.Next()->TrackedBefore = Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, v.Next()->TrackedBefore);
+ v.Next()->TrackedBefore =
+ Max<ssize_t>(
+ tracked - FromDuration(Impl->ForgetInteval) * group.get()->Weight,
+ Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, tracked));
if (!group.get()->Disabled && group.get()->EntitiesWeight > MinEntitiesWeight) {
delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight;
@@ -425,6 +431,10 @@ void TComputeScheduler::SetMaxDeviation(TDuration period) {
Impl->SmoothPeriod = period;
}
+void TComputeScheduler::SetForgetInterval(TDuration period) {
+ Impl->ForgetInteval = period;
+}
+
bool TComputeScheduler::Disabled(TString group) {
auto ptr = Impl->PoolId.FindPtr(group);
return !ptr || Impl->Records[*ptr]->MutableStats.Current().get()->Disabled;
diff --git a/ydb/core/kqp/runtime/kqp_compute_scheduler.h b/ydb/core/kqp/runtime/kqp_compute_scheduler.h
index 21f7380db17..d2f706ab94a 100644
--- a/ydb/core/kqp/runtime/kqp_compute_scheduler.h
+++ b/ydb/core/kqp/runtime/kqp_compute_scheduler.h
@@ -74,6 +74,7 @@ public:
void SetPriorities(TDistributionRule rootRule, double cores, TMonotonic now);
void SetMaxDeviation(TDuration);
+ void SetForgetInterval(TDuration);
::NMonitoring::TDynamicCounters::TCounterPtr GetGroupUsageCounter(TString group) const;
TSchedulerEntityHandle Enroll(TString group, double weight, TMonotonic now);
diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto
index fe2c90ba132..4ee8ec7bf4a 100644
--- a/ydb/core/protos/table_service_config.proto
+++ b/ydb/core/protos/table_service_config.proto
@@ -305,7 +305,7 @@ message TTableServiceConfig {
message TSubPoolsConfiguration {
repeated TComputePoolConfiguration SubPools = 1;
- };
+ }
message TComputePoolConfiguration {
optional double MaxCpuShare = 1;
@@ -313,8 +313,15 @@ message TTableServiceConfig {
string Name = 2;
TSubPoolsConfiguration SubPoolsConfiguration = 3;
}
- };
+ }
+
+ message TComputeSchedulerSettings {
+ optional uint64 AdvanceTimeIntervalUsec = 1 [default = 50000];
+ optional uint64 ForgetOverflowTimeoutUsec = 2 [default = 2000000];
+ }
+
optional TComputePoolConfiguration PoolsConfiguration = 68;
+ optional TComputeSchedulerSettings ComputeSchedulerSettings = 70;
optional bool EnableRowsDuplicationCheck = 69 [ default = false ];
};