diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-25 21:36:47 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-11-25 21:45:20 +0300 |
commit | c12e478730358c41a33da954f06655d323e7654a (patch) | |
tree | 422c1ccc49d716c9a79588e6d57a9c83b249226c | |
parent | e7ba55e67a7c19abf00ab200a28d7e0177e7fa7c (diff) | |
download | ydb-c12e478730358c41a33da954f06655d323e7654a.tar.gz |
Intermediate changes
commit_hash:72dc8207c3da659b90c6183ed9bee39d9fbb7cf8
-rw-r--r-- | yt/yt/core/rpc/grpc/dispatcher.cpp | 49 | ||||
-rw-r--r-- | yt/yt/core/rpc/grpc/dispatcher.h | 7 |
2 files changed, 30 insertions, 26 deletions
diff --git a/yt/yt/core/rpc/grpc/dispatcher.cpp b/yt/yt/core/rpc/grpc/dispatcher.cpp index a658619856..6d72c6b6b7 100644 --- a/yt/yt/core/rpc/grpc/dispatcher.cpp +++ b/yt/yt/core/rpc/grpc/dispatcher.cpp @@ -50,25 +50,25 @@ TGrpcLibraryLock::~TGrpcLibraryLock() class TDispatcher::TImpl { public: - [[nodiscard]] bool IsConfigured() const noexcept + [[nodiscard]] bool IsInitialized() const noexcept { - return Configured_.load(); + return Initialized_.load(); } void Configure(const TDispatcherConfigPtr& config) { - auto guard = Guard(ConfigureLock_); + auto guard = Guard(ConfigLock_); - if (IsConfigured()) { - THROW_ERROR_EXCEPTION("GRPC dispatcher is already configured"); + if (IsInitialized()) { + THROW_ERROR_EXCEPTION("GRPC dispatcher is already initialized and cannot be reconfigured"); } - DoConfigure(config); + Config_ = config; } TGrpcLibraryLockPtr GetLibraryLock() { - EnsureConfigured(); + EnsureInitialized(); auto grpcLock = LibraryLock_.Lock(); YT_VERIFY(grpcLock); return grpcLock; @@ -76,7 +76,7 @@ public: TGuardedGrpcCompletionQueue* PickRandomGuardedCompletionQueue() { - EnsureConfigured(); + EnsureInitialized(); return Threads_[RandomNumber<size_t>() % Threads_.size()]->GetGuardedCompletionQueue(); } @@ -153,40 +153,43 @@ private: using TDispatcherThreadPtr = TIntrusivePtr<TDispatcherThread>; - void EnsureConfigured() + void EnsureInitialized() { - if (IsConfigured()) { + if (IsInitialized()) { return; } - auto guard = Guard(ConfigureLock_); + auto guard = Guard(ConfigLock_); - if (IsConfigured()) { + if (IsInitialized()) { return; } - DoConfigure(New<TDispatcherConfig>()); + DoInitialize(); } - void DoConfigure(const TDispatcherConfigPtr& config) + void DoInitialize() { - VERIFY_SPINLOCK_AFFINITY(ConfigureLock_); - YT_VERIFY(!IsConfigured()); + VERIFY_SPINLOCK_AFFINITY(ConfigLock_); + YT_VERIFY(!IsInitialized()); - grpc_core::Executor::SetThreadsLimit(config->GrpcThreadCount); + grpc_core::Executor::SetThreadsLimit(Config_->GrpcThreadCount); // Initialize grpc only after configuration is done. auto grpcLock = New<TGrpcLibraryLock>(); - for (int index = 0; index < config->DispatcherThreadCount; ++index) { + for (int index = 0; index < Config_->DispatcherThreadCount; ++index) { Threads_.push_back(New<TDispatcherThread>(grpcLock, index)); } LibraryLock_ = grpcLock; - Configured_.store(true); + Initialized_.store(true); } - std::atomic<bool> Configured_ = false; - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, ConfigureLock_); + std::atomic<bool> Initialized_ = false; + + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, ConfigLock_); + TDispatcherConfigPtr Config_ = New<TDispatcherConfig>(); + TWeakPtr<TGrpcLibraryLock> LibraryLock_; std::vector<TDispatcherThreadPtr> Threads_; }; @@ -209,9 +212,9 @@ void TDispatcher::Configure(const TDispatcherConfigPtr& config) Impl_->Configure(config); } -bool TDispatcher::IsConfigured() const noexcept +bool TDispatcher::IsInitialized() const noexcept { - return Impl_->IsConfigured(); + return Impl_->IsInitialized(); } TGrpcLibraryLockPtr TDispatcher::GetLibraryLock() diff --git a/yt/yt/core/rpc/grpc/dispatcher.h b/yt/yt/core/rpc/grpc/dispatcher.h index a184074d3a..34ddb9f6ad 100644 --- a/yt/yt/core/rpc/grpc/dispatcher.h +++ b/yt/yt/core/rpc/grpc/dispatcher.h @@ -28,11 +28,12 @@ public: //! Configures the dispatcher. /*! - * Can only can called once; subsequent calls will throw. - * The call must be done prior to any GRPC client or server is created. + * The call must be done prior to any GRPC client or server is created. + * Can only be called before initialization, the future calls will throw. */ void Configure(const TDispatcherConfigPtr& config); - [[nodiscard]] bool IsConfigured() const noexcept; + + [[nodiscard]] bool IsInitialized() const noexcept; TGrpcLibraryLockPtr GetLibraryLock(); TGuardedGrpcCompletionQueue* PickRandomGuardedCompletionQueue(); |