summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <[email protected]>2024-11-09 12:09:07 +0300
committerbabenko <[email protected]>2024-11-09 12:20:42 +0300
commitc351a198d988d7a703e9c8b7dcbb19285a96de5e (patch)
tree61796e45d4e117dae6a6abe4b124733f9697f557
parent381a9a45520ba56577e90032e086e0168a03b956 (diff)
Refactor and polish TResourceTracker
[nodiff:caesar] commit_hash:eb743854a8d8c28984b9774cd4ad7774dae69eed
-rw-r--r--yt/yt/library/profiling/resource_tracker/resource_tracker.cpp773
-rw-r--r--yt/yt/library/profiling/resource_tracker/resource_tracker.h132
-rw-r--r--yt/yt/library/program/helpers.cpp11
3 files changed, 454 insertions, 462 deletions
diff --git a/yt/yt/library/profiling/resource_tracker/resource_tracker.cpp b/yt/yt/library/profiling/resource_tracker/resource_tracker.cpp
index 09cf67a4825..c2cc5717aeb 100644
--- a/yt/yt/library/profiling/resource_tracker/resource_tracker.cpp
+++ b/yt/yt/library/profiling/resource_tracker/resource_tracker.cpp
@@ -3,17 +3,26 @@
#include <yt/yt/core/logging/log.h>
#include <yt/yt/core/misc/fs.h>
-#include <yt/yt/core/misc/proc.h>
#include <yt/yt/core/misc/singleton.h>
+#include <yt/yt/core/misc/proc.h>
#include <yt/yt/core/ypath/token.h>
+#include <yt/yt/library/profiling/sensor.h>
+#include <yt/yt/library/profiling/producer.h>
+
+#include <library/cpp/yt/misc/global.h>
+
+#include <library/cpp/yt/memory/leaky_ref_counted_singleton.h>
+
#include <util/folder/filelist.h>
#include <util/stream/file.h>
#include <util/string/vector.h>
+#include <mutex>
+
#if defined(_linux_)
#include <unistd.h>
#endif
@@ -27,446 +36,542 @@ using namespace NYTree;
using namespace NProfiling;
using namespace NConcurrency;
-DEFINE_REFCOUNTED_TYPE(TCpuCgroupTracker)
-DEFINE_REFCOUNTED_TYPE(TMemoryCgroupTracker)
-DEFINE_REFCOUNTED_TYPE(TResourceTracker)
-
////////////////////////////////////////////////////////////////////////////////
-static NLogging::TLogger Logger("Profiling");
-static TProfiler Profiler("/resource_tracker");
-static TProfiler MemoryProfiler("/memory/cgroup");
+namespace {
+
+YT_DEFINE_GLOBAL(const NLogging::TLogger, Logger, "Profiling");
+YT_DEFINE_GLOBAL(const TProfiler, ResourceTrackerProfiler, "/resource_tracker");
+YT_DEFINE_GLOBAL(const TProfiler, MemoryProfiler, "/memory/cgroup");
// Please, refer to /proc documentation to know more about available information.
// http://www.kernel.org/doc/Documentation/filesystems/proc.txt
-static constexpr auto procPath = "/proc/self/task";
-
-////////////////////////////////////////////////////////////////////////////////
-
-namespace {
+constexpr auto ProcfsCurrentTaskPath = "/proc/self/task";
-i64 GetTicksPerSecond()
-{
-#if defined(_linux_)
- return sysconf(_SC_CLK_TCK);
-#else
- return -1;
-#endif
-}
-
-#ifdef RESOURCE_TRACKER_ENABLED
+} // namespace
+////////////////////////////////////////////////////////////////////////////////
-#endif
+DECLARE_REFCOUNTED_CLASS(TCpuCgroupTracker)
+DECLARE_REFCOUNTED_CLASS(TMemoryCgroupTracker)
-} // namespace
+////////////////////////////////////////////////////////////////////////////////
-void TCpuCgroupTracker::CollectSensors(ISensorWriter* writer)
+class TCpuCgroupTracker
+ : public NProfiling::ISensorProducer
{
- try {
- auto cgroups = GetProcessCgroups();
- for (const auto& group : cgroups) {
- for (const auto& controller : group.Controllers) {
- if (controller == "cpu") {
- auto stat = GetCgroupCpuStat(group.ControllersName, group.Path);
-
- if (!FirstCgroupStat_) {
- FirstCgroupStat_ = stat;
+public:
+ void CollectSensors(ISensorWriter* writer) override
+ {
+ try {
+ auto cgroups = GetProcessCgroups();
+ for (const auto& group : cgroups) {
+ for (const auto& controller : group.Controllers) {
+ if (controller == "cpu") {
+ auto stat = GetCgroupCpuStat(group.ControllersName, group.Path);
+
+ if (!FirstCgroupStat_) {
+ FirstCgroupStat_ = stat;
+ }
+
+ writer->AddCounter(
+ "/cgroup/periods",
+ stat.NrPeriods - FirstCgroupStat_->NrPeriods);
+
+ writer->AddCounter(
+ "/cgroup/throttled_periods",
+ stat.NrThrottled - FirstCgroupStat_->NrThrottled);
+
+ writer->AddCounter(
+ "/cgroup/throttled_cpu_percent",
+ (stat.ThrottledTime - FirstCgroupStat_->ThrottledTime) / 10'000'000);
+
+ writer->AddCounter(
+ "/cgroup/wait_cpu_percent",
+ (stat.WaitTime - FirstCgroupStat_->WaitTime) / 10'000'000);
+
+ return;
}
+ }
+ }
+ } catch (const std::exception& ex) {
+ if (!CgroupErrorLogged_) {
+ YT_LOG_INFO(ex, "Failed to collect cgroup cpu statistics");
+ CgroupErrorLogged_ = true;
+ }
+ }
+ }
- writer->AddCounter(
- "/cgroup/periods",
- stat.NrPeriods - FirstCgroupStat_->NrPeriods);
-
- writer->AddCounter(
- "/cgroup/throttled_periods",
- stat.NrThrottled - FirstCgroupStat_->NrThrottled);
+private:
+ std::optional<TCgroupCpuStat> FirstCgroupStat_;
+ bool CgroupErrorLogged_ = false;
+};
- writer->AddCounter(
- "/cgroup/throttled_cpu_percent",
- (stat.ThrottledTime - FirstCgroupStat_->ThrottledTime) / 10'000'000);
+DEFINE_REFCOUNTED_TYPE(TCpuCgroupTracker)
- writer->AddCounter(
- "/cgroup/wait_cpu_percent",
- (stat.WaitTime - FirstCgroupStat_->WaitTime) / 10'000'000);
+////////////////////////////////////////////////////////////////////////////////
- return;
+class TMemoryCgroupTracker
+ : public NProfiling::ISensorProducer
+{
+public:
+ void CollectSensors(ISensorWriter* writer) override
+ {
+ try {
+ auto cgroups = GetProcessCgroups();
+ for (const auto& group : cgroups) {
+ for (const auto& controller : group.Controllers) {
+ if (controller == "memory") {
+ auto stat = GetCgroupMemoryStat(group.Path);
+
+ writer->AddGauge("/memory_limit", stat.HierarchicalMemoryLimit);
+ writer->AddGauge("/cache", stat.Cache);
+ writer->AddGauge("/rss", stat.Rss);
+ writer->AddGauge("/rss_huge", stat.RssHuge);
+ writer->AddGauge("/mapped_file", stat.MappedFile);
+ writer->AddGauge("/dirty", stat.Dirty);
+ writer->AddGauge("/writeback", stat.Writeback);
+
+ TotalMemoryLimit_.store(stat.HierarchicalMemoryLimit);
+ AnonymousMemoryLimit_.store(SafeGetAnonymousMemoryLimit(
+ group.Path,
+ stat.HierarchicalMemoryLimit));
+
+ return;
+ }
}
}
- }
- } catch (const std::exception& ex) {
- if (!CgroupErrorLogged_) {
- YT_LOG_INFO(ex, "Failed to collect cgroup cpu statistics");
- CgroupErrorLogged_ = true;
+ } catch (const std::exception& ex) {
+ if (!CgroupErrorLogged_) {
+ YT_LOG_INFO(ex, "Failed to collect cgroup memory statistics");
+ CgroupErrorLogged_ = true;
+ }
}
}
-}
-void TMemoryCgroupTracker::CollectSensors(ISensorWriter* writer)
-{
- try {
- auto cgroups = GetProcessCgroups();
- for (const auto& group : cgroups) {
- for (const auto& controller : group.Controllers) {
- if (controller == "memory") {
- auto stat = GetCgroupMemoryStat(group.Path);
-
- writer->AddGauge("/memory_limit", stat.HierarchicalMemoryLimit);
- writer->AddGauge("/cache", stat.Cache);
- writer->AddGauge("/rss", stat.Rss);
- writer->AddGauge("/rss_huge", stat.RssHuge);
- writer->AddGauge("/mapped_file", stat.MappedFile);
- writer->AddGauge("/dirty", stat.Dirty);
- writer->AddGauge("/writeback", stat.Writeback);
-
- TotalMemoryLimit_.store(stat.HierarchicalMemoryLimit);
- AnonymousMemoryLimit_.store(SafeGetAnonymousMemoryLimit(
- group.Path,
- stat.HierarchicalMemoryLimit));
-
- return;
- }
+ i64 GetTotalMemoryLimit() const
+ {
+ return TotalMemoryLimit_.load();
+ }
+
+ i64 GetAnonymousMemoryLimit() const
+ {
+ return AnonymousMemoryLimit_.load();
+ }
+
+private:
+ bool CgroupErrorLogged_ = false;
+ bool AnonymousLimitErrorLogged_ = false;
+
+ std::atomic<i64> TotalMemoryLimit_ = 0;
+ std::atomic<i64> AnonymousMemoryLimit_ = 0;
+
+ i64 SafeGetAnonymousMemoryLimit(const TString& cgroupPath, i64 totalMemoryLimit)
+ {
+ try {
+ auto anonymousLimit = GetCgroupAnonymousMemoryLimit(cgroupPath);
+ auto result = anonymousLimit.value_or(totalMemoryLimit);
+ result = std::min(result, totalMemoryLimit);
+ return result != 0 ? result : totalMemoryLimit;
+ } catch (const std::exception& ex) {
+ if (!AnonymousLimitErrorLogged_) {
+ YT_LOG_INFO(ex, "Failed to collect cgroup anonymous memory limit");
+ AnonymousLimitErrorLogged_ = true;
}
}
- } catch (const std::exception& ex) {
- if (!CgroupErrorLogged_) {
- YT_LOG_INFO(ex, "Failed to collect cgroup memory statistics");
- CgroupErrorLogged_ = true;
- }
+
+ return totalMemoryLimit;
}
-}
+};
-i64 TMemoryCgroupTracker::SafeGetAnonymousMemoryLimit(const TString& cgroupPath, i64 totalMemoryLimit)
+DEFINE_REFCOUNTED_TYPE(TMemoryCgroupTracker)
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TResourceTrackerImpl
+ : public NProfiling::ISensorProducer
{
- try {
- auto anonymousLimit = GetCgroupAnonymousMemoryLimit(cgroupPath);
- auto result = anonymousLimit.value_or(totalMemoryLimit);
- result = std::min(result, totalMemoryLimit);
- return result != 0 ? result : totalMemoryLimit;
- } catch (const std::exception& ex) {
- if (!AnonymousLimitErrorLogged_) {
- YT_LOG_INFO(ex, "Failed to collect cgroup anonymous memory limit");
- AnonymousLimitErrorLogged_ = true;
- }
+public:
+ static TResourceTrackerImpl* Get()
+ {
+ return LeakyRefCountedSingleton<TResourceTrackerImpl>().Get();
}
- return totalMemoryLimit;
-}
+ void Enable()
+ {
+ std::call_once(EnabledFlag_, [&] {
+ ResourceTrackerProfiler().AddFuncGauge("/memory_usage/rss", MakeStrong(this), [] {
+ return GetProcessMemoryUsage().Rss;
+ });
-i64 TMemoryCgroupTracker::GetTotalMemoryLimit() const
-{
- return TotalMemoryLimit_.load();
-}
+ ResourceTrackerProfiler().AddFuncGauge("/utilization/max", MakeStrong(this), [this] {
+ return MaxThreadPoolUtilization_.load();
+ });
-i64 TMemoryCgroupTracker::GetAnonymousMemoryLimit() const
-{
- return AnonymousMemoryLimit_.load();
-}
+ ResourceTrackerProfiler().AddProducer("", CpuCgroupTracker_);
+ MemoryProfiler().AddProducer("", MemoryCgroupTracker_);
-TResourceTracker::TTimings TResourceTracker::TTimings::operator-(const TResourceTracker::TTimings& other) const
-{
- return {UserJiffies - other.UserJiffies, SystemJiffies - other.SystemJiffies, CpuWaitNsec - other.CpuWaitNsec};
-}
+ ResourceTrackerProfiler()
+ .WithProducerRemoveSupport()
+ .AddProducer("", MakeStrong(this));
+ });
+ }
-TResourceTracker::TTimings& TResourceTracker::TTimings::operator+=(const TResourceTracker::TTimings& other)
-{
- UserJiffies += other.UserJiffies;
- SystemJiffies += other.SystemJiffies;
- CpuWaitNsec += other.CpuWaitNsec;
- return *this;
-}
+ double GetUserCpu()
+ {
+ return LastUserCpu_.load();
+ }
-TResourceTracker::TResourceTracker()
- // CPU time is measured in jiffies; we need USER_HZ to convert them
- // to milliseconds and percentages.
- : TicksPerSecond_(GetTicksPerSecond())
- , LastUpdateTime_(TInstant::Now())
- , CpuCgroupTracker_(New<TCpuCgroupTracker>())
- , MemoryCgroupTracker_(New<TMemoryCgroupTracker>())
-{
- Profiler.AddFuncGauge("/memory_usage/rss", MakeStrong(this), [] {
- return GetProcessMemoryUsage().Rss;
- });
+ double GetSystemCpu()
+ {
+ return LastSystemCpu_.load();
+ }
- Profiler.AddFuncGauge("/utilization/max", MakeStrong(this), [this] {
- return MaxThreadPoolUtilization_.load();
- });
+ double GetCpuWait()
+ {
+ return LastCpuWait_.load();
+ }
- Profiler.AddProducer("", CpuCgroupTracker_);
- MemoryProfiler.AddProducer("", MemoryCgroupTracker_);
+ i64 GetTotalMemoryLimit()
+ {
+ return MemoryCgroupTracker_->GetTotalMemoryLimit();
+ }
- Profiler
- .WithProducerRemoveSupport()
- .AddProducer("", MakeStrong(this));
-}
+ i64 GetAnonymousMemoryLimit()
+ {
+ return MemoryCgroupTracker_->GetAnonymousMemoryLimit();
+ }
-void TResourceTracker::CollectSensors(ISensorWriter* writer)
-{
- i64 timeDeltaUsec = TInstant::Now().MicroSeconds() - LastUpdateTime_.MicroSeconds();
- if (timeDeltaUsec <= 0) {
- return;
+ void SetVCpuFactor(double factor)
+ {
+ VCpuFactor_.store(factor);
}
- auto tidToInfo = ProcessThreads();
- CollectSensorsAggregatedTimings(writer, TidToInfo_, tidToInfo, timeDeltaUsec);
- TidToInfo_ = tidToInfo;
+private:
- LastUpdateTime_ = TInstant::Now();
-}
+ const TCpuCgroupTrackerPtr CpuCgroupTracker_ = New<TCpuCgroupTracker>();
+ const TMemoryCgroupTrackerPtr MemoryCgroupTracker_ = New<TMemoryCgroupTracker>();
-void TResourceTracker::SetVCpuFactor(double vCpuFactor)
-{
- VCpuFactor_ = vCpuFactor;
-}
+ // CPU time is measured in jiffies; we need USER_HZ to convert them
+ // to milliseconds and percentages.
+ const i64 TicksPerSecond_ = [] {
+#if defined(_linux_)
+ return sysconf(_SC_CLK_TCK);
+#else
+ return -1;
+#endif
+ }();
-bool TResourceTracker::ProcessThread(TString tid, TResourceTracker::TThreadInfo* info)
-{
- auto threadStatPath = NFS::CombinePaths(procPath, tid);
- auto statPath = NFS::CombinePaths(threadStatPath, "stat");
- auto statusPath = NFS::CombinePaths(threadStatPath, "status");
- auto schedStatPath = NFS::CombinePaths(threadStatPath, "schedstat");
+ std::once_flag EnabledFlag_;
- try {
- // Parse status.
- {
- TIFStream file(statusPath);
- for (TString line; file.ReadLine(line); ) {
- auto tokens = SplitString(line, "\t");
+ std::atomic<double> VCpuFactor_;
- if (tokens.size() < 2) {
- continue;
- }
+ TInstant LastUpdateTime_ = TInstant::Now();
- if (tokens[0] == "Name:") {
- info->ThreadName = tokens[1];
- } else if (tokens[0] == "SigBlk:") {
- // This is a heuristic way to distinguish YT thread from non-YT threads.
- // It is used primarily for CHYT, which links against CH and Poco that
- // have their own complicated manner of handling threads. We want to be
- // able to visually distinguish them from our threads.
- //
- // Poco threads always block SIGQUIT, SIGPIPE and SIGTERM; we use the latter
- // one presence. Feel free to change this heuristic if needed.
- YT_VERIFY(tokens[1].size() == 16);
- auto mask = IntFromString<ui64, 16>(tokens[1]);
- // Note that signals are 1-based, so 14-th bit is SIGTERM (15).
- bool sigtermBlocked = (mask >> 14) & 1ull;
- info->IsYtThread = !sigtermBlocked;
- }
- }
+ // Values below are in percent.
+ std::atomic<double> LastUserCpu_;
+ std::atomic<double> LastSystemCpu_;
+ std::atomic<double> LastCpuWait_;
+
+ std::atomic<double> MaxThreadPoolUtilization_;
+
+ void CollectSensors(ISensorWriter* writer) override
+ {
+ i64 timeDeltaUsec = TInstant::Now().MicroSeconds() - LastUpdateTime_.MicroSeconds();
+ if (timeDeltaUsec <= 0) {
+ return;
}
- // Parse schedstat.
- {
- TIFStream file(schedStatPath);
- auto tokens = SplitString(file.ReadLine(), " ");
- if (tokens.size() < 3) {
- return false;
- }
+ auto tidToInfo = ParseThreadInfos();
+ CollectSensorsAggregatedTimings(writer, TidToInfo_, tidToInfo, timeDeltaUsec);
+ TidToInfo_ = tidToInfo;
+
+ LastUpdateTime_ = TInstant::Now();
+ }
- info->Timings.CpuWaitNsec = FromString<i64>(tokens[1]);
+ struct TTimings
+ {
+ i64 UserJiffies = 0;
+ i64 SystemJiffies = 0;
+ i64 CpuWaitNsec = 0;
+
+ TTimings operator-(const TTimings& other) const
+ {
+ return {UserJiffies - other.UserJiffies, SystemJiffies - other.SystemJiffies, CpuWaitNsec - other.CpuWaitNsec};
}
- // Parse stat.
+ TTimings& operator+=(const TTimings& other)
{
- TIFStream file(statPath);
- auto tokens = SplitString(file.ReadLine(), " ");
- if (tokens.size() < 15) {
- return false;
+ UserJiffies += other.UserJiffies;
+ SystemJiffies += other.SystemJiffies;
+ CpuWaitNsec += other.CpuWaitNsec;
+ return *this;
+ }
+ };
+
+ struct TThreadInfo
+ {
+ std::string ThreadName;
+ TTimings Timings;
+ bool IsYTThread = true;
+ //! This key is IsYtThread ? ThreadName : ThreadName + "@".
+ //! It allows to distinguish YT threads from non-YT threads that
+ //! inherited parent YT thread name.
+ std::string ProfilingKey;
+ };
+
+ // Thread id -> stats
+ using TThreadInfoMap = THashMap<NThreading::TThreadId, TThreadInfo>;
+ TThreadInfoMap TidToInfo_;
+
+ std::optional<TThreadInfo> TryParseThreadInfo(NThreading::TThreadId tid)
+ {
+ TThreadInfo info;
+
+ auto threadStatPath = NFS::CombinePaths(ProcfsCurrentTaskPath, ToString(tid));
+ auto statPath = NFS::CombinePaths(threadStatPath, "stat");
+ auto statusPath = NFS::CombinePaths(threadStatPath, "status");
+ auto schedStatPath = NFS::CombinePaths(threadStatPath, "schedstat");
+
+ try {
+ // Parse status.
+ {
+ TIFStream file(statusPath);
+ for (TString line; file.ReadLine(line); ) {
+ auto tokens = SplitString(line, "\t");
+
+ if (tokens.size() < 2) {
+ continue;
+ }
+
+ if (tokens[0] == "Name:") {
+ info.ThreadName = tokens[1];
+ } else if (tokens[0] == "SigBlk:") {
+ // This is a heuristic way to distinguish YT thread from non-YT threads.
+ // It is used primarily for CHYT, which links against CH and Poco that
+ // have their own complicated manner of handling threads. We want to be
+ // able to visually distinguish them from our threads.
+ //
+ // Poco threads always block SIGQUIT, SIGPIPE and SIGTERM; we use the latter
+ // one presence. Feel free to change this heuristic if needed.
+ YT_VERIFY(tokens[1].size() == 16);
+ auto mask = IntFromString<ui64, 16>(tokens[1]);
+ // Note that signals are 1-based, so 14-th bit is SIGTERM (15).
+ bool sigtermBlocked = (mask >> 14) & 1ull;
+ info.IsYTThread = !sigtermBlocked;
+ }
+ }
}
- info->Timings.UserJiffies = FromString<i64>(tokens[13]);
- info->Timings.SystemJiffies = FromString<i64>(tokens[14]);
- }
+ // Parse schedstat.
+ {
+ TIFStream file(schedStatPath);
+ auto tokens = SplitString(file.ReadLine(), " ");
+ if (tokens.size() < 3) {
+ return std::nullopt;
+ }
- info->ProfilingKey = info->ThreadName;
+ info.Timings.CpuWaitNsec = FromString<i64>(tokens[1]);
+ }
- if (!TidToInfo_.contains(tid)) {
- YT_LOG_TRACE("Thread %v named %v", tid, info->ThreadName);
- }
+ // Parse stat.
+ {
+ TIFStream file(statPath);
+ auto tokens = SplitString(file.ReadLine(), " ");
+ if (tokens.size() < 15) {
+ return std::nullopt;
+ }
- // Group threads by thread pool, using YT thread naming convention.
- if (auto index = info->ProfilingKey.rfind(':'); index != TString::npos) {
- bool isDigit = std::all_of(info->ProfilingKey.cbegin() + index + 1, info->ProfilingKey.cend(), [] (char c) {
- return std::isdigit(c);
- });
- if (isDigit) {
- info->ProfilingKey = info->ProfilingKey.substr(0, index);
+ info.Timings.UserJiffies = FromString<i64>(tokens[13]);
+ info.Timings.SystemJiffies = FromString<i64>(tokens[14]);
}
- }
- if (!info->IsYtThread) {
- info->ProfilingKey += "@";
- }
- } catch (const TIoException&) {
- // Ignore all IO exceptions.
- return false;
- }
+ info.ProfilingKey = info.ThreadName;
- YT_LOG_TRACE("Thread statistics (Tid: %v, ThreadName: %v, IsYtThread: %v, UserJiffies: %v, SystemJiffies: %v, CpuWaitNsec: %v)",
- tid,
- info->ThreadName,
- info->IsYtThread,
- info->Timings.UserJiffies,
- info->Timings.SystemJiffies,
- info->Timings.CpuWaitNsec);
+ // Group threads by thread pool, using YT thread naming convention.
+ if (auto index = info.ProfilingKey.rfind(':'); index != TString::npos) {
+ bool isDigit = std::all_of(info.ProfilingKey.cbegin() + index + 1, info.ProfilingKey.cend(), [] (char c) {
+ return std::isdigit(c);
+ });
+ if (isDigit) {
+ info.ProfilingKey = info.ProfilingKey.substr(0, index);
+ }
+ }
- return true;
-}
+ if (!info.IsYTThread) {
+ info.ProfilingKey += "@";
+ }
+ } catch (const TIoException&) {
+ // Ignore all IO exceptions.
+ return std::nullopt;
+ }
-TResourceTracker::TThreadMap TResourceTracker::ProcessThreads()
-{
- TDirsList dirsList;
- try {
- dirsList.Fill(procPath);
- } catch (const TSystemError&) {
- // Ignore all exceptions.
- return {};
+ YT_LOG_TRACE("Thread statistics (Tid: %v, ThreadName: %v, IsYtThread: %v, UserJiffies: %v, SystemJiffies: %v, CpuWaitNsec: %v)",
+ tid,
+ info.ThreadName,
+ info.IsYTThread,
+ info.Timings.UserJiffies,
+ info.Timings.SystemJiffies,
+ info.Timings.CpuWaitNsec);
+
+ return info;
}
- TThreadMap tidToStats;
+ TThreadInfoMap ParseThreadInfos()
+ {
+ TDirsList dirsList;
+ try {
+ dirsList.Fill(ProcfsCurrentTaskPath);
+ } catch (const TSystemError&) {
+ // Ignore all exceptions.
+ return {};
+ }
- for (int index = 0; index < static_cast<int>(dirsList.Size()); ++index) {
- auto tid = TString(dirsList.Next());
- TThreadInfo info;
- if (ProcessThread(tid, &info)) {
- tidToStats[tid] = info;
- } else {
- YT_LOG_TRACE("Failed to prepare thread info for thread (Tid: %v)", tid);
+ TThreadInfoMap tidToStats;
+
+ for (int index = 0; index < static_cast<int>(dirsList.Size()); ++index) {
+ auto tidStr = TStringBuf(dirsList.Next());
+ NThreading::TThreadId tid;
+ if (!TryFromString(tidStr, tid)) {
+ continue;
+ }
+ auto info = TryParseThreadInfo(tid);
+ if (info) {
+ tidToStats[tid] = *info;
+ } else {
+ YT_LOG_TRACE("Failed to parse thread info (Tid: %v)", tid);
+ }
}
+
+ return tidToStats;
}
- return tidToStats;
-}
+ void CollectSensorsAggregatedTimings(
+ ISensorWriter* writer,
+ const TThreadInfoMap& oldTidToInfo,
+ const TThreadInfoMap& newTidToInfo,
+ i64 timeDeltaUsec)
+ {
+ double totalUserCpuTime = 0.0;
+ double totalSystemCpuTime = 0.0;
+ double totalCpuWaitTime = 0.0;
-void TResourceTracker::CollectSensorsAggregatedTimings(
- ISensorWriter* writer,
- const TResourceTracker::TThreadMap& oldTidToInfo,
- const TResourceTracker::TThreadMap& newTidToInfo,
- i64 timeDeltaUsec)
-{
- double totalUserCpuTime = 0.0;
- double totalSystemCpuTime = 0.0;
- double totalCpuWaitTime = 0.0;
+ THashMap<std::string, TTimings> profilingKeyToAggregatedTimings;
+ THashMap<std::string, int> profilingKeyToCount;
- THashMap<TString, TTimings> profilingKeyToAggregatedTimings;
- THashMap<TString, int> profilingKeyToCount;
+ // Consider only those threads which did not change their thread names.
+ // In each group of such threads with same thread name, export aggregated timings.
- // Consider only those threads which did not change their thread names.
- // In each group of such threads with same thread name, export aggregated timings.
+ for (const auto& [tid, newInfo] : newTidToInfo) {
+ ++profilingKeyToCount[newInfo.ProfilingKey];
- for (const auto& [tid, newInfo] : newTidToInfo) {
- ++profilingKeyToCount[newInfo.ProfilingKey];
+ auto it = oldTidToInfo.find(tid);
- auto it = oldTidToInfo.find(tid);
+ if (it == oldTidToInfo.end()) {
+ continue;
+ }
- if (it == oldTidToInfo.end()) {
- continue;
- }
+ const auto& oldInfo = it->second;
- const auto& oldInfo = it->second;
+ if (oldInfo.ProfilingKey != newInfo.ProfilingKey) {
+ continue;
+ }
- if (oldInfo.ProfilingKey != newInfo.ProfilingKey) {
- continue;
+ profilingKeyToAggregatedTimings[newInfo.ProfilingKey] += newInfo.Timings - oldInfo.Timings;
}
- profilingKeyToAggregatedTimings[newInfo.ProfilingKey] += newInfo.Timings - oldInfo.Timings;
- }
+ double vCpuFactor = VCpuFactor_;
+
+ double maxUtilization = 0.0;
+ for (const auto& [profilingKey, aggregatedTimings] : profilingKeyToAggregatedTimings) {
+ // Multiplier 1e6 / timeDelta is for taking average over time (all values should be "per second").
+ // Multiplier 100 for CPU time is for measuring CPU load in percents. It is due to historical reasons.
+ double userCpuTime = std::max<double>(0.0, 100. * aggregatedTimings.UserJiffies / TicksPerSecond_ * (1e6 / timeDeltaUsec));
+ double systemCpuTime = std::max<double>(0.0, 100. * aggregatedTimings.SystemJiffies / TicksPerSecond_ * (1e6 / timeDeltaUsec));
+ double waitTime = std::max<double>(0.0, 100 * aggregatedTimings.CpuWaitNsec / 1e9 * (1e6 / timeDeltaUsec));
+
+ totalUserCpuTime += userCpuTime;
+ totalSystemCpuTime += systemCpuTime;
+ totalCpuWaitTime += waitTime;
+
+ auto threadCount = profilingKeyToCount[profilingKey];
+ double utilization = (userCpuTime + systemCpuTime) / (100 * threadCount);
+
+ double totalCpuTime = userCpuTime + systemCpuTime;
+
+ TWithTagGuard tagGuard(writer, "thread", profilingKey);
+ writer->AddGauge("/user_cpu", userCpuTime);
+ writer->AddGauge("/system_cpu", systemCpuTime);
+ writer->AddGauge("/total_cpu", totalCpuTime);
+ writer->AddGauge("/cpu_wait", waitTime);
+ writer->AddGauge("/thread_count", threadCount);
+ writer->AddGauge("/utilization", utilization);
+ if (vCpuFactor != 0.0) {
+ writer->AddGauge("/user_vcpu", userCpuTime * vCpuFactor);
+ writer->AddGauge("/system_vcpu", systemCpuTime * vCpuFactor);
+ writer->AddGauge("/total_vcpu", totalCpuTime * vCpuFactor);
+ }
- double vCpuFactor = VCpuFactor_;
-
- double maxUtilization = 0.0;
- for (const auto& [profilingKey, aggregatedTimings] : profilingKeyToAggregatedTimings) {
- // Multiplier 1e6 / timeDelta is for taking average over time (all values should be "per second").
- // Multiplier 100 for CPU time is for measuring CPU load in percents. It is due to historical reasons.
- double userCpuTime = std::max<double>(0.0, 100. * aggregatedTimings.UserJiffies / TicksPerSecond_ * (1e6 / timeDeltaUsec));
- double systemCpuTime = std::max<double>(0.0, 100. * aggregatedTimings.SystemJiffies / TicksPerSecond_ * (1e6 / timeDeltaUsec));
- double waitTime = std::max<double>(0.0, 100 * aggregatedTimings.CpuWaitNsec / 1e9 * (1e6 / timeDeltaUsec));
-
- totalUserCpuTime += userCpuTime;
- totalSystemCpuTime += systemCpuTime;
- totalCpuWaitTime += waitTime;
-
- auto threadCount = profilingKeyToCount[profilingKey];
- double utilization = (userCpuTime + systemCpuTime) / (100 * threadCount);
-
- double totalCpuTime = userCpuTime + systemCpuTime;
-
- TWithTagGuard tagGuard(writer, "thread", profilingKey);
- writer->AddGauge("/user_cpu", userCpuTime);
- writer->AddGauge("/system_cpu", systemCpuTime);
- writer->AddGauge("/total_cpu", totalCpuTime);
- writer->AddGauge("/cpu_wait", waitTime);
- writer->AddGauge("/thread_count", threadCount);
- writer->AddGauge("/utilization", utilization);
- if (vCpuFactor != 0.0) {
- writer->AddGauge("/user_vcpu", userCpuTime * vCpuFactor);
- writer->AddGauge("/system_vcpu", systemCpuTime * vCpuFactor);
- writer->AddGauge("/total_vcpu", totalCpuTime * vCpuFactor);
+ maxUtilization = std::max(maxUtilization, utilization);
+
+ YT_LOG_TRACE("Thread CPU timings in percent/sec (ProfilingKey: %v, UserCpu: %v, SystemCpu: %v, CpuWait: %v)",
+ profilingKey,
+ userCpuTime,
+ systemCpuTime,
+ waitTime);
}
- maxUtilization = std::max(maxUtilization, utilization);
+ LastUserCpu_.store(totalUserCpuTime);
+ LastSystemCpu_.store(totalSystemCpuTime);
+ LastCpuWait_.store(totalCpuWaitTime);
+ MaxThreadPoolUtilization_.store(maxUtilization);
- YT_LOG_TRACE("Thread CPU timings in percent/sec (ProfilingKey: %v, UserCpu: %v, SystemCpu: %v, CpuWait: %v)",
- profilingKey,
- userCpuTime,
- systemCpuTime,
- waitTime);
- }
+ YT_LOG_DEBUG("Total CPU timings in percent/sec (UserCpu: %v, SystemCpu: %v, CpuWait: %v)",
+ totalUserCpuTime,
+ totalSystemCpuTime,
+ totalCpuWaitTime);
- LastUserCpu_.store(totalUserCpuTime);
- LastSystemCpu_.store(totalSystemCpuTime);
- LastCpuWait_.store(totalCpuWaitTime);
- MaxThreadPoolUtilization_ = maxUtilization;
+ int fileDescriptorCount = GetFileDescriptorCount();
+ writer->AddGauge("/open_fds", fileDescriptorCount);
+ YT_LOG_DEBUG("Assessed open file descriptors (Count: %v)", fileDescriptorCount);
+ }
+};
- YT_LOG_DEBUG("Total CPU timings in percent/sec (UserCpu: %v, SystemCpu: %v, CpuWait: %v)",
- totalUserCpuTime,
- totalSystemCpuTime,
- totalCpuWaitTime);
+////////////////////////////////////////////////////////////////////////////////
- int fileDescriptorCount = GetFileDescriptorCount();
- writer->AddGauge("/open_fds", fileDescriptorCount);
- YT_LOG_DEBUG("Assessed open file descriptors (Count: %v)", fileDescriptorCount);
+void TResourceTracker::Enable()
+{
+ TResourceTrackerImpl::Get()->Enable();
}
double TResourceTracker::GetUserCpu()
{
- return LastUserCpu_.load();
+ return TResourceTrackerImpl::Get()->GetUserCpu();
}
double TResourceTracker::GetSystemCpu()
{
- return LastSystemCpu_.load();
+ return TResourceTrackerImpl::Get()->GetSystemCpu();
}
double TResourceTracker::GetCpuWait()
{
- return LastCpuWait_.load();
+ return TResourceTrackerImpl::Get()->GetCpuWait();
}
i64 TResourceTracker::GetTotalMemoryLimit()
{
- return MemoryCgroupTracker_->GetTotalMemoryLimit();
+ return TResourceTrackerImpl::Get()->GetTotalMemoryLimit();
}
i64 TResourceTracker::GetAnonymousMemoryLimit()
{
- return MemoryCgroupTracker_->GetAnonymousMemoryLimit();
-}
-
-TResourceTrackerPtr GetResourceTracker()
-{
- return LeakyRefCountedSingleton<TResourceTracker>();
-}
-
-void EnableResourceTracker()
-{
- GetResourceTracker();
+ return TResourceTrackerImpl::Get()->GetAnonymousMemoryLimit();
}
-void SetVCpuFactor(double vCpuFactor)
+void TResourceTracker::SetVCpuFactor(double factor)
{
- GetResourceTracker()->SetVCpuFactor(vCpuFactor);
+ TResourceTrackerImpl::Get()->SetVCpuFactor(factor);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/library/profiling/resource_tracker/resource_tracker.h b/yt/yt/library/profiling/resource_tracker/resource_tracker.h
index 424c499ffcd..a68ea551b59 100644
--- a/yt/yt/library/profiling/resource_tracker/resource_tracker.h
+++ b/yt/yt/library/profiling/resource_tracker/resource_tracker.h
@@ -1,138 +1,30 @@
#pragma once
-#include <vector>
-#include <yt/yt/core/concurrency/periodic_executor.h>
-
-#include <yt/yt/library/profiling/producer.h>
-
-#include <yt/yt/core/misc/proc.h>
+#include <yt/yt/core/misc/public.h>
namespace NYT::NProfiling {
////////////////////////////////////////////////////////////////////////////////
-DECLARE_REFCOUNTED_CLASS(TCpuCgroupTracker)
-
-class TCpuCgroupTracker
- : public NProfiling::ISensorProducer
-{
-public:
- void CollectSensors(ISensorWriter* writer) override;
-
-private:
- std::optional<TCgroupCpuStat> FirstCgroupStat_;
- bool CgroupErrorLogged_ = false;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-DECLARE_REFCOUNTED_CLASS(TMemoryCgroupTracker)
-
-class TMemoryCgroupTracker
- : public NProfiling::ISensorProducer
-{
-public:
- void CollectSensors(ISensorWriter* writer) override;
-
- i64 GetTotalMemoryLimit() const;
- i64 GetAnonymousMemoryLimit() const;
-
-private:
- bool CgroupErrorLogged_ = false;
- bool AnonymousLimitErrorLogged_ = false;
-
- std::atomic<i64> TotalMemoryLimit_ = 0;
- std::atomic<i64> AnonymousMemoryLimit_ = 0;
-
- i64 SafeGetAnonymousMemoryLimit(const TString& cgroupPath, i64 totalMemoryLimit);
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-DECLARE_REFCOUNTED_CLASS(TResourceTracker)
-
class TResourceTracker
- : public NProfiling::ISensorProducer
{
public:
- explicit TResourceTracker();
-
- double GetUserCpu();
- double GetSystemCpu();
- double GetCpuWait();
-
- i64 GetTotalMemoryLimit();
- i64 GetAnonymousMemoryLimit();
-
- void CollectSensors(ISensorWriter* writer) override;
-
- void SetVCpuFactor(double factor);
-
-private:
- std::atomic<double> VCpuFactor_{0.0};
-
- i64 TicksPerSecond_;
- TInstant LastUpdateTime_;
-
- // Value below are in percents.
- std::atomic<double> LastUserCpu_{0.0};
- std::atomic<double> LastSystemCpu_{0.0};
- std::atomic<double> LastCpuWait_{0.0};
-
- std::atomic<double> MaxThreadPoolUtilization_ = {0.0};
+ //! Enables collecting background statistics and pushing them to profiler.
+ static void Enable();
- struct TTimings
- {
- i64 UserJiffies = 0;
- i64 SystemJiffies = 0;
- i64 CpuWaitNsec = 0;
+ static double GetUserCpu();
+ static double GetSystemCpu();
+ static double GetCpuWait();
- TTimings operator-(const TTimings& other) const;
- TTimings& operator+=(const TTimings& other);
- };
+ static i64 GetTotalMemoryLimit();
+ static i64 GetAnonymousMemoryLimit();
- struct TThreadInfo
- {
- TString ThreadName;
- TTimings Timings;
- bool IsYtThread = true;
- //! This key is IsYtThread ? ThreadName : ThreadName + "@".
- //! It allows to distinguish YT threads from non-YT threads that
- //! inherited parent YT thread name.
- TString ProfilingKey;
- };
-
- // thread id -> stats
- using TThreadMap = THashMap<TString, TThreadInfo>;
-
- TThreadMap TidToInfo_;
-
- TCpuCgroupTrackerPtr CpuCgroupTracker_;
- TMemoryCgroupTrackerPtr MemoryCgroupTracker_;
-
- void EnqueueUsage();
-
- void EnqueueThreadStats();
-
- bool ProcessThread(TString tid, TThreadInfo* result);
- TThreadMap ProcessThreads();
-
- void CollectSensorsAggregatedTimings(
- ISensorWriter* writer,
- const TThreadMap& oldTidToInfo,
- const TThreadMap& newTidToInfo,
- i64 timeDeltaUsec);
+ //! If this factor is set, additional metrics will be reported:
+ //! user, system, total cpu multiplied by given factor.
+ //! E.g. |system_vcpu = system_cpu * vcpu_factor|.
+ static void SetVCpuFactor(double factor);
};
-TResourceTrackerPtr GetResourceTracker();
-
-void EnableResourceTracker();
-
-//! If this vCpuFactor is set, additional metrics will be reported:
-//! user, system, total cpu multiplied by given factor.
-//! E. g. system_vcpu = system_cpu * vcpu_factor.
-void SetVCpuFactor(double vCpuFactor);
-
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NProfiling
diff --git a/yt/yt/library/program/helpers.cpp b/yt/yt/library/program/helpers.cpp
index b4be1fbb0d9..46bd3822f08 100644
--- a/yt/yt/library/program/helpers.cpp
+++ b/yt/yt/library/program/helpers.cpp
@@ -123,12 +123,7 @@ private:
i64 GetAnonymousMemoryLimit() const
{
- auto resourceTracker = NProfiling::GetResourceTracker();
- if (!resourceTracker) {
- return 0;
- }
-
- return resourceTracker->GetAnonymousMemoryLimit();
+ return NProfiling::TResourceTracker::GetAnonymousMemoryLimit();
}
TAllocatorMemoryLimit ProposeHeapMemoryLimit(i64 totalMemory, const TTCMallocConfigPtr& config) const
@@ -251,9 +246,9 @@ void ConfigureSingletons(const TSingletonsConfigPtr& config)
}
if (config->EnableResourceTracker) {
- NProfiling::EnableResourceTracker();
+ NProfiling::TResourceTracker::Enable();
if (config->ResourceTrackerVCpuFactor.has_value()) {
- NProfiling::SetVCpuFactor(config->ResourceTrackerVCpuFactor.value());
+ NProfiling::TResourceTracker::SetVCpuFactor(config->ResourceTrackerVCpuFactor.value());
}
}