diff options
| author | babenko <[email protected]> | 2024-11-09 12:09:07 +0300 |
|---|---|---|
| committer | babenko <[email protected]> | 2024-11-09 12:20:42 +0300 |
| commit | c351a198d988d7a703e9c8b7dcbb19285a96de5e (patch) | |
| tree | 61796e45d4e117dae6a6abe4b124733f9697f557 | |
| parent | 381a9a45520ba56577e90032e086e0168a03b956 (diff) | |
Refactor and polish TResourceTracker
[nodiff:caesar]
commit_hash:eb743854a8d8c28984b9774cd4ad7774dae69eed
| -rw-r--r-- | yt/yt/library/profiling/resource_tracker/resource_tracker.cpp | 773 | ||||
| -rw-r--r-- | yt/yt/library/profiling/resource_tracker/resource_tracker.h | 132 | ||||
| -rw-r--r-- | yt/yt/library/program/helpers.cpp | 11 |
3 files changed, 454 insertions, 462 deletions
diff --git a/yt/yt/library/profiling/resource_tracker/resource_tracker.cpp b/yt/yt/library/profiling/resource_tracker/resource_tracker.cpp index 09cf67a4825..c2cc5717aeb 100644 --- a/yt/yt/library/profiling/resource_tracker/resource_tracker.cpp +++ b/yt/yt/library/profiling/resource_tracker/resource_tracker.cpp @@ -3,17 +3,26 @@ #include <yt/yt/core/logging/log.h> #include <yt/yt/core/misc/fs.h> -#include <yt/yt/core/misc/proc.h> #include <yt/yt/core/misc/singleton.h> +#include <yt/yt/core/misc/proc.h> #include <yt/yt/core/ypath/token.h> +#include <yt/yt/library/profiling/sensor.h> +#include <yt/yt/library/profiling/producer.h> + +#include <library/cpp/yt/misc/global.h> + +#include <library/cpp/yt/memory/leaky_ref_counted_singleton.h> + #include <util/folder/filelist.h> #include <util/stream/file.h> #include <util/string/vector.h> +#include <mutex> + #if defined(_linux_) #include <unistd.h> #endif @@ -27,446 +36,542 @@ using namespace NYTree; using namespace NProfiling; using namespace NConcurrency; -DEFINE_REFCOUNTED_TYPE(TCpuCgroupTracker) -DEFINE_REFCOUNTED_TYPE(TMemoryCgroupTracker) -DEFINE_REFCOUNTED_TYPE(TResourceTracker) - //////////////////////////////////////////////////////////////////////////////// -static NLogging::TLogger Logger("Profiling"); -static TProfiler Profiler("/resource_tracker"); -static TProfiler MemoryProfiler("/memory/cgroup"); +namespace { + +YT_DEFINE_GLOBAL(const NLogging::TLogger, Logger, "Profiling"); +YT_DEFINE_GLOBAL(const TProfiler, ResourceTrackerProfiler, "/resource_tracker"); +YT_DEFINE_GLOBAL(const TProfiler, MemoryProfiler, "/memory/cgroup"); // Please, refer to /proc documentation to know more about available information. // http://www.kernel.org/doc/Documentation/filesystems/proc.txt -static constexpr auto procPath = "/proc/self/task"; - -//////////////////////////////////////////////////////////////////////////////// - -namespace { +constexpr auto ProcfsCurrentTaskPath = "/proc/self/task"; -i64 GetTicksPerSecond() -{ -#if defined(_linux_) - return sysconf(_SC_CLK_TCK); -#else - return -1; -#endif -} - -#ifdef RESOURCE_TRACKER_ENABLED +} // namespace +//////////////////////////////////////////////////////////////////////////////// -#endif +DECLARE_REFCOUNTED_CLASS(TCpuCgroupTracker) +DECLARE_REFCOUNTED_CLASS(TMemoryCgroupTracker) -} // namespace +//////////////////////////////////////////////////////////////////////////////// -void TCpuCgroupTracker::CollectSensors(ISensorWriter* writer) +class TCpuCgroupTracker + : public NProfiling::ISensorProducer { - try { - auto cgroups = GetProcessCgroups(); - for (const auto& group : cgroups) { - for (const auto& controller : group.Controllers) { - if (controller == "cpu") { - auto stat = GetCgroupCpuStat(group.ControllersName, group.Path); - - if (!FirstCgroupStat_) { - FirstCgroupStat_ = stat; +public: + void CollectSensors(ISensorWriter* writer) override + { + try { + auto cgroups = GetProcessCgroups(); + for (const auto& group : cgroups) { + for (const auto& controller : group.Controllers) { + if (controller == "cpu") { + auto stat = GetCgroupCpuStat(group.ControllersName, group.Path); + + if (!FirstCgroupStat_) { + FirstCgroupStat_ = stat; + } + + writer->AddCounter( + "/cgroup/periods", + stat.NrPeriods - FirstCgroupStat_->NrPeriods); + + writer->AddCounter( + "/cgroup/throttled_periods", + stat.NrThrottled - FirstCgroupStat_->NrThrottled); + + writer->AddCounter( + "/cgroup/throttled_cpu_percent", + (stat.ThrottledTime - FirstCgroupStat_->ThrottledTime) / 10'000'000); + + writer->AddCounter( + "/cgroup/wait_cpu_percent", + (stat.WaitTime - FirstCgroupStat_->WaitTime) / 10'000'000); + + return; } + } + } + } catch (const std::exception& ex) { + if (!CgroupErrorLogged_) { + YT_LOG_INFO(ex, "Failed to collect cgroup cpu statistics"); + CgroupErrorLogged_ = true; + } + } + } - writer->AddCounter( - "/cgroup/periods", - stat.NrPeriods - FirstCgroupStat_->NrPeriods); - - writer->AddCounter( - "/cgroup/throttled_periods", - stat.NrThrottled - FirstCgroupStat_->NrThrottled); +private: + std::optional<TCgroupCpuStat> FirstCgroupStat_; + bool CgroupErrorLogged_ = false; +}; - writer->AddCounter( - "/cgroup/throttled_cpu_percent", - (stat.ThrottledTime - FirstCgroupStat_->ThrottledTime) / 10'000'000); +DEFINE_REFCOUNTED_TYPE(TCpuCgroupTracker) - writer->AddCounter( - "/cgroup/wait_cpu_percent", - (stat.WaitTime - FirstCgroupStat_->WaitTime) / 10'000'000); +//////////////////////////////////////////////////////////////////////////////// - return; +class TMemoryCgroupTracker + : public NProfiling::ISensorProducer +{ +public: + void CollectSensors(ISensorWriter* writer) override + { + try { + auto cgroups = GetProcessCgroups(); + for (const auto& group : cgroups) { + for (const auto& controller : group.Controllers) { + if (controller == "memory") { + auto stat = GetCgroupMemoryStat(group.Path); + + writer->AddGauge("/memory_limit", stat.HierarchicalMemoryLimit); + writer->AddGauge("/cache", stat.Cache); + writer->AddGauge("/rss", stat.Rss); + writer->AddGauge("/rss_huge", stat.RssHuge); + writer->AddGauge("/mapped_file", stat.MappedFile); + writer->AddGauge("/dirty", stat.Dirty); + writer->AddGauge("/writeback", stat.Writeback); + + TotalMemoryLimit_.store(stat.HierarchicalMemoryLimit); + AnonymousMemoryLimit_.store(SafeGetAnonymousMemoryLimit( + group.Path, + stat.HierarchicalMemoryLimit)); + + return; + } } } - } - } catch (const std::exception& ex) { - if (!CgroupErrorLogged_) { - YT_LOG_INFO(ex, "Failed to collect cgroup cpu statistics"); - CgroupErrorLogged_ = true; + } catch (const std::exception& ex) { + if (!CgroupErrorLogged_) { + YT_LOG_INFO(ex, "Failed to collect cgroup memory statistics"); + CgroupErrorLogged_ = true; + } } } -} -void TMemoryCgroupTracker::CollectSensors(ISensorWriter* writer) -{ - try { - auto cgroups = GetProcessCgroups(); - for (const auto& group : cgroups) { - for (const auto& controller : group.Controllers) { - if (controller == "memory") { - auto stat = GetCgroupMemoryStat(group.Path); - - writer->AddGauge("/memory_limit", stat.HierarchicalMemoryLimit); - writer->AddGauge("/cache", stat.Cache); - writer->AddGauge("/rss", stat.Rss); - writer->AddGauge("/rss_huge", stat.RssHuge); - writer->AddGauge("/mapped_file", stat.MappedFile); - writer->AddGauge("/dirty", stat.Dirty); - writer->AddGauge("/writeback", stat.Writeback); - - TotalMemoryLimit_.store(stat.HierarchicalMemoryLimit); - AnonymousMemoryLimit_.store(SafeGetAnonymousMemoryLimit( - group.Path, - stat.HierarchicalMemoryLimit)); - - return; - } + i64 GetTotalMemoryLimit() const + { + return TotalMemoryLimit_.load(); + } + + i64 GetAnonymousMemoryLimit() const + { + return AnonymousMemoryLimit_.load(); + } + +private: + bool CgroupErrorLogged_ = false; + bool AnonymousLimitErrorLogged_ = false; + + std::atomic<i64> TotalMemoryLimit_ = 0; + std::atomic<i64> AnonymousMemoryLimit_ = 0; + + i64 SafeGetAnonymousMemoryLimit(const TString& cgroupPath, i64 totalMemoryLimit) + { + try { + auto anonymousLimit = GetCgroupAnonymousMemoryLimit(cgroupPath); + auto result = anonymousLimit.value_or(totalMemoryLimit); + result = std::min(result, totalMemoryLimit); + return result != 0 ? result : totalMemoryLimit; + } catch (const std::exception& ex) { + if (!AnonymousLimitErrorLogged_) { + YT_LOG_INFO(ex, "Failed to collect cgroup anonymous memory limit"); + AnonymousLimitErrorLogged_ = true; } } - } catch (const std::exception& ex) { - if (!CgroupErrorLogged_) { - YT_LOG_INFO(ex, "Failed to collect cgroup memory statistics"); - CgroupErrorLogged_ = true; - } + + return totalMemoryLimit; } -} +}; -i64 TMemoryCgroupTracker::SafeGetAnonymousMemoryLimit(const TString& cgroupPath, i64 totalMemoryLimit) +DEFINE_REFCOUNTED_TYPE(TMemoryCgroupTracker) + +//////////////////////////////////////////////////////////////////////////////// + +class TResourceTrackerImpl + : public NProfiling::ISensorProducer { - try { - auto anonymousLimit = GetCgroupAnonymousMemoryLimit(cgroupPath); - auto result = anonymousLimit.value_or(totalMemoryLimit); - result = std::min(result, totalMemoryLimit); - return result != 0 ? result : totalMemoryLimit; - } catch (const std::exception& ex) { - if (!AnonymousLimitErrorLogged_) { - YT_LOG_INFO(ex, "Failed to collect cgroup anonymous memory limit"); - AnonymousLimitErrorLogged_ = true; - } +public: + static TResourceTrackerImpl* Get() + { + return LeakyRefCountedSingleton<TResourceTrackerImpl>().Get(); } - return totalMemoryLimit; -} + void Enable() + { + std::call_once(EnabledFlag_, [&] { + ResourceTrackerProfiler().AddFuncGauge("/memory_usage/rss", MakeStrong(this), [] { + return GetProcessMemoryUsage().Rss; + }); -i64 TMemoryCgroupTracker::GetTotalMemoryLimit() const -{ - return TotalMemoryLimit_.load(); -} + ResourceTrackerProfiler().AddFuncGauge("/utilization/max", MakeStrong(this), [this] { + return MaxThreadPoolUtilization_.load(); + }); -i64 TMemoryCgroupTracker::GetAnonymousMemoryLimit() const -{ - return AnonymousMemoryLimit_.load(); -} + ResourceTrackerProfiler().AddProducer("", CpuCgroupTracker_); + MemoryProfiler().AddProducer("", MemoryCgroupTracker_); -TResourceTracker::TTimings TResourceTracker::TTimings::operator-(const TResourceTracker::TTimings& other) const -{ - return {UserJiffies - other.UserJiffies, SystemJiffies - other.SystemJiffies, CpuWaitNsec - other.CpuWaitNsec}; -} + ResourceTrackerProfiler() + .WithProducerRemoveSupport() + .AddProducer("", MakeStrong(this)); + }); + } -TResourceTracker::TTimings& TResourceTracker::TTimings::operator+=(const TResourceTracker::TTimings& other) -{ - UserJiffies += other.UserJiffies; - SystemJiffies += other.SystemJiffies; - CpuWaitNsec += other.CpuWaitNsec; - return *this; -} + double GetUserCpu() + { + return LastUserCpu_.load(); + } -TResourceTracker::TResourceTracker() - // CPU time is measured in jiffies; we need USER_HZ to convert them - // to milliseconds and percentages. - : TicksPerSecond_(GetTicksPerSecond()) - , LastUpdateTime_(TInstant::Now()) - , CpuCgroupTracker_(New<TCpuCgroupTracker>()) - , MemoryCgroupTracker_(New<TMemoryCgroupTracker>()) -{ - Profiler.AddFuncGauge("/memory_usage/rss", MakeStrong(this), [] { - return GetProcessMemoryUsage().Rss; - }); + double GetSystemCpu() + { + return LastSystemCpu_.load(); + } - Profiler.AddFuncGauge("/utilization/max", MakeStrong(this), [this] { - return MaxThreadPoolUtilization_.load(); - }); + double GetCpuWait() + { + return LastCpuWait_.load(); + } - Profiler.AddProducer("", CpuCgroupTracker_); - MemoryProfiler.AddProducer("", MemoryCgroupTracker_); + i64 GetTotalMemoryLimit() + { + return MemoryCgroupTracker_->GetTotalMemoryLimit(); + } - Profiler - .WithProducerRemoveSupport() - .AddProducer("", MakeStrong(this)); -} + i64 GetAnonymousMemoryLimit() + { + return MemoryCgroupTracker_->GetAnonymousMemoryLimit(); + } -void TResourceTracker::CollectSensors(ISensorWriter* writer) -{ - i64 timeDeltaUsec = TInstant::Now().MicroSeconds() - LastUpdateTime_.MicroSeconds(); - if (timeDeltaUsec <= 0) { - return; + void SetVCpuFactor(double factor) + { + VCpuFactor_.store(factor); } - auto tidToInfo = ProcessThreads(); - CollectSensorsAggregatedTimings(writer, TidToInfo_, tidToInfo, timeDeltaUsec); - TidToInfo_ = tidToInfo; +private: - LastUpdateTime_ = TInstant::Now(); -} + const TCpuCgroupTrackerPtr CpuCgroupTracker_ = New<TCpuCgroupTracker>(); + const TMemoryCgroupTrackerPtr MemoryCgroupTracker_ = New<TMemoryCgroupTracker>(); -void TResourceTracker::SetVCpuFactor(double vCpuFactor) -{ - VCpuFactor_ = vCpuFactor; -} + // CPU time is measured in jiffies; we need USER_HZ to convert them + // to milliseconds and percentages. + const i64 TicksPerSecond_ = [] { +#if defined(_linux_) + return sysconf(_SC_CLK_TCK); +#else + return -1; +#endif + }(); -bool TResourceTracker::ProcessThread(TString tid, TResourceTracker::TThreadInfo* info) -{ - auto threadStatPath = NFS::CombinePaths(procPath, tid); - auto statPath = NFS::CombinePaths(threadStatPath, "stat"); - auto statusPath = NFS::CombinePaths(threadStatPath, "status"); - auto schedStatPath = NFS::CombinePaths(threadStatPath, "schedstat"); + std::once_flag EnabledFlag_; - try { - // Parse status. - { - TIFStream file(statusPath); - for (TString line; file.ReadLine(line); ) { - auto tokens = SplitString(line, "\t"); + std::atomic<double> VCpuFactor_; - if (tokens.size() < 2) { - continue; - } + TInstant LastUpdateTime_ = TInstant::Now(); - if (tokens[0] == "Name:") { - info->ThreadName = tokens[1]; - } else if (tokens[0] == "SigBlk:") { - // This is a heuristic way to distinguish YT thread from non-YT threads. - // It is used primarily for CHYT, which links against CH and Poco that - // have their own complicated manner of handling threads. We want to be - // able to visually distinguish them from our threads. - // - // Poco threads always block SIGQUIT, SIGPIPE and SIGTERM; we use the latter - // one presence. Feel free to change this heuristic if needed. - YT_VERIFY(tokens[1].size() == 16); - auto mask = IntFromString<ui64, 16>(tokens[1]); - // Note that signals are 1-based, so 14-th bit is SIGTERM (15). - bool sigtermBlocked = (mask >> 14) & 1ull; - info->IsYtThread = !sigtermBlocked; - } - } + // Values below are in percent. + std::atomic<double> LastUserCpu_; + std::atomic<double> LastSystemCpu_; + std::atomic<double> LastCpuWait_; + + std::atomic<double> MaxThreadPoolUtilization_; + + void CollectSensors(ISensorWriter* writer) override + { + i64 timeDeltaUsec = TInstant::Now().MicroSeconds() - LastUpdateTime_.MicroSeconds(); + if (timeDeltaUsec <= 0) { + return; } - // Parse schedstat. - { - TIFStream file(schedStatPath); - auto tokens = SplitString(file.ReadLine(), " "); - if (tokens.size() < 3) { - return false; - } + auto tidToInfo = ParseThreadInfos(); + CollectSensorsAggregatedTimings(writer, TidToInfo_, tidToInfo, timeDeltaUsec); + TidToInfo_ = tidToInfo; + + LastUpdateTime_ = TInstant::Now(); + } - info->Timings.CpuWaitNsec = FromString<i64>(tokens[1]); + struct TTimings + { + i64 UserJiffies = 0; + i64 SystemJiffies = 0; + i64 CpuWaitNsec = 0; + + TTimings operator-(const TTimings& other) const + { + return {UserJiffies - other.UserJiffies, SystemJiffies - other.SystemJiffies, CpuWaitNsec - other.CpuWaitNsec}; } - // Parse stat. + TTimings& operator+=(const TTimings& other) { - TIFStream file(statPath); - auto tokens = SplitString(file.ReadLine(), " "); - if (tokens.size() < 15) { - return false; + UserJiffies += other.UserJiffies; + SystemJiffies += other.SystemJiffies; + CpuWaitNsec += other.CpuWaitNsec; + return *this; + } + }; + + struct TThreadInfo + { + std::string ThreadName; + TTimings Timings; + bool IsYTThread = true; + //! This key is IsYtThread ? ThreadName : ThreadName + "@". + //! It allows to distinguish YT threads from non-YT threads that + //! inherited parent YT thread name. + std::string ProfilingKey; + }; + + // Thread id -> stats + using TThreadInfoMap = THashMap<NThreading::TThreadId, TThreadInfo>; + TThreadInfoMap TidToInfo_; + + std::optional<TThreadInfo> TryParseThreadInfo(NThreading::TThreadId tid) + { + TThreadInfo info; + + auto threadStatPath = NFS::CombinePaths(ProcfsCurrentTaskPath, ToString(tid)); + auto statPath = NFS::CombinePaths(threadStatPath, "stat"); + auto statusPath = NFS::CombinePaths(threadStatPath, "status"); + auto schedStatPath = NFS::CombinePaths(threadStatPath, "schedstat"); + + try { + // Parse status. + { + TIFStream file(statusPath); + for (TString line; file.ReadLine(line); ) { + auto tokens = SplitString(line, "\t"); + + if (tokens.size() < 2) { + continue; + } + + if (tokens[0] == "Name:") { + info.ThreadName = tokens[1]; + } else if (tokens[0] == "SigBlk:") { + // This is a heuristic way to distinguish YT thread from non-YT threads. + // It is used primarily for CHYT, which links against CH and Poco that + // have their own complicated manner of handling threads. We want to be + // able to visually distinguish them from our threads. + // + // Poco threads always block SIGQUIT, SIGPIPE and SIGTERM; we use the latter + // one presence. Feel free to change this heuristic if needed. + YT_VERIFY(tokens[1].size() == 16); + auto mask = IntFromString<ui64, 16>(tokens[1]); + // Note that signals are 1-based, so 14-th bit is SIGTERM (15). + bool sigtermBlocked = (mask >> 14) & 1ull; + info.IsYTThread = !sigtermBlocked; + } + } } - info->Timings.UserJiffies = FromString<i64>(tokens[13]); - info->Timings.SystemJiffies = FromString<i64>(tokens[14]); - } + // Parse schedstat. + { + TIFStream file(schedStatPath); + auto tokens = SplitString(file.ReadLine(), " "); + if (tokens.size() < 3) { + return std::nullopt; + } - info->ProfilingKey = info->ThreadName; + info.Timings.CpuWaitNsec = FromString<i64>(tokens[1]); + } - if (!TidToInfo_.contains(tid)) { - YT_LOG_TRACE("Thread %v named %v", tid, info->ThreadName); - } + // Parse stat. + { + TIFStream file(statPath); + auto tokens = SplitString(file.ReadLine(), " "); + if (tokens.size() < 15) { + return std::nullopt; + } - // Group threads by thread pool, using YT thread naming convention. - if (auto index = info->ProfilingKey.rfind(':'); index != TString::npos) { - bool isDigit = std::all_of(info->ProfilingKey.cbegin() + index + 1, info->ProfilingKey.cend(), [] (char c) { - return std::isdigit(c); - }); - if (isDigit) { - info->ProfilingKey = info->ProfilingKey.substr(0, index); + info.Timings.UserJiffies = FromString<i64>(tokens[13]); + info.Timings.SystemJiffies = FromString<i64>(tokens[14]); } - } - if (!info->IsYtThread) { - info->ProfilingKey += "@"; - } - } catch (const TIoException&) { - // Ignore all IO exceptions. - return false; - } + info.ProfilingKey = info.ThreadName; - YT_LOG_TRACE("Thread statistics (Tid: %v, ThreadName: %v, IsYtThread: %v, UserJiffies: %v, SystemJiffies: %v, CpuWaitNsec: %v)", - tid, - info->ThreadName, - info->IsYtThread, - info->Timings.UserJiffies, - info->Timings.SystemJiffies, - info->Timings.CpuWaitNsec); + // Group threads by thread pool, using YT thread naming convention. + if (auto index = info.ProfilingKey.rfind(':'); index != TString::npos) { + bool isDigit = std::all_of(info.ProfilingKey.cbegin() + index + 1, info.ProfilingKey.cend(), [] (char c) { + return std::isdigit(c); + }); + if (isDigit) { + info.ProfilingKey = info.ProfilingKey.substr(0, index); + } + } - return true; -} + if (!info.IsYTThread) { + info.ProfilingKey += "@"; + } + } catch (const TIoException&) { + // Ignore all IO exceptions. + return std::nullopt; + } -TResourceTracker::TThreadMap TResourceTracker::ProcessThreads() -{ - TDirsList dirsList; - try { - dirsList.Fill(procPath); - } catch (const TSystemError&) { - // Ignore all exceptions. - return {}; + YT_LOG_TRACE("Thread statistics (Tid: %v, ThreadName: %v, IsYtThread: %v, UserJiffies: %v, SystemJiffies: %v, CpuWaitNsec: %v)", + tid, + info.ThreadName, + info.IsYTThread, + info.Timings.UserJiffies, + info.Timings.SystemJiffies, + info.Timings.CpuWaitNsec); + + return info; } - TThreadMap tidToStats; + TThreadInfoMap ParseThreadInfos() + { + TDirsList dirsList; + try { + dirsList.Fill(ProcfsCurrentTaskPath); + } catch (const TSystemError&) { + // Ignore all exceptions. + return {}; + } - for (int index = 0; index < static_cast<int>(dirsList.Size()); ++index) { - auto tid = TString(dirsList.Next()); - TThreadInfo info; - if (ProcessThread(tid, &info)) { - tidToStats[tid] = info; - } else { - YT_LOG_TRACE("Failed to prepare thread info for thread (Tid: %v)", tid); + TThreadInfoMap tidToStats; + + for (int index = 0; index < static_cast<int>(dirsList.Size()); ++index) { + auto tidStr = TStringBuf(dirsList.Next()); + NThreading::TThreadId tid; + if (!TryFromString(tidStr, tid)) { + continue; + } + auto info = TryParseThreadInfo(tid); + if (info) { + tidToStats[tid] = *info; + } else { + YT_LOG_TRACE("Failed to parse thread info (Tid: %v)", tid); + } } + + return tidToStats; } - return tidToStats; -} + void CollectSensorsAggregatedTimings( + ISensorWriter* writer, + const TThreadInfoMap& oldTidToInfo, + const TThreadInfoMap& newTidToInfo, + i64 timeDeltaUsec) + { + double totalUserCpuTime = 0.0; + double totalSystemCpuTime = 0.0; + double totalCpuWaitTime = 0.0; -void TResourceTracker::CollectSensorsAggregatedTimings( - ISensorWriter* writer, - const TResourceTracker::TThreadMap& oldTidToInfo, - const TResourceTracker::TThreadMap& newTidToInfo, - i64 timeDeltaUsec) -{ - double totalUserCpuTime = 0.0; - double totalSystemCpuTime = 0.0; - double totalCpuWaitTime = 0.0; + THashMap<std::string, TTimings> profilingKeyToAggregatedTimings; + THashMap<std::string, int> profilingKeyToCount; - THashMap<TString, TTimings> profilingKeyToAggregatedTimings; - THashMap<TString, int> profilingKeyToCount; + // Consider only those threads which did not change their thread names. + // In each group of such threads with same thread name, export aggregated timings. - // Consider only those threads which did not change their thread names. - // In each group of such threads with same thread name, export aggregated timings. + for (const auto& [tid, newInfo] : newTidToInfo) { + ++profilingKeyToCount[newInfo.ProfilingKey]; - for (const auto& [tid, newInfo] : newTidToInfo) { - ++profilingKeyToCount[newInfo.ProfilingKey]; + auto it = oldTidToInfo.find(tid); - auto it = oldTidToInfo.find(tid); + if (it == oldTidToInfo.end()) { + continue; + } - if (it == oldTidToInfo.end()) { - continue; - } + const auto& oldInfo = it->second; - const auto& oldInfo = it->second; + if (oldInfo.ProfilingKey != newInfo.ProfilingKey) { + continue; + } - if (oldInfo.ProfilingKey != newInfo.ProfilingKey) { - continue; + profilingKeyToAggregatedTimings[newInfo.ProfilingKey] += newInfo.Timings - oldInfo.Timings; } - profilingKeyToAggregatedTimings[newInfo.ProfilingKey] += newInfo.Timings - oldInfo.Timings; - } + double vCpuFactor = VCpuFactor_; + + double maxUtilization = 0.0; + for (const auto& [profilingKey, aggregatedTimings] : profilingKeyToAggregatedTimings) { + // Multiplier 1e6 / timeDelta is for taking average over time (all values should be "per second"). + // Multiplier 100 for CPU time is for measuring CPU load in percents. It is due to historical reasons. + double userCpuTime = std::max<double>(0.0, 100. * aggregatedTimings.UserJiffies / TicksPerSecond_ * (1e6 / timeDeltaUsec)); + double systemCpuTime = std::max<double>(0.0, 100. * aggregatedTimings.SystemJiffies / TicksPerSecond_ * (1e6 / timeDeltaUsec)); + double waitTime = std::max<double>(0.0, 100 * aggregatedTimings.CpuWaitNsec / 1e9 * (1e6 / timeDeltaUsec)); + + totalUserCpuTime += userCpuTime; + totalSystemCpuTime += systemCpuTime; + totalCpuWaitTime += waitTime; + + auto threadCount = profilingKeyToCount[profilingKey]; + double utilization = (userCpuTime + systemCpuTime) / (100 * threadCount); + + double totalCpuTime = userCpuTime + systemCpuTime; + + TWithTagGuard tagGuard(writer, "thread", profilingKey); + writer->AddGauge("/user_cpu", userCpuTime); + writer->AddGauge("/system_cpu", systemCpuTime); + writer->AddGauge("/total_cpu", totalCpuTime); + writer->AddGauge("/cpu_wait", waitTime); + writer->AddGauge("/thread_count", threadCount); + writer->AddGauge("/utilization", utilization); + if (vCpuFactor != 0.0) { + writer->AddGauge("/user_vcpu", userCpuTime * vCpuFactor); + writer->AddGauge("/system_vcpu", systemCpuTime * vCpuFactor); + writer->AddGauge("/total_vcpu", totalCpuTime * vCpuFactor); + } - double vCpuFactor = VCpuFactor_; - - double maxUtilization = 0.0; - for (const auto& [profilingKey, aggregatedTimings] : profilingKeyToAggregatedTimings) { - // Multiplier 1e6 / timeDelta is for taking average over time (all values should be "per second"). - // Multiplier 100 for CPU time is for measuring CPU load in percents. It is due to historical reasons. - double userCpuTime = std::max<double>(0.0, 100. * aggregatedTimings.UserJiffies / TicksPerSecond_ * (1e6 / timeDeltaUsec)); - double systemCpuTime = std::max<double>(0.0, 100. * aggregatedTimings.SystemJiffies / TicksPerSecond_ * (1e6 / timeDeltaUsec)); - double waitTime = std::max<double>(0.0, 100 * aggregatedTimings.CpuWaitNsec / 1e9 * (1e6 / timeDeltaUsec)); - - totalUserCpuTime += userCpuTime; - totalSystemCpuTime += systemCpuTime; - totalCpuWaitTime += waitTime; - - auto threadCount = profilingKeyToCount[profilingKey]; - double utilization = (userCpuTime + systemCpuTime) / (100 * threadCount); - - double totalCpuTime = userCpuTime + systemCpuTime; - - TWithTagGuard tagGuard(writer, "thread", profilingKey); - writer->AddGauge("/user_cpu", userCpuTime); - writer->AddGauge("/system_cpu", systemCpuTime); - writer->AddGauge("/total_cpu", totalCpuTime); - writer->AddGauge("/cpu_wait", waitTime); - writer->AddGauge("/thread_count", threadCount); - writer->AddGauge("/utilization", utilization); - if (vCpuFactor != 0.0) { - writer->AddGauge("/user_vcpu", userCpuTime * vCpuFactor); - writer->AddGauge("/system_vcpu", systemCpuTime * vCpuFactor); - writer->AddGauge("/total_vcpu", totalCpuTime * vCpuFactor); + maxUtilization = std::max(maxUtilization, utilization); + + YT_LOG_TRACE("Thread CPU timings in percent/sec (ProfilingKey: %v, UserCpu: %v, SystemCpu: %v, CpuWait: %v)", + profilingKey, + userCpuTime, + systemCpuTime, + waitTime); } - maxUtilization = std::max(maxUtilization, utilization); + LastUserCpu_.store(totalUserCpuTime); + LastSystemCpu_.store(totalSystemCpuTime); + LastCpuWait_.store(totalCpuWaitTime); + MaxThreadPoolUtilization_.store(maxUtilization); - YT_LOG_TRACE("Thread CPU timings in percent/sec (ProfilingKey: %v, UserCpu: %v, SystemCpu: %v, CpuWait: %v)", - profilingKey, - userCpuTime, - systemCpuTime, - waitTime); - } + YT_LOG_DEBUG("Total CPU timings in percent/sec (UserCpu: %v, SystemCpu: %v, CpuWait: %v)", + totalUserCpuTime, + totalSystemCpuTime, + totalCpuWaitTime); - LastUserCpu_.store(totalUserCpuTime); - LastSystemCpu_.store(totalSystemCpuTime); - LastCpuWait_.store(totalCpuWaitTime); - MaxThreadPoolUtilization_ = maxUtilization; + int fileDescriptorCount = GetFileDescriptorCount(); + writer->AddGauge("/open_fds", fileDescriptorCount); + YT_LOG_DEBUG("Assessed open file descriptors (Count: %v)", fileDescriptorCount); + } +}; - YT_LOG_DEBUG("Total CPU timings in percent/sec (UserCpu: %v, SystemCpu: %v, CpuWait: %v)", - totalUserCpuTime, - totalSystemCpuTime, - totalCpuWaitTime); +//////////////////////////////////////////////////////////////////////////////// - int fileDescriptorCount = GetFileDescriptorCount(); - writer->AddGauge("/open_fds", fileDescriptorCount); - YT_LOG_DEBUG("Assessed open file descriptors (Count: %v)", fileDescriptorCount); +void TResourceTracker::Enable() +{ + TResourceTrackerImpl::Get()->Enable(); } double TResourceTracker::GetUserCpu() { - return LastUserCpu_.load(); + return TResourceTrackerImpl::Get()->GetUserCpu(); } double TResourceTracker::GetSystemCpu() { - return LastSystemCpu_.load(); + return TResourceTrackerImpl::Get()->GetSystemCpu(); } double TResourceTracker::GetCpuWait() { - return LastCpuWait_.load(); + return TResourceTrackerImpl::Get()->GetCpuWait(); } i64 TResourceTracker::GetTotalMemoryLimit() { - return MemoryCgroupTracker_->GetTotalMemoryLimit(); + return TResourceTrackerImpl::Get()->GetTotalMemoryLimit(); } i64 TResourceTracker::GetAnonymousMemoryLimit() { - return MemoryCgroupTracker_->GetAnonymousMemoryLimit(); -} - -TResourceTrackerPtr GetResourceTracker() -{ - return LeakyRefCountedSingleton<TResourceTracker>(); -} - -void EnableResourceTracker() -{ - GetResourceTracker(); + return TResourceTrackerImpl::Get()->GetAnonymousMemoryLimit(); } -void SetVCpuFactor(double vCpuFactor) +void TResourceTracker::SetVCpuFactor(double factor) { - GetResourceTracker()->SetVCpuFactor(vCpuFactor); + TResourceTrackerImpl::Get()->SetVCpuFactor(factor); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/library/profiling/resource_tracker/resource_tracker.h b/yt/yt/library/profiling/resource_tracker/resource_tracker.h index 424c499ffcd..a68ea551b59 100644 --- a/yt/yt/library/profiling/resource_tracker/resource_tracker.h +++ b/yt/yt/library/profiling/resource_tracker/resource_tracker.h @@ -1,138 +1,30 @@ #pragma once -#include <vector> -#include <yt/yt/core/concurrency/periodic_executor.h> - -#include <yt/yt/library/profiling/producer.h> - -#include <yt/yt/core/misc/proc.h> +#include <yt/yt/core/misc/public.h> namespace NYT::NProfiling { //////////////////////////////////////////////////////////////////////////////// -DECLARE_REFCOUNTED_CLASS(TCpuCgroupTracker) - -class TCpuCgroupTracker - : public NProfiling::ISensorProducer -{ -public: - void CollectSensors(ISensorWriter* writer) override; - -private: - std::optional<TCgroupCpuStat> FirstCgroupStat_; - bool CgroupErrorLogged_ = false; -}; - -//////////////////////////////////////////////////////////////////////////////// - -DECLARE_REFCOUNTED_CLASS(TMemoryCgroupTracker) - -class TMemoryCgroupTracker - : public NProfiling::ISensorProducer -{ -public: - void CollectSensors(ISensorWriter* writer) override; - - i64 GetTotalMemoryLimit() const; - i64 GetAnonymousMemoryLimit() const; - -private: - bool CgroupErrorLogged_ = false; - bool AnonymousLimitErrorLogged_ = false; - - std::atomic<i64> TotalMemoryLimit_ = 0; - std::atomic<i64> AnonymousMemoryLimit_ = 0; - - i64 SafeGetAnonymousMemoryLimit(const TString& cgroupPath, i64 totalMemoryLimit); -}; - -//////////////////////////////////////////////////////////////////////////////// - -DECLARE_REFCOUNTED_CLASS(TResourceTracker) - class TResourceTracker - : public NProfiling::ISensorProducer { public: - explicit TResourceTracker(); - - double GetUserCpu(); - double GetSystemCpu(); - double GetCpuWait(); - - i64 GetTotalMemoryLimit(); - i64 GetAnonymousMemoryLimit(); - - void CollectSensors(ISensorWriter* writer) override; - - void SetVCpuFactor(double factor); - -private: - std::atomic<double> VCpuFactor_{0.0}; - - i64 TicksPerSecond_; - TInstant LastUpdateTime_; - - // Value below are in percents. - std::atomic<double> LastUserCpu_{0.0}; - std::atomic<double> LastSystemCpu_{0.0}; - std::atomic<double> LastCpuWait_{0.0}; - - std::atomic<double> MaxThreadPoolUtilization_ = {0.0}; + //! Enables collecting background statistics and pushing them to profiler. + static void Enable(); - struct TTimings - { - i64 UserJiffies = 0; - i64 SystemJiffies = 0; - i64 CpuWaitNsec = 0; + static double GetUserCpu(); + static double GetSystemCpu(); + static double GetCpuWait(); - TTimings operator-(const TTimings& other) const; - TTimings& operator+=(const TTimings& other); - }; + static i64 GetTotalMemoryLimit(); + static i64 GetAnonymousMemoryLimit(); - struct TThreadInfo - { - TString ThreadName; - TTimings Timings; - bool IsYtThread = true; - //! This key is IsYtThread ? ThreadName : ThreadName + "@". - //! It allows to distinguish YT threads from non-YT threads that - //! inherited parent YT thread name. - TString ProfilingKey; - }; - - // thread id -> stats - using TThreadMap = THashMap<TString, TThreadInfo>; - - TThreadMap TidToInfo_; - - TCpuCgroupTrackerPtr CpuCgroupTracker_; - TMemoryCgroupTrackerPtr MemoryCgroupTracker_; - - void EnqueueUsage(); - - void EnqueueThreadStats(); - - bool ProcessThread(TString tid, TThreadInfo* result); - TThreadMap ProcessThreads(); - - void CollectSensorsAggregatedTimings( - ISensorWriter* writer, - const TThreadMap& oldTidToInfo, - const TThreadMap& newTidToInfo, - i64 timeDeltaUsec); + //! If this factor is set, additional metrics will be reported: + //! user, system, total cpu multiplied by given factor. + //! E.g. |system_vcpu = system_cpu * vcpu_factor|. + static void SetVCpuFactor(double factor); }; -TResourceTrackerPtr GetResourceTracker(); - -void EnableResourceTracker(); - -//! If this vCpuFactor is set, additional metrics will be reported: -//! user, system, total cpu multiplied by given factor. -//! E. g. system_vcpu = system_cpu * vcpu_factor. -void SetVCpuFactor(double vCpuFactor); - //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NProfiling diff --git a/yt/yt/library/program/helpers.cpp b/yt/yt/library/program/helpers.cpp index b4be1fbb0d9..46bd3822f08 100644 --- a/yt/yt/library/program/helpers.cpp +++ b/yt/yt/library/program/helpers.cpp @@ -123,12 +123,7 @@ private: i64 GetAnonymousMemoryLimit() const { - auto resourceTracker = NProfiling::GetResourceTracker(); - if (!resourceTracker) { - return 0; - } - - return resourceTracker->GetAnonymousMemoryLimit(); + return NProfiling::TResourceTracker::GetAnonymousMemoryLimit(); } TAllocatorMemoryLimit ProposeHeapMemoryLimit(i64 totalMemory, const TTCMallocConfigPtr& config) const @@ -251,9 +246,9 @@ void ConfigureSingletons(const TSingletonsConfigPtr& config) } if (config->EnableResourceTracker) { - NProfiling::EnableResourceTracker(); + NProfiling::TResourceTracker::Enable(); if (config->ResourceTrackerVCpuFactor.has_value()) { - NProfiling::SetVCpuFactor(config->ResourceTrackerVCpuFactor.value()); + NProfiling::TResourceTracker::SetVCpuFactor(config->ResourceTrackerVCpuFactor.value()); } } |
