diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-10-08 17:56:22 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-10-08 18:07:23 +0300 |
commit | d17ef5729f3885185bdb7aa459fc4d3bada0c94d (patch) | |
tree | 31f8c229410b09032912843fdd86c9717b623f39 /yt | |
parent | 37e325a9a8628ece2764f0f26b09ebc09ca39814 (diff) | |
download | ydb-d17ef5729f3885185bdb7aa459fc4d3bada0c94d.tar.gz |
Intermediate changes
commit_hash:8b7eb71badc9f2fcd168ee34e8c379b35577eccb
Diffstat (limited to 'yt')
-rw-r--r-- | yt/yt/library/program/config.cpp | 41 | ||||
-rw-r--r-- | yt/yt/library/program/config.h | 21 | ||||
-rw-r--r-- | yt/yt/library/program/helpers.cpp | 7 | ||||
-rw-r--r-- | yt/yt/library/program/public.h | 1 | ||||
-rw-r--r-- | yt/yt/library/program/stockpile.cpp | 44 | ||||
-rw-r--r-- | yt/yt/library/program/stockpile.h | 45 | ||||
-rw-r--r-- | yt/yt/library/program/ya.make | 1 |
7 files changed, 158 insertions, 2 deletions
diff --git a/yt/yt/library/program/config.cpp b/yt/yt/library/program/config.cpp index a8765946b8..52aa9e5d8f 100644 --- a/yt/yt/library/program/config.cpp +++ b/yt/yt/library/program/config.cpp @@ -54,13 +54,52 @@ void TTCMallocConfig::Register(TRegistrar registrar) void TStockpileConfig::Register(TRegistrar registrar) { registrar.BaseClassParameter("buffer_size", &TThis::BufferSize) - .Default(DefaultBufferSize); + .Default(DefaultBufferSize) + .GreaterThan(0); registrar.BaseClassParameter("thread_count", &TThis::ThreadCount) .Default(DefaultThreadCount); + registrar.BaseClassParameter("strategy", &TThis::Strategy) + .Default(DefaultStrategy); registrar.BaseClassParameter("period", &TThis::Period) .Default(DefaultPeriod); } +TStockpileConfigPtr TStockpileConfig::ApplyDynamic(const TStockpileDynamicConfigPtr& dynamicConfig) const +{ + auto mergedConfig = CloneYsonStruct(MakeStrong(this)); + + if (dynamicConfig->BufferSize) { + mergedConfig->BufferSize = *dynamicConfig->BufferSize; + } + if (dynamicConfig->ThreadCount) { + mergedConfig->ThreadCount = *dynamicConfig->ThreadCount; + } + if (dynamicConfig->Strategy) { + mergedConfig->Strategy = *dynamicConfig->Strategy; + } + if (dynamicConfig->Period) { + mergedConfig->Period = *dynamicConfig->Period; + } + + return mergedConfig; +} + +//////////////////////////////////////////////////////////////////////////////// + +void TStockpileDynamicConfig::Register(TRegistrar registrar) +{ + registrar.BaseClassParameter("buffer_size", &TThis::BufferSize) + .Optional() + .GreaterThan(0); + registrar.BaseClassParameter("thread_count", &TThis::ThreadCount) + .Optional() + .GreaterThan(0); + registrar.BaseClassParameter("strategy", &TThis::Strategy) + .Optional(); + registrar.BaseClassParameter("period", &TThis::Period) + .Optional(); +} + //////////////////////////////////////////////////////////////////////////////// void THeapProfilerConfig::Register(TRegistrar registrar) diff --git a/yt/yt/library/program/config.h b/yt/yt/library/program/config.h index 5e3aea6b40..5490c37d54 100644 --- a/yt/yt/library/program/config.h +++ b/yt/yt/library/program/config.h @@ -101,6 +101,8 @@ class TStockpileConfig , public NYTree::TYsonStruct { public: + TStockpileConfigPtr ApplyDynamic(const TStockpileDynamicConfigPtr& dynamicConfig) const; + REGISTER_YSON_STRUCT(TStockpileConfig); static void Register(TRegistrar registrar); @@ -110,6 +112,24 @@ DEFINE_REFCOUNTED_TYPE(TStockpileConfig) //////////////////////////////////////////////////////////////////////////////// +class TStockpileDynamicConfig + : public NYTree::TYsonStruct +{ +public: + std::optional<i64> BufferSize; + std::optional<int> ThreadCount; + std::optional<EStockpileStrategy> Strategy; + std::optional<TDuration> Period; + + REGISTER_YSON_STRUCT(TStockpileDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TStockpileDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + class THeapProfilerConfig : public NYTree::TYsonStruct { @@ -178,6 +198,7 @@ public: NTracing::TJaegerTracerDynamicConfigPtr Jaeger; NTracing::TTracingTransportConfigPtr TracingTransport; TTCMallocConfigPtr TCMalloc; + TStockpileDynamicConfigPtr Stockpile; NYson::TProtobufInteropDynamicConfigPtr ProtobufInterop; REGISTER_YSON_STRUCT(TSingletonsDynamicConfig); diff --git a/yt/yt/library/program/helpers.cpp b/yt/yt/library/program/helpers.cpp index 3b11d8bcb1..b4be1fbb0d 100644 --- a/yt/yt/library/program/helpers.cpp +++ b/yt/yt/library/program/helpers.cpp @@ -1,6 +1,7 @@ #include "helpers.h" #include "config.h" #include "private.h" +#include "stockpile.h" #include <yt/yt/core/ytalloc/bindings.h> @@ -243,7 +244,7 @@ void ConfigureSingletons(const TSingletonsConfigPtr& config) ConfigureTCMalloc(config->TCMalloc); - ConfigureStockpile(*config->Stockpile); + TStockpileManager::Get()->Reconfigure(*config->Stockpile); if (config->EnableRefCountedTrackerProfiling) { EnableRefCountedTrackerProfiling(); @@ -305,6 +306,10 @@ void ReconfigureSingletons(const TSingletonsConfigPtr& config, const TSingletons ConfigureTCMalloc(config->TCMalloc); } + if (dynamicConfig->Stockpile) { + TStockpileManager::Get()->Reconfigure(*config->Stockpile->ApplyDynamic(dynamicConfig->Stockpile)); + } + NYson::SetProtobufInteropConfig(config->ProtobufInterop->ApplyDynamic(dynamicConfig->ProtobufInterop)); } diff --git a/yt/yt/library/program/public.h b/yt/yt/library/program/public.h index 8b2a2321d7..116fa157be 100644 --- a/yt/yt/library/program/public.h +++ b/yt/yt/library/program/public.h @@ -10,6 +10,7 @@ DECLARE_REFCOUNTED_CLASS(TBuildInfo) DECLARE_REFCOUNTED_CLASS(TRpcConfig) DECLARE_REFCOUNTED_CLASS(TTCMallocConfig) DECLARE_REFCOUNTED_CLASS(TStockpileConfig) +DECLARE_REFCOUNTED_CLASS(TStockpileDynamicConfig) DECLARE_REFCOUNTED_CLASS(TSingletonsConfig) DECLARE_REFCOUNTED_CLASS(TSingletonsDynamicConfig) DECLARE_REFCOUNTED_CLASS(TDiagnosticDumpConfig) diff --git a/yt/yt/library/program/stockpile.cpp b/yt/yt/library/program/stockpile.cpp new file mode 100644 index 0000000000..f269398469 --- /dev/null +++ b/yt/yt/library/program/stockpile.cpp @@ -0,0 +1,44 @@ +#include "stockpile.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TStockpileManager* TStockpileManager::Get() +{ + return Singleton<TStockpileManager>(); +} + +void TStockpileManager::Reconfigure(TStockpileOptions options) +{ + auto guard = Guard(SpinLock_); + + ThreadState_->ShouldProceed.store(false); + + for (auto& thread : Threads_) { + thread->join(); + } + + Threads_.clear(); + ThreadState_ = New<TStockpileThreadState>(); + ThreadState_->ShouldProceed.store(true, std::memory_order_release); + + for (int threadIndex = 0; threadIndex < options.ThreadCount; ++threadIndex) { + Threads_.push_back(std::make_unique<std::thread>([options, state = ThreadState_] { + RunStockpileThread(options, &state->ShouldProceed); + })); + } +} + +TStockpileManager::~TStockpileManager() +{ + ThreadState_->ShouldProceed.store(false); + + for (auto& thread : Threads_) { + thread->detach(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/yt/library/program/stockpile.h b/yt/yt/library/program/stockpile.h new file mode 100644 index 0000000000..bf4cd37bab --- /dev/null +++ b/yt/yt/library/program/stockpile.h @@ -0,0 +1,45 @@ +#pragma once + +#include <library/cpp/yt/memory/intrusive_ptr.h> +#include "library/cpp/yt/memory/new.h" + +#include <library/cpp/yt/stockpile/stockpile.h> + +#include <library/cpp/yt/threading/spin_lock.h> + +#include <thread> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +DECLARE_REFCOUNTED_STRUCT(TStockpileThreadState) + +struct TStockpileThreadState final +{ + std::atomic<bool> ShouldProceed; +}; + +DEFINE_REFCOUNTED_TYPE(TStockpileThreadState) + +//////////////////////////////////////////////////////////////////////////////// + +class TStockpileManager +{ +public: + static TStockpileManager* Get(); + + void Reconfigure(TStockpileOptions options); + + ~TStockpileManager(); + +private: + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_); + std::vector<std::unique_ptr<std::thread>> Threads_; + + TStockpileThreadStatePtr ThreadState_ = New<TStockpileThreadState>(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/yt/library/program/ya.make b/yt/yt/library/program/ya.make index 4c0604f04f..e234eacaab 100644 --- a/yt/yt/library/program/ya.make +++ b/yt/yt/library/program/ya.make @@ -10,6 +10,7 @@ SRCS( program_config_mixin.cpp program_pdeathsig_mixin.cpp program_setsid_mixin.cpp + stockpile.cpp ) PEERDIR( |