aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgryzlov-ad <gryzlov-ad@yandex-team.com>2024-08-21 16:53:55 +0300
committergryzlov-ad <gryzlov-ad@yandex-team.com>2024-08-21 17:10:01 +0300
commit94bf8cc8897c1fd643d12329042e99ee96cc69e4 (patch)
tree2a7401fd08076827bacad567e75856b996ecfb04
parent06068c509279029ceaa73fd5f3a0d43e99e16f12 (diff)
downloadydb-94bf8cc8897c1fd643d12329042e99ee96cc69e4.tar.gz
Track job CPU utilization on Flow workers
8fc3af8a8fe764c4c60363a67733aedeb70c9226
-rw-r--r--yt/yt/core/misc/ema_counter-inl.h (renamed from yt/yt/core/misc/ema_counter.cpp)40
-rw-r--r--yt/yt/core/misc/ema_counter.h46
-rw-r--r--yt/yt/core/misc/unittests/ema_counter_ut.cpp6
-rw-r--r--yt/yt/core/ya.make1
4 files changed, 64 insertions, 29 deletions
diff --git a/yt/yt/core/misc/ema_counter.cpp b/yt/yt/core/misc/ema_counter-inl.h
index aba58bf508..05a0aec6ef 100644
--- a/yt/yt/core/misc/ema_counter.cpp
+++ b/yt/yt/core/misc/ema_counter-inl.h
@@ -1,17 +1,23 @@
+#ifndef EMA_COUNTER_INL_H_
+#error "Direct inclusion of this file is not allowed, include transaction.h"
+// For the sake of sane code completion.
#include "ema_counter.h"
-
-#include <cmath>
+#endif
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
-TEmaCounter::TEmaCounter(TWindowDurations windowDurations)
+template <typename T, int WindowCount>
+ requires std::is_arithmetic_v<T>
+TEmaCounter<T, WindowCount>::TEmaCounter(TEmaCounterWindowDurations<WindowCount> windowDurations)
: WindowDurations(std::move(windowDurations))
, WindowRates(WindowDurations.size())
{ }
-void TEmaCounter::Update(i64 newCount, TInstant newTimestamp)
+template <typename T, int WindowCount>
+ requires std::is_arithmetic_v<T>
+void TEmaCounter<T, WindowCount>::Update(T newCount, TInstant newTimestamp)
{
if (!LastTimestamp) {
// Just set current value, we do not know enough information to deal with rates.
@@ -27,7 +33,7 @@ void TEmaCounter::Update(i64 newCount, TInstant newTimestamp)
}
auto timeDelta = (newTimestamp - *LastTimestamp).SecondsFloat();
- i64 countDelta = std::max(Count, newCount) - Count;
+ auto countDelta = std::max(Count, newCount) - Count;
auto newRate = countDelta / timeDelta;
Count = newCount;
@@ -41,7 +47,9 @@ void TEmaCounter::Update(i64 newCount, TInstant newTimestamp)
}
}
-std::optional<double> TEmaCounter::GetRate(int windowIndex, TInstant currentTimestamp) const
+template <typename T, int WindowCount>
+ requires std::is_arithmetic_v<T>
+std::optional<double> TEmaCounter<T, WindowCount>::GetRate(int windowIndex, TInstant currentTimestamp) const
{
if (!StartTimestamp) {
return {};
@@ -54,14 +62,18 @@ std::optional<double> TEmaCounter::GetRate(int windowIndex, TInstant currentTime
return WindowRates[windowIndex];
}
-TEmaCounter operator+(const TEmaCounter& lhs, const TEmaCounter& rhs)
+template <typename T, int WindowCount>
+ requires std::is_arithmetic_v<T>
+TEmaCounter<T, WindowCount> operator+(const TEmaCounter<T, WindowCount>& lhs, const TEmaCounter<T, WindowCount>& rhs)
{
- TEmaCounter result = lhs;
+ auto result = lhs;
result += rhs;
return result;
}
-TEmaCounter& operator+=(TEmaCounter& lhs, const TEmaCounter& rhs)
+template <typename T, int WindowCount>
+ requires std::is_arithmetic_v<T>
+TEmaCounter<T, WindowCount>& operator+=(TEmaCounter<T, WindowCount>& lhs, const TEmaCounter<T, WindowCount>& rhs)
{
YT_VERIFY(lhs.WindowDurations == rhs.WindowDurations);
lhs.LastTimestamp = std::max(lhs.LastTimestamp, rhs.LastTimestamp);
@@ -74,7 +86,9 @@ TEmaCounter& operator+=(TEmaCounter& lhs, const TEmaCounter& rhs)
return lhs;
}
-TEmaCounter& operator*=(TEmaCounter& lhs, double coefficient)
+template <typename T, int WindowCount>
+ requires std::is_arithmetic_v<T>
+TEmaCounter<T, WindowCount>& operator*=(TEmaCounter<T, WindowCount>& lhs, double coefficient)
{
lhs.Count *= coefficient;
lhs.ImmediateRate *= coefficient;
@@ -84,9 +98,11 @@ TEmaCounter& operator*=(TEmaCounter& lhs, double coefficient)
return lhs;
}
-TEmaCounter operator*(const TEmaCounter& lhs, double coefficient)
+template <typename T, int WindowCount>
+ requires std::is_arithmetic_v<T>
+TEmaCounter<T, WindowCount> operator*(const TEmaCounter<T, WindowCount>& lhs, double coefficient)
{
- TEmaCounter result = lhs;
+ auto result = lhs;
result *= coefficient;
return result;
}
diff --git a/yt/yt/core/misc/ema_counter.h b/yt/yt/core/misc/ema_counter.h
index 77e86e49bd..21f96d7775 100644
--- a/yt/yt/core/misc/ema_counter.h
+++ b/yt/yt/core/misc/ema_counter.h
@@ -8,13 +8,26 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+//! A typical number of configured time windows.
+constexpr int TypicalWindowCount = 2;
+
+template <int WindowCount = TypicalWindowCount>
+using TEmaCounterWindowDurations = TCompactVector<TDuration, WindowCount>;
+
+template <int WindowCount = TypicalWindowCount>
+using TEmaCounterWindowRates = TCompactVector<double, WindowCount>;
+
+////////////////////////////////////////////////////////////////////////////////
+
//! A helper structure for maintaining a monotonic counter and
//! estimating its average rate over a set of configured time windows
//! using EMA (exponential moving average) technique.
+template<typename T, int WindowCount = TypicalWindowCount>
+ requires std::is_arithmetic_v<T>
struct TEmaCounter
{
//! Current value of the counter.
- i64 Count = 0;
+ T Count = 0;
//! Last update time.
std::optional<TInstant> LastTimestamp;
//! First update time.
@@ -24,20 +37,15 @@ struct TEmaCounter
//! according to the last update.
double ImmediateRate = 0.0;
- //! A typical number of configured time windows.
- static constexpr int TypicalWindowCount = 2;
- using TWindowDurations = TCompactVector<TDuration, TypicalWindowCount>;
- using TWindowRates = TCompactVector<double, TypicalWindowCount>;
-
//! Durations of configured time windows.
- TWindowDurations WindowDurations;
+ TEmaCounterWindowDurations<WindowCount> WindowDurations;
//! Estimates of a rate over corresponding time windows.
- TWindowRates WindowRates;
+ TEmaCounterWindowRates<WindowCount> WindowRates;
- explicit TEmaCounter(TWindowDurations windowDurations);
+ explicit TEmaCounter(TEmaCounterWindowDurations<WindowCount> windowDurations);
//! Set new value of counter, optionally providing a current timestamp.
- void Update(i64 newCount, TInstant newTimestamp = TInstant::Now());
+ void Update(T newCount, TInstant newTimestamp = TInstant::Now());
//! Returns the rate for the given window after enough time has passed
//! for the values to be accurate (at least the duration of the window itself).
@@ -47,10 +55,22 @@ struct TEmaCounter
// Operators for linear transformations (addition, scaling) of counters over the fixed set of windows.
-TEmaCounter operator+(const TEmaCounter& lhs, const TEmaCounter& rhs);
-TEmaCounter& operator+=(TEmaCounter& lhs, const TEmaCounter& rhs);
-TEmaCounter& operator*=(TEmaCounter& lhs, double coefficient);
+template <class T, int WindowCount>
+ requires std::is_arithmetic_v<T>
+TEmaCounter<T, WindowCount> operator+(const TEmaCounter<T, WindowCount>& lhs, const TEmaCounter<T, WindowCount>& rhs);
+
+template <class T, int WindowCount>
+ requires std::is_arithmetic_v<T>
+TEmaCounter<T, WindowCount>& operator+=(TEmaCounter<T, WindowCount>& lhs, const TEmaCounter<T, WindowCount>& rhs);
+
+template <class T, int WindowCount>
+ requires std::is_arithmetic_v<T>
+TEmaCounter<T, WindowCount>& operator*=(TEmaCounter<T, WindowCount>& lhs, double coefficient);
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
+
+#define EMA_COUNTER_INL_H_
+#include "ema_counter-inl.h"
+#undef EMA_COUNTER_INL_H_
diff --git a/yt/yt/core/misc/unittests/ema_counter_ut.cpp b/yt/yt/core/misc/unittests/ema_counter_ut.cpp
index 667a9b6c0d..e7fcbb5602 100644
--- a/yt/yt/core/misc/unittests/ema_counter_ut.cpp
+++ b/yt/yt/core/misc/unittests/ema_counter_ut.cpp
@@ -13,7 +13,7 @@ TEST(TEmaCounterTest, Simple)
{
const auto min = TDuration::Minutes(1);
- TEmaCounter counter({min});
+ TEmaCounter<i64> counter({min});
EXPECT_EQ(std::nullopt, counter.LastTimestamp);
EXPECT_EQ(std::nullopt, counter.StartTimestamp);
@@ -44,7 +44,7 @@ TEST(TEmaCounterTest, MockTime)
{
const auto sec = TDuration::Seconds(1), min = TDuration::Minutes(1);
- TEmaCounter counter({min});
+ TEmaCounter<i64> counter({min});
int obsoleteRate = 1;
int actualRate = 10;
@@ -94,7 +94,7 @@ TEST(TEmaCounterTest, RealTime)
{
const auto quant = TDuration::MilliSeconds(10), sec = TDuration::Seconds(1);
- TEmaCounter counter({sec});
+ TEmaCounter<i64> counter({sec});
const int valueCount = 200;
std::mt19937 generator(/*seed*/ 42);
diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make
index 52a7b80653..fd9f4b0792 100644
--- a/yt/yt/core/ya.make
+++ b/yt/yt/core/ya.make
@@ -123,7 +123,6 @@ SRCS(
misc/digest.cpp
misc/error.cpp
misc/error_code.cpp
- misc/ema_counter.cpp
misc/fs.cpp
# NB: it is necessary to prevent linker optimization of
# REGISTER_INTERMEDIATE_PROTO_INTEROP_REPRESENTATION macros for TGuid.