diff options
| author | dgolear <[email protected]> | 2025-06-20 19:32:27 +0300 | 
|---|---|---|
| committer | dgolear <[email protected]> | 2025-06-20 19:56:56 +0300 | 
| commit | caec1b0ede2b901bcf006a403d15e38e1a94b659 (patch) | |
| tree | 154f961ba752ad72d16b264643709e8d5d3f3a55 | |
| parent | 56da7b93f3951cc9270c8f80e6b7789f77b189ab (diff) | |
YT-19545: Move overload controller to core/rpc from server/node; move percpu counters from profiling/solomon
\* Changelog entry
Type: feature
Component: misc-server
Generalize overload controller and allow other server components to use it.
commit_hash:ff310ddce3b8ee9353746019f1bc404ad9695a90
18 files changed, 1444 insertions, 20 deletions
| diff --git a/yt/yt/core/rpc/config.cpp b/yt/yt/core/rpc/config.cpp index 8fca9174023..e43e9a3166a 100644 --- a/yt/yt/core/rpc/config.cpp +++ b/yt/yt/core/rpc/config.cpp @@ -376,4 +376,66 @@ void TDispatcherDynamicConfig::Register(TRegistrar registrar)  //////////////////////////////////////////////////////////////////////////////// +void TServiceMethod::Register(TRegistrar registrar) +{ +    registrar.Parameter("service", &TThis::Service) +        .Default(); +    registrar.Parameter("method", &TThis::Method) +        .Default(); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TServiceMethodConfig::Register(TRegistrar registrar) +{ +    registrar.Parameter("service", &TThis::Service) +        .Default(); +    registrar.Parameter("method", &TThis::Method) +        .Default(); +    registrar.Parameter("max_window", &TThis::MaxWindow) +        .Default(1'024); +    registrar.Parameter("waiting_timeout_fraction", &TThis::WaitingTimeoutFraction) +        .Default(0.5); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TOverloadTrackerConfigBase::Register(TRegistrar registrar) +{ +    registrar.Parameter("methods_to_throttle", &TThis::MethodsToThrottle) +        .Default(); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TOverloadTrackerMeanWaitTimeConfig::Register(TRegistrar registrar) +{ +    registrar.Parameter("mean_wait_time_threshold", &TThis::MeanWaitTimeThreshold) +        .Default(TDuration::MilliSeconds(20)); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TOverloadTrackerBacklogQueueFillFractionConfig::Register(TRegistrar registrar) +{ +    registrar.Parameter("backlog_queue_fill_fraction_threshold", &TThis::BacklogQueueFillFractionThreshold) +        .Default(0.9); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TOverloadControllerConfig::Register(TRegistrar registrar) +{ +    registrar.Parameter("enabled", &TThis::Enabled) +        .Default(false); +    registrar.Parameter("trackers", &TThis::Trackers) +        .Default(); +    registrar.Parameter("methods", &TThis::Methods) +        .Default(); +    registrar.Parameter("load_adjusting_period", &TThis::LoadAdjustingPeriod) +        .Default(TDuration::MilliSeconds(100)); +} + +//////////////////////////////////////////////////////////////////////////////// +  } // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/config.h b/yt/yt/core/rpc/config.h index f0ddcdb3c6a..4f4734faa26 100644 --- a/yt/yt/core/rpc/config.h +++ b/yt/yt/core/rpc/config.h @@ -5,6 +5,7 @@  #include <yt/yt/core/compression/public.h>  #include <yt/yt/core/ytree/yson_struct.h> +#include <yt/yt/core/ytree/polymorphic_yson_struct.h>  #include <yt/yt/core/concurrency/config.h> @@ -462,4 +463,106 @@ DEFINE_REFCOUNTED_TYPE(TDispatcherDynamicConfig)  //////////////////////////////////////////////////////////////////////////////// +struct TServiceMethod +    : public NYTree::TYsonStructLite +{ +    std::string Service; +    std::string Method; + +    REGISTER_YSON_STRUCT_LITE(TServiceMethod); + +    static void Register(TRegistrar registrar); +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TServiceMethodConfig +    : public NYTree::TYsonStruct +{ +    std::string Service; +    std::string Method; + +    int MaxWindow; +    double WaitingTimeoutFraction; + +    REGISTER_YSON_STRUCT(TServiceMethodConfig); + +    static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TServiceMethodConfig) + +//////////////////////////////////////////////////////////////////////////////// + +struct TOverloadTrackerConfigBase +    : public NYTree::TYsonStruct +{ +    std::vector<TServiceMethod> MethodsToThrottle; + +    REGISTER_YSON_STRUCT(TOverloadTrackerConfigBase); + +    static void Register(TRegistrar registrar); +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TOverloadTrackerMeanWaitTimeConfig +    : public TOverloadTrackerConfigBase +{ +    TDuration MeanWaitTimeThreshold; + +    REGISTER_YSON_STRUCT(TOverloadTrackerMeanWaitTimeConfig); + +    static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TOverloadTrackerMeanWaitTimeConfig) + +//////////////////////////////////////////////////////////////////////////////// + +struct TOverloadTrackerBacklogQueueFillFractionConfig +    : public TOverloadTrackerConfigBase +{ +    double BacklogQueueFillFractionThreshold; + +    REGISTER_YSON_STRUCT(TOverloadTrackerBacklogQueueFillFractionConfig); + +    static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TOverloadTrackerBacklogQueueFillFractionConfig) + +//////////////////////////////////////////////////////////////////////////////// + +DEFINE_ENUM(EOverloadTrackerConfigType, +    (Base) +    (MeanWaitTime) +    (BacklogQueueFillFraction) +); + +DEFINE_POLYMORPHIC_YSON_STRUCT_FOR_ENUM_WITH_DEFAULT(OverloadTrackerConfig, EOverloadTrackerConfigType, MeanWaitTime, +    ((Base)                     (TOverloadTrackerConfigBase)) +    ((MeanWaitTime)             (TOverloadTrackerMeanWaitTimeConfig)) +    ((BacklogQueueFillFraction) (TOverloadTrackerBacklogQueueFillFractionConfig)) +); + +//////////////////////////////////////////////////////////////////////////////// + +struct TOverloadControllerConfig +    : public NYTree::TYsonStruct +{ +    bool Enabled; +    THashMap<std::string, TOverloadTrackerConfig> Trackers; +    std::vector<TServiceMethodConfigPtr> Methods; +    TDuration LoadAdjustingPeriod; + +    REGISTER_YSON_STRUCT(TOverloadControllerConfig); + +    static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TOverloadControllerConfig) + +//////////////////////////////////////////////////////////////////////////////// +  } // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/overload_controller.cpp b/yt/yt/core/rpc/overload_controller.cpp new file mode 100644 index 00000000000..87484a7b405 --- /dev/null +++ b/yt/yt/core/rpc/overload_controller.cpp @@ -0,0 +1,602 @@ +#include "overload_controller.h" + +#include "config.h" +#include "private.h" + +#include <yt/yt/library/profiling/percpu.h> + +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/periodic_executor.h> +#include <yt/yt/core/concurrency/two_level_fair_share_thread_pool.h> + +#include <yt/yt/core/logging/log_manager.h> + +#include <yt/yt/core/misc/proc.h> + +#include <yt/yt/core/profiling/timing.h> + +#include <library/cpp/yt/threading/spin_lock.h> + +namespace NYT::NRpc { + +using namespace NThreading; +using namespace NConcurrency; +using namespace NLogging; +using namespace NProfiling; + +//////////////////////////////////////////////////////////////////////////////// + +static auto Logger = RpcServerLogger().WithTag("OverloadController"); +static const std::string CpuThrottlingTrackerName = "CpuThrottling"; +static const std::string LogDropTrackerName = "LogDrop"; +static const std::string ControlGroupCpuName = "cpu"; + +//////////////////////////////////////////////////////////////////////////////// + +DECLARE_REFCOUNTED_CLASS(TOverloadTracker) + +class TOverloadTracker +    : public TRefCounted +{ +public: +    TOverloadTracker(TStringBuf type, TStringBuf id) +        : Type_(type) +        , Id_(id) +    { } + +    virtual bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) = 0; + +    const std::string& GetType() const +    { +        return Type_; +    } + +protected: +    const std::string Type_; +    const std::string Id_; + +    TCounter Overloaded_; +}; + +DEFINE_REFCOUNTED_TYPE(TOverloadTracker); + +//////////////////////////////////////////////////////////////////////////////// + +class TMeanWaitTimeTracker +    : public TOverloadTracker +{ +public: +    TMeanWaitTimeTracker(TStringBuf type, TStringBuf id, const TProfiler& profiler) +        : TOverloadTracker(type, id) +        , Counter_(New<TPerCpuDurationSummary>()) +    { +        auto taggedProfiler = profiler.WithTag("tracker", std::string(Id_)); + +        Overloaded_ = taggedProfiler.Counter("/overloaded"); +        MeanWaitTime_ = profiler.Timer("/mean_wait_time"); +        MeanWaitTimeThreshold_ = profiler.TimeGauge("/mean_wait_time_threshold"); +    } + +    void Record(TDuration waitTime) +    { +        Counter_->Record(waitTime); +    } + +    virtual bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) override +    { +        auto summary = Counter_->GetSummaryAndReset(); +        TDuration meanValue; + +        if (summary.Count()) { +            meanValue = summary.Sum() / summary.Count(); +        } + +        YT_LOG_DEBUG("Reporting mean wait time for tracker " +            "(Tracker: %v, TotalWaitTime: %v, TotalCount: %v, MeanValue: %v)", +            Id_, +            summary.Sum(), +            summary.Count(), +            meanValue); + +        return ProfileAndGetOverloaded(meanValue, config.TryGetConcrete<TOverloadTrackerMeanWaitTimeConfig>()); +    } + +protected: +    bool ProfileAndGetOverloaded(TDuration meanValue, const TOverloadTrackerMeanWaitTimeConfigPtr& config) +    { +        bool overloaded = meanValue > config->MeanWaitTimeThreshold; + +        Overloaded_.Increment(static_cast<int>(overloaded)); +        MeanWaitTime_.Record(meanValue); +        MeanWaitTimeThreshold_.Update(config->MeanWaitTimeThreshold); + +        return overloaded; +    } + +private: +    using TPerCpuDurationSummary = TPerCpuSummary<TDuration>; + +    TIntrusivePtr<TPerCpuDurationSummary> Counter_; + +    TEventTimer MeanWaitTime_; +    TTimeGauge MeanWaitTimeThreshold_; +}; + +DEFINE_REFCOUNTED_TYPE(TMeanWaitTimeTracker); + +//////////////////////////////////////////////////////////////////////////////// + +class TContainerCpuThrottlingTracker +    : public TMeanWaitTimeTracker +{ +public: +    using TMeanWaitTimeTracker::TMeanWaitTimeTracker; + +    bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) override +    { +        auto cpuStats = GetStats(); +        if (!cpuStats) { +            return {}; +        } + +        TDuration throttlingTime; + +        if (LastCpuStats_) { +            auto throttlingDelta = cpuStats->ThrottledTime - LastCpuStats_->ThrottledTime; +            throttlingTime = TDuration::MicroSeconds(throttlingDelta / 1000); + +            YT_LOG_DEBUG("Reporting container CPU throttling time " +                "(LastCpuThrottlingTime: %v, CpuThrottlingTime: %v, ThrottlingDelta: %v, ThrottlingTime: %v)", +                LastCpuStats_->ThrottledTime, +                cpuStats->ThrottledTime, +                throttlingDelta, +                throttlingTime); +        } + +        LastCpuStats_ = cpuStats; + +        return ProfileAndGetOverloaded(throttlingTime, config.TryGetConcrete<TOverloadTrackerMeanWaitTimeConfig>()); +    } + +private: +    std::optional<TCgroupCpuStat> LastCpuStats_; +    bool CgroupErrorLogged_ = false; + +    std::optional<TCgroupCpuStat> GetStats() +    { +        try { +            auto cgroups = GetProcessCgroups(); +            for (const auto& group : cgroups) { +                for (const auto& controller : group.Controllers) { +                    if (controller == ControlGroupCpuName) { +                        return GetCgroupCpuStat(group.ControllersName, group.Path); +                    } +                } +            } +        } catch (const std::exception& ex) { +            if (!CgroupErrorLogged_) { +                YT_LOG_INFO(ex, "Failed to collect cgroup CPU statistics"); +                CgroupErrorLogged_ = true; +            } +        } + +        return {}; +    } +}; + +DEFINE_REFCOUNTED_TYPE(TContainerCpuThrottlingTracker); + +//////////////////////////////////////////////////////////////////////////////// + +class TLogDropTracker +    : public TOverloadTracker +{ +public: +    TLogDropTracker(TStringBuf type, TStringBuf id, const TProfiler& profiler) +        : TOverloadTracker(type, id) +    { +        auto taggedProfiler = profiler.WithTag("tracker", Id_); + +        Overloaded_ = taggedProfiler.Counter("/overloaded"); +        BacklogQueueFillFraction_ = profiler.Gauge("/backlog_queue_fill_fraction"); +        BacklogQueueFillFractionThreshold_ = profiler.Gauge("/backlog_queue_fill_fraction_threshold"); +    } + +    bool CalculateIsOverloaded(const TOverloadTrackerConfig& config) override +    { +        double BacklogQueueFillFraction = TLogManager::Get()->GetBacklogQueueFillFraction(); + +        YT_LOG_DEBUG("Reporting logging queue filling fraction " +            "(BacklogQueueFillFraction: %v)", +            BacklogQueueFillFraction); + +        const auto& logDropConfig = config.TryGetConcrete<TOverloadTrackerBacklogQueueFillFractionConfig>(); + +        bool overloaded = BacklogQueueFillFraction > logDropConfig->BacklogQueueFillFractionThreshold; + +        Overloaded_.Increment(static_cast<int>(overloaded)); +        BacklogQueueFillFraction_.Update(BacklogQueueFillFraction); +        BacklogQueueFillFractionThreshold_.Update(logDropConfig->BacklogQueueFillFractionThreshold); + +        return overloaded; +    } + +private: +    TGauge BacklogQueueFillFraction_; +    TGauge BacklogQueueFillFractionThreshold_; +}; + +DEFINE_REFCOUNTED_TYPE(TLogDropTracker); + +//////////////////////////////////////////////////////////////////////////////// + +bool ShouldThrottleCall(const TCongestionState& congestionState) +{ +    if (!congestionState.MaxWindow || !congestionState.CurrentWindow) { +        return false; +    } + +    return static_cast<int>(RandomNumber<ui32>(*congestionState.MaxWindow)) + 1 > *congestionState.CurrentWindow; +} + +//////////////////////////////////////////////////////////////////////////////// + +class TCongestionController +    : public TRefCounted +{ +public: +    TCongestionController(TServiceMethodConfigPtr methodConfig, TOverloadControllerConfigPtr config, TProfiler profiler) +        : MethodConfig_(std::move(methodConfig)) +        , Config_(std::move(config)) +        , MaxWindow_(MethodConfig_->MaxWindow) +        , Window_(MaxWindow_) +        , WindowGauge_(profiler.Gauge("/window")) +        , SlowStartThresholdGauge_(profiler.Gauge("/slow_start_threshold_gauge")) +        , MaxWindowGauge_(profiler.Gauge("/max_window")) +    { } + +    TCongestionState GetCongestionState() +    { +        auto result = TCongestionState{ +            .CurrentWindow = Window_.load(std::memory_order::relaxed), +            .MaxWindow = MaxWindow_, +            .WaitingTimeoutFraction = MethodConfig_->WaitingTimeoutFraction, +        }; + +        auto now = NProfiling::GetCpuInstant(); +        auto recentlyOverloadedThreshold = Config_->LoadAdjustingPeriod * MethodConfig_->MaxWindow; + +        for (const auto& [trackerType, lastOverloaded] : OverloadedTrackers_) { +            auto sinceLastOverloaded = CpuDurationToDuration(now - lastOverloaded.load(std::memory_order::relaxed)); +            if (sinceLastOverloaded < recentlyOverloadedThreshold) { +                result.OverloadedTrackers.push_back(trackerType); +            } +        } + +        return result; +    } + +    void Adjust(const THashSet<std::string>& overloadedTrackers, TCpuInstant timestamp) +    { +        auto window = Window_.load(std::memory_order::relaxed); + +        // NB. We reporting here slightly outdated values but this makes code simpler a bit. +        WindowGauge_.Update(window); +        MaxWindowGauge_.Update(MaxWindow_); +        SlowStartThresholdGauge_.Update(SlowStartThreshold_); + +        for (const auto& tracker : overloadedTrackers) { +            auto it = OverloadedTrackers_.find(tracker); +            if (it != OverloadedTrackers_.end()) { +                it->second.store(timestamp, std::memory_order::relaxed); +            } +        } + +        auto overloaded = !overloadedTrackers.empty(); + +        if (overloaded) { +            SlowStartThreshold_ = window > 0 ? window / 2 : SlowStartThreshold_; +            Window_.store(0, std::memory_order::relaxed); + +            YT_LOG_WARNING("System is overloaded (SlowStartThreshold: %v, Window: %v, OverloadedTrackers: %v)", +                SlowStartThreshold_, +                window, +                overloadedTrackers); +            return; +        } + +        if (window >= SlowStartThreshold_) { +            ++window; +        } else { +            window *= 2; +            window = std::min(SlowStartThreshold_, window); +        } + +        // Keeping window in sane limits. +        window = std::min(MaxWindow_, window); +        window = std::max(1, window); + +        YT_LOG_DEBUG("Adjusting system load up (SlowStartThreshold: %v, CurrentWindow: %v)", +            SlowStartThreshold_, +            window); + +        Window_.store(window, std::memory_order::relaxed); +    } + +    void AddTrackerType(std::string trackerType) +    { +        // Called only during initial construction of controllers, +        // so we do not have to serialize here. +        OverloadedTrackers_[trackerType] = {}; +    } + +private: +    const TServiceMethodConfigPtr MethodConfig_; +    const TOverloadControllerConfigPtr Config_; +    const int MaxWindow_; + +    std::atomic<int> Window_; +    int SlowStartThreshold_ = 0; +    THashMap<std::string, std::atomic<TCpuInstant>> OverloadedTrackers_; + +    TCounter SkippedRequestCount_; +    TGauge WindowGauge_; +    TGauge SlowStartThresholdGauge_; +    TGauge MaxWindowGauge_; +}; + +DEFINE_REFCOUNTED_TYPE(TCongestionController); + +//////////////////////////////////////////////////////////////////////////////// + +class TOverloadController +    : public IOverloadController +{ +public: +    DEFINE_SIGNAL_OVERRIDE(void(), LoadAdjusted); + +    TOverloadController(TOverloadControllerConfigPtr config, NProfiling::TProfiler profiler) +        : ControlThread_(New<TActionQueue>("OverloadCtl")) +        , Invoker_(ControlThread_->GetInvoker()) +        , Periodic_(New<TPeriodicExecutor>( +            Invoker_, +            BIND(&TOverloadController::Adjust, MakeWeak(this)), +            config->LoadAdjustingPeriod)) +        , Profiler_(std::move(profiler)) +    { +        State_.Config = std::move(config); +        CreateGenericTracker<TContainerCpuThrottlingTracker>(CpuThrottlingTrackerName); +        CreateGenericTracker<TLogDropTracker>(LogDropTrackerName); +        UpdateStateSnapshot(State_, Guard(SpinLock_)); +    } + +    void Start() override +    { +        Periodic_->Start(); +    } + +    void TrackInvoker( +        TStringBuf name, +        const IInvokerPtr& invoker) override +    { +        invoker->SubscribeWaitTimeObserved(CreateGenericWaitTimeObserver(name)); +    } + +    void TrackFSHThreadPool( +        TStringBuf name, +        const NConcurrency::ITwoLevelFairShareThreadPoolPtr& threadPool) override +    { +        threadPool->SubscribeWaitTimeObserved(CreateGenericWaitTimeObserver(name)); +    } + +    IInvoker::TWaitTimeObserver CreateGenericWaitTimeObserver( +        TStringBuf trackerType, +        std::optional<TStringBuf> id = {}) override +    { +        auto tracker = CreateGenericTracker<TMeanWaitTimeTracker>(std::move(trackerType), std::move(id)); + +        return BIND([tracker] (TDuration waitTime) { +            tracker->Record(waitTime); +        }); +    } + +    TCongestionState GetCongestionState(TStringBuf service, TStringBuf method) const override +    { +        auto snapshot = GetStateSnapshot(); +        if (!snapshot->Config->Enabled) { +            return {}; +        } + +        const auto& controllers = snapshot->CongestionControllers; +        if (auto it = controllers.find(std::pair(service, method)); it != controllers.end()) { +            return it->second->GetCongestionState(); +        } + +        return {}; +    } + +    void Reconfigure(TOverloadControllerConfigPtr config) override +    { +        Periodic_->SetPeriod(config->LoadAdjustingPeriod); + +        auto guard = Guard(SpinLock_); +        State_.CongestionControllers = CreateCongestionControllers(config, Profiler_); +        State_.Config = std::move(config); + +        UpdateStateSnapshot(State_, std::move(guard)); +    } + +private: +    using TMethodIndex = std::pair<TString, TString>; +    using TMethodsCongestionControllers = THashMap<TMethodIndex, TCongestionControllerPtr>; + +    struct TState final +    { +        static constexpr bool EnableHazard = true; + +        TOverloadControllerConfigPtr Config; +        TMethodsCongestionControllers CongestionControllers; +        THashMap<TString, TOverloadTrackerPtr> Trackers; +    }; + +    using TSpinLockGuard = TGuard<NThreading::TSpinLock>; + +    const NConcurrency::TActionQueuePtr ControlThread_; +    const IInvokerPtr Invoker_; +    const NConcurrency::TPeriodicExecutorPtr Periodic_; +    const NProfiling::TProfiler Profiler_; + +    TAtomicPtr<TState, /*EnableAcquireHazard*/ true> StateSnapshot_; + +    NThreading::TSpinLock SpinLock_; +    TState State_; + +    void Adjust() +    { +        DoAdjust(GetStateSnapshot()); +        LoadAdjusted_.Fire(); +    } + +    void DoAdjust(const THazardPtr<TState>& state) +    { +        YT_ASSERT_INVOKER_AFFINITY(Invoker_); + +        auto now = NProfiling::GetCpuInstant(); + +        using TOverloadedTrackers = THashSet<std::string>; +        THashMap<TMethodIndex, TOverloadedTrackers> methodOverloaded; + +        const auto& config = state->Config; + +        for (const auto& [_, trackerConfig] : config->Trackers) { +            for (const auto& method : trackerConfig->MethodsToThrottle) { +                methodOverloaded[std::pair(method.Service, method.Method)] = {}; +            } +        } + +        for (const auto& [trackerId, tracker] : state->Trackers) { +            auto trackerIt = config->Trackers.find(tracker->GetType()); +            if (trackerIt == config->Trackers.end()) { +                continue; +            } + +            const auto& trackerConfig = trackerIt->second; + +            auto trackerOverloaded = tracker->CalculateIsOverloaded(trackerConfig); + +            if (!trackerOverloaded) { +                continue; +            } + +            for (const auto& method : trackerIt->second->MethodsToThrottle) { +                auto& overloadedTrackers = methodOverloaded[std::pair(method.Service, method.Method)]; +                overloadedTrackers.insert(tracker->GetType()); +            } +        } + +        for (const auto& [method, overloadedTrackers] : methodOverloaded) { +            auto it = state->CongestionControllers.find(method); +            if (it == state->CongestionControllers.end()) { +                YT_LOG_WARNING("Cannot find congestion controller for method (Service: %v, Method: %v)", +                    method.first, +                    method.second); + +                continue; +            } + +            it->second->Adjust(overloadedTrackers, now); +        } +    } + +    THazardPtr<TOverloadController::TState> GetStateSnapshot() const +    { +        YT_VERIFY(StateSnapshot_); + +        return StateSnapshot_.AcquireHazard(); +    } + +    void UpdateStateSnapshot(const TState& state, TSpinLockGuard guard) +    { +        auto snapshot = New<TState>(state); +        guard.Release(); + +        StateSnapshot_.Store(std::move(snapshot)); +        ReclaimHazardPointers(); +    } + +    template <class TTracker> +    TIntrusivePtr<TTracker> CreateGenericTracker(TStringBuf trackerType, std::optional<TStringBuf> id = {}) +    { +        YT_LOG_DEBUG("Creating overload tracker (TrackerType: %v, Id: %v)", +            trackerType, +            id); + +        auto trackerId = id.value_or(trackerType); + +        auto profiler = Profiler_.WithTag("tracker", std::string(trackerId)); + +        auto tracker = New<TTracker>(trackerType, trackerId, profiler); + +        auto guard = Guard(SpinLock_); + +        State_.Trackers[trackerId] = tracker; + +        UpdateStateSnapshot(State_, std::move(guard)); + +        return tracker; +    } + +    static TMethodsCongestionControllers CreateCongestionControllers( +        const TOverloadControllerConfigPtr& config, +        NProfiling::TProfiler profiler) +    { +        TMethodsCongestionControllers controllers; + +        THashMap<TMethodIndex, TServiceMethodConfigPtr> configIndex; +        for (const auto& methodConfig : config->Methods) { +            configIndex[std::pair(methodConfig->Service, methodConfig->Method)] = methodConfig; +        } + +        auto getConfig = [&configIndex] (TStringBuf service, TStringBuf method) { +            auto it = configIndex.find(std::pair(service, method)); +            if (it != configIndex.end()) { +                return it->second; +            } + +            auto defaultConfig = New<TServiceMethodConfig>(); +            defaultConfig->Service = service; +            defaultConfig->Method = method; +            return defaultConfig; +        }; + +        for (const auto& [trackerType, tracker] : config->Trackers) { +            for (const auto& method : tracker->MethodsToThrottle) { +                auto& controller = controllers[std::pair(method.Service, method.Method)]; + +                if (!controller) { +                    auto methodConfig = getConfig(method.Service, method.Method); +                    auto controllerProfiler = profiler +                        .WithTag("yt_service", method.Service) +                        .WithTag("method", method.Method); + +                    controller = New<TCongestionController>( +                        std::move(methodConfig), +                        config, std::move(controllerProfiler)); +                } + +                controller->AddTrackerType(trackerType); +            } +        } + +        return controllers; +    } +}; + +IOverloadControllerPtr CreateOverloadController(TOverloadControllerConfigPtr config, NProfiling::TProfiler profiler) +{ +    return New<TOverloadController>(std::move(config), std::move(profiler)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/overload_controller.h b/yt/yt/core/rpc/overload_controller.h new file mode 100644 index 00000000000..708ec833271 --- /dev/null +++ b/yt/yt/core/rpc/overload_controller.h @@ -0,0 +1,67 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/library/profiling/sensor.h> + +#include <yt/yt/core/concurrency/public.h> + +#include <yt/yt/core/misc/atomic_ptr.h> + +#include <library/cpp/yt/compact_containers/compact_vector.h> + +#include <library/cpp/yt/threading/public.h> + +namespace NYT::NRpc { + +//////////////////////////////////////////////////////////////////////////////// + +// Overall rpc-method congestion state. +// Can be a a measure of allowed concurrency or probability of acceptance more calls. +struct TCongestionState +{ +    std::optional<int> CurrentWindow; +    std::optional<int> MaxWindow; +    double WaitingTimeoutFraction = 0; + +    using TTrackersList = TCompactVector<std::string, 4>; +    TTrackersList OverloadedTrackers; +}; + +// Probabilistic predicate based on congestion state of the the method. +bool ShouldThrottleCall(const TCongestionState& congestionState); + +//////////////////////////////////////////////////////////////////////////////// + +struct IOverloadController +    : public TRefCounted +{ +public: +    DECLARE_INTERFACE_SIGNAL(void(), LoadAdjusted); + +    virtual void Start() = 0; +    virtual void Reconfigure(TOverloadControllerConfigPtr config) = 0; + +    virtual void TrackInvoker( +        TStringBuf name, +        const IInvokerPtr& invoker) = 0; +    virtual void TrackFSHThreadPool( +        TStringBuf name, +        const NConcurrency::ITwoLevelFairShareThreadPoolPtr& threadPool) = 0; + +    virtual IInvoker::TWaitTimeObserver CreateGenericWaitTimeObserver( +        TStringBuf trackerType, +        std::optional<TStringBuf> id = {}) = 0; + +    virtual TCongestionState GetCongestionState(TStringBuf service, TStringBuf method) const = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IOverloadController); + +IOverloadControllerPtr CreateOverloadController( +    TOverloadControllerConfigPtr config, +    NProfiling::TProfiler profiler = {}); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/overload_controlling_service_base-inl.h b/yt/yt/core/rpc/overload_controlling_service_base-inl.h new file mode 100644 index 00000000000..2a7a1b1b9dc --- /dev/null +++ b/yt/yt/core/rpc/overload_controlling_service_base-inl.h @@ -0,0 +1,96 @@ +#ifndef OVERLOAD_CONTROLLING_SERVICE_BASE_INL_H_ +#error "Direct inclusion of this file is not allowed, include overload_controlling_service_base.h" +// For the sake of sane code completion. +#include "overload_controlling_service_base.h" +#endif + +#include "overload_controller.h" + +#include <yt/yt/core/concurrency/delayed_executor.h> + +namespace NYT::NRpc { + +using namespace NConcurrency; + +//////////////////////////////////////////////////////////////////////////////// + +template <class TBaseService> +template <typename... TArgs> +TOverloadControllingServiceBase<TBaseService>::TOverloadControllingServiceBase( +    IOverloadControllerPtr controller, +    TArgs&&... args) +    : TBaseService(std::forward<TArgs>(args)...) +    , Controller_(std::move(controller)) +{ +    YT_VERIFY(Controller_); +} + +template <class TBaseService> +void TOverloadControllingServiceBase<TBaseService>::SubscribeLoadAdjusted() +{ +    Controller_->SubscribeLoadAdjusted(BIND( +        &TOverloadControllingServiceBase::HandleLoadAdjusted, +        MakeWeak(this))); +} + +template <class TBaseService> +auto TOverloadControllingServiceBase<TBaseService>::RegisterMethod( +    const TMethodDescriptor& descriptor) -> TRuntimeMethodInfoPtr +{ +    Methods_.insert(descriptor.Method); +    return TBaseService::RegisterMethod(descriptor); +} + +template <class TBaseService> +void TOverloadControllingServiceBase<TBaseService>::HandleLoadAdjusted() +{ +    const auto& serviceName = TBaseService::GetServiceId().ServiceName; + +    for (const auto& method : Methods_) { +        auto* runtimeInfo = TBaseService::FindMethodInfo(method); +        YT_VERIFY(runtimeInfo); + +        auto congestionState = Controller_->GetCongestionState(serviceName, method); +        runtimeInfo->ConcurrencyLimit.SetDynamicLimit(congestionState.CurrentWindow); +        runtimeInfo->WaitingTimeoutFraction.store( +            congestionState.WaitingTimeoutFraction, +            std::memory_order::relaxed); +    } +} + +template <class TBaseService> +std::optional<TError> TOverloadControllingServiceBase<TBaseService>::GetThrottledError( +    const NRpc::NProto::TRequestHeader& requestHeader) +{ +    auto congestionState = Controller_->GetCongestionState(requestHeader.service(), requestHeader.method()); +    const auto& overloadedTrackers = congestionState.OverloadedTrackers; + +    if (!overloadedTrackers.empty()) { +        return TError(NRpc::EErrorCode::Overloaded, "Instance is overloaded") +            << TErrorAttribute("overloaded_trackers", overloadedTrackers); +    } + +    return TBaseService::GetThrottledError(requestHeader); +} + +template <class TBaseService> +void TOverloadControllingServiceBase<TBaseService>::HandleRequest( +    std::unique_ptr<NRpc::NProto::TRequestHeader> header, +    TSharedRefArray message, +    NBus::IBusPtr replyBus) +{ +    auto congestionState = Controller_->GetCongestionState( +        header->service(), +        header->method()); + +    if (ShouldThrottleCall(congestionState)) { +        // Give other handling routines chance to execute. +        NConcurrency::Yield(); +    } + +    TBaseService::HandleRequest(std::move(header), std::move(message), std::move(replyBus)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/overload_controlling_service_base.cpp b/yt/yt/core/rpc/overload_controlling_service_base.cpp new file mode 100644 index 00000000000..76b559bc726 --- /dev/null +++ b/yt/yt/core/rpc/overload_controlling_service_base.cpp @@ -0,0 +1,9 @@ +#include "overload_controlling_service_base.h" + +namespace NYT::NRpc { + +//////////////////////////////////////////////////////////////////////////////// + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/overload_controlling_service_base.h b/yt/yt/core/rpc/overload_controlling_service_base.h new file mode 100644 index 00000000000..7b8c95c007e --- /dev/null +++ b/yt/yt/core/rpc/overload_controlling_service_base.h @@ -0,0 +1,45 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/core/rpc/service_detail.h> + +namespace NYT::NRpc { + +//////////////////////////////////////////////////////////////////////////////// + +template <class TBaseService> +class TOverloadControllingServiceBase +    : public TBaseService +{ +public: +    template <typename... TArgs> +    explicit TOverloadControllingServiceBase(IOverloadControllerPtr controller, TArgs&&... args); + +    using TRuntimeMethodInfoPtr = NRpc::TServiceBase::TRuntimeMethodInfoPtr; +    using TMethodDescriptor = NRpc::TServiceBase::TMethodDescriptor; + +    TRuntimeMethodInfoPtr RegisterMethod(const TMethodDescriptor& descriptor) override; +    void SubscribeLoadAdjusted(); + +protected: +    std::optional<TError> GetThrottledError(const NRpc::NProto::TRequestHeader& requestHeader) override; +    void HandleRequest( +        std::unique_ptr<NRpc::NProto::TRequestHeader> header, +        TSharedRefArray message, +        NBus::IBusPtr replyBus) override; + +private: +    IOverloadControllerPtr Controller_; +    THashSet<std::string> Methods_; + +    void HandleLoadAdjusted(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRpc + +#define OVERLOAD_CONTROLLING_SERVICE_BASE_INL_H_ +#include "overload_controlling_service_base-inl.h" +#undef OVERLOAD_CONTROLLING_SERVICE_BASE_INL_H_ diff --git a/yt/yt/core/rpc/public.h b/yt/yt/core/rpc/public.h index 5955185d368..5f89abd52d9 100644 --- a/yt/yt/core/rpc/public.h +++ b/yt/yt/core/rpc/public.h @@ -70,6 +70,7 @@ DECLARE_REFCOUNTED_STRUCT(IChannelFactory)  DECLARE_REFCOUNTED_STRUCT(IRoamingChannelProvider)  DECLARE_REFCOUNTED_STRUCT(IAuthenticator)  DECLARE_REFCOUNTED_STRUCT(IResponseKeeper) +DECLARE_REFCOUNTED_STRUCT(IOverloadController)  DECLARE_REFCOUNTED_CLASS(TClientContext)  DECLARE_REFCOUNTED_CLASS(TServiceBase) @@ -77,6 +78,7 @@ DECLARE_REFCOUNTED_CLASS(TChannelWrapper)  DECLARE_REFCOUNTED_CLASS(TStaticChannelFactory)  DECLARE_REFCOUNTED_CLASS(TClientRequestControlThunk)  DECLARE_REFCOUNTED_CLASS(TCachingChannelFactory) +DECLARE_REFCOUNTED_CLASS(TCongestionController)  DECLARE_REFCOUNTED_CLASS(TAttachmentsInputStream)  DECLARE_REFCOUNTED_CLASS(TAttachmentsOutputStream) @@ -127,6 +129,10 @@ DECLARE_REFCOUNTED_STRUCT(TThrottlingChannelDynamicConfig)  DECLARE_REFCOUNTED_STRUCT(TResponseKeeperConfig)  DECLARE_REFCOUNTED_STRUCT(TDispatcherConfig)  DECLARE_REFCOUNTED_STRUCT(TDispatcherDynamicConfig) +DECLARE_REFCOUNTED_STRUCT(TServiceMethodConfig) +DECLARE_REFCOUNTED_STRUCT(TOverloadTrackerMeanWaitTimeConfig) +DECLARE_REFCOUNTED_STRUCT(TOverloadTrackerBacklogQueueFillFractionConfig) +DECLARE_REFCOUNTED_STRUCT(TOverloadControllerConfig)  struct TRequestQueueThrottlerConfigs  { diff --git a/yt/yt/core/rpc/unittests/main/ya.make b/yt/yt/core/rpc/unittests/main/ya.make index 335a91fa4aa..226e2da0215 100644 --- a/yt/yt/core/rpc/unittests/main/ya.make +++ b/yt/yt/core/rpc/unittests/main/ya.make @@ -5,6 +5,7 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)  PROTO_NAMESPACE(yt)  SRCS( +    yt/yt/core/rpc/unittests/overload_controller_ut.cpp      yt/yt/core/rpc/unittests/handle_channel_failure_ut.cpp      yt/yt/core/rpc/unittests/roaming_channel_ut.cpp      yt/yt/core/rpc/unittests/rpc_ut.cpp diff --git a/yt/yt/core/rpc/unittests/overload_controller_ut.cpp b/yt/yt/core/rpc/unittests/overload_controller_ut.cpp new file mode 100644 index 00000000000..8a6e39aeb43 --- /dev/null +++ b/yt/yt/core/rpc/unittests/overload_controller_ut.cpp @@ -0,0 +1,431 @@ +#include <yt/yt/core/rpc/config.h> +#include <yt/yt/core/rpc/overload_controller.h> + +#include <yt/yt/core/concurrency/action_queue.h> +#include <yt/yt/core/concurrency/two_level_fair_share_thread_pool.h> + +#include <yt/yt/core/test_framework/framework.h> + +namespace NYT::NRpc { +namespace { + +using namespace NConcurrency; + +//////////////////////////////////////////////////////////////////////////////// + +class TMockInvoker +    : public IInvoker +{ +public: +    DEFINE_SIGNAL_OVERRIDE(TWaitTimeObserver::TSignature, WaitTimeObserved); + +public: +    void Invoke(TClosure /*callback*/) override +    { } + +    void Invoke(TMutableRange<TClosure> /*callbacks*/) override +    { } + +    bool CheckAffinity(const IInvokerPtr& /*invoker*/) const override +    { +        return false; +    } + +    bool IsSerialized() const override +    { +        return true; +    } + +    NThreading::TThreadId GetThreadId() const override +    { +        return {}; +    } + +    void FireWaitTimeObserved(TDuration waitTime) +    { +        WaitTimeObserved_.Fire(waitTime); +    } +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TMethodInfo +{ +    TString Service; +    TString Method; +    double WaitingTimeoutFraction = 0; +}; + +using TMethodInfoList = std::vector<TMethodInfo>; + +constexpr auto MeanWaitTimeThreshold = TDuration::MilliSeconds(20); + +TOverloadControllerConfigPtr CreateConfig(const THashMap<TString, TMethodInfoList>& schema) +{ +    auto config = New<TOverloadControllerConfig>(); +    config->Enabled = true; + +    for (const auto& [trackerName, methods] : schema) { +        TOverloadTrackerConfig trackerConfig(EOverloadTrackerConfigType::MeanWaitTime); +        auto trackerMeanWaitTimeConfig = trackerConfig.TryGetConcrete<TOverloadTrackerMeanWaitTimeConfig>(); + +        for (const auto& methodInfo : methods) { +            { +                TServiceMethod serviceMethod; +                serviceMethod.Service = methodInfo.Service; +                serviceMethod.Method = methodInfo.Method; +                trackerMeanWaitTimeConfig->MethodsToThrottle.push_back(std::move(serviceMethod)); +            } +            { +                auto serviceMethodConfig = New<TServiceMethodConfig>(); +                serviceMethodConfig->Service = methodInfo.Service; +                serviceMethodConfig->Method = methodInfo.Method; +                serviceMethodConfig->WaitingTimeoutFraction = methodInfo.WaitingTimeoutFraction; +                config->Methods.push_back(std::move(serviceMethodConfig)); +            } +            trackerMeanWaitTimeConfig->MeanWaitTimeThreshold = MeanWaitTimeThreshold; +        } + +        config->Trackers[trackerName] = trackerConfig; +    } + +    return config; +} + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TOverloadControllerTest, TestOverloadsRequests) +{ +    auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); +    auto mockInvoker = New<TMockInvoker>(); +    auto mockInvoker2 = New<TMockInvoker>(); + +    controller->TrackInvoker("Mock", mockInvoker); +    controller->TrackInvoker("Mock2", mockInvoker2); + +    auto config = CreateConfig({ +        {"Mock", {{"MockService", "MockMethod"}}}, +        {"Mock2", {{"MockService", "MockMethod2"}}}, +    }); +    config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); +    controller->Reconfigure(config); +    controller->Start(); + +    // Simulate overload +    for (int i = 0; i < 5000; ++i) { +        mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); +    } + +    // Check overload incoming requests +    int remainsCount = 1000; +    while (remainsCount > 0) { +        EXPECT_FALSE(ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod2"))); + +        auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod")); +        if (overloaded) { +            --remainsCount; +        } else { +            Sleep(TDuration::MicroSeconds(10)); +        } +    } + +    // Check recovering even if no calls +    while (remainsCount < 1000) { +        auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod")); +        if (!overloaded) { +            ++remainsCount; +        } else { +            Sleep(TDuration::MicroSeconds(1)); +        } +    } +} + +TEST(TOverloadControllerTest, TestNoOverloads) +{ +    auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); +    auto mockInvoker = New<TMockInvoker>(); + +    controller->TrackInvoker("Mock", mockInvoker); + +    auto config = CreateConfig({ +        {"Mock", {{"MockService", "MockMethod"}}} +    }); +    config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); + +    controller->Reconfigure(config); +    controller->Start(); + +    // Simulate overload +    for (int i = 0; i < 5000; ++i) { +        mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold / 2); +    } + +    for (int i = 0; i < 10000; ++i) { +        EXPECT_FALSE(ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod"))); +        mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold / 2); + +        Sleep(TDuration::MicroSeconds(10)); +    } +} + +TEST(TOverloadControllerTest, TestTwoInvokersSameMethod) +{ +    auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); +    auto mockInvoker1 = New<TMockInvoker>(); +    auto mockInvoker2 = New<TMockInvoker>(); + +    controller->TrackInvoker("Mock1", mockInvoker1); +    controller->TrackInvoker("Mock2", mockInvoker2); + +    auto config = CreateConfig({ +        {"Mock1", {{"MockService", "MockMethod"}}}, +        {"Mock2", {{"MockService", "MockMethod"}}}, +    }); +    config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); + +    controller->Reconfigure(config); +    controller->Start(); + +    // Simulate overload +    for (int i = 0; i < 5000; ++i) { +        mockInvoker1->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); +        mockInvoker2->FireWaitTimeObserved(MeanWaitTimeThreshold / 2); +    } + +    // Check overloading incoming requests +    int remainsCount = 1000; +    while (remainsCount > 0) { +        auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod")); +        if (overloaded) { +            --remainsCount; +        } else { +            Sleep(TDuration::MicroSeconds(10)); +        } +    } + +    // Check recovering even if no calls +    while (remainsCount < 1000) { +        auto overloaded = ShouldThrottleCall(controller->GetCongestionState("MockService", "MockMethod")); +        if (!overloaded) { +            ++remainsCount; +        } else { +            Sleep(TDuration::MicroSeconds(1)); +        } +    } +} + +TEST(TOverloadControllerTest, TestCongestionWindow) +{ +    auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); +    auto mockInvoker = New<TMockInvoker>(); +    auto mockInvoker2 = New<TMockInvoker>(); + +    controller->TrackInvoker("Mock", mockInvoker); +    controller->TrackInvoker("Mock2", mockInvoker2); + +    auto config = CreateConfig({ +        {"Mock", {{"MockService", "MockMethod", 0.3}}}, +        {"Mock2", {{"MockService", "MockMethod2", 0.3}}}, +    }); +    config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); +    controller->Reconfigure(config); +    controller->Start(); + +    // Simulate overload +    for (int i = 0; i < 5000; ++i) { +        mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); +    } + +    // Check overload incoming requests +    int remainsCount = 1000; +    while (remainsCount > 0) { +        mockInvoker->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); +        { +            auto window2 = controller->GetCongestionState("MockService", "MockMethod2"); +            EXPECT_EQ(window2.MaxWindow, window2.CurrentWindow); +        } + +        auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); +        bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; +        if (overloaded) { +            --remainsCount; +            EXPECT_EQ(0.3, congestionState.WaitingTimeoutFraction); +            EXPECT_EQ(congestionState.OverloadedTrackers, TCongestionState::TTrackersList{"Mock"}); +        } else { +            Sleep(TDuration::MicroSeconds(10)); +        } +    } + +    // Check recovering even if no calls +    while (remainsCount < 1000) { +        auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); +        bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; + +        if (!overloaded) { +            ++remainsCount; +        } else { +            Sleep(TDuration::MicroSeconds(1)); +        } +    } +} + +TEST(TOverloadControllerTest, TestCongestionWindowTwoTrackers) +{ +    auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); +    auto mockInvoker1 = New<TMockInvoker>(); +    auto mockInvoker2 = New<TMockInvoker>(); + +    controller->TrackInvoker("Mock", mockInvoker1); +    controller->TrackInvoker("Mock2", mockInvoker2); + +    auto config = CreateConfig({ +        {"Mock", {{"MockService", "MockMethod", 0.3}}}, +        {"Mock2", {{"MockService", "MockMethod", 0.3}}}, +    }); +    config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); +    controller->Reconfigure(config); +    controller->Start(); + +    // Simulate overload +    for (int i = 0; i < 5000; ++i) { +        mockInvoker1->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); +        mockInvoker2->FireWaitTimeObserved(MeanWaitTimeThreshold * 2); +    } + +    // Check overload incoming requests +    int remainsCount = 10; +    while (remainsCount > 0) { +        auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); +        bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; +        if (overloaded) { +            --remainsCount; +            auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers; +            std::sort(trackers.begin(), trackers.end()); +            EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock", "Mock2"})); +        } else { +            Sleep(TDuration::MicroSeconds(10)); +        } +    } +} + +TEST(TOverloadControllerTest, TestCongestionWindowTwoInstancies) +{ +    auto controller = CreateOverloadController(New<TOverloadControllerConfig>()); +    auto observer1 = controller->CreateGenericWaitTimeObserver("Mock", "Mock.1"); +    auto observer2 = controller->CreateGenericWaitTimeObserver("Mock", "Mock.2"); + +    auto config = CreateConfig({ +        {"Mock", {{"MockService", "MockMethod", 0.3}}}, +    }); +    config->LoadAdjustingPeriod = TDuration::MilliSeconds(1); +    controller->Reconfigure(config); +    controller->Start(); + +    // Simulate overload +    for (int i = 0; i < 5000; ++i) { +        observer1(MeanWaitTimeThreshold * 2); +    } + +    // Check overload incoming requests +    int remainsCount = 10; +    while (remainsCount > 0) { +        auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); +        bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; +        if (overloaded) { +            --remainsCount; +            auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers; +            EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock"})); +        } else { +            Sleep(TDuration::MicroSeconds(10)); +        } +    } + +    Sleep(TDuration::MicroSeconds(10)); + +    for (int i = 0; i < 5000; ++i) { +        observer1(MeanWaitTimeThreshold * 2); +        observer2(MeanWaitTimeThreshold * 2); +    } + +    remainsCount = 10; +    while (remainsCount > 0) { +        auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); +        bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; +        if (overloaded) { +            --remainsCount; +            auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers; +            EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock"})); +        } else { +            Sleep(TDuration::MicroSeconds(10)); +        } +    } + +    Sleep(TDuration::MicroSeconds(10)); +    for (int i = 0; i < 5000; ++i) { +        observer1(MeanWaitTimeThreshold / 2); +        observer2(MeanWaitTimeThreshold * 2); +    } + +    remainsCount = 10; +    while (remainsCount > 0) { +        auto congestionState = controller->GetCongestionState("MockService", "MockMethod"); +        bool overloaded = congestionState.MaxWindow != congestionState.CurrentWindow; +        if (overloaded) { +            --remainsCount; +            auto trackers = controller->GetCongestionState("MockService", "MockMethod").OverloadedTrackers; +            EXPECT_EQ(trackers, TCongestionState::TTrackersList({"Mock"})); +        } else { +            Sleep(TDuration::MicroSeconds(10)); +        } +    } +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class TExecutorPtr> +void ExecuteWaitTimeTest(const TExecutorPtr& executor, const IInvokerPtr& invoker) +{ +    static constexpr int DesiredActionsCount = 27; + +    TDuration totalWaitTime; +    int actionsCount = 0; + +    executor->SubscribeWaitTimeObserved(BIND([&] (TDuration waitTime) { +        totalWaitTime += waitTime; +        ++actionsCount; +    })); + +    std::vector<TFuture<void>> futures; +    for (int i = 0; i < DesiredActionsCount; ++i) { +        auto future = BIND([] { +            Sleep(TDuration::MilliSeconds(1)); +        }).AsyncVia(invoker) +        .Run(); + +        futures.push_back(std::move(future)); +    } + +    WaitFor(AllSucceeded(std::move(futures))) +        .ThrowOnError(); + +    EXPECT_EQ(DesiredActionsCount, actionsCount); +    EXPECT_GE(totalWaitTime, TDuration::MilliSeconds(DesiredActionsCount - 1)); +} + +TEST(TOverloadControllerTest, WaitTimeObserver) +{ +    { +        auto actionQueue = New<TActionQueue>("TestActionQueue"); +        ExecuteWaitTimeTest(actionQueue->GetInvoker(), actionQueue->GetInvoker()); +    } + +    { +        auto fshThreadPool = CreateTwoLevelFairShareThreadPool(1, "TestNewFsh"); +        ExecuteWaitTimeTest(fshThreadPool, fshThreadPool->GetInvoker("test-pool", "fsh-tag")); +    } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NRpc diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index 61c92191dbd..5dfb9c00bf7 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -200,9 +200,11 @@ SRCS(      rpc/helpers.cpp      rpc/local_channel.cpp      rpc/local_server.cpp -    rpc/message.cpp      rpc/message_format.cpp +    rpc/message.cpp      rpc/null_channel.cpp +    rpc/overload_controller.cpp +    rpc/overload_controlling_service_base.cpp      rpc/peer_discovery.cpp      rpc/per_key_request_queue_provider.cpp      rpc/protocol_version.cpp @@ -213,8 +215,8 @@ SRCS(      rpc/roaming_channel.cpp      rpc/serialized_channel.cpp      rpc/server_detail.cpp -    rpc/service.cpp      rpc/service_detail.cpp +    rpc/service.cpp      rpc/static_channel_factory.cpp      rpc/stream.cpp      rpc/throttling_channel.cpp diff --git a/yt/yt/library/profiling/solomon/percpu.cpp b/yt/yt/library/profiling/percpu.cpp index 0f146f0921d..d69686f7c30 100644 --- a/yt/yt/library/profiling/solomon/percpu.cpp +++ b/yt/yt/library/profiling/percpu.cpp @@ -1,5 +1,6 @@  #include "percpu.h" -#include "yt/yt/library/profiling/summary.h" + +#include "summary.h"  #include <yt/yt/core/profiling/tscp.h> diff --git a/yt/yt/library/profiling/solomon/percpu.h b/yt/yt/library/profiling/percpu.h index 544d806e11d..e1766da58ab 100644 --- a/yt/yt/library/profiling/solomon/percpu.h +++ b/yt/yt/library/profiling/percpu.h @@ -1,7 +1,7 @@  #pragma once -#include <yt/yt/library/profiling/impl.h> -#include <yt/yt/library/profiling/summary.h> +#include "impl.h" +#include "summary.h"  #include <yt/yt/core/profiling/tscp.h> diff --git a/yt/yt/library/profiling/solomon/helpers.cpp b/yt/yt/library/profiling/solomon/helpers.cpp index 7f4be734a5e..b5458fb4535 100644 --- a/yt/yt/library/profiling/solomon/helpers.cpp +++ b/yt/yt/library/profiling/solomon/helpers.cpp @@ -1,9 +1,10 @@  #include "helpers.h" -#include "percpu.h"  #include "private.h"  #include "producer.h"  #include "sensor_set.h" +#include <yt/yt/library/profiling/percpu.h> +  #include <yt/yt/core/http/http.h>  #include <yt/yt/core/misc/ref_counted_tracker.h> diff --git a/yt/yt/library/profiling/solomon/registry.cpp b/yt/yt/library/profiling/solomon/registry.cpp index b8b45442902..d9e6d5b7628 100644 --- a/yt/yt/library/profiling/solomon/registry.cpp +++ b/yt/yt/library/profiling/solomon/registry.cpp @@ -1,12 +1,10 @@  #include "registry.h"  #include "sensor.h" -#include "percpu.h" -#include <yt/yt/core/misc/protobuf_helpers.h> +#include <yt/yt/library/profiling/percpu.h> -#include <yt/yt/library/profiling/impl.h> -#include <yt/yt/library/profiling/sensor.h> +#include <yt/yt/core/misc/protobuf_helpers.h>  #include <library/cpp/yt/assert/assert.h> diff --git a/yt/yt/library/profiling/solomon/registry.h b/yt/yt/library/profiling/solomon/registry.h index 7a53df82e1e..86ef47f5117 100644 --- a/yt/yt/library/profiling/solomon/registry.h +++ b/yt/yt/library/profiling/solomon/registry.h @@ -5,6 +5,11 @@  #include "producer.h"  #include "tag_registry.h" +#include <yt/yt/library/profiling/sensor.h> +#include <yt/yt/library/profiling/impl.h> + +#include <yt/yt/library/profiling/solomon/sensor_dump.pb.h> +  #include <yt/yt/core/actions/invoker_util.h>  #include <yt/yt/core/misc/mpsc_stack.h> @@ -13,11 +18,6 @@  #include <yt/yt/core/ytree/fluent.h> -#include <yt/yt/library/profiling/sensor.h> -#include <yt/yt/library/profiling/impl.h> - -#include <yt/yt/library/profiling/solomon/sensor_dump.pb.h> -  #include <library/cpp/yt/threading/spin_lock.h>  namespace NYT::NProfiling { diff --git a/yt/yt/library/profiling/solomon/ya.make b/yt/yt/library/profiling/solomon/ya.make index 3d53febe4f9..5d4ddcff946 100644 --- a/yt/yt/library/profiling/solomon/ya.make +++ b/yt/yt/library/profiling/solomon/ya.make @@ -7,14 +7,13 @@ SRCS(      cube.cpp      exporter.cpp      helpers.cpp -    percpu.cpp      producer.cpp      proxy.cpp      GLOBAL registry.cpp      remote.cpp -    sensor.cpp      sensor_service.cpp      sensor_set.cpp +    sensor.cpp      tag_registry.cpp      sensor_dump.proto diff --git a/yt/yt/library/profiling/ya.make b/yt/yt/library/profiling/ya.make index e302d65ef41..e0f44fd5fb2 100644 --- a/yt/yt/library/profiling/ya.make +++ b/yt/yt/library/profiling/ya.make @@ -3,12 +3,13 @@ LIBRARY()  INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)  SRCS( -    sensor.cpp -    producer.cpp +    histogram_snapshot.cpp      impl.cpp +    percpu.cpp +    producer.cpp +    sensor.cpp      tag.cpp      testing.cpp -    histogram_snapshot.cpp  )  PEERDIR( | 
