aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-11-25 21:36:47 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-11-25 21:45:20 +0300
commitc12e478730358c41a33da954f06655d323e7654a (patch)
tree422c1ccc49d716c9a79588e6d57a9c83b249226c
parente7ba55e67a7c19abf00ab200a28d7e0177e7fa7c (diff)
downloadydb-c12e478730358c41a33da954f06655d323e7654a.tar.gz
Intermediate changes
commit_hash:72dc8207c3da659b90c6183ed9bee39d9fbb7cf8
-rw-r--r--yt/yt/core/rpc/grpc/dispatcher.cpp49
-rw-r--r--yt/yt/core/rpc/grpc/dispatcher.h7
2 files changed, 30 insertions, 26 deletions
diff --git a/yt/yt/core/rpc/grpc/dispatcher.cpp b/yt/yt/core/rpc/grpc/dispatcher.cpp
index a658619856..6d72c6b6b7 100644
--- a/yt/yt/core/rpc/grpc/dispatcher.cpp
+++ b/yt/yt/core/rpc/grpc/dispatcher.cpp
@@ -50,25 +50,25 @@ TGrpcLibraryLock::~TGrpcLibraryLock()
class TDispatcher::TImpl
{
public:
- [[nodiscard]] bool IsConfigured() const noexcept
+ [[nodiscard]] bool IsInitialized() const noexcept
{
- return Configured_.load();
+ return Initialized_.load();
}
void Configure(const TDispatcherConfigPtr& config)
{
- auto guard = Guard(ConfigureLock_);
+ auto guard = Guard(ConfigLock_);
- if (IsConfigured()) {
- THROW_ERROR_EXCEPTION("GRPC dispatcher is already configured");
+ if (IsInitialized()) {
+ THROW_ERROR_EXCEPTION("GRPC dispatcher is already initialized and cannot be reconfigured");
}
- DoConfigure(config);
+ Config_ = config;
}
TGrpcLibraryLockPtr GetLibraryLock()
{
- EnsureConfigured();
+ EnsureInitialized();
auto grpcLock = LibraryLock_.Lock();
YT_VERIFY(grpcLock);
return grpcLock;
@@ -76,7 +76,7 @@ public:
TGuardedGrpcCompletionQueue* PickRandomGuardedCompletionQueue()
{
- EnsureConfigured();
+ EnsureInitialized();
return Threads_[RandomNumber<size_t>() % Threads_.size()]->GetGuardedCompletionQueue();
}
@@ -153,40 +153,43 @@ private:
using TDispatcherThreadPtr = TIntrusivePtr<TDispatcherThread>;
- void EnsureConfigured()
+ void EnsureInitialized()
{
- if (IsConfigured()) {
+ if (IsInitialized()) {
return;
}
- auto guard = Guard(ConfigureLock_);
+ auto guard = Guard(ConfigLock_);
- if (IsConfigured()) {
+ if (IsInitialized()) {
return;
}
- DoConfigure(New<TDispatcherConfig>());
+ DoInitialize();
}
- void DoConfigure(const TDispatcherConfigPtr& config)
+ void DoInitialize()
{
- VERIFY_SPINLOCK_AFFINITY(ConfigureLock_);
- YT_VERIFY(!IsConfigured());
+ VERIFY_SPINLOCK_AFFINITY(ConfigLock_);
+ YT_VERIFY(!IsInitialized());
- grpc_core::Executor::SetThreadsLimit(config->GrpcThreadCount);
+ grpc_core::Executor::SetThreadsLimit(Config_->GrpcThreadCount);
// Initialize grpc only after configuration is done.
auto grpcLock = New<TGrpcLibraryLock>();
- for (int index = 0; index < config->DispatcherThreadCount; ++index) {
+ for (int index = 0; index < Config_->DispatcherThreadCount; ++index) {
Threads_.push_back(New<TDispatcherThread>(grpcLock, index));
}
LibraryLock_ = grpcLock;
- Configured_.store(true);
+ Initialized_.store(true);
}
- std::atomic<bool> Configured_ = false;
- YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, ConfigureLock_);
+ std::atomic<bool> Initialized_ = false;
+
+ YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, ConfigLock_);
+ TDispatcherConfigPtr Config_ = New<TDispatcherConfig>();
+
TWeakPtr<TGrpcLibraryLock> LibraryLock_;
std::vector<TDispatcherThreadPtr> Threads_;
};
@@ -209,9 +212,9 @@ void TDispatcher::Configure(const TDispatcherConfigPtr& config)
Impl_->Configure(config);
}
-bool TDispatcher::IsConfigured() const noexcept
+bool TDispatcher::IsInitialized() const noexcept
{
- return Impl_->IsConfigured();
+ return Impl_->IsInitialized();
}
TGrpcLibraryLockPtr TDispatcher::GetLibraryLock()
diff --git a/yt/yt/core/rpc/grpc/dispatcher.h b/yt/yt/core/rpc/grpc/dispatcher.h
index a184074d3a..34ddb9f6ad 100644
--- a/yt/yt/core/rpc/grpc/dispatcher.h
+++ b/yt/yt/core/rpc/grpc/dispatcher.h
@@ -28,11 +28,12 @@ public:
//! Configures the dispatcher.
/*!
- * Can only can called once; subsequent calls will throw.
- * The call must be done prior to any GRPC client or server is created.
+ * The call must be done prior to any GRPC client or server is created.
+ * Can only be called before initialization, the future calls will throw.
*/
void Configure(const TDispatcherConfigPtr& config);
- [[nodiscard]] bool IsConfigured() const noexcept;
+
+ [[nodiscard]] bool IsInitialized() const noexcept;
TGrpcLibraryLockPtr GetLibraryLock();
TGuardedGrpcCompletionQueue* PickRandomGuardedCompletionQueue();