aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 10:54:08 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 12:28:07 +0300
commitfc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch)
treec15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/grpc
parent8a749596d40e91c896a1907afcd108d9221fbde1 (diff)
downloadydb-e9cbe5c5cf67db853d223fd365c9f05b695f7b96.tar.gz
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/grpc')
-rw-r--r--library/cpp/grpc/server/grpc_request.h6
-rw-r--r--library/cpp/grpc/server/grpc_request_base.h3
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp37
-rw-r--r--library/cpp/grpc/server/grpc_server.h80
4 files changed, 87 insertions, 39 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index c4b7e9c040..4e869ef5f6 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -113,6 +113,10 @@ public:
return FinishPromise_.GetFuture();
}
+ bool IsClientLost() const override {
+ return ClientLost_.load();
+ }
+
TString GetPeer() const override {
return TString(this->Context.peer());
}
@@ -496,6 +500,7 @@ private:
void OnFinish(EQueueEventStatus evStatus) {
if (this->Context.IsCancelled()) {
+ ClientLost_.store(true);
FinishPromise_.SetValue(EFinishStatus::CANCEL);
} else {
FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR);
@@ -556,6 +561,7 @@ private:
NThreading::TPromise<EFinishStatus> FinishPromise_;
bool SkipUpdateCountersOnError = false;
IStreamAdaptor::TPtr StreamAdaptor_;
+ std::atomic<bool> ClientLost_ = false;
};
template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer>
diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h
index 42b78ed7df..60b38805ed 100644
--- a/library/cpp/grpc/server/grpc_request_base.h
+++ b/library/cpp/grpc/server/grpc_request_base.h
@@ -116,6 +116,9 @@ public:
//! Returns true if server is using ssl
virtual bool SslServer() const = 0;
+
+ //! Returns true if client was not interested in result (but we still must send response to make grpc happy)
+ virtual bool IsClientLost() const = 0;
};
} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 97472206e2..0c05c7404e 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -130,11 +130,16 @@ void TGRpcServer::Start() {
builder.SetOption(std::make_unique<TKeepAliveOption>());
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- CQS_.push_back(builder.AddCompletionQueue());
- }
- } else {
+ size_t completionQueueCount = 1;
+ if (Options_.WorkersPerCompletionQueue) {
+ size_t threadsPerQueue = Max(std::size_t{1}, Options_.WorkersPerCompletionQueue);
+ completionQueueCount = (Options_.WorkerThreads + threadsPerQueue - 1) / threadsPerQueue; // ceiling
+ } else if (Options_.UseCompletionQueuePerThread) {
+ completionQueueCount = Options_.WorkerThreads;
+ }
+
+ CQS_.reserve(completionQueueCount);
+ for (size_t i = 0; i < completionQueueCount; ++i) {
CQS_.push_back(builder.AddCompletionQueue());
}
@@ -159,23 +164,15 @@ void TGRpcServer::Start() {
size_t index = 0;
for (IGRpcServicePtr service : Services_) {
// TODO: provide something else for services instead of ServerCompletionQueue
- service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
+ service->InitService(CQS_, Options_.Logger, index++);
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[i];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
- } else {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[0];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
+ Ts.reserve(Options_.WorkerThreads);
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ auto* cq = &CQS_[i % CQS_.size()];
+ Ts.push_back(SystemThreadFactory()->Run([cq] {
+ PullEvents(cq->get());
+ }));
}
if (Options_.ExternalListener) {
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index c9b48a6676..6da5076046 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -54,7 +54,14 @@ struct TServerOptions {
//! Number of worker threads.
DECLARE_FIELD(WorkerThreads, size_t, 2);
- //! Create one completion queue per thread
+ //! Number of workers per completion queue, i.e. when
+ // WorkerThreads=8 and PriorityWorkersPerCompletionQueue=2
+ // there will be 4 completion queues. When set to 0 then
+ // only UseCompletionQueuePerThread affects number of CQ.
+ DECLARE_FIELD(WorkersPerCompletionQueue, size_t, 0);
+
+ //! Obsolete. Create one completion queue per thread.
+ // Setting true equals to the WorkersPerCompletionQueue=1
DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
//! Memory quota size for grpc server in bytes. Zero means unlimited.
@@ -122,6 +129,15 @@ class ICancelableContext {
public:
virtual void Shutdown() = 0;
virtual ~ICancelableContext() = default;
+
+private:
+ template<class T>
+ friend class TGrpcServiceBase;
+
+ // Shard assigned by RegisterRequestCtx. This field is not thread-safe
+ // because RegisterRequestCtx may only be called once for a single service,
+ // so it's only assigned once.
+ size_t ShardIndex = size_t(-1);
};
template <class TLimit>
@@ -166,7 +182,17 @@ class IGRpcService: public TThrRefBase {
public:
virtual grpc::Service* GetService() = 0;
virtual void StopService() noexcept = 0;
+
virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;
+
+ virtual void InitService(
+ const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs,
+ TLoggerPtr logger,
+ size_t index)
+ {
+ InitService(cqs[index % cqs.size()].get(), logger);
+ }
+
virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
virtual bool IsUnsafeToShutdown() const = 0;
virtual size_t RequestsInProgress() const = 0;
@@ -236,13 +262,15 @@ public:
using TCurrentGRpcService = T;
void StopService() noexcept override {
- with_lock(Lock_) {
- AtomicSet(ShuttingDown_, 1);
-
- // Send TryCansel to event (can be send after finishing).
- // Actual dtors will be called from grpc thread, so deadlock impossible
- for (auto* request : Requests_) {
- request->Shutdown();
+ AtomicSet(ShuttingDown_, 1);
+
+ for (auto& shard : Shards_) {
+ with_lock(shard.Lock_) {
+ // Send TryCansel to event (can be send after finishing).
+ // Actual dtors will be called from grpc thread, so deadlock impossible
+ for (auto* request : shard.Requests_) {
+ request->Shutdown();
+ }
}
}
}
@@ -263,8 +291,10 @@ public:
size_t RequestsInProgress() const override {
size_t c = 0;
- with_lock(Lock_) {
- c = Requests_.size();
+ for (auto& shard : Shards_) {
+ with_lock(shard.Lock_) {
+ c += shard.Requests_.size();
+ }
}
return c;
}
@@ -290,23 +320,29 @@ public:
}
bool RegisterRequestCtx(ICancelableContext* req) {
- with_lock(Lock_) {
- auto r = Requests_.emplace(req);
- Y_VERIFY(r.second, "Ctx already registered");
+ if (Y_LIKELY(req->ShardIndex == size_t(-1))) {
+ req->ShardIndex = NextShard_.fetch_add(1, std::memory_order_relaxed) % Shards_.size();
+ }
+ auto& shard = Shards_[req->ShardIndex];
+ with_lock(shard.Lock_) {
if (IsShuttingDown()) {
- // Server is already shutting down
- Requests_.erase(r.first);
return false;
}
+
+ auto r = shard.Requests_.emplace(req);
+ Y_VERIFY(r.second, "Ctx already registered");
}
return true;
}
void DeregisterRequestCtx(ICancelableContext* req) {
- with_lock(Lock_) {
- Y_VERIFY(Requests_.erase(req), "Ctx is not registered");
+ Y_VERIFY(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index");
+
+ auto& shard = Shards_[req->ShardIndex];
+ with_lock(shard.Lock_) {
+ Y_VERIFY(shard.Requests_.erase(req), "Ctx is not registered");
}
}
@@ -325,8 +361,14 @@ private:
bool SslServer_ = false;
bool NeedAuth_ = false;
- THashSet<ICancelableContext*> Requests_;
- TAdaptiveLock Lock_;
+ struct TShard {
+ TAdaptiveLock Lock_;
+ THashSet<ICancelableContext*> Requests_;
+ };
+
+ // Note: benchmarks showed 4 shards is enough to scale to ~30 threads
+ TVector<TShard> Shards_{ size_t(4) };
+ std::atomic<size_t> NextShard_{ 0 };
};
class TGRpcServer {