diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-30 23:55:24 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-12-01 00:09:15 +0300 |
commit | 22f8e802732231ee090cd47a053afd31b32e06bf (patch) | |
tree | b51c7166fed23ed91f3b5154cba45a614eedb49f | |
parent | 21adcc74febab524dedf75a02d887e6f507d0b7e (diff) | |
download | ydb-22f8e802732231ee090cd47a053afd31b32e06bf.tar.gz |
Intermediate changes
commit_hash:320ab736dea5dae74e2ec8c344fb915be4c4df99
-rw-r--r-- | library/cpp/yt/stockpile/stockpile.h | 10 | ||||
-rw-r--r-- | library/cpp/yt/stockpile/stockpile_linux.cpp | 193 | ||||
-rw-r--r-- | library/cpp/yt/stockpile/stockpile_other.cpp | 5 | ||||
-rw-r--r-- | library/cpp/yt/stockpile/ya.make | 7 | ||||
-rw-r--r-- | yt/yt/library/program/helpers.cpp | 7 | ||||
-rw-r--r-- | yt/yt/library/program/program.cpp | 10 | ||||
-rw-r--r-- | yt/yt/library/program/program.h | 2 | ||||
-rw-r--r-- | yt/yt/library/program/stockpile.cpp | 36 | ||||
-rw-r--r-- | yt/yt/library/program/stockpile.h | 29 | ||||
-rw-r--r-- | yt/yt/library/program/ya.make | 1 |
10 files changed, 135 insertions, 165 deletions
diff --git a/library/cpp/yt/stockpile/stockpile.h b/library/cpp/yt/stockpile/stockpile.h index d84763cf28..fae1b3a569 100644 --- a/library/cpp/yt/stockpile/stockpile.h +++ b/library/cpp/yt/stockpile/stockpile.h @@ -39,9 +39,13 @@ struct TStockpileOptions //////////////////////////////////////////////////////////////////////////////// -void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed); - -void RunDetachedStockpileThreads(TStockpileOptions options); +class TStockpileManager +{ +public: + //! Configures the background stockpile threads. + //! Safe to call multiple times. + static void Reconfigure(TStockpileOptions options); +}; //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/stockpile/stockpile_linux.cpp b/library/cpp/yt/stockpile/stockpile_linux.cpp index ef0ad59032..0fc60b94a8 100644 --- a/library/cpp/yt/stockpile/stockpile_linux.cpp +++ b/library/cpp/yt/stockpile/stockpile_linux.cpp @@ -1,119 +1,156 @@ #include "stockpile.h" -#include "library/cpp/yt/logging/logger.h" +#include <library/cpp/yt/threading/spin_lock.h> + +#include <library/cpp/yt/misc/global.h> + +#include <library/cpp/yt/memory/leaky_singleton.h> + +#include <library/cpp/yt/logging/logger.h> #include <thread> -#include <mutex> #include <sys/mman.h> #include <util/system/thread.h> + #include <string.h> namespace NYT { //////////////////////////////////////////////////////////////////////////////// -static const auto Logger = NLogging::TLogger("Stockpile"); +namespace { +YT_DEFINE_GLOBAL(const NLogging::TLogger, Logger, "Stockpile"); constexpr int MADV_STOCKPILE = 0x59410004; -//////////////////////////////////////////////////////////////////////////////// - -namespace { +} // namespace -void RunWithFixedBreaks(i64 bufferSize, TDuration period) +class TStockpileManagerImpl { - auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE); - YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode)); - Sleep(period); -} +public: + static TStockpileManagerImpl* Get() + { + return LeakySingleton<TStockpileManagerImpl>(); + } -void RunWithCappedLoad(i64 bufferSize, TDuration period) -{ - auto started = GetApproximateCpuInstant(); - auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE); - YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode)); - auto duration = CpuDurationToDuration(GetApproximateCpuInstant() - started); + void Reconfigure(TStockpileOptions options) + { + auto guard = Guard(SpinLock_); + + Run_.store(false); + for (const auto& thread : Threads_) { + thread->join(); + } + + Threads_.clear(); + Run_.store(true); - if (duration < period) { - Sleep(period - duration); + Options_ = options; + + for (int threadIndex = 0; threadIndex < Options_.ThreadCount; ++threadIndex) { + Threads_.push_back(std::make_unique<std::thread>(&TStockpileManagerImpl::ThreadMain, this)); + } } -} -std::pair<i64, TDuration> RunWithBackoffs( - i64 adjustedBufferSize, - TDuration adjustedPeriod, - const TStockpileOptions& options, - i64 pageSize) -{ - int returnCode = -::madvise(nullptr, adjustedBufferSize, MADV_STOCKPILE); - YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode)); - - switch(returnCode) { - case 0: - Sleep(options.Period); - return {options.BufferSize, options.Period}; - - case ENOMEM: - if (adjustedBufferSize / 2 >= pageSize) { - // Immediately make an attempt to reclaim half as much. - adjustedBufferSize = adjustedBufferSize / 2; - } else { - // Unless there is not even a single reclaimable page. - Sleep(options.Period); - } - return {adjustedBufferSize, options.Period}; +private: + DECLARE_LEAKY_SINGLETON_FRIEND(); + + const i64 PageSize_ = sysconf(_SC_PAGESIZE); + + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); + std::vector<std::unique_ptr<std::thread>> Threads_; + TStockpileOptions Options_; + std::atomic<bool> Run_ = false; + + void ThreadMain() + { + TThread::SetCurrentThreadName("Stockpile"); + + auto bufferSize = Options_.BufferSize; + auto period = Options_.Period; - case EAGAIN: - case EINTR: - Sleep(adjustedPeriod); - return {options.BufferSize, adjustedPeriod + options.Period}; + while (Run_.load()) { + switch (Options_.Strategy) { + case EStockpileStrategy::FixedBreaks: + RunWithFixedBreaks(Options_.BufferSize, Options_.Period); + break; - default: - Sleep(options.Period); - return {options.BufferSize, options.Period}; + case EStockpileStrategy::FlooredLoad: + RunWithCappedLoad(Options_.BufferSize, Options_.Period); + break; + + case EStockpileStrategy::ProgressiveBackoff: + std::tie(bufferSize, period) = RunWithBackoffs(bufferSize, period); + break; + + default: + YT_ABORT(); + } + } } -} -} // namespace + void RunWithFixedBreaks(i64 bufferSize, TDuration period) + { + auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE); + YT_LOG_DEBUG_IF(returnCode != 0, "System call \"madvise\" failed: %v", strerror(returnCode)); -void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed) -{ - TThread::SetCurrentThreadName("Stockpile"); + Sleep(period); + } - const i64 pageSize = sysconf(_SC_PAGESIZE); - auto bufferSize = options.BufferSize; - auto period = options.Period; + void RunWithCappedLoad(i64 bufferSize, TDuration period) + { + auto started = GetApproximateCpuInstant(); - while (!shouldProceed || shouldProceed->load()) { - switch (options.Strategy) { - case EStockpileStrategy::FixedBreaks: - RunWithFixedBreaks(options.BufferSize, options.Period); - break; + auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE); + YT_LOG_DEBUG_IF(returnCode != 0, "System call \"madvise\" failed: %v", strerror(returnCode)); - case EStockpileStrategy::FlooredLoad: - RunWithCappedLoad(options.BufferSize, options.Period); - break; + auto duration = CpuDurationToDuration(GetApproximateCpuInstant() - started); + if (duration < period) { + Sleep(period - duration); + } + } - case EStockpileStrategy::ProgressiveBackoff: - std::tie(bufferSize, period) = RunWithBackoffs(bufferSize, period, options, pageSize); - break; + std::pair<i64, TDuration> RunWithBackoffs( + i64 adjustedBufferSize, + TDuration adjustedPeriod) + { + int returnCode = -::madvise(nullptr, adjustedBufferSize, MADV_STOCKPILE); + YT_LOG_DEBUG_IF(returnCode != 0, "System call \"madvise\" failed: %v", strerror(returnCode)); + + switch(returnCode) { + case 0: + Sleep(Options_.Period); + return {Options_.BufferSize, Options_.Period}; + + case ENOMEM: + if (adjustedBufferSize / 2 >= PageSize_) { + // Immediately make an attempt to reclaim half as much. + adjustedBufferSize = adjustedBufferSize / 2; + } else { + // Unless there is not even a single reclaimable page. + Sleep(Options_.Period); + } + return {adjustedBufferSize, Options_.Period}; + + case EAGAIN: + case EINTR: + Sleep(adjustedPeriod); + return {Options_.BufferSize, adjustedPeriod + Options_.Period}; default: - YT_ABORT(); + Sleep(Options_.Period); + return {Options_.BufferSize, Options_.Period}; } } -} +}; + +//////////////////////////////////////////////////////////////////////////////// -void RunDetachedStockpileThreads(TStockpileOptions options) +void TStockpileManager::Reconfigure(TStockpileOptions options) { - static std::once_flag OnceFlag; - std::call_once(OnceFlag, [options = std::move(options)] { - for (int i = 0; i < options.ThreadCount; ++i) { - std::thread(RunStockpileThread, options, nullptr).detach(); - } - }); + TStockpileManagerImpl::Get()->Reconfigure(std::move(options)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/stockpile/stockpile_other.cpp b/library/cpp/yt/stockpile/stockpile_other.cpp index ce52a876ac..481b111b56 100644 --- a/library/cpp/yt/stockpile/stockpile_other.cpp +++ b/library/cpp/yt/stockpile/stockpile_other.cpp @@ -4,10 +4,7 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// -void RunStockpileThread(TStockpileOptions /*options*/, std::atomic<bool>* /*shouldProceed*/) -{ } - -void RunDetachedStockpileThreads(TStockpileOptions /*options*/) +void TStockpileManager::Reconfigure(TStockpileOptions /*options*/) { } //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/stockpile/ya.make b/library/cpp/yt/stockpile/ya.make index 39d51aaf97..36ce673ba6 100644 --- a/library/cpp/yt/stockpile/ya.make +++ b/library/cpp/yt/stockpile/ya.make @@ -8,4 +8,11 @@ ELSE() SRCS(stockpile_other.cpp) ENDIF() +PEERDIR( + library/cpp/yt/misc + library/cpp/yt/threading + library/cpp/yt/logging + library/cpp/yt/memory +) + END() diff --git a/yt/yt/library/program/helpers.cpp b/yt/yt/library/program/helpers.cpp index 46bd3822f0..adb2c8effd 100644 --- a/yt/yt/library/program/helpers.cpp +++ b/yt/yt/library/program/helpers.cpp @@ -1,7 +1,6 @@ #include "helpers.h" #include "config.h" #include "private.h" -#include "stockpile.h" #include <yt/yt/core/ytalloc/bindings.h> @@ -40,6 +39,8 @@ #include <library/cpp/yt/memory/atomic_intrusive_ptr.h> +#include <library/cpp/yt/stockpile/stockpile.h> + #include <util/string/split.h> #include <util/system/thread.h> @@ -239,7 +240,7 @@ void ConfigureSingletons(const TSingletonsConfigPtr& config) ConfigureTCMalloc(config->TCMalloc); - TStockpileManager::Get()->Reconfigure(*config->Stockpile); + TStockpileManager::Reconfigure(*config->Stockpile); if (config->EnableRefCountedTrackerProfiling) { EnableRefCountedTrackerProfiling(); @@ -302,7 +303,7 @@ void ReconfigureSingletons(const TSingletonsConfigPtr& config, const TSingletons } if (dynamicConfig->Stockpile) { - TStockpileManager::Get()->Reconfigure(*config->Stockpile->ApplyDynamic(dynamicConfig->Stockpile)); + TStockpileManager::Reconfigure(*config->Stockpile->ApplyDynamic(dynamicConfig->Stockpile)); } NYson::SetProtobufInteropConfig(config->ProtobufInterop->ApplyDynamic(dynamicConfig->ProtobufInterop)); diff --git a/yt/yt/library/program/program.cpp b/yt/yt/library/program/program.cpp index 79f0bdb7fb..7cb2e0a6c5 100644 --- a/yt/yt/library/program/program.cpp +++ b/yt/yt/library/program/program.cpp @@ -9,8 +9,6 @@ #include <yt/yt/core/misc/fs.h> #include <yt/yt/core/misc/shutdown.h> -#include <yt/yt/core/ytalloc/bindings.h> - #include <yt/yt/core/yson/writer.h> #include <yt/yt/core/yson/null_consumer.h> @@ -20,9 +18,8 @@ #include <yt/yt/library/profiling/tcmalloc/profiler.h> -#include <library/cpp/ytalloc/api/ytalloc.h> - #include <library/cpp/yt/mlock/mlock.h> + #include <library/cpp/yt/stockpile/stockpile.h> #include <library/cpp/yt/system/exit.h> @@ -337,11 +334,6 @@ void ConfigureAllocator(const TAllocatorOptions& options) NYT::MlockFileMappings(); #ifdef _linux_ - NYTAlloc::EnableYTLogging(); - NYTAlloc::EnableYTProfiling(); - NYTAlloc::InitializeLibunwindInterop(); - NYTAlloc::SetEnableEagerMemoryRelease(options.YTAllocEagerMemoryRelease); - if (tcmalloc::MallocExtension::NeedsProcessBackgroundActions()) { std::thread backgroundThread([] { TThread::SetCurrentThreadName("TCAllocBack"); diff --git a/yt/yt/library/program/program.h b/yt/yt/library/program/program.h index f4c8babb50..5ef17adb50 100644 --- a/yt/yt/library/program/program.h +++ b/yt/yt/library/program/program.h @@ -128,8 +128,6 @@ void ConfigureExitZeroOnSigterm(); struct TAllocatorOptions { - bool YTAllocEagerMemoryRelease = false; - bool TCMallocOptimizeSize = false; std::optional<i64> TCMallocGuardedSamplingRate = 128_MB; diff --git a/yt/yt/library/program/stockpile.cpp b/yt/yt/library/program/stockpile.cpp deleted file mode 100644 index 864574eab3..0000000000 --- a/yt/yt/library/program/stockpile.cpp +++ /dev/null @@ -1,36 +0,0 @@ -#include "stockpile.h" - -#include <library/cpp/yt/memory/leaky_singleton.h> - -namespace NYT { - -//////////////////////////////////////////////////////////////////////////////// - -TStockpileManager* TStockpileManager::Get() -{ - return LeakySingleton<TStockpileManager>(); -} - -void TStockpileManager::Reconfigure(TStockpileOptions options) -{ - auto guard = Guard(SpinLock_); - - ShouldProceed_.store(false); - - for (const auto& thread : Threads_) { - thread->join(); - } - - Threads_.clear(); - ShouldProceed_.store(true); - - for (int threadIndex = 0; threadIndex < options.ThreadCount; ++threadIndex) { - Threads_.push_back(std::make_unique<std::thread>([options, this] { - RunStockpileThread(options, &ShouldProceed_); - })); - } -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT diff --git a/yt/yt/library/program/stockpile.h b/yt/yt/library/program/stockpile.h deleted file mode 100644 index 68c198b2e4..0000000000 --- a/yt/yt/library/program/stockpile.h +++ /dev/null @@ -1,29 +0,0 @@ -#pragma once - -#include <library/cpp/yt/stockpile/stockpile.h> - -#include <library/cpp/yt/threading/spin_lock.h> - -#include <thread> - -namespace NYT { - -//////////////////////////////////////////////////////////////////////////////// - -class TStockpileManager -{ -public: - static TStockpileManager* Get(); - - void Reconfigure(TStockpileOptions options); - -private: - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); - std::vector<std::unique_ptr<std::thread>> Threads_; - - std::atomic<bool> ShouldProceed_{true}; -}; - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT diff --git a/yt/yt/library/program/ya.make b/yt/yt/library/program/ya.make index e234eacaab..4c0604f04f 100644 --- a/yt/yt/library/program/ya.make +++ b/yt/yt/library/program/ya.make @@ -10,7 +10,6 @@ SRCS( program_config_mixin.cpp program_pdeathsig_mixin.cpp program_setsid_mixin.cpp - stockpile.cpp ) PEERDIR( |