aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/cpulimit
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
committerqrort <qrort@yandex-team.com>2022-11-30 23:47:12 +0300
commit22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch)
treebffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/cpulimit
parent332b99e2173f0425444abb759eebcb2fafaa9209 (diff)
downloadydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz
validate canons without yatest_common
Diffstat (limited to 'library/cpp/cpulimit')
-rw-r--r--library/cpp/cpulimit/cpu.cpp91
-rw-r--r--library/cpp/cpulimit/cpu.h59
2 files changed, 150 insertions, 0 deletions
diff --git a/library/cpp/cpulimit/cpu.cpp b/library/cpp/cpulimit/cpu.cpp
new file mode 100644
index 0000000000..583b38055e
--- /dev/null
+++ b/library/cpp/cpulimit/cpu.cpp
@@ -0,0 +1,91 @@
+#include "cpu.h"
+
+#include <util/datetime/cputimer.h>
+#include <util/generic/yexception.h>
+#include <util/generic/ylimits.h>
+#include <util/random/random.h>
+#include <util/system/rusage.h>
+#include <util/thread/pool.h>
+
+namespace {
+ TDuration GetUsage() {
+ const auto rusage = TRusage::Get();
+ return rusage.Utime + rusage.Stime;
+ }
+
+ TInstant MonotonicNow() {
+ return TInstant::Zero() + CyclesToDuration(GetCycleCount());
+ }
+}
+
+namespace NCpuLimit {
+ TCpuMeasurer::TCpuMeasurer(TDuration probePeriod)
+ : FastProbe_{.Period = probePeriod, .Usage = 0}
+ , SlowProbe_{.Period = TDuration::MilliSeconds(100), .Usage = 0}
+ {
+ const auto now = MonotonicNow();
+ const auto usageNow = GetUsage();
+ for (auto [probe, size] : {std::pair(&FastProbe_, 5), std::pair(&SlowProbe_, 3)}) {
+ probe->Window.assign(size, {now, usageNow});
+ probe->MeasurerThread = SystemThreadFactory()->Run([this, probe = probe] {
+ UpdateProbeThread(*probe);
+ });
+ }
+ }
+
+ TCpuMeasurer::~TCpuMeasurer() {
+ Finished_ = true;
+ for (auto* probe : {&FastProbe_, &SlowProbe_}) {
+ probe->MeasurerThread->Join();
+ }
+ }
+
+ void TCpuMeasurer::UpdateProbeThread(TProbe& probe) {
+ const ui64 numberOfMeasurments = 10;
+ const TDuration checkPeriod = probe.Period / numberOfMeasurments;
+ while (!Finished_.load()) {
+ Sleep(checkPeriod);
+
+ const auto now = MonotonicNow();
+ const auto usageNow = GetUsage();
+
+ const auto [windowStartTime, windowStartUsage] = probe.Window.front();
+
+ const auto windowDuration = now - windowStartTime;
+ const auto windowUsage = usageNow - windowStartUsage;
+
+ probe.Usage.store(windowUsage / windowDuration);
+ probe.Window.emplace_back(now, usageNow);
+
+ if (probe.Window.size() > numberOfMeasurments) {
+ probe.Window.pop_front();
+ }
+ }
+ }
+
+ TCpuLimiter::TCpuLimiter(double slowThreshold, double fastThresholdBegin, double fastThresholdEnd)
+ : SlowThreshold_(slowThreshold)
+ , FastThresholdBegin_(fastThresholdBegin)
+ , FastThresholdEnd_(fastThresholdEnd)
+ {
+ Y_ENSURE(FastThresholdBegin_ <= FastThresholdEnd_);
+ }
+
+ bool TCpuLimiter::ThrottleHard(double slowUsage, double fastUsage) const {
+ if (slowUsage <= SlowThreshold_) {
+ return false;
+ }
+ if (fastUsage > FastThresholdEnd_) {
+ return true;
+ } else if (fastUsage > FastThresholdBegin_) {
+ return RandomNumber<ui32>() * (FastThresholdEnd_ - FastThresholdBegin_) <
+ Max<ui32>() * (fastUsage - FastThresholdBegin_);
+ } else {
+ return false;
+ }
+ }
+
+ bool TCpuLimiter::ThrottleSoft(double slowUsage, double fastUsage) const {
+ return slowUsage > SlowThreshold_ && fastUsage > FastThresholdEnd_;
+ }
+}
diff --git a/library/cpp/cpulimit/cpu.h b/library/cpp/cpulimit/cpu.h
new file mode 100644
index 0000000000..eb5ee9b011
--- /dev/null
+++ b/library/cpp/cpulimit/cpu.h
@@ -0,0 +1,59 @@
+#pragma once
+
+#include "cpu.h"
+
+#include <util/datetime/base.h>
+#include <util/system/types.h>
+#include <util/thread/factory.h>
+
+#include <deque>
+
+namespace NCpuLimit {
+ class TCpuMeasurer {
+ struct TProbe {
+ TDuration Period;
+ std::atomic<double> Usage;
+ THolder<IThreadFactory::IThread> MeasurerThread;
+ std::deque<std::pair<TInstant, TDuration>> Window;
+ TDuration WindowDuration{};
+ TDuration WindowUsage{};
+ };
+
+ public:
+ explicit TCpuMeasurer(TDuration probePeriod);
+ ~TCpuMeasurer();
+
+ double CpuUsageFast() const {
+ return FastProbe_.Usage.load();
+ }
+
+ double CpuUsageSlow() const {
+ return SlowProbe_.Usage.load();
+ }
+
+ private:
+ void UpdateProbeThread(TProbe& probe);
+
+ std::atomic<bool> Finished_ = false;
+
+ TProbe FastProbe_;
+ TProbe SlowProbe_;
+ };
+
+ class TCpuLimiter {
+ public:
+ TCpuLimiter(double slowThreshold, double fastThresholdBegin, double fastThresholdEnd);
+
+ // Only throttle (all requests) when LA is greater than FastThresholdEnd_
+ bool ThrottleSoft(double slowUsage, double fastUsage) const;
+
+ // Throttle requests with some probability when LA is greater than
+ // FastThresholdBegin_
+ bool ThrottleHard(double slowUsage, double fastUsage) const;
+
+ private:
+ double SlowThreshold_;
+ double FastThresholdBegin_;
+ double FastThresholdEnd_;
+ };
+}