aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2025-01-13 16:06:36 +0300
committerbabenko <babenko@yandex-team.com>2025-01-13 16:23:37 +0300
commit35068345b060c5a136cd6dbf1b9b7052ba6751f9 (patch)
tree1749c5fa707a4403bb983d9342aae640953979ac
parente5f98db1a19bd77a3e8647de3c5f409c919014f0 (diff)
downloadydb-35068345b060c5a136cd6dbf1b9b7052ba6751f9.tar.gz
Rework master state thread management
The main goal is to enable running multiple master instances within a single multidaemon process. Changes include: * Fair share queues now accept `TThreadOptions` at init-time * `NObjectServer::TThreadState` is introduced to encapsulate all bits of state (previously stored as individual TLS globals) * Bootstrap is no longer considered to be unique and is now stored in `TThreadState` * Automaton thread is initialized with the help of `TThreadOptions::ThreadInitializer` * Free functions `(Assert|Verify)AutomatonThreadAffinity` and `(Assert|Verify)PersistentStateRead` are now considered public API and are preferred over bootstrap/hydra facade implementations. commit_hash:e2e84cb02a2cf2fde13105036bd693657f37f1ab
-rw-r--r--yt/yt/core/concurrency/fair_share_action_queue-inl.h16
-rw-r--r--yt/yt/core/concurrency/fair_share_action_queue.cpp25
-rw-r--r--yt/yt/core/concurrency/fair_share_action_queue.h4
-rw-r--r--yt/yt/core/concurrency/fair_share_queue_scheduler_thread.cpp6
-rw-r--r--yt/yt/core/concurrency/fair_share_queue_scheduler_thread.h3
-rw-r--r--yt/yt/core/concurrency/fiber_scheduler_thread.cpp1
-rw-r--r--yt/yt/core/concurrency/unittests/fair_share_action_queue_ut.cpp6
7 files changed, 50 insertions, 11 deletions
diff --git a/yt/yt/core/concurrency/fair_share_action_queue-inl.h b/yt/yt/core/concurrency/fair_share_action_queue-inl.h
index 9619f33276..e4a4feeec6 100644
--- a/yt/yt/core/concurrency/fair_share_action_queue-inl.h
+++ b/yt/yt/core/concurrency/fair_share_action_queue-inl.h
@@ -17,8 +17,14 @@ public:
const TString& threadName,
const std::vector<TString>& queueNames,
const THashMap<TString, std::vector<TString>>& bucketToQueues,
+ NThreading::TThreadOptions threadOptions,
NProfiling::IRegistryPtr registry)
- : Queue_(CreateFairShareActionQueue(threadName, queueNames, bucketToQueues, std::move(registry)))
+ : Queue_(CreateFairShareActionQueue(
+ threadName,
+ queueNames,
+ bucketToQueues,
+ threadOptions,
+ std::move(registry)))
{ }
const IInvokerPtr& GetInvoker(EQueue queue) override
@@ -41,6 +47,7 @@ template <typename EQueue, typename EBucket>
IEnumIndexedFairShareActionQueuePtr<EQueue> CreateEnumIndexedFairShareActionQueue(
const TString& threadName,
const THashMap<EBucket, std::vector<EQueue>>& bucketToQueues,
+ NThreading::TThreadOptions threadOptions,
NProfiling::IRegistryPtr registry)
{
std::vector<TString> queueNames;
@@ -55,7 +62,12 @@ IEnumIndexedFairShareActionQueuePtr<EQueue> CreateEnumIndexedFairShareActionQueu
stringBucket.push_back(ToString(queue));
}
}
- return New<TEnumIndexedFairShareActionQueue<EQueue>>(threadName, queueNames, stringBuckets, std::move(registry));
+ return New<TEnumIndexedFairShareActionQueue<EQueue>>(
+ threadName,
+ queueNames,
+ stringBuckets,
+ threadOptions,
+ std::move(registry));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/fair_share_action_queue.cpp b/yt/yt/core/concurrency/fair_share_action_queue.cpp
index db018103bc..ef00551028 100644
--- a/yt/yt/core/concurrency/fair_share_action_queue.cpp
+++ b/yt/yt/core/concurrency/fair_share_action_queue.cpp
@@ -19,6 +19,7 @@ namespace NYT::NConcurrency {
using namespace NProfiling;
using namespace NYPath;
using namespace NYTree;
+using namespace NThreading;
////////////////////////////////////////////////////////////////////////////////
@@ -30,6 +31,7 @@ public:
const TString& threadName,
const std::vector<TString>& queueNames,
const THashMap<TString, std::vector<TString>>& bucketToQueues,
+ TThreadOptions threadOptions,
NProfiling::IRegistryPtr registry)
: ShutdownCookie_(RegisterShutdownCallback(
Format("FairShareActionQueue(%v)", threadName),
@@ -94,8 +96,17 @@ public:
YT_VERIFY(QueueIndexToBucketQueueIndex_[queueIndex] != -1);
}
- Queue_ = New<TFairShareInvokerQueue>(CallbackEventCount_, std::move(bucketDescriptions), std::move(registry));
- Thread_ = New<TFairShareQueueSchedulerThread>(Queue_, CallbackEventCount_, threadName, threadName);
+ Queue_ = New<TFairShareInvokerQueue>(
+ CallbackEventCount_,
+ std::move(bucketDescriptions),
+ std::move(registry));
+
+ Thread_ = New<TFairShareQueueSchedulerThread>(
+ Queue_,
+ CallbackEventCount_,
+ threadName,
+ threadName,
+ threadOptions);
}
~TFairShareActionQueue()
@@ -141,7 +152,7 @@ public:
}
private:
- const TIntrusivePtr<NThreading::TEventCount> CallbackEventCount_ = New<NThreading::TEventCount>();
+ const TIntrusivePtr<TEventCount> CallbackEventCount_ = New<TEventCount>();
const TShutdownCookie ShutdownCookie_;
const IInvokerPtr ShutdownInvoker_ = GetShutdownInvoker();
@@ -171,9 +182,15 @@ IFairShareActionQueuePtr CreateFairShareActionQueue(
const TString& threadName,
const std::vector<TString>& queueNames,
const THashMap<TString, std::vector<TString>>& bucketToQueues,
+ TThreadOptions threadOptions,
NProfiling::IRegistryPtr registry)
{
- return New<TFairShareActionQueue>(threadName, queueNames, bucketToQueues, std::move(registry));
+ return New<TFairShareActionQueue>(
+ threadName,
+ queueNames,
+ bucketToQueues,
+ threadOptions,
+ std::move(registry));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/fair_share_action_queue.h b/yt/yt/core/concurrency/fair_share_action_queue.h
index a1f8d4e2b1..195bde5461 100644
--- a/yt/yt/core/concurrency/fair_share_action_queue.h
+++ b/yt/yt/core/concurrency/fair_share_action_queue.h
@@ -8,6 +8,8 @@
#include <yt/yt/core/profiling/public.h>
+#include <yt/yt/core/threading/thread.h>
+
namespace NYT::NConcurrency {
////////////////////////////////////////////////////////////////////////////////
@@ -28,6 +30,7 @@ IFairShareActionQueuePtr CreateFairShareActionQueue(
const TString& threadName,
const std::vector<TString>& queueNames,
const THashMap<TString, std::vector<TString>>& bucketToQueues = {},
+ NThreading::TThreadOptions threadOptions = {},
NProfiling::IRegistryPtr registry = {});
////////////////////////////////////////////////////////////////////////////////
@@ -47,6 +50,7 @@ template <typename EQueue, typename EBucket = EQueue>
IEnumIndexedFairShareActionQueuePtr<EQueue> CreateEnumIndexedFairShareActionQueue(
const TString& threadName,
const THashMap<EBucket, std::vector<EQueue>>& bucketToQueues = {},
+ NThreading::TThreadOptions threadOptions = {},
NProfiling::IRegistryPtr registry = {});
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.cpp b/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.cpp
index fed943be93..c21ac16eb5 100644
--- a/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.cpp
@@ -10,11 +10,13 @@ TFairShareQueueSchedulerThread::TFairShareQueueSchedulerThread(
TFairShareInvokerQueuePtr queue,
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
const TString& threadGroupName,
- const TString& threadName)
+ const TString& threadName,
+ NThreading::TThreadOptions options)
: TSchedulerThread(
std::move(callbackEventCount),
threadGroupName,
- threadName)
+ threadName,
+ options)
, Queue_(std::move(queue))
{ }
diff --git a/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.h b/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.h
index 5716b472ad..7dc802a3a6 100644
--- a/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.h
+++ b/yt/yt/core/concurrency/fair_share_queue_scheduler_thread.h
@@ -16,7 +16,8 @@ public:
TFairShareInvokerQueuePtr queue,
TIntrusivePtr<NThreading::TEventCount> callbackEventCount,
const TString& threadGroupName,
- const TString& threadName);
+ const TString& threadName,
+ NThreading::TThreadOptions options = {});
protected:
const TFairShareInvokerQueuePtr Queue_;
diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
index c0e4612e03..92fa3e1212 100644
--- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
@@ -1047,7 +1047,6 @@ TFiberSchedulerThread::TFiberSchedulerThread(
, ThreadGroupName_(std::move(threadGroupName))
{ }
-
void TFiberSchedulerThread::ThreadMain()
{
// Hold this strongly.
diff --git a/yt/yt/core/concurrency/unittests/fair_share_action_queue_ut.cpp b/yt/yt/core/concurrency/unittests/fair_share_action_queue_ut.cpp
index d13026ce8d..955c3b3adc 100644
--- a/yt/yt/core/concurrency/unittests/fair_share_action_queue_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/fair_share_action_queue_ut.cpp
@@ -144,7 +144,11 @@ TEST_F(TTestFairShareActionQueue, TestProfiling)
THashMap<EBuckets, std::vector<EQueues>> bucketToQueues{};
bucketToQueues[EBuckets::Bucket1] = {EQueues::Queue1, EQueues::Queue2};
bucketToQueues[EBuckets::Bucket2] = {EQueues::Queue3};
- auto queue = CreateEnumIndexedFairShareActionQueue<EQueues>("ActionQueue", bucketToQueues, registry);
+ auto queue = CreateEnumIndexedFairShareActionQueue<EQueues>(
+ "ActionQueue",
+ bucketToQueues,
+ /*threadOptions*/ {},
+ registry);
auto config = CreateExporterConfig();
auto exporter = New<TSolomonExporter>(config, registry);