diff options
author | snaury <snaury@ydb.tech> | 2023-02-02 15:38:47 +0300 |
---|---|---|
committer | snaury <snaury@ydb.tech> | 2023-02-02 15:38:47 +0300 |
commit | d356fcb037bfbba8740bde2db0f51a8b4aaa9128 (patch) | |
tree | c39c6a955daa46064270c2c9d00b83e0d54f98e5 /library/cpp | |
parent | 2a727f613dac73e5ad1e408c0d631860db8d448a (diff) | |
download | ydb-d356fcb037bfbba8740bde2db0f51a8b4aaa9128.tar.gz |
Use several shards in grpc services for registered requests
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 61 |
1 files changed, 43 insertions, 18 deletions
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index 1f31f7166a..6da5076046 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -129,6 +129,15 @@ class ICancelableContext { public: virtual void Shutdown() = 0; virtual ~ICancelableContext() = default; + +private: + template<class T> + friend class TGrpcServiceBase; + + // Shard assigned by RegisterRequestCtx. This field is not thread-safe + // because RegisterRequestCtx may only be called once for a single service, + // so it's only assigned once. + size_t ShardIndex = size_t(-1); }; template <class TLimit> @@ -253,13 +262,15 @@ public: using TCurrentGRpcService = T; void StopService() noexcept override { - with_lock(Lock_) { - AtomicSet(ShuttingDown_, 1); - - // Send TryCansel to event (can be send after finishing). - // Actual dtors will be called from grpc thread, so deadlock impossible - for (auto* request : Requests_) { - request->Shutdown(); + AtomicSet(ShuttingDown_, 1); + + for (auto& shard : Shards_) { + with_lock(shard.Lock_) { + // Send TryCansel to event (can be send after finishing). + // Actual dtors will be called from grpc thread, so deadlock impossible + for (auto* request : shard.Requests_) { + request->Shutdown(); + } } } } @@ -280,8 +291,10 @@ public: size_t RequestsInProgress() const override { size_t c = 0; - with_lock(Lock_) { - c = Requests_.size(); + for (auto& shard : Shards_) { + with_lock(shard.Lock_) { + c += shard.Requests_.size(); + } } return c; } @@ -307,23 +320,29 @@ public: } bool RegisterRequestCtx(ICancelableContext* req) { - with_lock(Lock_) { - auto r = Requests_.emplace(req); - Y_VERIFY(r.second, "Ctx already registered"); + if (Y_LIKELY(req->ShardIndex == size_t(-1))) { + req->ShardIndex = NextShard_.fetch_add(1, std::memory_order_relaxed) % Shards_.size(); + } + auto& shard = Shards_[req->ShardIndex]; + with_lock(shard.Lock_) { if (IsShuttingDown()) { - // Server is already shutting down - Requests_.erase(r.first); return false; } + + auto r = shard.Requests_.emplace(req); + Y_VERIFY(r.second, "Ctx already registered"); } return true; } void DeregisterRequestCtx(ICancelableContext* req) { - with_lock(Lock_) { - Y_VERIFY(Requests_.erase(req), "Ctx is not registered"); + Y_VERIFY(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index"); + + auto& shard = Shards_[req->ShardIndex]; + with_lock(shard.Lock_) { + Y_VERIFY(shard.Requests_.erase(req), "Ctx is not registered"); } } @@ -342,8 +361,14 @@ private: bool SslServer_ = false; bool NeedAuth_ = false; - THashSet<ICancelableContext*> Requests_; - TAdaptiveLock Lock_; + struct TShard { + TAdaptiveLock Lock_; + THashSet<ICancelableContext*> Requests_; + }; + + // Note: benchmarks showed 4 shards is enough to scale to ~30 threads + TVector<TShard> Shards_{ size_t(4) }; + std::atomic<size_t> NextShard_{ 0 }; }; class TGRpcServer { |