diff options
author | babenko <babenko@yandex-team.com> | 2025-01-13 16:06:36 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2025-01-13 16:23:37 +0300 |
commit | 35068345b060c5a136cd6dbf1b9b7052ba6751f9 (patch) | |
tree | 1749c5fa707a4403bb983d9342aae640953979ac | |
parent | e5f98db1a19bd77a3e8647de3c5f409c919014f0 (diff) | |
download | ydb-35068345b060c5a136cd6dbf1b9b7052ba6751f9.tar.gz |
Rework master state thread management
The main goal is to enable running multiple master instances within a single multidaemon process.
Changes include:
* Fair share queues now accept `TThreadOptions` at init-time
* `NObjectServer::TThreadState` is introduced to encapsulate all bits of state (previously stored as individual TLS globals)
* Bootstrap is no longer considered to be unique and is now stored in `TThreadState`
* Automaton thread is initialized with the help of `TThreadOptions::ThreadInitializer`
* Free functions `(Assert|Verify)AutomatonThreadAffinity` and `(Assert|Verify)PersistentStateRead` are now considered public API and are preferred over bootstrap/hydra facade implementations.
commit_hash:e2e84cb02a2cf2fde13105036bd693657f37f1ab
7 files changed, 50 insertions, 11 deletions
diff --git a/yt/yt/core/concurrency/fair_share_action_queue-inl.h b/yt/yt/core/concurrency/fair_share_action_queue-inl.h index 9619f33276..e4a4feeec6 100644 --- a/yt/yt/core/concurrency/fair_share_action_queue-inl.h +++ b/yt/yt/core/concurrency/fair_share_action_queue-inl.h @@ -17,8 +17,14 @@ public: const TString& threadName, const std::vector<TString>& queueNames, const THashMap<TString, std::vector<TString>>& bucketToQueues, + NThreading::TThreadOptions threadOptions, NProfiling::IRegistryPtr registry) - : Queue_(CreateFairShareActionQueue(threadName, queueNames, bucketToQueues, std::move(registry))) + : Queue_(CreateFairShareActionQueue( + threadName, + queueNames, + bucketToQueues, + threadOptions, + std::move(registry))) { } const IInvokerPtr& GetInvoker(EQueue queue) override @@ -41,6 +47,7 @@ template <typename EQueue, typename EBucket> IEnumIndexedFairShareActionQueuePtr<EQueue> CreateEnumIndexedFairShareActionQueue( const TString& threadName, const THashMap<EBucket, std::vector<EQueue>>& bucketToQueues, + NThreading::TThreadOptions threadOptions, NProfiling::IRegistryPtr registry) { std::vector<TString> queueNames; @@ -55,7 +62,12 @@ IEnumIndexedFairShareActionQueuePtr<EQueue> CreateEnumIndexedFairShareActionQueu stringBucket.push_back(ToString(queue)); } } - return New<TEnumIndexedFairShareActionQueue<EQueue>>(threadName, queueNames, stringBuckets, std::move(registry)); + return New<TEnumIndexedFairShareActionQueue<EQueue>>( + threadName, + queueNames, + stringBuckets, + threadOptions, + std::move(registry)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/fair_share_action_queue.cpp b/yt/yt/core/concurrency/fair_share_action_queue.cpp index db018103bc..ef00551028 100644 --- a/yt/yt/core/concurrency/fair_share_action_queue.cpp +++ b/yt/yt/core/concurrency/fair_share_action_queue.cpp @@ -19,6 +19,7 @@ namespace NYT::NConcurrency { using namespace NProfiling; using namespace NYPath; using namespace NYTree; +using namespace NThreading; //////////////////////////////////////////////////////////////////////////////// @@ -30,6 +31,7 @@ public: const TString& threadName, const std::vector<TString>& queueNames, const THashMap<TString, std::vector<TString>>& bucketToQueues, + TThreadOptions threadOptions, NProfiling::IRegistryPtr registry) : ShutdownCookie_(RegisterShutdownCallback( Format("FairShareActionQueue(%v)", threadName), @@ -94,8 +96,17 @@ public: YT_VERIFY(QueueIndexToBucketQueueIndex_[queueIndex] != -1); } - Queue_ = New<TFairShareInvokerQueue>(CallbackEventCount_, std::move(bucketDescriptions), std::move(registry)); - Thread_ = New<TFairShareQueueSchedulerThread>(Queue_, CallbackEventCount_, threadName, threadName); + Queue_ = New<TFairShareInvokerQueue>( + CallbackEventCount_, + std::move(bucketDescriptions), + std::move(registry)); + + Thread_ = New<TFairShareQueueSchedulerThread>( + Queue_, + CallbackEventCount_, + threadName, + threadName, + threadOptions); } ~TFairShareActionQueue() @@ -141,7 +152,7 @@ public: } private: - const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_ = New<NThreading::TEventCount>(); + const TIntrusivePtr<TEventCount> CallbackEventCount_ = New<TEventCount>(); const TShutdownCookie ShutdownCookie_; const IInvokerPtr ShutdownInvoker_ = GetShutdownInvoker(); @@ -171,9 +182,15 @@ IFairShareActionQueuePtr CreateFairShareActionQueue( const TString& threadName, const std::vector<TString>& queueNames, const THashMap<TString, std::vector<TString>>& bucketToQueues, + TThreadOptions threadOptions, NProfiling::IRegistryPtr registry) { - return New<TFairShareActionQueue>(threadName, queueNames, bucketToQueues, std::move(registry)); + return New<TFairShareActionQueue>( + threadName, + queueNames, + bucketToQueues, + threadOptions, + std::move(registry)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/fair_share_action_queue.h b/yt/yt/core/concurrency/fair_share_action_queue.h index a1f8d4e2b1..195bde5461 100644 --- a/yt/yt/core/concurrency/fair_share_action_queue.h +++ b/yt/yt/core/concurrency/fair_share_action_queue.h @@ -8,6 +8,8 @@ #include <yt/yt/core/profiling/public.h> +#include <yt/yt/core/threading/thread.h> + namespace NYT::NConcurrency { //////////////////////////////////////////////////////////////////////////////// @@ -28,6 +30,7 @@ IFairShareActionQueuePtr CreateFairShareActionQueue( const TString& threadName, const std::vector<TString>& queueNames, const THashMap<TString, std::vector<TString>>& bucketToQueues = {}, + NThreading::TThreadOptions threadOptions = {}, NProfiling::IRegistryPtr registry = {}); //////////////////////////////////////////////////////////////////////////////// @@ -47,6 +50,7 @@ template <typename EQueue, typename EBucket = EQueue> IEnumIndexedFairShareActionQueuePtr<EQueue> CreateEnumIndexedFairShareActionQueue( const TString& threadName, const THashMap<EBucket, std::vector<EQueue>>& bucketToQueues = {}, + NThreading::TThreadOptions threadOptions = {}, NProfiling::IRegistryPtr registry = {}); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.cpp b/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.cpp index fed943be93..c21ac16eb5 100644 --- a/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.cpp @@ -10,11 +10,13 @@ TFairShareQueueSchedulerThread::TFairShareQueueSchedulerThread( TFairShareInvokerQueuePtr queue, TIntrusivePtr<NThreading::TEventCount> callbackEventCount, const TString& threadGroupName, - const TString& threadName) + const TString& threadName, + NThreading::TThreadOptions options) : TSchedulerThread( std::move(callbackEventCount), threadGroupName, - threadName) + threadName, + options) , Queue_(std::move(queue)) { } diff --git a/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.h b/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.h index 5716b472ad..7dc802a3a6 100644 --- a/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.h +++ b/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.h @@ -16,7 +16,8 @@ public: TFairShareInvokerQueuePtr queue, TIntrusivePtr<NThreading::TEventCount> callbackEventCount, const TString& threadGroupName, - const TString& threadName); + const TString& threadName, + NThreading::TThreadOptions options = {}); protected: const TFairShareInvokerQueuePtr Queue_; diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp index c0e4612e03..92fa3e1212 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp @@ -1047,7 +1047,6 @@ TFiberSchedulerThread::TFiberSchedulerThread( , ThreadGroupName_(std::move(threadGroupName)) { } - void TFiberSchedulerThread::ThreadMain() { // Hold this strongly. diff --git a/yt/yt/core/concurrency/unittests/fair_share_action_queue_ut.cpp b/yt/yt/core/concurrency/unittests/fair_share_action_queue_ut.cpp index d13026ce8d..955c3b3adc 100644 --- a/yt/yt/core/concurrency/unittests/fair_share_action_queue_ut.cpp +++ b/yt/yt/core/concurrency/unittests/fair_share_action_queue_ut.cpp @@ -144,7 +144,11 @@ TEST_F(TTestFairShareActionQueue, TestProfiling) THashMap<EBuckets, std::vector<EQueues>> bucketToQueues{}; bucketToQueues[EBuckets::Bucket1] = {EQueues::Queue1, EQueues::Queue2}; bucketToQueues[EBuckets::Bucket2] = {EQueues::Queue3}; - auto queue = CreateEnumIndexedFairShareActionQueue<EQueues>("ActionQueue", bucketToQueues, registry); + auto queue = CreateEnumIndexedFairShareActionQueue<EQueues>( + "ActionQueue", + bucketToQueues, + /*threadOptions*/ {}, + registry); auto config = CreateExporterConfig(); auto exporter = New<TSolomonExporter>(config, registry); |