aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-11-30 23:55:24 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-12-01 00:09:15 +0300
commit22f8e802732231ee090cd47a053afd31b32e06bf (patch)
treeb51c7166fed23ed91f3b5154cba45a614eedb49f /library/cpp
parent21adcc74febab524dedf75a02d887e6f507d0b7e (diff)
downloadydb-22f8e802732231ee090cd47a053afd31b32e06bf.tar.gz
Intermediate changes
commit_hash:320ab736dea5dae74e2ec8c344fb915be4c4df99
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/yt/stockpile/stockpile.h10
-rw-r--r--library/cpp/yt/stockpile/stockpile_linux.cpp193
-rw-r--r--library/cpp/yt/stockpile/stockpile_other.cpp5
-rw-r--r--library/cpp/yt/stockpile/ya.make7
4 files changed, 130 insertions, 85 deletions
diff --git a/library/cpp/yt/stockpile/stockpile.h b/library/cpp/yt/stockpile/stockpile.h
index d84763cf28..fae1b3a569 100644
--- a/library/cpp/yt/stockpile/stockpile.h
+++ b/library/cpp/yt/stockpile/stockpile.h
@@ -39,9 +39,13 @@ struct TStockpileOptions
////////////////////////////////////////////////////////////////////////////////
-void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed);
-
-void RunDetachedStockpileThreads(TStockpileOptions options);
+class TStockpileManager
+{
+public:
+ //! Configures the background stockpile threads.
+ //! Safe to call multiple times.
+ static void Reconfigure(TStockpileOptions options);
+};
////////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/yt/stockpile/stockpile_linux.cpp b/library/cpp/yt/stockpile/stockpile_linux.cpp
index ef0ad59032..0fc60b94a8 100644
--- a/library/cpp/yt/stockpile/stockpile_linux.cpp
+++ b/library/cpp/yt/stockpile/stockpile_linux.cpp
@@ -1,119 +1,156 @@
#include "stockpile.h"
-#include "library/cpp/yt/logging/logger.h"
+#include <library/cpp/yt/threading/spin_lock.h>
+
+#include <library/cpp/yt/misc/global.h>
+
+#include <library/cpp/yt/memory/leaky_singleton.h>
+
+#include <library/cpp/yt/logging/logger.h>
#include <thread>
-#include <mutex>
#include <sys/mman.h>
#include <util/system/thread.h>
+
#include <string.h>
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
-static const auto Logger = NLogging::TLogger("Stockpile");
+namespace {
+YT_DEFINE_GLOBAL(const NLogging::TLogger, Logger, "Stockpile");
constexpr int MADV_STOCKPILE = 0x59410004;
-////////////////////////////////////////////////////////////////////////////////
-
-namespace {
+} // namespace
-void RunWithFixedBreaks(i64 bufferSize, TDuration period)
+class TStockpileManagerImpl
{
- auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE);
- YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode));
- Sleep(period);
-}
+public:
+ static TStockpileManagerImpl* Get()
+ {
+ return LeakySingleton<TStockpileManagerImpl>();
+ }
-void RunWithCappedLoad(i64 bufferSize, TDuration period)
-{
- auto started = GetApproximateCpuInstant();
- auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE);
- YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode));
- auto duration = CpuDurationToDuration(GetApproximateCpuInstant() - started);
+ void Reconfigure(TStockpileOptions options)
+ {
+ auto guard = Guard(SpinLock_);
+
+ Run_.store(false);
+ for (const auto& thread : Threads_) {
+ thread->join();
+ }
+
+ Threads_.clear();
+ Run_.store(true);
- if (duration < period) {
- Sleep(period - duration);
+ Options_ = options;
+
+ for (int threadIndex = 0; threadIndex < Options_.ThreadCount; ++threadIndex) {
+ Threads_.push_back(std::make_unique<std::thread>(&TStockpileManagerImpl::ThreadMain, this));
+ }
}
-}
-std::pair<i64, TDuration> RunWithBackoffs(
- i64 adjustedBufferSize,
- TDuration adjustedPeriod,
- const TStockpileOptions& options,
- i64 pageSize)
-{
- int returnCode = -::madvise(nullptr, adjustedBufferSize, MADV_STOCKPILE);
- YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode));
-
- switch(returnCode) {
- case 0:
- Sleep(options.Period);
- return {options.BufferSize, options.Period};
-
- case ENOMEM:
- if (adjustedBufferSize / 2 >= pageSize) {
- // Immediately make an attempt to reclaim half as much.
- adjustedBufferSize = adjustedBufferSize / 2;
- } else {
- // Unless there is not even a single reclaimable page.
- Sleep(options.Period);
- }
- return {adjustedBufferSize, options.Period};
+private:
+ DECLARE_LEAKY_SINGLETON_FRIEND();
+
+ const i64 PageSize_ = sysconf(_SC_PAGESIZE);
+
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
+ std::vector<std::unique_ptr<std::thread>> Threads_;
+ TStockpileOptions Options_;
+ std::atomic<bool> Run_ = false;
+
+ void ThreadMain()
+ {
+ TThread::SetCurrentThreadName("Stockpile");
+
+ auto bufferSize = Options_.BufferSize;
+ auto period = Options_.Period;
- case EAGAIN:
- case EINTR:
- Sleep(adjustedPeriod);
- return {options.BufferSize, adjustedPeriod + options.Period};
+ while (Run_.load()) {
+ switch (Options_.Strategy) {
+ case EStockpileStrategy::FixedBreaks:
+ RunWithFixedBreaks(Options_.BufferSize, Options_.Period);
+ break;
- default:
- Sleep(options.Period);
- return {options.BufferSize, options.Period};
+ case EStockpileStrategy::FlooredLoad:
+ RunWithCappedLoad(Options_.BufferSize, Options_.Period);
+ break;
+
+ case EStockpileStrategy::ProgressiveBackoff:
+ std::tie(bufferSize, period) = RunWithBackoffs(bufferSize, period);
+ break;
+
+ default:
+ YT_ABORT();
+ }
+ }
}
-}
-} // namespace
+ void RunWithFixedBreaks(i64 bufferSize, TDuration period)
+ {
+ auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE);
+ YT_LOG_DEBUG_IF(returnCode != 0, "System call \"madvise\" failed: %v", strerror(returnCode));
-void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed)
-{
- TThread::SetCurrentThreadName("Stockpile");
+ Sleep(period);
+ }
- const i64 pageSize = sysconf(_SC_PAGESIZE);
- auto bufferSize = options.BufferSize;
- auto period = options.Period;
+ void RunWithCappedLoad(i64 bufferSize, TDuration period)
+ {
+ auto started = GetApproximateCpuInstant();
- while (!shouldProceed || shouldProceed->load()) {
- switch (options.Strategy) {
- case EStockpileStrategy::FixedBreaks:
- RunWithFixedBreaks(options.BufferSize, options.Period);
- break;
+ auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE);
+ YT_LOG_DEBUG_IF(returnCode != 0, "System call \"madvise\" failed: %v", strerror(returnCode));
- case EStockpileStrategy::FlooredLoad:
- RunWithCappedLoad(options.BufferSize, options.Period);
- break;
+ auto duration = CpuDurationToDuration(GetApproximateCpuInstant() - started);
+ if (duration < period) {
+ Sleep(period - duration);
+ }
+ }
- case EStockpileStrategy::ProgressiveBackoff:
- std::tie(bufferSize, period) = RunWithBackoffs(bufferSize, period, options, pageSize);
- break;
+ std::pair<i64, TDuration> RunWithBackoffs(
+ i64 adjustedBufferSize,
+ TDuration adjustedPeriod)
+ {
+ int returnCode = -::madvise(nullptr, adjustedBufferSize, MADV_STOCKPILE);
+ YT_LOG_DEBUG_IF(returnCode != 0, "System call \"madvise\" failed: %v", strerror(returnCode));
+
+ switch(returnCode) {
+ case 0:
+ Sleep(Options_.Period);
+ return {Options_.BufferSize, Options_.Period};
+
+ case ENOMEM:
+ if (adjustedBufferSize / 2 >= PageSize_) {
+ // Immediately make an attempt to reclaim half as much.
+ adjustedBufferSize = adjustedBufferSize / 2;
+ } else {
+ // Unless there is not even a single reclaimable page.
+ Sleep(Options_.Period);
+ }
+ return {adjustedBufferSize, Options_.Period};
+
+ case EAGAIN:
+ case EINTR:
+ Sleep(adjustedPeriod);
+ return {Options_.BufferSize, adjustedPeriod + Options_.Period};
default:
- YT_ABORT();
+ Sleep(Options_.Period);
+ return {Options_.BufferSize, Options_.Period};
}
}
-}
+};
+
+////////////////////////////////////////////////////////////////////////////////
-void RunDetachedStockpileThreads(TStockpileOptions options)
+void TStockpileManager::Reconfigure(TStockpileOptions options)
{
- static std::once_flag OnceFlag;
- std::call_once(OnceFlag, [options = std::move(options)] {
- for (int i = 0; i < options.ThreadCount; ++i) {
- std::thread(RunStockpileThread, options, nullptr).detach();
- }
- });
+ TStockpileManagerImpl::Get()->Reconfigure(std::move(options));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/yt/stockpile/stockpile_other.cpp b/library/cpp/yt/stockpile/stockpile_other.cpp
index ce52a876ac..481b111b56 100644
--- a/library/cpp/yt/stockpile/stockpile_other.cpp
+++ b/library/cpp/yt/stockpile/stockpile_other.cpp
@@ -4,10 +4,7 @@ namespace NYT {
////////////////////////////////////////////////////////////////////////////////
-void RunStockpileThread(TStockpileOptions /*options*/, std::atomic<bool>* /*shouldProceed*/)
-{ }
-
-void RunDetachedStockpileThreads(TStockpileOptions /*options*/)
+void TStockpileManager::Reconfigure(TStockpileOptions /*options*/)
{ }
////////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/yt/stockpile/ya.make b/library/cpp/yt/stockpile/ya.make
index 39d51aaf97..36ce673ba6 100644
--- a/library/cpp/yt/stockpile/ya.make
+++ b/library/cpp/yt/stockpile/ya.make
@@ -8,4 +8,11 @@ ELSE()
SRCS(stockpile_other.cpp)
ENDIF()
+PEERDIR(
+ library/cpp/yt/misc
+ library/cpp/yt/threading
+ library/cpp/yt/logging
+ library/cpp/yt/memory
+)
+
END()