diff options
author | dgolear <dgolear@yandex-team.com> | 2025-06-20 19:32:27 +0300 |
---|---|---|
committer | dgolear <dgolear@yandex-team.com> | 2025-06-20 19:56:56 +0300 |
commit | caec1b0ede2b901bcf006a403d15e38e1a94b659 (patch) | |
tree | 154f961ba752ad72d16b264643709e8d5d3f3a55 | |
parent | 56da7b93f3951cc9270c8f80e6b7789f77b189ab (diff) | |
download | ydb-caec1b0ede2b901bcf006a403d15e38e1a94b659.tar.gz |
YT-19545: Move overload controller to core/rpc from server/node; move percpu counters from profiling/solomon
\* Changelog entry
Type: feature
Component: misc-server
Generalize overload controller and allow other server components to use it.
commit_hash:ff310ddce3b8ee9353746019f1bc404ad9695a90
18 files changed, 1444 insertions, 20 deletions
diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp index 8fca9174023..e43e9a3166a 100644 --- a/yt/yt/core/rpc/config.cpp +++ b/yt/yt/core/rpc/config.cpp @@ -376,4 +376,66 @@ void TDispatcherDynamicConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TServiceMethod::Register(TRegistrar registrar) +{ + registrar.Parameter("service", &TThis::Service) + .Default(); + registrar.Parameter("method", &TThis::Method) + .Default(); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TServiceMethodConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("service", &TThis::Service) + .Default(); + registrar.Parameter("method", &TThis::Method) + .Default(); + registrar.Parameter("max_window", &TThis::MaxWindow) + .Default(1'024); + registrar.Parameter("waiting_timeout_fraction", &TThis::WaitingTimeoutFraction) + .Default(0.5); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TOverloadTrackerConfigBase::Register(TRegistrar registrar) +{ + registrar.Parameter("methods_to_throttle", &TThis::MethodsToThrottle) + .Default(); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TOverloadTrackerMeanWaitTimeConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("mean_wait_time_threshold", &TThis::MeanWaitTimeThreshold) + .Default(TDuration::MilliSeconds(20)); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TOverloadTrackerBacklogQueueFillFractionConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("backlog_queue_fill_fraction_threshold", &TThis::BacklogQueueFillFractionThreshold) + .Default(0.9); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TOverloadControllerConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("enabled", &TThis::Enabled) + .Default(false); + registrar.Parameter("trackers", &TThis::Trackers) + .Default(); + registrar.Parameter("methods", &TThis::Methods) + .Default(); + registrar.Parameter("load_adjusting_period", &TThis::LoadAdjustingPeriod) + .Default(TDuration::MilliSeconds(100)); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h index f0ddcdb3c6a..4f4734faa26 100644 --- a/yt/yt/core/rpc/config.h +++ b/yt/yt/core/rpc/config.h @@ -5,6 +5,7 @@ #include <yt/yt/core/compression/public.h> #include <yt/yt/core/ytree/yson_struct.h> +#include <yt/yt/core/ytree/polymorphic_yson_struct.h> #include <yt/yt/core/concurrency/config.h> @@ -462,4 +463,106 @@ DEFINE_REFCOUNTED_TYPE(TDispatcherDynamicConfig) //////////////////////////////////////////////////////////////////////////////// +struct TServiceMethod + : public NYTree::TYsonStructLite +{ + std::string Service; + std::string Method; + + REGISTER_YSON_STRUCT_LITE(TServiceMethod); + + static void Register(TRegistrar registrar); +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TServiceMethodConfig + : public NYTree::TYsonStruct +{ + std::string Service; + std::string Method; + + int MaxWindow; + double WaitingTimeoutFraction; + + REGISTER_YSON_STRUCT(TServiceMethodConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TServiceMethodConfig) + +//////////////////////////////////////////////////////////////////////////////// + +struct TOverloadTrackerConfigBase + : public NYTree::TYsonStruct +{ + std::vector<TServiceMethod> MethodsToThrottle; + + REGISTER_YSON_STRUCT(TOverloadTrackerConfigBase); + + static void Register(TRegistrar registrar); +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TOverloadTrackerMeanWaitTimeConfig + : public TOverloadTrackerConfigBase +{ + TDuration MeanWaitTimeThreshold; + + REGISTER_YSON_STRUCT(TOverloadTrackerMeanWaitTimeConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TOverloadTrackerMeanWaitTimeConfig) + +//////////////////////////////////////////////////////////////////////////////// + +struct TOverloadTrackerBacklogQueueFillFractionConfig + : public TOverloadTrackerConfigBase +{ + double BacklogQueueFillFractionThreshold; + + REGISTER_YSON_STRUCT(TOverloadTrackerBacklogQueueFillFractionConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TOverloadTrackerBacklogQueueFillFractionConfig) + +//////////////////////////////////////////////////////////////////////////////// + +DEFINE_ENUM(EOverloadTrackerConfigType, + (Base) + (MeanWaitTime) + (BacklogQueueFillFraction) +); + +DEFINE_POLYMORPHIC_YSON_STRUCT_FOR_ENUM_WITH_DEFAULT(OverloadTrackerConfig, EOverloadTrackerConfigType, MeanWaitTime, + ((Base) (TOverloadTrackerConfigBase)) + ((MeanWaitTime) (TOverloadTrackerMeanWaitTimeConfig)) + ((BacklogQueueFillFraction) (TOverloadTrackerBacklogQueueFillFractionConfig)) +); + +//////////////////////////////////////////////////////////////////////////////// + +struct TOverloadControllerConfig + : public NYTree::TYsonStruct +{ + bool Enabled; + THashMap<std::string, TOverloadTrackerConfig> Trackers; + std::vector<TServiceMethodConfigPtr> Methods; + TDuration LoadAdjustingPeriod; + + REGISTER_YSON_STRUCT(TOverloadControllerConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TOverloadControllerConfig) + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/overload_controller.cpp b/yt/yt/core/rpc/overload_controller.cpp new file mode 100644 index 00000000000..87484a7b405 --- /dev/null +++ b/yt/yt/core/rpc/overload_controller.cpp @@ -0,0 +1,602 @@ +#include "overload_controller.h" + +#include "config.h" +#include "private.h" + +#include <yt/yt/library/profiling/percpu.h> + +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/periodic_executor.h> +#include <yt/yt/core/concurrency/two_level_fair_share_thread_pool.h> + +#include <yt/yt/core/logging/log_manager.h> + +#include <yt/yt/core/misc/proc.h> + +#include <yt/yt/core/profiling/timing.h> + +#include <library/cpp/yt/threading/spin_lock.h> + +namespace NYT::NRpc { + +using namespace NThreading; +using namespace NConcurrency; +using namespace NLogging; +using namespace NProfiling; + +//////////////////////////////////////////////////////////////////////////////// + +static auto Logger = RpcServerLogger().WithTag("OverloadController"); +static const std::string CpuThrottlingTrackerName = "CpuThrottling"; +static const std::string LogDropTrackerName = "LogDrop"; +static const std::string ControlGroupCpuName = "cpu"; + +//////////////////////////////////////////////////////////////////////////////// + +DECLARE_REFCOUNTED_CLASS(TOverloadTracker) + +class TOverloadTracker + : public TRefCounted +{ +public: + TOverloadTracker(TStringBuf type, TStringBuf id) + : Type_(type) + , Id_(id) + { } + + virtual bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) = 0; + + const std::string& GetType() const + { + return Type_; + } + +protected: + const std::string Type_; + const std::string Id_; + + TCounter Overloaded_; +}; + +DEFINE_REFCOUNTED_TYPE(TOverloadTracker); + +//////////////////////////////////////////////////////////////////////////////// + +class TMeanWaitTimeTracker + : public TOverloadTracker +{ +public: + TMeanWaitTimeTracker(TStringBuf type, TStringBuf id, const TProfiler& profiler) + : TOverloadTracker(type, id) + , Counter_(New<TPerCpuDurationSummary>()) + { + auto taggedProfiler = profiler.WithTag("tracker", std::string(Id_)); + + Overloaded_ = taggedProfiler.Counter("/overloaded"); + MeanWaitTime_ = profiler.Timer("/mean_wait_time"); + MeanWaitTimeThreshold_ = profiler.TimeGauge("/mean_wait_time_threshold"); + } + + void Record(TDuration waitTime) + { + Counter_->Record(waitTime); + } + + virtual bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) override + { + auto summary = Counter_->GetSummaryAndReset(); + TDuration meanValue; + + if (summary.Count()) { + meanValue = summary.Sum() / summary.Count(); + } + + YT_LOG_DEBUG("Reporting mean wait time for tracker " + "(Tracker: %v, TotalWaitTime: %v, TotalCount: %v, MeanValue: %v)", + Id_, + summary.Sum(), + summary.Count(), + meanValue); + + return ProfileAndGetOverloaded(meanValue, config.TryGetConcrete<TOverloadTrackerMeanWaitTimeConfig>()); + } + +protected: + bool ProfileAndGetOverloaded(TDuration meanValue, const TOverloadTrackerMeanWaitTimeConfigPtr& config) + { + bool overloaded = meanValue > config->MeanWaitTimeThreshold; + + Overloaded_.Increment(static_cast<int>(overloaded)); + MeanWaitTime_.Record(meanValue); + MeanWaitTimeThreshold_.Update(config->MeanWaitTimeThreshold); + + return overloaded; + } + +private: + using TPerCpuDurationSummary = TPerCpuSummary<TDuration>; + + TIntrusivePtr<TPerCpuDurationSummary> Counter_; + + TEventTimer MeanWaitTime_; + TTimeGauge MeanWaitTimeThreshold_; +}; + +DEFINE_REFCOUNTED_TYPE(TMeanWaitTimeTracker); + +//////////////////////////////////////////////////////////////////////////////// + +class TContainerCpuThrottlingTracker + : public TMeanWaitTimeTracker +{ +public: + using TMeanWaitTimeTracker::TMeanWaitTimeTracker; + + bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) override + { + auto cpuStats = GetStats(); + if (!cpuStats) { + return {}; + } + + TDuration throttlingTime; + + if (LastCpuStats_) { + auto throttlingDelta = cpuStats->ThrottledTime - LastCpuStats_->ThrottledTime; + throttlingTime = TDuration::MicroSeconds(throttlingDelta / 1000); + + YT_LOG_DEBUG("Reporting container CPU throttling time " + "(LastCpuThrottlingTime: %v, CpuThrottlingTime: %v, ThrottlingDelta: %v, ThrottlingTime: %v)", + LastCpuStats_->ThrottledTime, + cpuStats->ThrottledTime, + throttlingDelta, + throttlingTime); + } + + LastCpuStats_ = cpuStats; + + return ProfileAndGetOverloaded(throttlingTime, config.TryGetConcrete<TOverloadTrackerMeanWaitTimeConfig>()); + } + +private: + std::optional<TCgroupCpuStat> LastCpuStats_; + bool CgroupErrorLogged_ = false; + + std::optional<TCgroupCpuStat> GetStats() + { + try { + auto cgroups = GetProcessCgroups(); + for (const auto& group : cgroups) { + for (const auto& controller : group.Controllers) { + if (controller == ControlGroupCpuName) { + return GetCgroupCpuStat(group.ControllersName, group.Path); + } + } + } + } catch (const std::exception& ex) { + if (!CgroupErrorLogged_) { + YT_LOG_INFO(ex, "Failed to collect cgroup CPU statistics"); + CgroupErrorLogged_ = true; + } + } + + return {}; + } +}; + +DEFINE_REFCOUNTED_TYPE(TContainerCpuThrottlingTracker); + +//////////////////////////////////////////////////////////////////////////////// + +class TLogDropTracker + : public TOverloadTracker +{ +public: + TLogDropTracker(TStringBuf type, TStringBuf id, const TProfiler& profiler) + : TOverloadTracker(type, id) + { + auto taggedProfiler = profiler.WithTag("tracker", Id_); + + Overloaded_ = taggedProfiler.Counter("/overloaded"); + BacklogQueueFillFraction_ = profiler.Gauge("/backlog_queue_fill_fraction"); + BacklogQueueFillFractionThreshold_ = profiler.Gauge("/backlog_queue_fill_fraction_threshold"); + } + + bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) override + { + double BacklogQueueFillFraction = TLogManager::Get()->GetBacklogQueueFillFraction(); + + YT_LOG_DEBUG("Reporting logging queue filling fraction " + "(BacklogQueueFillFraction: %v)", + BacklogQueueFillFraction); + + const auto& logDropConfig = config.TryGetConcrete<TOverloadTrackerBacklogQueueFillFractionConfig>(); + + bool overloaded = BacklogQueueFillFraction > logDropConfig->BacklogQueueFillFractionThreshold; + + Overloaded_.Increment(static_cast<int>(overloaded)); + BacklogQueueFillFraction_.Update(BacklogQueueFillFraction); + BacklogQueueFillFractionThreshold_.Update(logDropConfig->BacklogQueueFillFractionThreshold); + + return overloaded; + } + +private: + TGauge BacklogQueueFillFraction_; + TGauge BacklogQueueFillFractionThreshold_; +}; + +DEFINE_REFCOUNTED_TYPE(TLogDropTracker); + +//////////////////////////////////////////////////////////////////////////////// + +bool ShouldThrottleCall(const TCongestionState& congestionState) +{ + if (!congestionState.MaxWindow || !congestionState.CurrentWindow) { + return false; + } + + return static_cast<int>(RandomNumber<ui32>(*congestionState.MaxWindow)) + 1 > *congestionState.CurrentWindow; +} + +//////////////////////////////////////////////////////////////////////////////// + +class TCongestionController + : public TRefCounted +{ +public: + TCongestionController(TServiceMethodConfigPtr methodConfig, TOverloadControllerConfigPtr config, TProfiler profiler) + : MethodConfig_(std::move(methodConfig)) + , Config_(std::move(config)) + , MaxWindow_(MethodConfig_->MaxWindow) + , Window_(MaxWindow_) + , WindowGauge_(profiler.Gauge("/window")) + , SlowStartThresholdGauge_(profiler.Gauge("/slow_start_threshold_gauge")) + , MaxWindowGauge_(profiler.Gauge("/max_window")) + { } + + TCongestionState GetCongestionState() + { + auto result = TCongestionState{ + .CurrentWindow = Window_.load(std::memory_order::relaxed), + .MaxWindow = MaxWindow_, + .WaitingTimeoutFraction = MethodConfig_->WaitingTimeoutFraction, + }; + + auto now = NProfiling::GetCpuInstant(); + auto recentlyOverloadedThreshold = Config_->LoadAdjustingPeriod * MethodConfig_->MaxWindow; + + for (const auto& [trackerType, lastOverloaded] : OverloadedTrackers_) { + auto sinceLastOverloaded = CpuDurationToDuration(now - lastOverloaded.load(std::memory_order::relaxed)); + if (sinceLastOverloaded < recentlyOverloadedThreshold) { + result.OverloadedTrackers.push_back(trackerType); + } + } + + return result; + } + + void Adjust(const THashSet<std::string>& overloadedTrackers, TCpuInstant timestamp) + { + auto window = Window_.load(std::memory_order::relaxed); + + // NB. We reporting here slightly outdated values but this makes code simpler a bit. + WindowGauge_.Update(window); + MaxWindowGauge_.Update(MaxWindow_); + SlowStartThresholdGauge_.Update(SlowStartThreshold_); + + for (const auto& tracker : overloadedTrackers) { + auto it = OverloadedTrackers_.find(tracker); + if (it != OverloadedTrackers_.end()) { + it->second.store(timestamp, std::memory_order::relaxed); + } + } + + auto overloaded = !overloadedTrackers.empty(); + + if (overloaded) { + SlowStartThreshold_ = window > 0 ? window / 2 : SlowStartThreshold_; + Window_.store(0, std::memory_order::relaxed); + + YT_LOG_WARNING("System is overloaded (SlowStartThreshold: %v, Window: %v, OverloadedTrackers: %v)", + SlowStartThreshold_, + window, + overloadedTrackers); + return; + } + + if (window >= SlowStartThreshold_) { + ++window; + } else { + window *= 2; + window = std::min(SlowStartThreshold_, window); + } + + // Keeping window in sane limits. + window = std::min(MaxWindow_, window); + window = std::max(1, window); + + YT_LOG_DEBUG("Adjusting system load up (SlowStartThreshold: %v, CurrentWindow: %v)", + SlowStartThreshold_, + window); + + Window_.store(window, std::memory_order::relaxed); + } + + void AddTrackerType(std::string trackerType) + { + // Called only during initial construction of controllers, + // so we do not have to serialize here. + OverloadedTrackers_[trackerType] = {}; + } + +private: + const TServiceMethodConfigPtr MethodConfig_; + const TOverloadControllerConfigPtr Config_; + const int MaxWindow_; + + std::atomic<int> Window_; + int SlowStartThreshold_ = 0; + THashMap<std::string, std::atomic<TCpuInstant>> OverloadedTrackers_; + + TCounter SkippedRequestCount_; + TGauge WindowGauge_; + TGauge SlowStartThresholdGauge_; + TGauge MaxWindowGauge_; +}; + +DEFINE_REFCOUNTED_TYPE(TCongestionController); + +//////////////////////////////////////////////////////////////////////////////// + +class TOverloadController + : public IOverloadController +{ +public: + DEFINE_SIGNAL_OVERRIDE(void(), LoadAdjusted); + + TOverloadController(TOverloadControllerConfigPtr config, NProfiling::TProfiler profiler) + : ControlThread_(New<TActionQueue>("OverloadCtl")) + , Invoker_(ControlThread_->GetInvoker()) + , Periodic_(New<TPeriodicExecutor>( + Invoker_, + BIND(&TOverloadController::Adjust, MakeWeak(this)), + config->LoadAdjustingPeriod)) + , Profiler_(std::move(profiler)) + { + State_.Config = std::move(config); + CreateGenericTracker<TContainerCpuThrottlingTracker>(CpuThrottlingTrackerName); + CreateGenericTracker<TLogDropTracker>(LogDropTrackerName); + UpdateStateSnapshot(State_, Guard(SpinLock_)); + } + + void Start() override + { + Periodic_->Start(); + } + + void TrackInvoker( + TStringBuf name, + const IInvokerPtr& invoker) override + { + invoker->SubscribeWaitTimeObserved(CreateGenericWaitTimeObserver(name)); + } + + void TrackFSHThreadPool( + TStringBuf name, + const NConcurrency::ITwoLevelFairShareThreadPoolPtr& threadPool) override + { + threadPool->SubscribeWaitTimeObserved(CreateGenericWaitTimeObserver(name)); + } + + IInvoker::TWaitTimeObserver CreateGenericWaitTimeObserver( + TStringBuf trackerType, + std::optional<TStringBuf> id = {}) override + { + auto tracker = CreateGenericTracker<TMeanWaitTimeTracker>(std::move(trackerType), std::move(id)); + + return BIND([tracker] (TDuration waitTime) { + tracker->Record(waitTime); + }); + } + + TCongestionState GetCongestionState(TStringBuf service, TStringBuf method) const override + { + auto snapshot = GetStateSnapshot(); + if (!snapshot->Config->Enabled) { + return {}; + } + + const auto& controllers = snapshot->CongestionControllers; + if (auto it = controllers.find(std::pair(service, method)); it != controllers.end()) { + return it->second->GetCongestionState(); + } + + return {}; + } + + void Reconfigure(TOverloadControllerConfigPtr config) override + { + Periodic_->SetPeriod(config->LoadAdjustingPeriod); + + auto guard = Guard(SpinLock_); + State_.CongestionControllers = CreateCongestionControllers(config, Profiler_); + State_.Config = std::move(config); + + UpdateStateSnapshot(State_, std::move(guard)); + } + +private: + using TMethodIndex = std::pair<TString, TString>; + using TMethodsCongestionControllers = THashMap<TMethodIndex, TCongestionControllerPtr>; + + struct TState final + { + static constexpr bool EnableHazard = true; + + TOverloadControllerConfigPtr Config; + TMethodsCongestionControllers CongestionControllers; + THashMap<TString, TOverloadTrackerPtr> Trackers; + }; + + using TSpinLockGuard = TGuard<NThreading::TSpinLock>; + + const NConcurrency::TActionQueuePtr ControlThread_; + const IInvokerPtr Invoker_; + const NConcurrency::TPeriodicExecutorPtr Periodic_; + const NProfiling::TProfiler Profiler_; + + TAtomicPtr<TState, /*EnableAcquireHazard*/ true> StateSnapshot_; + + NThreading::TSpinLock SpinLock_; + TState State_; + + void Adjust() + { + DoAdjust(GetStateSnapshot()); + LoadAdjusted_.Fire(); + } + + void DoAdjust(const THazardPtr<TState>& state) + { + YT_ASSERT_INVOKER_AFFINITY(Invoker_); + + auto now = NProfiling::GetCpuInstant(); + + using TOverloadedTrackers = THashSet<std::string>; + THashMap<TMethodIndex, TOverloadedTrackers> methodOverloaded; + + const auto& config = state->Config; + + for (const auto& [_, trackerConfig] : config->Trackers) { + for (const auto& method : trackerConfig->MethodsToThrottle) { + methodOverloaded[std::pair(method.Service, method.Method)] = {}; + } + } + + for (const auto& [trackerId, tracker] : state->Trackers) { + auto trackerIt = config->Trackers.find(tracker->GetType()); + if (trackerIt == config->Trackers.end()) { + continue; + } + + const auto& trackerConfig = trackerIt->second; + + auto trackerOverloaded = tracker->CalculateIsOverloaded(trackerConfig); + + if (!trackerOverloaded) { + continue; + } + + for (const auto& method : trackerIt->second->MethodsToThrottle) { + auto& overloadedTrackers = methodOverloaded[std::pair(method.Service, method.Method)]; + overloadedTrackers.insert(tracker->GetType()); + } + } + + for (const auto& [method, overloadedTrackers] : methodOverloaded) { + auto it = state->CongestionControllers.find(method); + if (it == state->CongestionControllers.end()) { + YT_LOG_WARNING("Cannot find congestion controller for method (Service: %v, Method: %v)", + method.first, + method.second); + + continue; + } + + it->second->Adjust(overloadedTrackers, now); + } + } + + THazardPtr<TOverloadController::TState> GetStateSnapshot() const + { + YT_VERIFY(StateSnapshot_); + + return StateSnapshot_.AcquireHazard(); + } + + void UpdateStateSnapshot(const TState& state, TSpinLockGuard guard) + { + auto snapshot = New<TState>(state); + guard.Release(); + + StateSnapshot_.Store(std::move(snapshot)); + ReclaimHazardPointers(); + } + + template <class TTracker> + TIntrusivePtr<TTracker> CreateGenericTracker(TStringBuf trackerType, std::optional<TStringBuf> id = {}) + { + YT_LOG_DEBUG("Creating overload tracker (TrackerType: %v, Id: %v)", + trackerType, + id); + + auto trackerId = id.value_or(trackerType); + + auto profiler = Profiler_.WithTag("tracker", std::string(trackerId)); + + auto tracker = New<TTracker>(trackerType, trackerId, profiler); + + auto guard = Guard(SpinLock_); + + State_.Trackers[trackerId] = tracker; + + UpdateStateSnapshot(State_, std::move(guard)); + + return tracker; + } + + static TMethodsCongestionControllers CreateCongestionControllers( + const TOverloadControllerConfigPtr& config, + NProfiling::TProfiler profiler) + { + TMethodsCongestionControllers controllers; + + THashMap<TMethodIndex, TServiceMethodConfigPtr> configIndex; + for (const auto& methodConfig : config->Methods) { + configIndex[std::pair(methodConfig->Service, methodConfig->Method)] = methodConfig; + } + + auto getConfig = [&configIndex] (TStringBuf service, TStringBuf method) { + auto it = configIndex.find(std::pair(service, method)); + if (it != configIndex.end()) { + return it->second; + } + + auto defaultConfig = New<TServiceMethodConfig>(); + defaultConfig->Service = service; + defaultConfig->Method = method; + return defaultConfig; + }; + + for (const auto& [trackerType, tracker] : config->Trackers) { + for (const auto& method : tracker->MethodsToThrottle) { + auto& controller = controllers[std::pair(method.Service, method.Method)]; + + if (!controller) { + auto methodConfig = getConfig(method.Service, method.Method); + auto controllerProfiler = profiler + .WithTag("yt_service", method.Service) + .WithTag("method", method.Method); + + controller = New<TCongestionController>( + std::move(methodConfig), + config, std::move(controllerProfiler)); + } + + controller->AddTrackerType(trackerType); + } + } + + return controllers; + } +}; + +IOverloadControllerPtr CreateOverloadController(TOverloadControllerConfigPtr config, NProfiling::TProfiler profiler) +{ + return New<TOverloadController>(std::move(config), std::move(profiler)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/overload_controller.h b/yt/yt/core/rpc/overload_controller.h new file mode 100644 index 00000000000..708ec833271 --- /dev/null +++ b/yt/yt/core/rpc/overload_controller.h @@ -0,0 +1,67 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/library/profiling/sensor.h> + +#include <yt/yt/core/concurrency/public.h> + +#include <yt/yt/core/misc/atomic_ptr.h> + +#include <library/cpp/yt/compact_containers/compact_vector.h> + +#include <library/cpp/yt/threading/public.h> + +namespace NYT::NRpc { + +//////////////////////////////////////////////////////////////////////////////// + +// Overall rpc-method congestion state. +// Can be a a measure of allowed concurrency or probability of acceptance more calls. +struct TCongestionState +{ + std::optional<int> CurrentWindow; + std::optional<int> MaxWindow; + double WaitingTimeoutFraction = 0; + + using TTrackersList = TCompactVector<std::string, 4>; + TTrackersList OverloadedTrackers; +}; + +// Probabilistic predicate based on congestion state of the the method. +bool ShouldThrottleCall(const TCongestionState& congestionState); + +//////////////////////////////////////////////////////////////////////////////// + +struct IOverloadController + : public TRefCounted +{ +public: + DECLARE_INTERFACE_SIGNAL(void(), LoadAdjusted); + + virtual void Start() = 0; + virtual void Reconfigure(TOverloadControllerConfigPtr config) = 0; + + virtual void TrackInvoker( + TStringBuf name, + const IInvokerPtr& invoker) = 0; + virtual void TrackFSHThreadPool( + TStringBuf name, + const NConcurrency::ITwoLevelFairShareThreadPoolPtr& threadPool) = 0; + + virtual IInvoker::TWaitTimeObserver CreateGenericWaitTimeObserver( + TStringBuf trackerType, + std::optional<TStringBuf> id = {}) = 0; + + virtual TCongestionState GetCongestionState(TStringBuf service, TStringBuf method) const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IOverloadController); + +IOverloadControllerPtr CreateOverloadController( + TOverloadControllerConfigPtr config, + NProfiling::TProfiler profiler = {}); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/overload_controlling_service_base-inl.h b/yt/yt/core/rpc/overload_controlling_service_base-inl.h new file mode 100644 index 00000000000..2a7a1b1b9dc --- /dev/null +++ b/yt/yt/core/rpc/overload_controlling_service_base-inl.h @@ -0,0 +1,96 @@ +#ifndef OVERLOAD_CONTROLLING_SERVICE_BASE_INL_H_ +#error "Direct inclusion of this file is not allowed, include overload_controlling_service_base.h" +// For the sake of sane code completion. +#include "overload_controlling_service_base.h" +#endif + +#include "overload_controller.h" + +#include <yt/yt/core/concurrency/delayed_executor.h> + +namespace NYT::NRpc { + +using namespace NConcurrency; + +//////////////////////////////////////////////////////////////////////////////// + +template <class TBaseService> +template <typename... TArgs> +TOverloadControllingServiceBase<TBaseService>::TOverloadControllingServiceBase( + IOverloadControllerPtr controller, + TArgs&&... args) + : TBaseService(std::forward<TArgs>(args)...) + , Controller_(std::move(controller)) +{ + YT_VERIFY(Controller_); +} + +template <class TBaseService> +void TOverloadControllingServiceBase<TBaseService>::SubscribeLoadAdjusted() +{ + Controller_->SubscribeLoadAdjusted(BIND( + &TOverloadControllingServiceBase::HandleLoadAdjusted, + MakeWeak(this))); +} + +template <class TBaseService> +auto TOverloadControllingServiceBase<TBaseService>::RegisterMethod( + const TMethodDescriptor& descriptor) -> TRuntimeMethodInfoPtr +{ + Methods_.insert(descriptor.Method); + return TBaseService::RegisterMethod(descriptor); +} + +template <class TBaseService> +void TOverloadControllingServiceBase<TBaseService>::HandleLoadAdjusted() +{ + const auto& serviceName = TBaseService::GetServiceId().ServiceName; + + for (const auto& method : Methods_) { + auto* runtimeInfo = TBaseService::FindMethodInfo(method); + YT_VERIFY(runtimeInfo); + + auto congestionState = Controller_->GetCongestionState(serviceName, method); + runtimeInfo->ConcurrencyLimit.SetDynamicLimit(congestionState.CurrentWindow); + runtimeInfo->WaitingTimeoutFraction.store( + congestionState.WaitingTimeoutFraction, + std::memory_order::relaxed); + } +} + +template <class TBaseService> +std::optional<TError> TOverloadControllingServiceBase<TBaseService>::GetThrottledError( + const NRpc::NProto::TRequestHeader& requestHeader) +{ + auto congestionState = Controller_->GetCongestionState(requestHeader.service(), requestHeader.method()); + const auto& overloadedTrackers = congestionState.OverloadedTrackers; + + if (!overloadedTrackers.empty()) { + return TError(NRpc::EErrorCode::Overloaded, "Instance is overloaded") + << TErrorAttribute("overloaded_trackers", overloadedTrackers); + } + + return TBaseService::GetThrottledError(requestHeader); +} + +template <class TBaseService> +void TOverloadControllingServiceBase<TBaseService>::HandleRequest( + std::unique_ptr<NRpc::NProto::TRequestHeader> header, + TSharedRefArray message, + NBus::IBusPtr replyBus) +{ + auto congestionState = Controller_->GetCongestionState( + header->service(), + header->method()); + + if (ShouldThrottleCall(congestionState)) { + // Give other handling routines chance to execute. + NConcurrency::Yield(); + } + + TBaseService::HandleRequest(std::move(header), std::move(message), std::move(replyBus)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/overload_controlling_service_base.cpp b/yt/yt/core/rpc/overload_controlling_service_base.cpp new file mode 100644 index 00000000000..76b559bc726 --- /dev/null +++ b/yt/yt/core/rpc/overload_controlling_service_base.cpp @@ -0,0 +1,9 @@ +#include "overload_controlling_service_base.h" + +namespace NYT::NRpc { + +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/overload_controlling_service_base.h b/yt/yt/core/rpc/overload_controlling_service_base.h new file mode 100644 index 00000000000..7b8c95c007e --- /dev/null +++ b/yt/yt/core/rpc/overload_controlling_service_base.h @@ -0,0 +1,45 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/core/rpc/service_detail.h> + +namespace NYT::NRpc { + +//////////////////////////////////////////////////////////////////////////////// + +template <class TBaseService> +class TOverloadControllingServiceBase + : public TBaseService +{ +public: + template <typename... TArgs> + explicit TOverloadControllingServiceBase(IOverloadControllerPtr controller, TArgs&&... args); + + using TRuntimeMethodInfoPtr = NRpc::TServiceBase::TRuntimeMethodInfoPtr; + using TMethodDescriptor = NRpc::TServiceBase::TMethodDescriptor; + + TRuntimeMethodInfoPtr RegisterMethod(const TMethodDescriptor& descriptor) override; + void SubscribeLoadAdjusted(); + +protected: + std::optional<TError> GetThrottledError(const NRpc::NProto::TRequestHeader& requestHeader) override; + void HandleRequest( + std::unique_ptr<NRpc::NProto::TRequestHeader> header, + TSharedRefArray message, + NBus::IBusPtr replyBus) override; + +private: + IOverloadControllerPtr Controller_; + THashSet<std::string> Methods_; + + void HandleLoadAdjusted(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc + +#define OVERLOAD_CONTROLLING_SERVICE_BASE_INL_H_ +#include "overload_controlling_service_base-inl.h" +#undef OVERLOAD_CONTROLLING_SERVICE_BASE_INL_H_ diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h index 5955185d368..5f89abd52d9 100644 --- a/yt/yt/core/rpc/public.h +++ b/yt/yt/core/rpc/public.h @@ -70,6 +70,7 @@ DECLARE_REFCOUNTED_STRUCT(IChannelFactory) DECLARE_REFCOUNTED_STRUCT(IRoamingChannelProvider) DECLARE_REFCOUNTED_STRUCT(IAuthenticator) DECLARE_REFCOUNTED_STRUCT(IResponseKeeper) +DECLARE_REFCOUNTED_STRUCT(IOverloadController) DECLARE_REFCOUNTED_CLASS(TClientContext) DECLARE_REFCOUNTED_CLASS(TServiceBase) @@ -77,6 +78,7 @@ DECLARE_REFCOUNTED_CLASS(TChannelWrapper) DECLARE_REFCOUNTED_CLASS(TStaticChannelFactory) DECLARE_REFCOUNTED_CLASS(TClientRequestControlThunk) DECLARE_REFCOUNTED_CLASS(TCachingChannelFactory) +DECLARE_REFCOUNTED_CLASS(TCongestionController) DECLARE_REFCOUNTED_CLASS(TAttachmentsInputStream) DECLARE_REFCOUNTED_CLASS(TAttachmentsOutputStream) @@ -127,6 +129,10 @@ DECLARE_REFCOUNTED_STRUCT(TThrottlingChannelDynamicConfig) DECLARE_REFCOUNTED_STRUCT(TResponseKeeperConfig) DECLARE_REFCOUNTED_STRUCT(TDispatcherConfig) DECLARE_REFCOUNTED_STRUCT(TDispatcherDynamicConfig) +DECLARE_REFCOUNTED_STRUCT(TServiceMethodConfig) +DECLARE_REFCOUNTED_STRUCT(TOverloadTrackerMeanWaitTimeConfig) +DECLARE_REFCOUNTED_STRUCT(TOverloadTrackerBacklogQueueFillFractionConfig) +DECLARE_REFCOUNTED_STRUCT(TOverloadControllerConfig) struct TRequestQueueThrottlerConfigs { diff --git a/yt/yt/core/rpc/unittests/main/ya.make b/yt/yt/core/rpc/unittests/main/ya.make index 335a91fa4aa..226e2da0215 100644 --- a/yt/yt/core/rpc/unittests/main/ya.make +++ b/yt/yt/core/rpc/unittests/main/ya.make @@ -5,6 +5,7 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) PROTO_NAMESPACE(yt) SRCS( + yt/yt/core/rpc/unittests/overload_controller_ut.cpp yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp yt/yt/core/rpc/unittests/roaming_channel_ut.cpp yt/yt/core/rpc/unittests/rpc_ut.cpp diff --git a/yt/yt/core/rpc/unittests/overload_controller_ut.cpp b/yt/yt/core/rpc/unittests/overload_controller_ut.cpp new file mode 100644 index 00000000000..8a6e39aeb43 --- /dev/null +++ b/yt/yt/core/rpc/unittests/overload_controller_ut.cpp @@ -0,0 +1,431 @@ +#include <yt/yt/core/rpc/config.h> +#include <yt/yt/core/rpc/overload_controller.h> + +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/two_level_fair_share_thread_pool.h> + +#include <yt/yt/core/test_framework/framework.h> + +namespace NYT::NRpc { +namespace { + +using namespace NConcurrency; + +//////////////////////////////////////////////////////////////////////////////// + +class TMockInvoker + : public IInvoker +{ +public: + DEFINE_SIGNAL_OVERRIDE(TWaitTimeObserver::TSignature, WaitTimeObserved); + +public: + void Invoke(TClosure /*callback*/) override + { } + + void Invoke(TMutableRange<TClosure> /*callbacks*/) override + { } + + bool CheckAffinity(const IInvokerPtr& /*invoker*/) const override + { + return false; + } + + bool IsSerialized() const override + { + return true; + } + + NThreading::TThreadId GetThreadId() const override + { + return {}; + } + + void FireWaitTimeObserved(TDuration waitTime) + { + WaitTimeObserved_.Fire(waitTime); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TMethodInfo +{ + TString Service; + TString Method; + double WaitingTimeoutFraction = 0; +}; + +using TMethodInfoList = std::vector<TMethodInfo>; + +constexpr auto MeanWaitTimeThreshold = TDuration::MilliSeconds(20); + +TOverloadControllerConfigPtr CreateConfig(const THashMap<TString, TMethodInfoList>& schema) +{ + auto config = New<TOverloadControllerConfig>(); + config->Enabled = true; + + for (const auto& [trackerName, methods] : schema) { + TOverloadTrackerConfig trackerConfig(EOverloadTrackerConfigType::MeanWaitTime); + auto trackerMeanWaitTimeConfig = trackerConfig.TryGetConcrete<TOverloadTrackerMeanWaitTimeConfig>(); + + for (const auto& methodInfo : methods) { + { + TServiceMethod serviceMethod; + serviceMethod.Service = methodInfo.Service; + serviceMethod.Method = methodInfo.Method; + trackerMeanWaitTimeConfig->MethodsToThrottle.push_back(std::move(serviceMethod)); + } + { + auto serviceMethodConfig = New<TServiceMethodConfig>(); + serviceMethodConfig->Service = methodInfo.Service; + serviceMethodConfig->Method = methodInfo.Method; + serviceMethodConfig->WaitingTimeoutFraction = methodInfo.WaitingTimeoutFraction; + config->Methods.push_back(std::move(serviceMethodConfig)); + } + trackerMeanWaitTimeConfig->MeanWaitTimeThreshold = MeanWaitTimeThreshold; + } + + config->Trackers[trackerName] = trackerConfig; + } + + return config; +} + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TOverloadControllerTest, TestOverloadsRequests) +{ + auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); + auto mockInvoker = New<TMockInvoker>(); + auto mockInvoker2 = New<TMockInvoker>(); + + controller->TrackInvoker("Mock", mockInvoker); + controller->TrackInvoker("Mock2", mockInvoker2); + + auto config = CreateConfig({ + {"Mock", {{"MockService", "MockMethod"}}}, + {"Mock2", {{"MockService", "MockMethod2"}}}, + }); + config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); + controller->Reconfigure(config); + controller->Start(); + + // Simulate overload + for (int i = 0; i < 5000; ++i) { + mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); + } + + // Check overload incoming requests + int remainsCount = 1000; + while (remainsCount > 0) { + EXPECT_FALSE(ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod2"))); + + auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod")); + if (overloaded) { + --remainsCount; + } else { + Sleep(TDuration::MicroSeconds(10)); + } + } + + // Check recovering even if no calls + while (remainsCount < 1000) { + auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod")); + if (!overloaded) { + ++remainsCount; + } else { + Sleep(TDuration::MicroSeconds(1)); + } + } +} + +TEST(TOverloadControllerTest, TestNoOverloads) +{ + auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); + auto mockInvoker = New<TMockInvoker>(); + + controller->TrackInvoker("Mock", mockInvoker); + + auto config = CreateConfig({ + {"Mock", {{"MockService", "MockMethod"}}} + }); + config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); + + controller->Reconfigure(config); + controller->Start(); + + // Simulate overload + for (int i = 0; i < 5000; ++i) { + mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold / 2); + } + + for (int i = 0; i < 10000; ++i) { + EXPECT_FALSE(ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod"))); + mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold / 2); + + Sleep(TDuration::MicroSeconds(10)); + } +} + +TEST(TOverloadControllerTest, TestTwoInvokersSameMethod) +{ + auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); + auto mockInvoker1 = New<TMockInvoker>(); + auto mockInvoker2 = New<TMockInvoker>(); + + controller->TrackInvoker("Mock1", mockInvoker1); + controller->TrackInvoker("Mock2", mockInvoker2); + + auto config = CreateConfig({ + {"Mock1", {{"MockService", "MockMethod"}}}, + {"Mock2", {{"MockService", "MockMethod"}}}, + }); + config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); + + controller->Reconfigure(config); + controller->Start(); + + // Simulate overload + for (int i = 0; i < 5000; ++i) { + mockInvoker1->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); + mockInvoker2->FireWaitTimeObserved(MeanWaitTimeThreshold / 2); + } + + // Check overloading incoming requests + int remainsCount = 1000; + while (remainsCount > 0) { + auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod")); + if (overloaded) { + --remainsCount; + } else { + Sleep(TDuration::MicroSeconds(10)); + } + } + + // Check recovering even if no calls + while (remainsCount < 1000) { + auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod")); + if (!overloaded) { + ++remainsCount; + } else { + Sleep(TDuration::MicroSeconds(1)); + } + } +} + +TEST(TOverloadControllerTest, TestCongestionWindow) +{ + auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); + auto mockInvoker = New<TMockInvoker>(); + auto mockInvoker2 = New<TMockInvoker>(); + + controller->TrackInvoker("Mock", mockInvoker); + controller->TrackInvoker("Mock2", mockInvoker2); + + auto config = CreateConfig({ + {"Mock", {{"MockService", "MockMethod", 0.3}}}, + {"Mock2", {{"MockService", "MockMethod2", 0.3}}}, + }); + config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); + controller->Reconfigure(config); + controller->Start(); + + // Simulate overload + for (int i = 0; i < 5000; ++i) { + mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); + } + + // Check overload incoming requests + int remainsCount = 1000; + while (remainsCount > 0) { + mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); + { + auto window2 = controller->GetCongestionState("MockService", "MockMethod2"); + EXPECT_EQ(window2.MaxWindow, window2.CurrentWindow); + } + + auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); + bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; + if (overloaded) { + --remainsCount; + EXPECT_EQ(0.3, congestionState.WaitingTimeoutFraction); + EXPECT_EQ(congestionState.OverloadedTrackers, TCongestionState::TTrackersList{"Mock"}); + } else { + Sleep(TDuration::MicroSeconds(10)); + } + } + + // Check recovering even if no calls + while (remainsCount < 1000) { + auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); + bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; + + if (!overloaded) { + ++remainsCount; + } else { + Sleep(TDuration::MicroSeconds(1)); + } + } +} + +TEST(TOverloadControllerTest, TestCongestionWindowTwoTrackers) +{ + auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); + auto mockInvoker1 = New<TMockInvoker>(); + auto mockInvoker2 = New<TMockInvoker>(); + + controller->TrackInvoker("Mock", mockInvoker1); + controller->TrackInvoker("Mock2", mockInvoker2); + + auto config = CreateConfig({ + {"Mock", {{"MockService", "MockMethod", 0.3}}}, + {"Mock2", {{"MockService", "MockMethod", 0.3}}}, + }); + config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); + controller->Reconfigure(config); + controller->Start(); + + // Simulate overload + for (int i = 0; i < 5000; ++i) { + mockInvoker1->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); + mockInvoker2->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); + } + + // Check overload incoming requests + int remainsCount = 10; + while (remainsCount > 0) { + auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); + bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; + if (overloaded) { + --remainsCount; + auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers; + std::sort(trackers.begin(), trackers.end()); + EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock", "Mock2"})); + } else { + Sleep(TDuration::MicroSeconds(10)); + } + } +} + +TEST(TOverloadControllerTest, TestCongestionWindowTwoInstancies) +{ + auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); + auto observer1 = controller->CreateGenericWaitTimeObserver("Mock", "Mock.1"); + auto observer2 = controller->CreateGenericWaitTimeObserver("Mock", "Mock.2"); + + auto config = CreateConfig({ + {"Mock", {{"MockService", "MockMethod", 0.3}}}, + }); + config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); + controller->Reconfigure(config); + controller->Start(); + + // Simulate overload + for (int i = 0; i < 5000; ++i) { + observer1(MeanWaitTimeThreshold * 2); + } + + // Check overload incoming requests + int remainsCount = 10; + while (remainsCount > 0) { + auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); + bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; + if (overloaded) { + --remainsCount; + auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers; + EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock"})); + } else { + Sleep(TDuration::MicroSeconds(10)); + } + } + + Sleep(TDuration::MicroSeconds(10)); + + for (int i = 0; i < 5000; ++i) { + observer1(MeanWaitTimeThreshold * 2); + observer2(MeanWaitTimeThreshold * 2); + } + + remainsCount = 10; + while (remainsCount > 0) { + auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); + bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; + if (overloaded) { + --remainsCount; + auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers; + EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock"})); + } else { + Sleep(TDuration::MicroSeconds(10)); + } + } + + Sleep(TDuration::MicroSeconds(10)); + for (int i = 0; i < 5000; ++i) { + observer1(MeanWaitTimeThreshold / 2); + observer2(MeanWaitTimeThreshold * 2); + } + + remainsCount = 10; + while (remainsCount > 0) { + auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); + bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; + if (overloaded) { + --remainsCount; + auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers; + EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock"})); + } else { + Sleep(TDuration::MicroSeconds(10)); + } + } +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class TExecutorPtr> +void ExecuteWaitTimeTest(const TExecutorPtr& executor, const IInvokerPtr& invoker) +{ + static constexpr int DesiredActionsCount = 27; + + TDuration totalWaitTime; + int actionsCount = 0; + + executor->SubscribeWaitTimeObserved(BIND([&] (TDuration waitTime) { + totalWaitTime += waitTime; + ++actionsCount; + })); + + std::vector<TFuture<void>> futures; + for (int i = 0; i < DesiredActionsCount; ++i) { + auto future = BIND([] { + Sleep(TDuration::MilliSeconds(1)); + }).AsyncVia(invoker) + .Run(); + + futures.push_back(std::move(future)); + } + + WaitFor(AllSucceeded(std::move(futures))) + .ThrowOnError(); + + EXPECT_EQ(DesiredActionsCount, actionsCount); + EXPECT_GE(totalWaitTime, TDuration::MilliSeconds(DesiredActionsCount - 1)); +} + +TEST(TOverloadControllerTest, WaitTimeObserver) +{ + { + auto actionQueue = New<TActionQueue>("TestActionQueue"); + ExecuteWaitTimeTest(actionQueue->GetInvoker(), actionQueue->GetInvoker()); + } + + { + auto fshThreadPool = CreateTwoLevelFairShareThreadPool(1, "TestNewFsh"); + ExecuteWaitTimeTest(fshThreadPool, fshThreadPool->GetInvoker("test-pool", "fsh-tag")); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NRpc diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index 61c92191dbd..5dfb9c00bf7 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -200,9 +200,11 @@ SRCS( rpc/helpers.cpp rpc/local_channel.cpp rpc/local_server.cpp - rpc/message.cpp rpc/message_format.cpp + rpc/message.cpp rpc/null_channel.cpp + rpc/overload_controller.cpp + rpc/overload_controlling_service_base.cpp rpc/peer_discovery.cpp rpc/per_key_request_queue_provider.cpp rpc/protocol_version.cpp @@ -213,8 +215,8 @@ SRCS( rpc/roaming_channel.cpp rpc/serialized_channel.cpp rpc/server_detail.cpp - rpc/service.cpp rpc/service_detail.cpp + rpc/service.cpp rpc/static_channel_factory.cpp rpc/stream.cpp rpc/throttling_channel.cpp diff --git a/yt/yt/library/profiling/solomon/percpu.cpp b/yt/yt/library/profiling/percpu.cpp index 0f146f0921d..d69686f7c30 100644 --- a/yt/yt/library/profiling/solomon/percpu.cpp +++ b/yt/yt/library/profiling/percpu.cpp @@ -1,5 +1,6 @@ #include "percpu.h" -#include "yt/yt/library/profiling/summary.h" + +#include "summary.h" #include <yt/yt/core/profiling/tscp.h> diff --git a/yt/yt/library/profiling/solomon/percpu.h b/yt/yt/library/profiling/percpu.h index 544d806e11d..e1766da58ab 100644 --- a/yt/yt/library/profiling/solomon/percpu.h +++ b/yt/yt/library/profiling/percpu.h @@ -1,7 +1,7 @@ #pragma once -#include <yt/yt/library/profiling/impl.h> -#include <yt/yt/library/profiling/summary.h> +#include "impl.h" +#include "summary.h" #include <yt/yt/core/profiling/tscp.h> diff --git a/yt/yt/library/profiling/solomon/helpers.cpp b/yt/yt/library/profiling/solomon/helpers.cpp index 7f4be734a5e..b5458fb4535 100644 --- a/yt/yt/library/profiling/solomon/helpers.cpp +++ b/yt/yt/library/profiling/solomon/helpers.cpp @@ -1,9 +1,10 @@ #include "helpers.h" -#include "percpu.h" #include "private.h" #include "producer.h" #include "sensor_set.h" +#include <yt/yt/library/profiling/percpu.h> + #include <yt/yt/core/http/http.h> #include <yt/yt/core/misc/ref_counted_tracker.h> diff --git a/yt/yt/library/profiling/solomon/registry.cpp b/yt/yt/library/profiling/solomon/registry.cpp index b8b45442902..d9e6d5b7628 100644 --- a/yt/yt/library/profiling/solomon/registry.cpp +++ b/yt/yt/library/profiling/solomon/registry.cpp @@ -1,12 +1,10 @@ #include "registry.h" #include "sensor.h" -#include "percpu.h" -#include <yt/yt/core/misc/protobuf_helpers.h> +#include <yt/yt/library/profiling/percpu.h> -#include <yt/yt/library/profiling/impl.h> -#include <yt/yt/library/profiling/sensor.h> +#include <yt/yt/core/misc/protobuf_helpers.h> #include <library/cpp/yt/assert/assert.h> diff --git a/yt/yt/library/profiling/solomon/registry.h b/yt/yt/library/profiling/solomon/registry.h index 7a53df82e1e..86ef47f5117 100644 --- a/yt/yt/library/profiling/solomon/registry.h +++ b/yt/yt/library/profiling/solomon/registry.h @@ -5,6 +5,11 @@ #include "producer.h" #include "tag_registry.h" +#include <yt/yt/library/profiling/sensor.h> +#include <yt/yt/library/profiling/impl.h> + +#include <yt/yt/library/profiling/solomon/sensor_dump.pb.h> + #include <yt/yt/core/actions/invoker_util.h> #include <yt/yt/core/misc/mpsc_stack.h> @@ -13,11 +18,6 @@ #include <yt/yt/core/ytree/fluent.h> -#include <yt/yt/library/profiling/sensor.h> -#include <yt/yt/library/profiling/impl.h> - -#include <yt/yt/library/profiling/solomon/sensor_dump.pb.h> - #include <library/cpp/yt/threading/spin_lock.h> namespace NYT::NProfiling { diff --git a/yt/yt/library/profiling/solomon/ya.make b/yt/yt/library/profiling/solomon/ya.make index 3d53febe4f9..5d4ddcff946 100644 --- a/yt/yt/library/profiling/solomon/ya.make +++ b/yt/yt/library/profiling/solomon/ya.make @@ -7,14 +7,13 @@ SRCS( cube.cpp exporter.cpp helpers.cpp - percpu.cpp producer.cpp proxy.cpp GLOBAL registry.cpp remote.cpp - sensor.cpp sensor_service.cpp sensor_set.cpp + sensor.cpp tag_registry.cpp sensor_dump.proto diff --git a/yt/yt/library/profiling/ya.make b/yt/yt/library/profiling/ya.make index e302d65ef41..e0f44fd5fb2 100644 --- a/yt/yt/library/profiling/ya.make +++ b/yt/yt/library/profiling/ya.make @@ -3,12 +3,13 @@ LIBRARY() INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( - sensor.cpp - producer.cpp + histogram_snapshot.cpp impl.cpp + percpu.cpp + producer.cpp + sensor.cpp tag.cpp testing.cpp - histogram_snapshot.cpp ) PEERDIR( |