diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 10:54:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 12:28:07 +0300 |
commit | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch) | |
tree | c15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/grpc | |
parent | 8a749596d40e91c896a1907afcd108d9221fbde1 (diff) | |
download | ydb-e9cbe5c5cf67db853d223fd365c9f05b695f7b96.tar.gz |
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/grpc')
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 6 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 3 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.cpp | 37 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_server.h | 80 |
4 files changed, 87 insertions, 39 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index c4b7e9c040..4e869ef5f6 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -113,6 +113,10 @@ public: return FinishPromise_.GetFuture(); } + bool IsClientLost() const override { + return ClientLost_.load(); + } + TString GetPeer() const override { return TString(this->Context.peer()); } @@ -496,6 +500,7 @@ private: void OnFinish(EQueueEventStatus evStatus) { if (this->Context.IsCancelled()) { + ClientLost_.store(true); FinishPromise_.SetValue(EFinishStatus::CANCEL); } else { FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR); @@ -556,6 +561,7 @@ private: NThreading::TPromise<EFinishStatus> FinishPromise_; bool SkipUpdateCountersOnError = false; IStreamAdaptor::TPtr StreamAdaptor_; + std::atomic<bool> ClientLost_ = false; }; template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer> diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index 42b78ed7df..60b38805ed 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -116,6 +116,9 @@ public: //! Returns true if server is using ssl virtual bool SslServer() const = 0; + + //! Returns true if client was not interested in result (but we still must send response to make grpc happy) + virtual bool IsClientLost() const = 0; }; } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 97472206e2..0c05c7404e 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -130,11 +130,16 @@ void TGRpcServer::Start() { builder.SetOption(std::make_unique<TKeepAliveOption>()); } - if (Options_.UseCompletionQueuePerThread) { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - CQS_.push_back(builder.AddCompletionQueue()); - } - } else { + size_t completionQueueCount = 1; + if (Options_.WorkersPerCompletionQueue) { + size_t threadsPerQueue = Max(std::size_t{1}, Options_.WorkersPerCompletionQueue); + completionQueueCount = (Options_.WorkerThreads + threadsPerQueue - 1) / threadsPerQueue; // ceiling + } else if (Options_.UseCompletionQueuePerThread) { + completionQueueCount = Options_.WorkerThreads; + } + + CQS_.reserve(completionQueueCount); + for (size_t i = 0; i < completionQueueCount; ++i) { CQS_.push_back(builder.AddCompletionQueue()); } @@ -159,23 +164,15 @@ void TGRpcServer::Start() { size_t index = 0; for (IGRpcServicePtr service : Services_) { // TODO: provide something else for services instead of ServerCompletionQueue - service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger); + service->InitService(CQS_, Options_.Logger, index++); } - if (Options_.UseCompletionQueuePerThread) { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - auto* cq = &CQS_[i]; - Ts.push_back(SystemThreadFactory()->Run([cq] { - PullEvents(cq->get()); - })); - } - } else { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - auto* cq = &CQS_[0]; - Ts.push_back(SystemThreadFactory()->Run([cq] { - PullEvents(cq->get()); - })); - } + Ts.reserve(Options_.WorkerThreads); + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + auto* cq = &CQS_[i % CQS_.size()]; + Ts.push_back(SystemThreadFactory()->Run([cq] { + PullEvents(cq->get()); + })); } if (Options_.ExternalListener) { diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index c9b48a6676..6da5076046 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -54,7 +54,14 @@ struct TServerOptions { //! Number of worker threads. DECLARE_FIELD(WorkerThreads, size_t, 2); - //! Create one completion queue per thread + //! Number of workers per completion queue, i.e. when + // WorkerThreads=8 and PriorityWorkersPerCompletionQueue=2 + // there will be 4 completion queues. When set to 0 then + // only UseCompletionQueuePerThread affects number of CQ. + DECLARE_FIELD(WorkersPerCompletionQueue, size_t, 0); + + //! Obsolete. Create one completion queue per thread. + // Setting true equals to the WorkersPerCompletionQueue=1 DECLARE_FIELD(UseCompletionQueuePerThread, bool, false); //! Memory quota size for grpc server in bytes. Zero means unlimited. @@ -122,6 +129,15 @@ class ICancelableContext { public: virtual void Shutdown() = 0; virtual ~ICancelableContext() = default; + +private: + template<class T> + friend class TGrpcServiceBase; + + // Shard assigned by RegisterRequestCtx. This field is not thread-safe + // because RegisterRequestCtx may only be called once for a single service, + // so it's only assigned once. + size_t ShardIndex = size_t(-1); }; template <class TLimit> @@ -166,7 +182,17 @@ class IGRpcService: public TThrRefBase { public: virtual grpc::Service* GetService() = 0; virtual void StopService() noexcept = 0; + virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0; + + virtual void InitService( + const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs, + TLoggerPtr logger, + size_t index) + { + InitService(cqs[index % cqs.size()].get(), logger); + } + virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0; virtual bool IsUnsafeToShutdown() const = 0; virtual size_t RequestsInProgress() const = 0; @@ -236,13 +262,15 @@ public: using TCurrentGRpcService = T; void StopService() noexcept override { - with_lock(Lock_) { - AtomicSet(ShuttingDown_, 1); - - // Send TryCansel to event (can be send after finishing). - // Actual dtors will be called from grpc thread, so deadlock impossible - for (auto* request : Requests_) { - request->Shutdown(); + AtomicSet(ShuttingDown_, 1); + + for (auto& shard : Shards_) { + with_lock(shard.Lock_) { + // Send TryCansel to event (can be send after finishing). + // Actual dtors will be called from grpc thread, so deadlock impossible + for (auto* request : shard.Requests_) { + request->Shutdown(); + } } } } @@ -263,8 +291,10 @@ public: size_t RequestsInProgress() const override { size_t c = 0; - with_lock(Lock_) { - c = Requests_.size(); + for (auto& shard : Shards_) { + with_lock(shard.Lock_) { + c += shard.Requests_.size(); + } } return c; } @@ -290,23 +320,29 @@ public: } bool RegisterRequestCtx(ICancelableContext* req) { - with_lock(Lock_) { - auto r = Requests_.emplace(req); - Y_VERIFY(r.second, "Ctx already registered"); + if (Y_LIKELY(req->ShardIndex == size_t(-1))) { + req->ShardIndex = NextShard_.fetch_add(1, std::memory_order_relaxed) % Shards_.size(); + } + auto& shard = Shards_[req->ShardIndex]; + with_lock(shard.Lock_) { if (IsShuttingDown()) { - // Server is already shutting down - Requests_.erase(r.first); return false; } + + auto r = shard.Requests_.emplace(req); + Y_VERIFY(r.second, "Ctx already registered"); } return true; } void DeregisterRequestCtx(ICancelableContext* req) { - with_lock(Lock_) { - Y_VERIFY(Requests_.erase(req), "Ctx is not registered"); + Y_VERIFY(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index"); + + auto& shard = Shards_[req->ShardIndex]; + with_lock(shard.Lock_) { + Y_VERIFY(shard.Requests_.erase(req), "Ctx is not registered"); } } @@ -325,8 +361,14 @@ private: bool SslServer_ = false; bool NeedAuth_ = false; - THashSet<ICancelableContext*> Requests_; - TAdaptiveLock Lock_; + struct TShard { + TAdaptiveLock Lock_; + THashSet<ICancelableContext*> Requests_; + }; + + // Note: benchmarks showed 4 shards is enough to scale to ~30 threads + TVector<TShard> Shards_{ size_t(4) }; + std::atomic<size_t> NextShard_{ 0 }; }; class TGRpcServer { |