aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-10-08 17:56:22 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-10-08 18:07:23 +0300
commitd17ef5729f3885185bdb7aa459fc4d3bada0c94d (patch)
tree31f8c229410b09032912843fdd86c9717b623f39 /yt
parent37e325a9a8628ece2764f0f26b09ebc09ca39814 (diff)
downloadydb-d17ef5729f3885185bdb7aa459fc4d3bada0c94d.tar.gz
Intermediate changes
commit_hash:8b7eb71badc9f2fcd168ee34e8c379b35577eccb
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/library/program/config.cpp41
-rw-r--r--yt/yt/library/program/config.h21
-rw-r--r--yt/yt/library/program/helpers.cpp7
-rw-r--r--yt/yt/library/program/public.h1
-rw-r--r--yt/yt/library/program/stockpile.cpp44
-rw-r--r--yt/yt/library/program/stockpile.h45
-rw-r--r--yt/yt/library/program/ya.make1
7 files changed, 158 insertions, 2 deletions
diff --git a/yt/yt/library/program/config.cpp b/yt/yt/library/program/config.cpp
index a8765946b8..52aa9e5d8f 100644
--- a/yt/yt/library/program/config.cpp
+++ b/yt/yt/library/program/config.cpp
@@ -54,13 +54,52 @@ void TTCMallocConfig::Register(TRegistrar registrar)
void TStockpileConfig::Register(TRegistrar registrar)
{
registrar.BaseClassParameter("buffer_size", &TThis::BufferSize)
- .Default(DefaultBufferSize);
+ .Default(DefaultBufferSize)
+ .GreaterThan(0);
registrar.BaseClassParameter("thread_count", &TThis::ThreadCount)
.Default(DefaultThreadCount);
+ registrar.BaseClassParameter("strategy", &TThis::Strategy)
+ .Default(DefaultStrategy);
registrar.BaseClassParameter("period", &TThis::Period)
.Default(DefaultPeriod);
}
+TStockpileConfigPtr TStockpileConfig::ApplyDynamic(const TStockpileDynamicConfigPtr& dynamicConfig) const
+{
+ auto mergedConfig = CloneYsonStruct(MakeStrong(this));
+
+ if (dynamicConfig->BufferSize) {
+ mergedConfig->BufferSize = *dynamicConfig->BufferSize;
+ }
+ if (dynamicConfig->ThreadCount) {
+ mergedConfig->ThreadCount = *dynamicConfig->ThreadCount;
+ }
+ if (dynamicConfig->Strategy) {
+ mergedConfig->Strategy = *dynamicConfig->Strategy;
+ }
+ if (dynamicConfig->Period) {
+ mergedConfig->Period = *dynamicConfig->Period;
+ }
+
+ return mergedConfig;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TStockpileDynamicConfig::Register(TRegistrar registrar)
+{
+ registrar.BaseClassParameter("buffer_size", &TThis::BufferSize)
+ .Optional()
+ .GreaterThan(0);
+ registrar.BaseClassParameter("thread_count", &TThis::ThreadCount)
+ .Optional()
+ .GreaterThan(0);
+ registrar.BaseClassParameter("strategy", &TThis::Strategy)
+ .Optional();
+ registrar.BaseClassParameter("period", &TThis::Period)
+ .Optional();
+}
+
////////////////////////////////////////////////////////////////////////////////
void THeapProfilerConfig::Register(TRegistrar registrar)
diff --git a/yt/yt/library/program/config.h b/yt/yt/library/program/config.h
index 5e3aea6b40..5490c37d54 100644
--- a/yt/yt/library/program/config.h
+++ b/yt/yt/library/program/config.h
@@ -101,6 +101,8 @@ class TStockpileConfig
, public NYTree::TYsonStruct
{
public:
+ TStockpileConfigPtr ApplyDynamic(const TStockpileDynamicConfigPtr& dynamicConfig) const;
+
REGISTER_YSON_STRUCT(TStockpileConfig);
static void Register(TRegistrar registrar);
@@ -110,6 +112,24 @@ DEFINE_REFCOUNTED_TYPE(TStockpileConfig)
////////////////////////////////////////////////////////////////////////////////
+class TStockpileDynamicConfig
+ : public NYTree::TYsonStruct
+{
+public:
+ std::optional<i64> BufferSize;
+ std::optional<int> ThreadCount;
+ std::optional<EStockpileStrategy> Strategy;
+ std::optional<TDuration> Period;
+
+ REGISTER_YSON_STRUCT(TStockpileDynamicConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TStockpileDynamicConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
class THeapProfilerConfig
: public NYTree::TYsonStruct
{
@@ -178,6 +198,7 @@ public:
NTracing::TJaegerTracerDynamicConfigPtr Jaeger;
NTracing::TTracingTransportConfigPtr TracingTransport;
TTCMallocConfigPtr TCMalloc;
+ TStockpileDynamicConfigPtr Stockpile;
NYson::TProtobufInteropDynamicConfigPtr ProtobufInterop;
REGISTER_YSON_STRUCT(TSingletonsDynamicConfig);
diff --git a/yt/yt/library/program/helpers.cpp b/yt/yt/library/program/helpers.cpp
index 3b11d8bcb1..b4be1fbb0d 100644
--- a/yt/yt/library/program/helpers.cpp
+++ b/yt/yt/library/program/helpers.cpp
@@ -1,6 +1,7 @@
#include "helpers.h"
#include "config.h"
#include "private.h"
+#include "stockpile.h"
#include <yt/yt/core/ytalloc/bindings.h>
@@ -243,7 +244,7 @@ void ConfigureSingletons(const TSingletonsConfigPtr& config)
ConfigureTCMalloc(config->TCMalloc);
- ConfigureStockpile(*config->Stockpile);
+ TStockpileManager::Get()->Reconfigure(*config->Stockpile);
if (config->EnableRefCountedTrackerProfiling) {
EnableRefCountedTrackerProfiling();
@@ -305,6 +306,10 @@ void ReconfigureSingletons(const TSingletonsConfigPtr& config, const TSingletons
ConfigureTCMalloc(config->TCMalloc);
}
+ if (dynamicConfig->Stockpile) {
+ TStockpileManager::Get()->Reconfigure(*config->Stockpile->ApplyDynamic(dynamicConfig->Stockpile));
+ }
+
NYson::SetProtobufInteropConfig(config->ProtobufInterop->ApplyDynamic(dynamicConfig->ProtobufInterop));
}
diff --git a/yt/yt/library/program/public.h b/yt/yt/library/program/public.h
index 8b2a2321d7..116fa157be 100644
--- a/yt/yt/library/program/public.h
+++ b/yt/yt/library/program/public.h
@@ -10,6 +10,7 @@ DECLARE_REFCOUNTED_CLASS(TBuildInfo)
DECLARE_REFCOUNTED_CLASS(TRpcConfig)
DECLARE_REFCOUNTED_CLASS(TTCMallocConfig)
DECLARE_REFCOUNTED_CLASS(TStockpileConfig)
+DECLARE_REFCOUNTED_CLASS(TStockpileDynamicConfig)
DECLARE_REFCOUNTED_CLASS(TSingletonsConfig)
DECLARE_REFCOUNTED_CLASS(TSingletonsDynamicConfig)
DECLARE_REFCOUNTED_CLASS(TDiagnosticDumpConfig)
diff --git a/yt/yt/library/program/stockpile.cpp b/yt/yt/library/program/stockpile.cpp
new file mode 100644
index 0000000000..f269398469
--- /dev/null
+++ b/yt/yt/library/program/stockpile.cpp
@@ -0,0 +1,44 @@
+#include "stockpile.h"
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TStockpileManager* TStockpileManager::Get()
+{
+ return Singleton<TStockpileManager>();
+}
+
+void TStockpileManager::Reconfigure(TStockpileOptions options)
+{
+ auto guard = Guard(SpinLock_);
+
+ ThreadState_->ShouldProceed.store(false);
+
+ for (auto& thread : Threads_) {
+ thread->join();
+ }
+
+ Threads_.clear();
+ ThreadState_ = New<TStockpileThreadState>();
+ ThreadState_->ShouldProceed.store(true, std::memory_order_release);
+
+ for (int threadIndex = 0; threadIndex < options.ThreadCount; ++threadIndex) {
+ Threads_.push_back(std::make_unique<std::thread>([options, state = ThreadState_] {
+ RunStockpileThread(options, &state->ShouldProceed);
+ }));
+ }
+}
+
+TStockpileManager::~TStockpileManager()
+{
+ ThreadState_->ShouldProceed.store(false);
+
+ for (auto& thread : Threads_) {
+ thread->detach();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/yt/library/program/stockpile.h b/yt/yt/library/program/stockpile.h
new file mode 100644
index 0000000000..bf4cd37bab
--- /dev/null
+++ b/yt/yt/library/program/stockpile.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include <library/cpp/yt/memory/intrusive_ptr.h>
+#include "library/cpp/yt/memory/new.h"
+
+#include <library/cpp/yt/stockpile/stockpile.h>
+
+#include <library/cpp/yt/threading/spin_lock.h>
+
+#include <thread>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+DECLARE_REFCOUNTED_STRUCT(TStockpileThreadState)
+
+struct TStockpileThreadState final
+{
+ std::atomic<bool> ShouldProceed;
+};
+
+DEFINE_REFCOUNTED_TYPE(TStockpileThreadState)
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TStockpileManager
+{
+public:
+ static TStockpileManager* Get();
+
+ void Reconfigure(TStockpileOptions options);
+
+ ~TStockpileManager();
+
+private:
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, SpinLock_);
+ std::vector<std::unique_ptr<std::thread>> Threads_;
+
+ TStockpileThreadStatePtr ThreadState_ = New<TStockpileThreadState>();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/yt/library/program/ya.make b/yt/yt/library/program/ya.make
index 4c0604f04f..e234eacaab 100644
--- a/yt/yt/library/program/ya.make
+++ b/yt/yt/library/program/ya.make
@@ -10,6 +10,7 @@ SRCS(
program_config_mixin.cpp
program_pdeathsig_mixin.cpp
program_setsid_mixin.cpp
+ stockpile.cpp
)
PEERDIR(