aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorsnaury <snaury@ydb.tech>2023-02-02 15:38:47 +0300
committersnaury <snaury@ydb.tech>2023-02-02 15:38:47 +0300
commitd356fcb037bfbba8740bde2db0f51a8b4aaa9128 (patch)
treec39c6a955daa46064270c2c9d00b83e0d54f98e5 /library/cpp
parent2a727f613dac73e5ad1e408c0d631860db8d448a (diff)
downloadydb-d356fcb037bfbba8740bde2db0f51a8b4aaa9128.tar.gz
Use several shards in grpc services for registered requests
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/grpc/server/grpc_server.h61
1 files changed, 43 insertions, 18 deletions
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index 1f31f7166a..6da5076046 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -129,6 +129,15 @@ class ICancelableContext {
public:
virtual void Shutdown() = 0;
virtual ~ICancelableContext() = default;
+
+private:
+ template<class T>
+ friend class TGrpcServiceBase;
+
+ // Shard assigned by RegisterRequestCtx. This field is not thread-safe
+ // because RegisterRequestCtx may only be called once for a single service,
+ // so it's only assigned once.
+ size_t ShardIndex = size_t(-1);
};
template <class TLimit>
@@ -253,13 +262,15 @@ public:
using TCurrentGRpcService = T;
void StopService() noexcept override {
- with_lock(Lock_) {
- AtomicSet(ShuttingDown_, 1);
-
- // Send TryCansel to event (can be send after finishing).
- // Actual dtors will be called from grpc thread, so deadlock impossible
- for (auto* request : Requests_) {
- request->Shutdown();
+ AtomicSet(ShuttingDown_, 1);
+
+ for (auto& shard : Shards_) {
+ with_lock(shard.Lock_) {
+ // Send TryCansel to event (can be send after finishing).
+ // Actual dtors will be called from grpc thread, so deadlock impossible
+ for (auto* request : shard.Requests_) {
+ request->Shutdown();
+ }
}
}
}
@@ -280,8 +291,10 @@ public:
size_t RequestsInProgress() const override {
size_t c = 0;
- with_lock(Lock_) {
- c = Requests_.size();
+ for (auto& shard : Shards_) {
+ with_lock(shard.Lock_) {
+ c += shard.Requests_.size();
+ }
}
return c;
}
@@ -307,23 +320,29 @@ public:
}
bool RegisterRequestCtx(ICancelableContext* req) {
- with_lock(Lock_) {
- auto r = Requests_.emplace(req);
- Y_VERIFY(r.second, "Ctx already registered");
+ if (Y_LIKELY(req->ShardIndex == size_t(-1))) {
+ req->ShardIndex = NextShard_.fetch_add(1, std::memory_order_relaxed) % Shards_.size();
+ }
+ auto& shard = Shards_[req->ShardIndex];
+ with_lock(shard.Lock_) {
if (IsShuttingDown()) {
- // Server is already shutting down
- Requests_.erase(r.first);
return false;
}
+
+ auto r = shard.Requests_.emplace(req);
+ Y_VERIFY(r.second, "Ctx already registered");
}
return true;
}
void DeregisterRequestCtx(ICancelableContext* req) {
- with_lock(Lock_) {
- Y_VERIFY(Requests_.erase(req), "Ctx is not registered");
+ Y_VERIFY(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index");
+
+ auto& shard = Shards_[req->ShardIndex];
+ with_lock(shard.Lock_) {
+ Y_VERIFY(shard.Requests_.erase(req), "Ctx is not registered");
}
}
@@ -342,8 +361,14 @@ private:
bool SslServer_ = false;
bool NeedAuth_ = false;
- THashSet<ICancelableContext*> Requests_;
- TAdaptiveLock Lock_;
+ struct TShard {
+ TAdaptiveLock Lock_;
+ THashSet<ICancelableContext*> Requests_;
+ };
+
+ // Note: benchmarks showed 4 shards is enough to scale to ~30 threads
+ TVector<TShard> Shards_{ size_t(4) };
+ std::atomic<size_t> NextShard_{ 0 };
};
class TGRpcServer {