summaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2024-10-08 17:56:22 +0300
committerrobot-piglet <[email protected]>2024-10-08 18:07:23 +0300
commitd17ef5729f3885185bdb7aa459fc4d3bada0c94d (patch)
tree31f8c229410b09032912843fdd86c9717b623f39 /library/cpp
parent37e325a9a8628ece2764f0f26b09ebc09ca39814 (diff)
Intermediate changes
commit_hash:8b7eb71badc9f2fcd168ee34e8c379b35577eccb
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/containers/dense_hash/dense_hash.h1
-rw-r--r--library/cpp/yt/stockpile/stockpile.h21
-rw-r--r--library/cpp/yt/stockpile/stockpile_linux.cpp99
-rw-r--r--library/cpp/yt/stockpile/stockpile_other.cpp5
4 files changed, 114 insertions, 12 deletions
diff --git a/library/cpp/containers/dense_hash/dense_hash.h b/library/cpp/containers/dense_hash/dense_hash.h
index 5dae8487397..739479c25a3 100644
--- a/library/cpp/containers/dense_hash/dense_hash.h
+++ b/library/cpp/containers/dense_hash/dense_hash.h
@@ -2,6 +2,7 @@
#include "fwd.h"
+#include <util/generic/bitops.h>
#include <util/generic/utility.h>
#include <util/generic/vector.h>
#include <util/generic/mapfindptr.h>
diff --git a/library/cpp/yt/stockpile/stockpile.h b/library/cpp/yt/stockpile/stockpile.h
index 1df9591de40..d84763cf284 100644
--- a/library/cpp/yt/stockpile/stockpile.h
+++ b/library/cpp/yt/stockpile/stockpile.h
@@ -1,5 +1,9 @@
#pragma once
+#include <library/cpp/yt/cpu_clock/clock.h>
+
+#include <library/cpp/yt/misc/enum.h>
+
#include <util/system/types.h>
#include <util/generic/size_literals.h>
@@ -10,6 +14,14 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+DEFINE_ENUM(EStockpileStrategy,
+ ((FixedBreaks) (0))
+ ((FlooredLoad) (1))
+ ((ProgressiveBackoff) (2))
+);
+
+////////////////////////////////////////////////////////////////////////////////
+
struct TStockpileOptions
{
static constexpr i64 DefaultBufferSize = 4_GBs;
@@ -18,11 +30,18 @@ struct TStockpileOptions
static constexpr int DefaultThreadCount = 4;
int ThreadCount = DefaultThreadCount;
+ static constexpr EStockpileStrategy DefaultStrategy = EStockpileStrategy::FixedBreaks;
+ EStockpileStrategy Strategy = DefaultStrategy;
+
static constexpr TDuration DefaultPeriod = TDuration::MilliSeconds(10);
TDuration Period = DefaultPeriod;
};
-void ConfigureStockpile(const TStockpileOptions& options);
+////////////////////////////////////////////////////////////////////////////////
+
+void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed);
+
+void RunDetachedStockpileThreads(TStockpileOptions options);
////////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/yt/stockpile/stockpile_linux.cpp b/library/cpp/yt/stockpile/stockpile_linux.cpp
index 3ee83d93341..0cf5ea0a8bb 100644
--- a/library/cpp/yt/stockpile/stockpile_linux.cpp
+++ b/library/cpp/yt/stockpile/stockpile_linux.cpp
@@ -1,38 +1,117 @@
#include "stockpile.h"
+#include "library/cpp/yt/logging/logger.h"
+
#include <thread>
#include <mutex>
#include <sys/mman.h>
#include <util/system/thread.h>
+#include <string.h>
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
+static const auto Logger = NLogging::TLogger("Stockpile");
+
+constexpr int MADV_STOCKPILE = 0x59410004;
+
+////////////////////////////////////////////////////////////////////////////////
+
namespace {
-void RunStockpile(const TStockpileOptions& options)
+void RunWithFixedBreaks(i64 bufferSize, TDuration period)
{
- TThread::SetCurrentThreadName("Stockpile");
+ auto returnCode = ::madvise(nullptr, bufferSize, MADV_STOCKPILE);
+ YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" returned %v", strerror(returnCode));
+ Sleep(period);
+}
- constexpr int MADV_STOCKPILE = 0x59410004;
+void RunWithCappedLoad(i64 bufferSize, TDuration period)
+{
+ auto started = GetApproximateCpuInstant();
+ auto returnCode = ::madvise(nullptr, bufferSize, MADV_STOCKPILE);
+ YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" returned %v", strerror(returnCode));
+ auto duration = CpuDurationToDuration(GetApproximateCpuInstant() - started);
- while (true) {
- ::madvise(nullptr, options.BufferSize, MADV_STOCKPILE);
- Sleep(options.Period);
+ if (duration < period) {
+ Sleep(period - duration);
+ }
+}
+
+std::pair<i64, TDuration> RunWithBackoffs(
+ i64 adjustedBufferSize,
+ TDuration adjustedPeriod,
+ const TStockpileOptions& options,
+ i64 pageSize)
+{
+ int returnCode = ::madvise(nullptr, adjustedBufferSize, MADV_STOCKPILE);
+ YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" returned %v", strerror(returnCode));
+
+ switch(returnCode) {
+ case 0:
+ Sleep(options.Period);
+ return {options.BufferSize, options.Period};
+
+ case ENOMEM:
+ if (adjustedBufferSize / 2 >= pageSize) {
+ // Immediately make an attempt to reclaim half as much.
+ adjustedBufferSize = adjustedBufferSize / 2;
+ } else {
+ // Unless there is not even a single reclaimable page.
+ Sleep(options.Period);
+ }
+ return {adjustedBufferSize, options.Period};
+
+ case EAGAIN:
+ case EINTR:
+ Sleep(adjustedPeriod);
+ return {options.BufferSize, adjustedPeriod + options.Period};
+
+ default:
+ Sleep(options.Period);
+ return {options.BufferSize, options.Period};
}
}
} // namespace
-void ConfigureStockpile(const TStockpileOptions& options)
+void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed)
+{
+ TThread::SetCurrentThreadName("Stockpile");
+
+ const i64 pageSize = sysconf(_SC_PAGESIZE);
+ auto bufferSize = options.BufferSize;
+ auto period = options.Period;
+
+ while (!shouldProceed || shouldProceed->load()) {
+ switch (options.Strategy) {
+ case EStockpileStrategy::FixedBreaks:
+ RunWithFixedBreaks(options.BufferSize, options.Period);
+ break;
+
+ case EStockpileStrategy::FlooredLoad:
+ RunWithCappedLoad(options.BufferSize, options.Period);
+ break;
+
+ case EStockpileStrategy::ProgressiveBackoff:
+ std::tie(bufferSize, period) = RunWithBackoffs(bufferSize, period, options, pageSize);
+ break;
+
+ default:
+ YT_ABORT();
+ }
+ }
+}
+
+void RunDetachedStockpileThreads(TStockpileOptions options)
{
static std::once_flag OnceFlag;
- std::call_once(OnceFlag, [options] {
- for (int i = 0; i < options.ThreadCount; i++) {
- std::thread(RunStockpile, options).detach();
+ std::call_once(OnceFlag, [options = std::move(options)] {
+ for (int i = 0; i < options.ThreadCount; ++i) {
+ std::thread(RunStockpileThread, options, nullptr).detach();
}
});
}
diff --git a/library/cpp/yt/stockpile/stockpile_other.cpp b/library/cpp/yt/stockpile/stockpile_other.cpp
index 3495d9c1cb6..ce52a876aca 100644
--- a/library/cpp/yt/stockpile/stockpile_other.cpp
+++ b/library/cpp/yt/stockpile/stockpile_other.cpp
@@ -4,7 +4,10 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
-void ConfigureStockpile(const TStockpileOptions& /*options*/)
+void RunStockpileThread(TStockpileOptions /*options*/, std::atomic<bool>* /*shouldProceed*/)
+{ }
+
+void RunDetachedStockpileThreads(TStockpileOptions /*options*/)
{ }
////////////////////////////////////////////////////////////////////////////////