aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordgolear <dgolear@yandex-team.com>2025-06-20 19:32:27 +0300
committerdgolear <dgolear@yandex-team.com>2025-06-20 19:56:56 +0300
commitcaec1b0ede2b901bcf006a403d15e38e1a94b659 (patch)
tree154f961ba752ad72d16b264643709e8d5d3f3a55
parent56da7b93f3951cc9270c8f80e6b7789f77b189ab (diff)
downloadydb-caec1b0ede2b901bcf006a403d15e38e1a94b659.tar.gz
YT-19545: Move overload controller to core/rpc from server/node; move percpu counters from profiling/solomon
\* Changelog entry Type: feature Component: misc-server Generalize overload controller and allow other server components to use it. commit_hash:ff310ddce3b8ee9353746019f1bc404ad9695a90
-rw-r--r--yt/yt/core/rpc/config.cpp62
-rw-r--r--yt/yt/core/rpc/config.h103
-rw-r--r--yt/yt/core/rpc/overload_controller.cpp602
-rw-r--r--yt/yt/core/rpc/overload_controller.h67
-rw-r--r--yt/yt/core/rpc/overload_controlling_service_base-inl.h96
-rw-r--r--yt/yt/core/rpc/overload_controlling_service_base.cpp9
-rw-r--r--yt/yt/core/rpc/overload_controlling_service_base.h45
-rw-r--r--yt/yt/core/rpc/public.h6
-rw-r--r--yt/yt/core/rpc/unittests/main/ya.make1
-rw-r--r--yt/yt/core/rpc/unittests/overload_controller_ut.cpp431
-rw-r--r--yt/yt/core/ya.make6
-rw-r--r--yt/yt/library/profiling/percpu.cpp (renamed from yt/yt/library/profiling/solomon/percpu.cpp)3
-rw-r--r--yt/yt/library/profiling/percpu.h (renamed from yt/yt/library/profiling/solomon/percpu.h)4
-rw-r--r--yt/yt/library/profiling/solomon/helpers.cpp3
-rw-r--r--yt/yt/library/profiling/solomon/registry.cpp6
-rw-r--r--yt/yt/library/profiling/solomon/registry.h10
-rw-r--r--yt/yt/library/profiling/solomon/ya.make3
-rw-r--r--yt/yt/library/profiling/ya.make7
18 files changed, 1444 insertions, 20 deletions
diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp
index 8fca9174023..e43e9a3166a 100644
--- a/yt/yt/core/rpc/config.cpp
+++ b/yt/yt/core/rpc/config.cpp
@@ -376,4 +376,66 @@ void TDispatcherDynamicConfig::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+void TServiceMethod::Register(TRegistrar registrar)
+{
+ registrar.Parameter("service", &TThis::Service)
+ .Default();
+ registrar.Parameter("method", &TThis::Method)
+ .Default();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TServiceMethodConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("service", &TThis::Service)
+ .Default();
+ registrar.Parameter("method", &TThis::Method)
+ .Default();
+ registrar.Parameter("max_window", &TThis::MaxWindow)
+ .Default(1'024);
+ registrar.Parameter("waiting_timeout_fraction", &TThis::WaitingTimeoutFraction)
+ .Default(0.5);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TOverloadTrackerConfigBase::Register(TRegistrar registrar)
+{
+ registrar.Parameter("methods_to_throttle", &TThis::MethodsToThrottle)
+ .Default();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TOverloadTrackerMeanWaitTimeConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("mean_wait_time_threshold", &TThis::MeanWaitTimeThreshold)
+ .Default(TDuration::MilliSeconds(20));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TOverloadTrackerBacklogQueueFillFractionConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("backlog_queue_fill_fraction_threshold", &TThis::BacklogQueueFillFractionThreshold)
+ .Default(0.9);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TOverloadControllerConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("enabled", &TThis::Enabled)
+ .Default(false);
+ registrar.Parameter("trackers", &TThis::Trackers)
+ .Default();
+ registrar.Parameter("methods", &TThis::Methods)
+ .Default();
+ registrar.Parameter("load_adjusting_period", &TThis::LoadAdjustingPeriod)
+ .Default(TDuration::MilliSeconds(100));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h
index f0ddcdb3c6a..4f4734faa26 100644
--- a/yt/yt/core/rpc/config.h
+++ b/yt/yt/core/rpc/config.h
@@ -5,6 +5,7 @@
#include <yt/yt/core/compression/public.h>
#include <yt/yt/core/ytree/yson_struct.h>
+#include <yt/yt/core/ytree/polymorphic_yson_struct.h>
#include <yt/yt/core/concurrency/config.h>
@@ -462,4 +463,106 @@ DEFINE_REFCOUNTED_TYPE(TDispatcherDynamicConfig)
////////////////////////////////////////////////////////////////////////////////
+struct TServiceMethod
+ : public NYTree::TYsonStructLite
+{
+ std::string Service;
+ std::string Method;
+
+ REGISTER_YSON_STRUCT_LITE(TServiceMethod);
+
+ static void Register(TRegistrar registrar);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TServiceMethodConfig
+ : public NYTree::TYsonStruct
+{
+ std::string Service;
+ std::string Method;
+
+ int MaxWindow;
+ double WaitingTimeoutFraction;
+
+ REGISTER_YSON_STRUCT(TServiceMethodConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TServiceMethodConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TOverloadTrackerConfigBase
+ : public NYTree::TYsonStruct
+{
+ std::vector<TServiceMethod> MethodsToThrottle;
+
+ REGISTER_YSON_STRUCT(TOverloadTrackerConfigBase);
+
+ static void Register(TRegistrar registrar);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TOverloadTrackerMeanWaitTimeConfig
+ : public TOverloadTrackerConfigBase
+{
+ TDuration MeanWaitTimeThreshold;
+
+ REGISTER_YSON_STRUCT(TOverloadTrackerMeanWaitTimeConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TOverloadTrackerMeanWaitTimeConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TOverloadTrackerBacklogQueueFillFractionConfig
+ : public TOverloadTrackerConfigBase
+{
+ double BacklogQueueFillFractionThreshold;
+
+ REGISTER_YSON_STRUCT(TOverloadTrackerBacklogQueueFillFractionConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TOverloadTrackerBacklogQueueFillFractionConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
+DEFINE_ENUM(EOverloadTrackerConfigType,
+ (Base)
+ (MeanWaitTime)
+ (BacklogQueueFillFraction)
+);
+
+DEFINE_POLYMORPHIC_YSON_STRUCT_FOR_ENUM_WITH_DEFAULT(OverloadTrackerConfig, EOverloadTrackerConfigType, MeanWaitTime,
+ ((Base) (TOverloadTrackerConfigBase))
+ ((MeanWaitTime) (TOverloadTrackerMeanWaitTimeConfig))
+ ((BacklogQueueFillFraction) (TOverloadTrackerBacklogQueueFillFractionConfig))
+);
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TOverloadControllerConfig
+ : public NYTree::TYsonStruct
+{
+ bool Enabled;
+ THashMap<std::string, TOverloadTrackerConfig> Trackers;
+ std::vector<TServiceMethodConfigPtr> Methods;
+ TDuration LoadAdjustingPeriod;
+
+ REGISTER_YSON_STRUCT(TOverloadControllerConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TOverloadControllerConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/overload_controller.cpp b/yt/yt/core/rpc/overload_controller.cpp
new file mode 100644
index 00000000000..87484a7b405
--- /dev/null
+++ b/yt/yt/core/rpc/overload_controller.cpp
@@ -0,0 +1,602 @@
+#include "overload_controller.h"
+
+#include "config.h"
+#include "private.h"
+
+#include <yt/yt/library/profiling/percpu.h>
+
+#include <yt/yt/core/concurrency/action_queue.h>
+#include <yt/yt/core/concurrency/periodic_executor.h>
+#include <yt/yt/core/concurrency/two_level_fair_share_thread_pool.h>
+
+#include <yt/yt/core/logging/log_manager.h>
+
+#include <yt/yt/core/misc/proc.h>
+
+#include <yt/yt/core/profiling/timing.h>
+
+#include <library/cpp/yt/threading/spin_lock.h>
+
+namespace NYT::NRpc {
+
+using namespace NThreading;
+using namespace NConcurrency;
+using namespace NLogging;
+using namespace NProfiling;
+
+////////////////////////////////////////////////////////////////////////////////
+
+static auto Logger = RpcServerLogger().WithTag("OverloadController");
+static const std::string CpuThrottlingTrackerName = "CpuThrottling";
+static const std::string LogDropTrackerName = "LogDrop";
+static const std::string ControlGroupCpuName = "cpu";
+
+////////////////////////////////////////////////////////////////////////////////
+
+DECLARE_REFCOUNTED_CLASS(TOverloadTracker)
+
+class TOverloadTracker
+ : public TRefCounted
+{
+public:
+ TOverloadTracker(TStringBuf type, TStringBuf id)
+ : Type_(type)
+ , Id_(id)
+ { }
+
+ virtual bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) = 0;
+
+ const std::string& GetType() const
+ {
+ return Type_;
+ }
+
+protected:
+ const std::string Type_;
+ const std::string Id_;
+
+ TCounter Overloaded_;
+};
+
+DEFINE_REFCOUNTED_TYPE(TOverloadTracker);
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TMeanWaitTimeTracker
+ : public TOverloadTracker
+{
+public:
+ TMeanWaitTimeTracker(TStringBuf type, TStringBuf id, const TProfiler& profiler)
+ : TOverloadTracker(type, id)
+ , Counter_(New<TPerCpuDurationSummary>())
+ {
+ auto taggedProfiler = profiler.WithTag("tracker", std::string(Id_));
+
+ Overloaded_ = taggedProfiler.Counter("/overloaded");
+ MeanWaitTime_ = profiler.Timer("/mean_wait_time");
+ MeanWaitTimeThreshold_ = profiler.TimeGauge("/mean_wait_time_threshold");
+ }
+
+ void Record(TDuration waitTime)
+ {
+ Counter_->Record(waitTime);
+ }
+
+ virtual bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) override
+ {
+ auto summary = Counter_->GetSummaryAndReset();
+ TDuration meanValue;
+
+ if (summary.Count()) {
+ meanValue = summary.Sum() / summary.Count();
+ }
+
+ YT_LOG_DEBUG("Reporting mean wait time for tracker "
+ "(Tracker: %v, TotalWaitTime: %v, TotalCount: %v, MeanValue: %v)",
+ Id_,
+ summary.Sum(),
+ summary.Count(),
+ meanValue);
+
+ return ProfileAndGetOverloaded(meanValue, config.TryGetConcrete<TOverloadTrackerMeanWaitTimeConfig>());
+ }
+
+protected:
+ bool ProfileAndGetOverloaded(TDuration meanValue, const TOverloadTrackerMeanWaitTimeConfigPtr& config)
+ {
+ bool overloaded = meanValue > config->MeanWaitTimeThreshold;
+
+ Overloaded_.Increment(static_cast<int>(overloaded));
+ MeanWaitTime_.Record(meanValue);
+ MeanWaitTimeThreshold_.Update(config->MeanWaitTimeThreshold);
+
+ return overloaded;
+ }
+
+private:
+ using TPerCpuDurationSummary = TPerCpuSummary<TDuration>;
+
+ TIntrusivePtr<TPerCpuDurationSummary> Counter_;
+
+ TEventTimer MeanWaitTime_;
+ TTimeGauge MeanWaitTimeThreshold_;
+};
+
+DEFINE_REFCOUNTED_TYPE(TMeanWaitTimeTracker);
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TContainerCpuThrottlingTracker
+ : public TMeanWaitTimeTracker
+{
+public:
+ using TMeanWaitTimeTracker::TMeanWaitTimeTracker;
+
+ bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) override
+ {
+ auto cpuStats = GetStats();
+ if (!cpuStats) {
+ return {};
+ }
+
+ TDuration throttlingTime;
+
+ if (LastCpuStats_) {
+ auto throttlingDelta = cpuStats->ThrottledTime - LastCpuStats_->ThrottledTime;
+ throttlingTime = TDuration::MicroSeconds(throttlingDelta / 1000);
+
+ YT_LOG_DEBUG("Reporting container CPU throttling time "
+ "(LastCpuThrottlingTime: %v, CpuThrottlingTime: %v, ThrottlingDelta: %v, ThrottlingTime: %v)",
+ LastCpuStats_->ThrottledTime,
+ cpuStats->ThrottledTime,
+ throttlingDelta,
+ throttlingTime);
+ }
+
+ LastCpuStats_ = cpuStats;
+
+ return ProfileAndGetOverloaded(throttlingTime, config.TryGetConcrete<TOverloadTrackerMeanWaitTimeConfig>());
+ }
+
+private:
+ std::optional<TCgroupCpuStat> LastCpuStats_;
+ bool CgroupErrorLogged_ = false;
+
+ std::optional<TCgroupCpuStat> GetStats()
+ {
+ try {
+ auto cgroups = GetProcessCgroups();
+ for (const auto& group : cgroups) {
+ for (const auto& controller : group.Controllers) {
+ if (controller == ControlGroupCpuName) {
+ return GetCgroupCpuStat(group.ControllersName, group.Path);
+ }
+ }
+ }
+ } catch (const std::exception& ex) {
+ if (!CgroupErrorLogged_) {
+ YT_LOG_INFO(ex, "Failed to collect cgroup CPU statistics");
+ CgroupErrorLogged_ = true;
+ }
+ }
+
+ return {};
+ }
+};
+
+DEFINE_REFCOUNTED_TYPE(TContainerCpuThrottlingTracker);
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TLogDropTracker
+ : public TOverloadTracker
+{
+public:
+ TLogDropTracker(TStringBuf type, TStringBuf id, const TProfiler& profiler)
+ : TOverloadTracker(type, id)
+ {
+ auto taggedProfiler = profiler.WithTag("tracker", Id_);
+
+ Overloaded_ = taggedProfiler.Counter("/overloaded");
+ BacklogQueueFillFraction_ = profiler.Gauge("/backlog_queue_fill_fraction");
+ BacklogQueueFillFractionThreshold_ = profiler.Gauge("/backlog_queue_fill_fraction_threshold");
+ }
+
+ bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) override
+ {
+ double BacklogQueueFillFraction = TLogManager::Get()->GetBacklogQueueFillFraction();
+
+ YT_LOG_DEBUG("Reporting logging queue filling fraction "
+ "(BacklogQueueFillFraction: %v)",
+ BacklogQueueFillFraction);
+
+ const auto& logDropConfig = config.TryGetConcrete<TOverloadTrackerBacklogQueueFillFractionConfig>();
+
+ bool overloaded = BacklogQueueFillFraction > logDropConfig->BacklogQueueFillFractionThreshold;
+
+ Overloaded_.Increment(static_cast<int>(overloaded));
+ BacklogQueueFillFraction_.Update(BacklogQueueFillFraction);
+ BacklogQueueFillFractionThreshold_.Update(logDropConfig->BacklogQueueFillFractionThreshold);
+
+ return overloaded;
+ }
+
+private:
+ TGauge BacklogQueueFillFraction_;
+ TGauge BacklogQueueFillFractionThreshold_;
+};
+
+DEFINE_REFCOUNTED_TYPE(TLogDropTracker);
+
+////////////////////////////////////////////////////////////////////////////////
+
+bool ShouldThrottleCall(const TCongestionState& congestionState)
+{
+ if (!congestionState.MaxWindow || !congestionState.CurrentWindow) {
+ return false;
+ }
+
+ return static_cast<int>(RandomNumber<ui32>(*congestionState.MaxWindow)) + 1 > *congestionState.CurrentWindow;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TCongestionController
+ : public TRefCounted
+{
+public:
+ TCongestionController(TServiceMethodConfigPtr methodConfig, TOverloadControllerConfigPtr config, TProfiler profiler)
+ : MethodConfig_(std::move(methodConfig))
+ , Config_(std::move(config))
+ , MaxWindow_(MethodConfig_->MaxWindow)
+ , Window_(MaxWindow_)
+ , WindowGauge_(profiler.Gauge("/window"))
+ , SlowStartThresholdGauge_(profiler.Gauge("/slow_start_threshold_gauge"))
+ , MaxWindowGauge_(profiler.Gauge("/max_window"))
+ { }
+
+ TCongestionState GetCongestionState()
+ {
+ auto result = TCongestionState{
+ .CurrentWindow = Window_.load(std::memory_order::relaxed),
+ .MaxWindow = MaxWindow_,
+ .WaitingTimeoutFraction = MethodConfig_->WaitingTimeoutFraction,
+ };
+
+ auto now = NProfiling::GetCpuInstant();
+ auto recentlyOverloadedThreshold = Config_->LoadAdjustingPeriod * MethodConfig_->MaxWindow;
+
+ for (const auto& [trackerType, lastOverloaded] : OverloadedTrackers_) {
+ auto sinceLastOverloaded = CpuDurationToDuration(now - lastOverloaded.load(std::memory_order::relaxed));
+ if (sinceLastOverloaded < recentlyOverloadedThreshold) {
+ result.OverloadedTrackers.push_back(trackerType);
+ }
+ }
+
+ return result;
+ }
+
+ void Adjust(const THashSet<std::string>& overloadedTrackers, TCpuInstant timestamp)
+ {
+ auto window = Window_.load(std::memory_order::relaxed);
+
+ // NB. We reporting here slightly outdated values but this makes code simpler a bit.
+ WindowGauge_.Update(window);
+ MaxWindowGauge_.Update(MaxWindow_);
+ SlowStartThresholdGauge_.Update(SlowStartThreshold_);
+
+ for (const auto& tracker : overloadedTrackers) {
+ auto it = OverloadedTrackers_.find(tracker);
+ if (it != OverloadedTrackers_.end()) {
+ it->second.store(timestamp, std::memory_order::relaxed);
+ }
+ }
+
+ auto overloaded = !overloadedTrackers.empty();
+
+ if (overloaded) {
+ SlowStartThreshold_ = window > 0 ? window / 2 : SlowStartThreshold_;
+ Window_.store(0, std::memory_order::relaxed);
+
+ YT_LOG_WARNING("System is overloaded (SlowStartThreshold: %v, Window: %v, OverloadedTrackers: %v)",
+ SlowStartThreshold_,
+ window,
+ overloadedTrackers);
+ return;
+ }
+
+ if (window >= SlowStartThreshold_) {
+ ++window;
+ } else {
+ window *= 2;
+ window = std::min(SlowStartThreshold_, window);
+ }
+
+ // Keeping window in sane limits.
+ window = std::min(MaxWindow_, window);
+ window = std::max(1, window);
+
+ YT_LOG_DEBUG("Adjusting system load up (SlowStartThreshold: %v, CurrentWindow: %v)",
+ SlowStartThreshold_,
+ window);
+
+ Window_.store(window, std::memory_order::relaxed);
+ }
+
+ void AddTrackerType(std::string trackerType)
+ {
+ // Called only during initial construction of controllers,
+ // so we do not have to serialize here.
+ OverloadedTrackers_[trackerType] = {};
+ }
+
+private:
+ const TServiceMethodConfigPtr MethodConfig_;
+ const TOverloadControllerConfigPtr Config_;
+ const int MaxWindow_;
+
+ std::atomic<int> Window_;
+ int SlowStartThreshold_ = 0;
+ THashMap<std::string, std::atomic<TCpuInstant>> OverloadedTrackers_;
+
+ TCounter SkippedRequestCount_;
+ TGauge WindowGauge_;
+ TGauge SlowStartThresholdGauge_;
+ TGauge MaxWindowGauge_;
+};
+
+DEFINE_REFCOUNTED_TYPE(TCongestionController);
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TOverloadController
+ : public IOverloadController
+{
+public:
+ DEFINE_SIGNAL_OVERRIDE(void(), LoadAdjusted);
+
+ TOverloadController(TOverloadControllerConfigPtr config, NProfiling::TProfiler profiler)
+ : ControlThread_(New<TActionQueue>("OverloadCtl"))
+ , Invoker_(ControlThread_->GetInvoker())
+ , Periodic_(New<TPeriodicExecutor>(
+ Invoker_,
+ BIND(&TOverloadController::Adjust, MakeWeak(this)),
+ config->LoadAdjustingPeriod))
+ , Profiler_(std::move(profiler))
+ {
+ State_.Config = std::move(config);
+ CreateGenericTracker<TContainerCpuThrottlingTracker>(CpuThrottlingTrackerName);
+ CreateGenericTracker<TLogDropTracker>(LogDropTrackerName);
+ UpdateStateSnapshot(State_, Guard(SpinLock_));
+ }
+
+ void Start() override
+ {
+ Periodic_->Start();
+ }
+
+ void TrackInvoker(
+ TStringBuf name,
+ const IInvokerPtr& invoker) override
+ {
+ invoker->SubscribeWaitTimeObserved(CreateGenericWaitTimeObserver(name));
+ }
+
+ void TrackFSHThreadPool(
+ TStringBuf name,
+ const NConcurrency::ITwoLevelFairShareThreadPoolPtr& threadPool) override
+ {
+ threadPool->SubscribeWaitTimeObserved(CreateGenericWaitTimeObserver(name));
+ }
+
+ IInvoker::TWaitTimeObserver CreateGenericWaitTimeObserver(
+ TStringBuf trackerType,
+ std::optional<TStringBuf> id = {}) override
+ {
+ auto tracker = CreateGenericTracker<TMeanWaitTimeTracker>(std::move(trackerType), std::move(id));
+
+ return BIND([tracker] (TDuration waitTime) {
+ tracker->Record(waitTime);
+ });
+ }
+
+ TCongestionState GetCongestionState(TStringBuf service, TStringBuf method) const override
+ {
+ auto snapshot = GetStateSnapshot();
+ if (!snapshot->Config->Enabled) {
+ return {};
+ }
+
+ const auto& controllers = snapshot->CongestionControllers;
+ if (auto it = controllers.find(std::pair(service, method)); it != controllers.end()) {
+ return it->second->GetCongestionState();
+ }
+
+ return {};
+ }
+
+ void Reconfigure(TOverloadControllerConfigPtr config) override
+ {
+ Periodic_->SetPeriod(config->LoadAdjustingPeriod);
+
+ auto guard = Guard(SpinLock_);
+ State_.CongestionControllers = CreateCongestionControllers(config, Profiler_);
+ State_.Config = std::move(config);
+
+ UpdateStateSnapshot(State_, std::move(guard));
+ }
+
+private:
+ using TMethodIndex = std::pair<TString, TString>;
+ using TMethodsCongestionControllers = THashMap<TMethodIndex, TCongestionControllerPtr>;
+
+ struct TState final
+ {
+ static constexpr bool EnableHazard = true;
+
+ TOverloadControllerConfigPtr Config;
+ TMethodsCongestionControllers CongestionControllers;
+ THashMap<TString, TOverloadTrackerPtr> Trackers;
+ };
+
+ using TSpinLockGuard = TGuard<NThreading::TSpinLock>;
+
+ const NConcurrency::TActionQueuePtr ControlThread_;
+ const IInvokerPtr Invoker_;
+ const NConcurrency::TPeriodicExecutorPtr Periodic_;
+ const NProfiling::TProfiler Profiler_;
+
+ TAtomicPtr<TState, /*EnableAcquireHazard*/ true> StateSnapshot_;
+
+ NThreading::TSpinLock SpinLock_;
+ TState State_;
+
+ void Adjust()
+ {
+ DoAdjust(GetStateSnapshot());
+ LoadAdjusted_.Fire();
+ }
+
+ void DoAdjust(const THazardPtr<TState>& state)
+ {
+ YT_ASSERT_INVOKER_AFFINITY(Invoker_);
+
+ auto now = NProfiling::GetCpuInstant();
+
+ using TOverloadedTrackers = THashSet<std::string>;
+ THashMap<TMethodIndex, TOverloadedTrackers> methodOverloaded;
+
+ const auto& config = state->Config;
+
+ for (const auto& [_, trackerConfig] : config->Trackers) {
+ for (const auto& method : trackerConfig->MethodsToThrottle) {
+ methodOverloaded[std::pair(method.Service, method.Method)] = {};
+ }
+ }
+
+ for (const auto& [trackerId, tracker] : state->Trackers) {
+ auto trackerIt = config->Trackers.find(tracker->GetType());
+ if (trackerIt == config->Trackers.end()) {
+ continue;
+ }
+
+ const auto& trackerConfig = trackerIt->second;
+
+ auto trackerOverloaded = tracker->CalculateIsOverloaded(trackerConfig);
+
+ if (!trackerOverloaded) {
+ continue;
+ }
+
+ for (const auto& method : trackerIt->second->MethodsToThrottle) {
+ auto& overloadedTrackers = methodOverloaded[std::pair(method.Service, method.Method)];
+ overloadedTrackers.insert(tracker->GetType());
+ }
+ }
+
+ for (const auto& [method, overloadedTrackers] : methodOverloaded) {
+ auto it = state->CongestionControllers.find(method);
+ if (it == state->CongestionControllers.end()) {
+ YT_LOG_WARNING("Cannot find congestion controller for method (Service: %v, Method: %v)",
+ method.first,
+ method.second);
+
+ continue;
+ }
+
+ it->second->Adjust(overloadedTrackers, now);
+ }
+ }
+
+ THazardPtr<TOverloadController::TState> GetStateSnapshot() const
+ {
+ YT_VERIFY(StateSnapshot_);
+
+ return StateSnapshot_.AcquireHazard();
+ }
+
+ void UpdateStateSnapshot(const TState& state, TSpinLockGuard guard)
+ {
+ auto snapshot = New<TState>(state);
+ guard.Release();
+
+ StateSnapshot_.Store(std::move(snapshot));
+ ReclaimHazardPointers();
+ }
+
+ template <class TTracker>
+ TIntrusivePtr<TTracker> CreateGenericTracker(TStringBuf trackerType, std::optional<TStringBuf> id = {})
+ {
+ YT_LOG_DEBUG("Creating overload tracker (TrackerType: %v, Id: %v)",
+ trackerType,
+ id);
+
+ auto trackerId = id.value_or(trackerType);
+
+ auto profiler = Profiler_.WithTag("tracker", std::string(trackerId));
+
+ auto tracker = New<TTracker>(trackerType, trackerId, profiler);
+
+ auto guard = Guard(SpinLock_);
+
+ State_.Trackers[trackerId] = tracker;
+
+ UpdateStateSnapshot(State_, std::move(guard));
+
+ return tracker;
+ }
+
+ static TMethodsCongestionControllers CreateCongestionControllers(
+ const TOverloadControllerConfigPtr& config,
+ NProfiling::TProfiler profiler)
+ {
+ TMethodsCongestionControllers controllers;
+
+ THashMap<TMethodIndex, TServiceMethodConfigPtr> configIndex;
+ for (const auto& methodConfig : config->Methods) {
+ configIndex[std::pair(methodConfig->Service, methodConfig->Method)] = methodConfig;
+ }
+
+ auto getConfig = [&configIndex] (TStringBuf service, TStringBuf method) {
+ auto it = configIndex.find(std::pair(service, method));
+ if (it != configIndex.end()) {
+ return it->second;
+ }
+
+ auto defaultConfig = New<TServiceMethodConfig>();
+ defaultConfig->Service = service;
+ defaultConfig->Method = method;
+ return defaultConfig;
+ };
+
+ for (const auto& [trackerType, tracker] : config->Trackers) {
+ for (const auto& method : tracker->MethodsToThrottle) {
+ auto& controller = controllers[std::pair(method.Service, method.Method)];
+
+ if (!controller) {
+ auto methodConfig = getConfig(method.Service, method.Method);
+ auto controllerProfiler = profiler
+ .WithTag("yt_service", method.Service)
+ .WithTag("method", method.Method);
+
+ controller = New<TCongestionController>(
+ std::move(methodConfig),
+ config, std::move(controllerProfiler));
+ }
+
+ controller->AddTrackerType(trackerType);
+ }
+ }
+
+ return controllers;
+ }
+};
+
+IOverloadControllerPtr CreateOverloadController(TOverloadControllerConfigPtr config, NProfiling::TProfiler profiler)
+{
+ return New<TOverloadController>(std::move(config), std::move(profiler));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/overload_controller.h b/yt/yt/core/rpc/overload_controller.h
new file mode 100644
index 00000000000..708ec833271
--- /dev/null
+++ b/yt/yt/core/rpc/overload_controller.h
@@ -0,0 +1,67 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/library/profiling/sensor.h>
+
+#include <yt/yt/core/concurrency/public.h>
+
+#include <yt/yt/core/misc/atomic_ptr.h>
+
+#include <library/cpp/yt/compact_containers/compact_vector.h>
+
+#include <library/cpp/yt/threading/public.h>
+
+namespace NYT::NRpc {
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Overall rpc-method congestion state.
+// Can be a a measure of allowed concurrency or probability of acceptance more calls.
+struct TCongestionState
+{
+ std::optional<int> CurrentWindow;
+ std::optional<int> MaxWindow;
+ double WaitingTimeoutFraction = 0;
+
+ using TTrackersList = TCompactVector<std::string, 4>;
+ TTrackersList OverloadedTrackers;
+};
+
+// Probabilistic predicate based on congestion state of the the method.
+bool ShouldThrottleCall(const TCongestionState& congestionState);
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct IOverloadController
+ : public TRefCounted
+{
+public:
+ DECLARE_INTERFACE_SIGNAL(void(), LoadAdjusted);
+
+ virtual void Start() = 0;
+ virtual void Reconfigure(TOverloadControllerConfigPtr config) = 0;
+
+ virtual void TrackInvoker(
+ TStringBuf name,
+ const IInvokerPtr& invoker) = 0;
+ virtual void TrackFSHThreadPool(
+ TStringBuf name,
+ const NConcurrency::ITwoLevelFairShareThreadPoolPtr& threadPool) = 0;
+
+ virtual IInvoker::TWaitTimeObserver CreateGenericWaitTimeObserver(
+ TStringBuf trackerType,
+ std::optional<TStringBuf> id = {}) = 0;
+
+ virtual TCongestionState GetCongestionState(TStringBuf service, TStringBuf method) const = 0;
+};
+
+DEFINE_REFCOUNTED_TYPE(IOverloadController);
+
+IOverloadControllerPtr CreateOverloadController(
+ TOverloadControllerConfigPtr config,
+ NProfiling::TProfiler profiler = {});
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/overload_controlling_service_base-inl.h b/yt/yt/core/rpc/overload_controlling_service_base-inl.h
new file mode 100644
index 00000000000..2a7a1b1b9dc
--- /dev/null
+++ b/yt/yt/core/rpc/overload_controlling_service_base-inl.h
@@ -0,0 +1,96 @@
+#ifndef OVERLOAD_CONTROLLING_SERVICE_BASE_INL_H_
+#error "Direct inclusion of this file is not allowed, include overload_controlling_service_base.h"
+// For the sake of sane code completion.
+#include "overload_controlling_service_base.h"
+#endif
+
+#include "overload_controller.h"
+
+#include <yt/yt/core/concurrency/delayed_executor.h>
+
+namespace NYT::NRpc {
+
+using namespace NConcurrency;
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TBaseService>
+template <typename... TArgs>
+TOverloadControllingServiceBase<TBaseService>::TOverloadControllingServiceBase(
+ IOverloadControllerPtr controller,
+ TArgs&&... args)
+ : TBaseService(std::forward<TArgs>(args)...)
+ , Controller_(std::move(controller))
+{
+ YT_VERIFY(Controller_);
+}
+
+template <class TBaseService>
+void TOverloadControllingServiceBase<TBaseService>::SubscribeLoadAdjusted()
+{
+ Controller_->SubscribeLoadAdjusted(BIND(
+ &TOverloadControllingServiceBase::HandleLoadAdjusted,
+ MakeWeak(this)));
+}
+
+template <class TBaseService>
+auto TOverloadControllingServiceBase<TBaseService>::RegisterMethod(
+ const TMethodDescriptor& descriptor) -> TRuntimeMethodInfoPtr
+{
+ Methods_.insert(descriptor.Method);
+ return TBaseService::RegisterMethod(descriptor);
+}
+
+template <class TBaseService>
+void TOverloadControllingServiceBase<TBaseService>::HandleLoadAdjusted()
+{
+ const auto& serviceName = TBaseService::GetServiceId().ServiceName;
+
+ for (const auto& method : Methods_) {
+ auto* runtimeInfo = TBaseService::FindMethodInfo(method);
+ YT_VERIFY(runtimeInfo);
+
+ auto congestionState = Controller_->GetCongestionState(serviceName, method);
+ runtimeInfo->ConcurrencyLimit.SetDynamicLimit(congestionState.CurrentWindow);
+ runtimeInfo->WaitingTimeoutFraction.store(
+ congestionState.WaitingTimeoutFraction,
+ std::memory_order::relaxed);
+ }
+}
+
+template <class TBaseService>
+std::optional<TError> TOverloadControllingServiceBase<TBaseService>::GetThrottledError(
+ const NRpc::NProto::TRequestHeader& requestHeader)
+{
+ auto congestionState = Controller_->GetCongestionState(requestHeader.service(), requestHeader.method());
+ const auto& overloadedTrackers = congestionState.OverloadedTrackers;
+
+ if (!overloadedTrackers.empty()) {
+ return TError(NRpc::EErrorCode::Overloaded, "Instance is overloaded")
+ << TErrorAttribute("overloaded_trackers", overloadedTrackers);
+ }
+
+ return TBaseService::GetThrottledError(requestHeader);
+}
+
+template <class TBaseService>
+void TOverloadControllingServiceBase<TBaseService>::HandleRequest(
+ std::unique_ptr<NRpc::NProto::TRequestHeader> header,
+ TSharedRefArray message,
+ NBus::IBusPtr replyBus)
+{
+ auto congestionState = Controller_->GetCongestionState(
+ header->service(),
+ header->method());
+
+ if (ShouldThrottleCall(congestionState)) {
+ // Give other handling routines chance to execute.
+ NConcurrency::Yield();
+ }
+
+ TBaseService::HandleRequest(std::move(header), std::move(message), std::move(replyBus));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/overload_controlling_service_base.cpp b/yt/yt/core/rpc/overload_controlling_service_base.cpp
new file mode 100644
index 00000000000..76b559bc726
--- /dev/null
+++ b/yt/yt/core/rpc/overload_controlling_service_base.cpp
@@ -0,0 +1,9 @@
+#include "overload_controlling_service_base.h"
+
+namespace NYT::NRpc {
+
+////////////////////////////////////////////////////////////////////////////////
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/overload_controlling_service_base.h b/yt/yt/core/rpc/overload_controlling_service_base.h
new file mode 100644
index 00000000000..7b8c95c007e
--- /dev/null
+++ b/yt/yt/core/rpc/overload_controlling_service_base.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/core/rpc/service_detail.h>
+
+namespace NYT::NRpc {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TBaseService>
+class TOverloadControllingServiceBase
+ : public TBaseService
+{
+public:
+ template <typename... TArgs>
+ explicit TOverloadControllingServiceBase(IOverloadControllerPtr controller, TArgs&&... args);
+
+ using TRuntimeMethodInfoPtr = NRpc::TServiceBase::TRuntimeMethodInfoPtr;
+ using TMethodDescriptor = NRpc::TServiceBase::TMethodDescriptor;
+
+ TRuntimeMethodInfoPtr RegisterMethod(const TMethodDescriptor& descriptor) override;
+ void SubscribeLoadAdjusted();
+
+protected:
+ std::optional<TError> GetThrottledError(const NRpc::NProto::TRequestHeader& requestHeader) override;
+ void HandleRequest(
+ std::unique_ptr<NRpc::NProto::TRequestHeader> header,
+ TSharedRefArray message,
+ NBus::IBusPtr replyBus) override;
+
+private:
+ IOverloadControllerPtr Controller_;
+ THashSet<std::string> Methods_;
+
+ void HandleLoadAdjusted();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NRpc
+
+#define OVERLOAD_CONTROLLING_SERVICE_BASE_INL_H_
+#include "overload_controlling_service_base-inl.h"
+#undef OVERLOAD_CONTROLLING_SERVICE_BASE_INL_H_
diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h
index 5955185d368..5f89abd52d9 100644
--- a/yt/yt/core/rpc/public.h
+++ b/yt/yt/core/rpc/public.h
@@ -70,6 +70,7 @@ DECLARE_REFCOUNTED_STRUCT(IChannelFactory)
DECLARE_REFCOUNTED_STRUCT(IRoamingChannelProvider)
DECLARE_REFCOUNTED_STRUCT(IAuthenticator)
DECLARE_REFCOUNTED_STRUCT(IResponseKeeper)
+DECLARE_REFCOUNTED_STRUCT(IOverloadController)
DECLARE_REFCOUNTED_CLASS(TClientContext)
DECLARE_REFCOUNTED_CLASS(TServiceBase)
@@ -77,6 +78,7 @@ DECLARE_REFCOUNTED_CLASS(TChannelWrapper)
DECLARE_REFCOUNTED_CLASS(TStaticChannelFactory)
DECLARE_REFCOUNTED_CLASS(TClientRequestControlThunk)
DECLARE_REFCOUNTED_CLASS(TCachingChannelFactory)
+DECLARE_REFCOUNTED_CLASS(TCongestionController)
DECLARE_REFCOUNTED_CLASS(TAttachmentsInputStream)
DECLARE_REFCOUNTED_CLASS(TAttachmentsOutputStream)
@@ -127,6 +129,10 @@ DECLARE_REFCOUNTED_STRUCT(TThrottlingChannelDynamicConfig)
DECLARE_REFCOUNTED_STRUCT(TResponseKeeperConfig)
DECLARE_REFCOUNTED_STRUCT(TDispatcherConfig)
DECLARE_REFCOUNTED_STRUCT(TDispatcherDynamicConfig)
+DECLARE_REFCOUNTED_STRUCT(TServiceMethodConfig)
+DECLARE_REFCOUNTED_STRUCT(TOverloadTrackerMeanWaitTimeConfig)
+DECLARE_REFCOUNTED_STRUCT(TOverloadTrackerBacklogQueueFillFractionConfig)
+DECLARE_REFCOUNTED_STRUCT(TOverloadControllerConfig)
struct TRequestQueueThrottlerConfigs
{
diff --git a/yt/yt/core/rpc/unittests/main/ya.make b/yt/yt/core/rpc/unittests/main/ya.make
index 335a91fa4aa..226e2da0215 100644
--- a/yt/yt/core/rpc/unittests/main/ya.make
+++ b/yt/yt/core/rpc/unittests/main/ya.make
@@ -5,6 +5,7 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
PROTO_NAMESPACE(yt)
SRCS(
+ yt/yt/core/rpc/unittests/overload_controller_ut.cpp
yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp
yt/yt/core/rpc/unittests/roaming_channel_ut.cpp
yt/yt/core/rpc/unittests/rpc_ut.cpp
diff --git a/yt/yt/core/rpc/unittests/overload_controller_ut.cpp b/yt/yt/core/rpc/unittests/overload_controller_ut.cpp
new file mode 100644
index 00000000000..8a6e39aeb43
--- /dev/null
+++ b/yt/yt/core/rpc/unittests/overload_controller_ut.cpp
@@ -0,0 +1,431 @@
+#include <yt/yt/core/rpc/config.h>
+#include <yt/yt/core/rpc/overload_controller.h>
+
+#include <yt/yt/core/concurrency/action_queue.h>
+#include <yt/yt/core/concurrency/two_level_fair_share_thread_pool.h>
+
+#include <yt/yt/core/test_framework/framework.h>
+
+namespace NYT::NRpc {
+namespace {
+
+using namespace NConcurrency;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TMockInvoker
+ : public IInvoker
+{
+public:
+ DEFINE_SIGNAL_OVERRIDE(TWaitTimeObserver::TSignature, WaitTimeObserved);
+
+public:
+ void Invoke(TClosure /*callback*/) override
+ { }
+
+ void Invoke(TMutableRange<TClosure> /*callbacks*/) override
+ { }
+
+ bool CheckAffinity(const IInvokerPtr& /*invoker*/) const override
+ {
+ return false;
+ }
+
+ bool IsSerialized() const override
+ {
+ return true;
+ }
+
+ NThreading::TThreadId GetThreadId() const override
+ {
+ return {};
+ }
+
+ void FireWaitTimeObserved(TDuration waitTime)
+ {
+ WaitTimeObserved_.Fire(waitTime);
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TMethodInfo
+{
+ TString Service;
+ TString Method;
+ double WaitingTimeoutFraction = 0;
+};
+
+using TMethodInfoList = std::vector<TMethodInfo>;
+
+constexpr auto MeanWaitTimeThreshold = TDuration::MilliSeconds(20);
+
+TOverloadControllerConfigPtr CreateConfig(const THashMap<TString, TMethodInfoList>& schema)
+{
+ auto config = New<TOverloadControllerConfig>();
+ config->Enabled = true;
+
+ for (const auto& [trackerName, methods] : schema) {
+ TOverloadTrackerConfig trackerConfig(EOverloadTrackerConfigType::MeanWaitTime);
+ auto trackerMeanWaitTimeConfig = trackerConfig.TryGetConcrete<TOverloadTrackerMeanWaitTimeConfig>();
+
+ for (const auto& methodInfo : methods) {
+ {
+ TServiceMethod serviceMethod;
+ serviceMethod.Service = methodInfo.Service;
+ serviceMethod.Method = methodInfo.Method;
+ trackerMeanWaitTimeConfig->MethodsToThrottle.push_back(std::move(serviceMethod));
+ }
+ {
+ auto serviceMethodConfig = New<TServiceMethodConfig>();
+ serviceMethodConfig->Service = methodInfo.Service;
+ serviceMethodConfig->Method = methodInfo.Method;
+ serviceMethodConfig->WaitingTimeoutFraction = methodInfo.WaitingTimeoutFraction;
+ config->Methods.push_back(std::move(serviceMethodConfig));
+ }
+ trackerMeanWaitTimeConfig->MeanWaitTimeThreshold = MeanWaitTimeThreshold;
+ }
+
+ config->Trackers[trackerName] = trackerConfig;
+ }
+
+ return config;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TOverloadControllerTest, TestOverloadsRequests)
+{
+ auto controller = CreateOverloadController(New<TOverloadControllerConfig>());
+ auto mockInvoker = New<TMockInvoker>();
+ auto mockInvoker2 = New<TMockInvoker>();
+
+ controller->TrackInvoker("Mock", mockInvoker);
+ controller->TrackInvoker("Mock2", mockInvoker2);
+
+ auto config = CreateConfig({
+ {"Mock", {{"MockService", "MockMethod"}}},
+ {"Mock2", {{"MockService", "MockMethod2"}}},
+ });
+ config->LoadAdjustingPeriod = TDuration::MilliSeconds(1);
+ controller->Reconfigure(config);
+ controller->Start();
+
+ // Simulate overload
+ for (int i = 0; i < 5000; ++i) {
+ mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold * 2);
+ }
+
+ // Check overload incoming requests
+ int remainsCount = 1000;
+ while (remainsCount > 0) {
+ EXPECT_FALSE(ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod2")));
+
+ auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod"));
+ if (overloaded) {
+ --remainsCount;
+ } else {
+ Sleep(TDuration::MicroSeconds(10));
+ }
+ }
+
+ // Check recovering even if no calls
+ while (remainsCount < 1000) {
+ auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod"));
+ if (!overloaded) {
+ ++remainsCount;
+ } else {
+ Sleep(TDuration::MicroSeconds(1));
+ }
+ }
+}
+
+TEST(TOverloadControllerTest, TestNoOverloads)
+{
+ auto controller = CreateOverloadController(New<TOverloadControllerConfig>());
+ auto mockInvoker = New<TMockInvoker>();
+
+ controller->TrackInvoker("Mock", mockInvoker);
+
+ auto config = CreateConfig({
+ {"Mock", {{"MockService", "MockMethod"}}}
+ });
+ config->LoadAdjustingPeriod = TDuration::MilliSeconds(1);
+
+ controller->Reconfigure(config);
+ controller->Start();
+
+ // Simulate overload
+ for (int i = 0; i < 5000; ++i) {
+ mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold / 2);
+ }
+
+ for (int i = 0; i < 10000; ++i) {
+ EXPECT_FALSE(ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod")));
+ mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold / 2);
+
+ Sleep(TDuration::MicroSeconds(10));
+ }
+}
+
+TEST(TOverloadControllerTest, TestTwoInvokersSameMethod)
+{
+ auto controller = CreateOverloadController(New<TOverloadControllerConfig>());
+ auto mockInvoker1 = New<TMockInvoker>();
+ auto mockInvoker2 = New<TMockInvoker>();
+
+ controller->TrackInvoker("Mock1", mockInvoker1);
+ controller->TrackInvoker("Mock2", mockInvoker2);
+
+ auto config = CreateConfig({
+ {"Mock1", {{"MockService", "MockMethod"}}},
+ {"Mock2", {{"MockService", "MockMethod"}}},
+ });
+ config->LoadAdjustingPeriod = TDuration::MilliSeconds(1);
+
+ controller->Reconfigure(config);
+ controller->Start();
+
+ // Simulate overload
+ for (int i = 0; i < 5000; ++i) {
+ mockInvoker1->FireWaitTimeObserved(MeanWaitTimeThreshold * 2);
+ mockInvoker2->FireWaitTimeObserved(MeanWaitTimeThreshold / 2);
+ }
+
+ // Check overloading incoming requests
+ int remainsCount = 1000;
+ while (remainsCount > 0) {
+ auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod"));
+ if (overloaded) {
+ --remainsCount;
+ } else {
+ Sleep(TDuration::MicroSeconds(10));
+ }
+ }
+
+ // Check recovering even if no calls
+ while (remainsCount < 1000) {
+ auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod"));
+ if (!overloaded) {
+ ++remainsCount;
+ } else {
+ Sleep(TDuration::MicroSeconds(1));
+ }
+ }
+}
+
+TEST(TOverloadControllerTest, TestCongestionWindow)
+{
+ auto controller = CreateOverloadController(New<TOverloadControllerConfig>());
+ auto mockInvoker = New<TMockInvoker>();
+ auto mockInvoker2 = New<TMockInvoker>();
+
+ controller->TrackInvoker("Mock", mockInvoker);
+ controller->TrackInvoker("Mock2", mockInvoker2);
+
+ auto config = CreateConfig({
+ {"Mock", {{"MockService", "MockMethod", 0.3}}},
+ {"Mock2", {{"MockService", "MockMethod2", 0.3}}},
+ });
+ config->LoadAdjustingPeriod = TDuration::MilliSeconds(1);
+ controller->Reconfigure(config);
+ controller->Start();
+
+ // Simulate overload
+ for (int i = 0; i < 5000; ++i) {
+ mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold * 2);
+ }
+
+ // Check overload incoming requests
+ int remainsCount = 1000;
+ while (remainsCount > 0) {
+ mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold * 2);
+ {
+ auto window2 = controller->GetCongestionState("MockService", "MockMethod2");
+ EXPECT_EQ(window2.MaxWindow, window2.CurrentWindow);
+ }
+
+ auto congestionState = controller->GetCongestionState("MockService", "MockMethod");
+ bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow;
+ if (overloaded) {
+ --remainsCount;
+ EXPECT_EQ(0.3, congestionState.WaitingTimeoutFraction);
+ EXPECT_EQ(congestionState.OverloadedTrackers, TCongestionState::TTrackersList{"Mock"});
+ } else {
+ Sleep(TDuration::MicroSeconds(10));
+ }
+ }
+
+ // Check recovering even if no calls
+ while (remainsCount < 1000) {
+ auto congestionState = controller->GetCongestionState("MockService", "MockMethod");
+ bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow;
+
+ if (!overloaded) {
+ ++remainsCount;
+ } else {
+ Sleep(TDuration::MicroSeconds(1));
+ }
+ }
+}
+
+TEST(TOverloadControllerTest, TestCongestionWindowTwoTrackers)
+{
+ auto controller = CreateOverloadController(New<TOverloadControllerConfig>());
+ auto mockInvoker1 = New<TMockInvoker>();
+ auto mockInvoker2 = New<TMockInvoker>();
+
+ controller->TrackInvoker("Mock", mockInvoker1);
+ controller->TrackInvoker("Mock2", mockInvoker2);
+
+ auto config = CreateConfig({
+ {"Mock", {{"MockService", "MockMethod", 0.3}}},
+ {"Mock2", {{"MockService", "MockMethod", 0.3}}},
+ });
+ config->LoadAdjustingPeriod = TDuration::MilliSeconds(1);
+ controller->Reconfigure(config);
+ controller->Start();
+
+ // Simulate overload
+ for (int i = 0; i < 5000; ++i) {
+ mockInvoker1->FireWaitTimeObserved(MeanWaitTimeThreshold * 2);
+ mockInvoker2->FireWaitTimeObserved(MeanWaitTimeThreshold * 2);
+ }
+
+ // Check overload incoming requests
+ int remainsCount = 10;
+ while (remainsCount > 0) {
+ auto congestionState = controller->GetCongestionState("MockService", "MockMethod");
+ bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow;
+ if (overloaded) {
+ --remainsCount;
+ auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers;
+ std::sort(trackers.begin(), trackers.end());
+ EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock", "Mock2"}));
+ } else {
+ Sleep(TDuration::MicroSeconds(10));
+ }
+ }
+}
+
+TEST(TOverloadControllerTest, TestCongestionWindowTwoInstancies)
+{
+ auto controller = CreateOverloadController(New<TOverloadControllerConfig>());
+ auto observer1 = controller->CreateGenericWaitTimeObserver("Mock", "Mock.1");
+ auto observer2 = controller->CreateGenericWaitTimeObserver("Mock", "Mock.2");
+
+ auto config = CreateConfig({
+ {"Mock", {{"MockService", "MockMethod", 0.3}}},
+ });
+ config->LoadAdjustingPeriod = TDuration::MilliSeconds(1);
+ controller->Reconfigure(config);
+ controller->Start();
+
+ // Simulate overload
+ for (int i = 0; i < 5000; ++i) {
+ observer1(MeanWaitTimeThreshold * 2);
+ }
+
+ // Check overload incoming requests
+ int remainsCount = 10;
+ while (remainsCount > 0) {
+ auto congestionState = controller->GetCongestionState("MockService", "MockMethod");
+ bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow;
+ if (overloaded) {
+ --remainsCount;
+ auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers;
+ EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock"}));
+ } else {
+ Sleep(TDuration::MicroSeconds(10));
+ }
+ }
+
+ Sleep(TDuration::MicroSeconds(10));
+
+ for (int i = 0; i < 5000; ++i) {
+ observer1(MeanWaitTimeThreshold * 2);
+ observer2(MeanWaitTimeThreshold * 2);
+ }
+
+ remainsCount = 10;
+ while (remainsCount > 0) {
+ auto congestionState = controller->GetCongestionState("MockService", "MockMethod");
+ bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow;
+ if (overloaded) {
+ --remainsCount;
+ auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers;
+ EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock"}));
+ } else {
+ Sleep(TDuration::MicroSeconds(10));
+ }
+ }
+
+ Sleep(TDuration::MicroSeconds(10));
+ for (int i = 0; i < 5000; ++i) {
+ observer1(MeanWaitTimeThreshold / 2);
+ observer2(MeanWaitTimeThreshold * 2);
+ }
+
+ remainsCount = 10;
+ while (remainsCount > 0) {
+ auto congestionState = controller->GetCongestionState("MockService", "MockMethod");
+ bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow;
+ if (overloaded) {
+ --remainsCount;
+ auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers;
+ EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock"}));
+ } else {
+ Sleep(TDuration::MicroSeconds(10));
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TExecutorPtr>
+void ExecuteWaitTimeTest(const TExecutorPtr& executor, const IInvokerPtr& invoker)
+{
+ static constexpr int DesiredActionsCount = 27;
+
+ TDuration totalWaitTime;
+ int actionsCount = 0;
+
+ executor->SubscribeWaitTimeObserved(BIND([&] (TDuration waitTime) {
+ totalWaitTime += waitTime;
+ ++actionsCount;
+ }));
+
+ std::vector<TFuture<void>> futures;
+ for (int i = 0; i < DesiredActionsCount; ++i) {
+ auto future = BIND([] {
+ Sleep(TDuration::MilliSeconds(1));
+ }).AsyncVia(invoker)
+ .Run();
+
+ futures.push_back(std::move(future));
+ }
+
+ WaitFor(AllSucceeded(std::move(futures)))
+ .ThrowOnError();
+
+ EXPECT_EQ(DesiredActionsCount, actionsCount);
+ EXPECT_GE(totalWaitTime, TDuration::MilliSeconds(DesiredActionsCount - 1));
+}
+
+TEST(TOverloadControllerTest, WaitTimeObserver)
+{
+ {
+ auto actionQueue = New<TActionQueue>("TestActionQueue");
+ ExecuteWaitTimeTest(actionQueue->GetInvoker(), actionQueue->GetInvoker());
+ }
+
+ {
+ auto fshThreadPool = CreateTwoLevelFairShareThreadPool(1, "TestNewFsh");
+ ExecuteWaitTimeTest(fshThreadPool, fshThreadPool->GetInvoker("test-pool", "fsh-tag"));
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NRpc
diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make
index 61c92191dbd..5dfb9c00bf7 100644
--- a/yt/yt/core/ya.make
+++ b/yt/yt/core/ya.make
@@ -200,9 +200,11 @@ SRCS(
rpc/helpers.cpp
rpc/local_channel.cpp
rpc/local_server.cpp
- rpc/message.cpp
rpc/message_format.cpp
+ rpc/message.cpp
rpc/null_channel.cpp
+ rpc/overload_controller.cpp
+ rpc/overload_controlling_service_base.cpp
rpc/peer_discovery.cpp
rpc/per_key_request_queue_provider.cpp
rpc/protocol_version.cpp
@@ -213,8 +215,8 @@ SRCS(
rpc/roaming_channel.cpp
rpc/serialized_channel.cpp
rpc/server_detail.cpp
- rpc/service.cpp
rpc/service_detail.cpp
+ rpc/service.cpp
rpc/static_channel_factory.cpp
rpc/stream.cpp
rpc/throttling_channel.cpp
diff --git a/yt/yt/library/profiling/solomon/percpu.cpp b/yt/yt/library/profiling/percpu.cpp
index 0f146f0921d..d69686f7c30 100644
--- a/yt/yt/library/profiling/solomon/percpu.cpp
+++ b/yt/yt/library/profiling/percpu.cpp
@@ -1,5 +1,6 @@
#include "percpu.h"
-#include "yt/yt/library/profiling/summary.h"
+
+#include "summary.h"
#include <yt/yt/core/profiling/tscp.h>
diff --git a/yt/yt/library/profiling/solomon/percpu.h b/yt/yt/library/profiling/percpu.h
index 544d806e11d..e1766da58ab 100644
--- a/yt/yt/library/profiling/solomon/percpu.h
+++ b/yt/yt/library/profiling/percpu.h
@@ -1,7 +1,7 @@
#pragma once
-#include <yt/yt/library/profiling/impl.h>
-#include <yt/yt/library/profiling/summary.h>
+#include "impl.h"
+#include "summary.h"
#include <yt/yt/core/profiling/tscp.h>
diff --git a/yt/yt/library/profiling/solomon/helpers.cpp b/yt/yt/library/profiling/solomon/helpers.cpp
index 7f4be734a5e..b5458fb4535 100644
--- a/yt/yt/library/profiling/solomon/helpers.cpp
+++ b/yt/yt/library/profiling/solomon/helpers.cpp
@@ -1,9 +1,10 @@
#include "helpers.h"
-#include "percpu.h"
#include "private.h"
#include "producer.h"
#include "sensor_set.h"
+#include <yt/yt/library/profiling/percpu.h>
+
#include <yt/yt/core/http/http.h>
#include <yt/yt/core/misc/ref_counted_tracker.h>
diff --git a/yt/yt/library/profiling/solomon/registry.cpp b/yt/yt/library/profiling/solomon/registry.cpp
index b8b45442902..d9e6d5b7628 100644
--- a/yt/yt/library/profiling/solomon/registry.cpp
+++ b/yt/yt/library/profiling/solomon/registry.cpp
@@ -1,12 +1,10 @@
#include "registry.h"
#include "sensor.h"
-#include "percpu.h"
-#include <yt/yt/core/misc/protobuf_helpers.h>
+#include <yt/yt/library/profiling/percpu.h>
-#include <yt/yt/library/profiling/impl.h>
-#include <yt/yt/library/profiling/sensor.h>
+#include <yt/yt/core/misc/protobuf_helpers.h>
#include <library/cpp/yt/assert/assert.h>
diff --git a/yt/yt/library/profiling/solomon/registry.h b/yt/yt/library/profiling/solomon/registry.h
index 7a53df82e1e..86ef47f5117 100644
--- a/yt/yt/library/profiling/solomon/registry.h
+++ b/yt/yt/library/profiling/solomon/registry.h
@@ -5,6 +5,11 @@
#include "producer.h"
#include "tag_registry.h"
+#include <yt/yt/library/profiling/sensor.h>
+#include <yt/yt/library/profiling/impl.h>
+
+#include <yt/yt/library/profiling/solomon/sensor_dump.pb.h>
+
#include <yt/yt/core/actions/invoker_util.h>
#include <yt/yt/core/misc/mpsc_stack.h>
@@ -13,11 +18,6 @@
#include <yt/yt/core/ytree/fluent.h>
-#include <yt/yt/library/profiling/sensor.h>
-#include <yt/yt/library/profiling/impl.h>
-
-#include <yt/yt/library/profiling/solomon/sensor_dump.pb.h>
-
#include <library/cpp/yt/threading/spin_lock.h>
namespace NYT::NProfiling {
diff --git a/yt/yt/library/profiling/solomon/ya.make b/yt/yt/library/profiling/solomon/ya.make
index 3d53febe4f9..5d4ddcff946 100644
--- a/yt/yt/library/profiling/solomon/ya.make
+++ b/yt/yt/library/profiling/solomon/ya.make
@@ -7,14 +7,13 @@ SRCS(
cube.cpp
exporter.cpp
helpers.cpp
- percpu.cpp
producer.cpp
proxy.cpp
GLOBAL registry.cpp
remote.cpp
- sensor.cpp
sensor_service.cpp
sensor_set.cpp
+ sensor.cpp
tag_registry.cpp
sensor_dump.proto
diff --git a/yt/yt/library/profiling/ya.make b/yt/yt/library/profiling/ya.make
index e302d65ef41..e0f44fd5fb2 100644
--- a/yt/yt/library/profiling/ya.make
+++ b/yt/yt/library/profiling/ya.make
@@ -3,12 +3,13 @@ LIBRARY()
INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
SRCS(
- sensor.cpp
- producer.cpp
+ histogram_snapshot.cpp
impl.cpp
+ percpu.cpp
+ producer.cpp
+ sensor.cpp
tag.cpp
testing.cpp
- histogram_snapshot.cpp
)
PEERDIR(