diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-30 23:55:24 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-12-01 00:09:15 +0300 |
commit | 22f8e802732231ee090cd47a053afd31b32e06bf (patch) | |
tree | b51c7166fed23ed91f3b5154cba45a614eedb49f /library/cpp | |
parent | 21adcc74febab524dedf75a02d887e6f507d0b7e (diff) | |
download | ydb-22f8e802732231ee090cd47a053afd31b32e06bf.tar.gz |
Intermediate changes
commit_hash:320ab736dea5dae74e2ec8c344fb915be4c4df99
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/yt/stockpile/stockpile.h | 10 | ||||
-rw-r--r-- | library/cpp/yt/stockpile/stockpile_linux.cpp | 193 | ||||
-rw-r--r-- | library/cpp/yt/stockpile/stockpile_other.cpp | 5 | ||||
-rw-r--r-- | library/cpp/yt/stockpile/ya.make | 7 |
4 files changed, 130 insertions, 85 deletions
diff --git a/library/cpp/yt/stockpile/stockpile.h b/library/cpp/yt/stockpile/stockpile.h index d84763cf28..fae1b3a569 100644 --- a/library/cpp/yt/stockpile/stockpile.h +++ b/library/cpp/yt/stockpile/stockpile.h @@ -39,9 +39,13 @@ struct TStockpileOptions //////////////////////////////////////////////////////////////////////////////// -void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed); - -void RunDetachedStockpileThreads(TStockpileOptions options); +class TStockpileManager +{ +public: + //! Configures the background stockpile threads. + //! Safe to call multiple times. + static void Reconfigure(TStockpileOptions options); +}; //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/stockpile/stockpile_linux.cpp b/library/cpp/yt/stockpile/stockpile_linux.cpp index ef0ad59032..0fc60b94a8 100644 --- a/library/cpp/yt/stockpile/stockpile_linux.cpp +++ b/library/cpp/yt/stockpile/stockpile_linux.cpp @@ -1,119 +1,156 @@ #include "stockpile.h" -#include "library/cpp/yt/logging/logger.h" +#include <library/cpp/yt/threading/spin_lock.h> + +#include <library/cpp/yt/misc/global.h> + +#include <library/cpp/yt/memory/leaky_singleton.h> + +#include <library/cpp/yt/logging/logger.h> #include <thread> -#include <mutex> #include <sys/mman.h> #include <util/system/thread.h> + #include <string.h> namespace NYT { //////////////////////////////////////////////////////////////////////////////// -static const auto Logger = NLogging::TLogger("Stockpile"); +namespace { +YT_DEFINE_GLOBAL(const NLogging::TLogger, Logger, "Stockpile"); constexpr int MADV_STOCKPILE = 0x59410004; -//////////////////////////////////////////////////////////////////////////////// - -namespace { +} // namespace -void RunWithFixedBreaks(i64 bufferSize, TDuration period) +class TStockpileManagerImpl { - auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE); - YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode)); - Sleep(period); -} +public: + static TStockpileManagerImpl* Get() + { + return LeakySingleton<TStockpileManagerImpl>(); + } -void RunWithCappedLoad(i64 bufferSize, TDuration period) -{ - auto started = GetApproximateCpuInstant(); - auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE); - YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode)); - auto duration = CpuDurationToDuration(GetApproximateCpuInstant() - started); + void Reconfigure(TStockpileOptions options) + { + auto guard = Guard(SpinLock_); + + Run_.store(false); + for (const auto& thread : Threads_) { + thread->join(); + } + + Threads_.clear(); + Run_.store(true); - if (duration < period) { - Sleep(period - duration); + Options_ = options; + + for (int threadIndex = 0; threadIndex < Options_.ThreadCount; ++threadIndex) { + Threads_.push_back(std::make_unique<std::thread>(&TStockpileManagerImpl::ThreadMain, this)); + } } -} -std::pair<i64, TDuration> RunWithBackoffs( - i64 adjustedBufferSize, - TDuration adjustedPeriod, - const TStockpileOptions& options, - i64 pageSize) -{ - int returnCode = -::madvise(nullptr, adjustedBufferSize, MADV_STOCKPILE); - YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode)); - - switch(returnCode) { - case 0: - Sleep(options.Period); - return {options.BufferSize, options.Period}; - - case ENOMEM: - if (adjustedBufferSize / 2 >= pageSize) { - // Immediately make an attempt to reclaim half as much. - adjustedBufferSize = adjustedBufferSize / 2; - } else { - // Unless there is not even a single reclaimable page. - Sleep(options.Period); - } - return {adjustedBufferSize, options.Period}; +private: + DECLARE_LEAKY_SINGLETON_FRIEND(); + + const i64 PageSize_ = sysconf(_SC_PAGESIZE); + + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); + std::vector<std::unique_ptr<std::thread>> Threads_; + TStockpileOptions Options_; + std::atomic<bool> Run_ = false; + + void ThreadMain() + { + TThread::SetCurrentThreadName("Stockpile"); + + auto bufferSize = Options_.BufferSize; + auto period = Options_.Period; - case EAGAIN: - case EINTR: - Sleep(adjustedPeriod); - return {options.BufferSize, adjustedPeriod + options.Period}; + while (Run_.load()) { + switch (Options_.Strategy) { + case EStockpileStrategy::FixedBreaks: + RunWithFixedBreaks(Options_.BufferSize, Options_.Period); + break; - default: - Sleep(options.Period); - return {options.BufferSize, options.Period}; + case EStockpileStrategy::FlooredLoad: + RunWithCappedLoad(Options_.BufferSize, Options_.Period); + break; + + case EStockpileStrategy::ProgressiveBackoff: + std::tie(bufferSize, period) = RunWithBackoffs(bufferSize, period); + break; + + default: + YT_ABORT(); + } + } } -} -} // namespace + void RunWithFixedBreaks(i64 bufferSize, TDuration period) + { + auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE); + YT_LOG_DEBUG_IF(returnCode != 0, "System call \"madvise\" failed: %v", strerror(returnCode)); -void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed) -{ - TThread::SetCurrentThreadName("Stockpile"); + Sleep(period); + } - const i64 pageSize = sysconf(_SC_PAGESIZE); - auto bufferSize = options.BufferSize; - auto period = options.Period; + void RunWithCappedLoad(i64 bufferSize, TDuration period) + { + auto started = GetApproximateCpuInstant(); - while (!shouldProceed || shouldProceed->load()) { - switch (options.Strategy) { - case EStockpileStrategy::FixedBreaks: - RunWithFixedBreaks(options.BufferSize, options.Period); - break; + auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE); + YT_LOG_DEBUG_IF(returnCode != 0, "System call \"madvise\" failed: %v", strerror(returnCode)); - case EStockpileStrategy::FlooredLoad: - RunWithCappedLoad(options.BufferSize, options.Period); - break; + auto duration = CpuDurationToDuration(GetApproximateCpuInstant() - started); + if (duration < period) { + Sleep(period - duration); + } + } - case EStockpileStrategy::ProgressiveBackoff: - std::tie(bufferSize, period) = RunWithBackoffs(bufferSize, period, options, pageSize); - break; + std::pair<i64, TDuration> RunWithBackoffs( + i64 adjustedBufferSize, + TDuration adjustedPeriod) + { + int returnCode = -::madvise(nullptr, adjustedBufferSize, MADV_STOCKPILE); + YT_LOG_DEBUG_IF(returnCode != 0, "System call \"madvise\" failed: %v", strerror(returnCode)); + + switch(returnCode) { + case 0: + Sleep(Options_.Period); + return {Options_.BufferSize, Options_.Period}; + + case ENOMEM: + if (adjustedBufferSize / 2 >= PageSize_) { + // Immediately make an attempt to reclaim half as much. + adjustedBufferSize = adjustedBufferSize / 2; + } else { + // Unless there is not even a single reclaimable page. + Sleep(Options_.Period); + } + return {adjustedBufferSize, Options_.Period}; + + case EAGAIN: + case EINTR: + Sleep(adjustedPeriod); + return {Options_.BufferSize, adjustedPeriod + Options_.Period}; default: - YT_ABORT(); + Sleep(Options_.Period); + return {Options_.BufferSize, Options_.Period}; } } -} +}; + +//////////////////////////////////////////////////////////////////////////////// -void RunDetachedStockpileThreads(TStockpileOptions options) +void TStockpileManager::Reconfigure(TStockpileOptions options) { - static std::once_flag OnceFlag; - std::call_once(OnceFlag, [options = std::move(options)] { - for (int i = 0; i < options.ThreadCount; ++i) { - std::thread(RunStockpileThread, options, nullptr).detach(); - } - }); + TStockpileManagerImpl::Get()->Reconfigure(std::move(options)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/stockpile/stockpile_other.cpp b/library/cpp/yt/stockpile/stockpile_other.cpp index ce52a876ac..481b111b56 100644 --- a/library/cpp/yt/stockpile/stockpile_other.cpp +++ b/library/cpp/yt/stockpile/stockpile_other.cpp @@ -4,10 +4,7 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// -void RunStockpileThread(TStockpileOptions /*options*/, std::atomic<bool>* /*shouldProceed*/) -{ } - -void RunDetachedStockpileThreads(TStockpileOptions /*options*/) +void TStockpileManager::Reconfigure(TStockpileOptions /*options*/) { } //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/stockpile/ya.make b/library/cpp/yt/stockpile/ya.make index 39d51aaf97..36ce673ba6 100644 --- a/library/cpp/yt/stockpile/ya.make +++ b/library/cpp/yt/stockpile/ya.make @@ -8,4 +8,11 @@ ELSE() SRCS(stockpile_other.cpp) ENDIF() +PEERDIR( + library/cpp/yt/misc + library/cpp/yt/threading + library/cpp/yt/logging + library/cpp/yt/memory +) + END() |