aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc
diff options
context:
space:
mode:
authorAlexey Borzenkov <snaury@yandex-team.ru>2022-02-10 16:47:43 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:47:43 +0300
commit330c83f8c116bd45316397b179275e9d87007e7d (patch)
treec0748b5dcbade83af788c0abfa89c0383d6b779c /library/cpp/grpc
parent22d92781ba2a10b7fb5b977b7d1a5c40ff53885f (diff)
downloadydb-330c83f8c116bd45316397b179275e9d87007e7d.tar.gz
Restoring authorship annotation for Alexey Borzenkov <snaury@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp574
-rw-r--r--library/cpp/grpc/client/grpc_client_low.h1234
-rw-r--r--library/cpp/grpc/client/grpc_common.h16
-rw-r--r--library/cpp/grpc/server/grpc_counters.h12
-rw-r--r--library/cpp/grpc/server/grpc_request.h56
-rw-r--r--library/cpp/grpc/server/grpc_request_base.h6
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp12
-rw-r--r--library/cpp/grpc/server/grpc_server.h198
8 files changed, 1054 insertions, 1054 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
index bba83aee02..73cc908ef8 100644
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ b/library/cpp/grpc/client/grpc_client_low.cpp
@@ -193,214 +193,214 @@ static void PullEvents(grpc::CompletionQueue* cq) {
}
}
-class TGRpcClientLow::TContextImpl final
- : public std::enable_shared_from_this<TContextImpl>
- , public IQueueClientContext
-{
- friend class TGRpcClientLow;
-
- using TCallback = std::function<void()>;
- using TContextPtr = std::shared_ptr<TContextImpl>;
-
-public:
+class TGRpcClientLow::TContextImpl final
+ : public std::enable_shared_from_this<TContextImpl>
+ , public IQueueClientContext
+{
+ friend class TGRpcClientLow;
+
+ using TCallback = std::function<void()>;
+ using TContextPtr = std::shared_ptr<TContextImpl>;
+
+public:
~TContextImpl() override {
- Y_VERIFY(CountChildren() == 0,
- "Destructor called with non-empty children");
-
- if (Parent) {
- Parent->ForgetContext(this);
- } else if (Y_LIKELY(Owner)) {
- Owner->ForgetContext(this);
- }
- }
-
- /**
- * Helper for locking child pointer from a parent container
- */
- static TContextPtr LockChildPtr(TContextImpl* ptr) {
- if (ptr) {
- // N.B. it is safe to do as long as it's done under a mutex and
- // pointer is among valid children. When that's the case we
- // know that TContextImpl destructor has not finished yet, so
- // the object is valid. The lock() method may return nullptr
- // though, if the object is being destructed right now.
- return ptr->weak_from_this().lock();
- } else {
- return nullptr;
- }
- }
-
- void ForgetContext(TContextImpl* child) {
- std::unique_lock<std::mutex> guard(Mutex);
-
- auto removed = RemoveChild(child);
- Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child);
- }
-
- IQueueClientContextPtr CreateContext() override {
- auto self = shared_from_this();
- auto child = std::make_shared<TContextImpl>();
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
-
- AddChild(child.get());
-
- // It's now safe to initialize parent and owner
- child->Parent = std::move(self);
- child->Owner = Owner;
+ Y_VERIFY(CountChildren() == 0,
+ "Destructor called with non-empty children");
+
+ if (Parent) {
+ Parent->ForgetContext(this);
+ } else if (Y_LIKELY(Owner)) {
+ Owner->ForgetContext(this);
+ }
+ }
+
+ /**
+ * Helper for locking child pointer from a parent container
+ */
+ static TContextPtr LockChildPtr(TContextImpl* ptr) {
+ if (ptr) {
+ // N.B. it is safe to do as long as it's done under a mutex and
+ // pointer is among valid children. When that's the case we
+ // know that TContextImpl destructor has not finished yet, so
+ // the object is valid. The lock() method may return nullptr
+ // though, if the object is being destructed right now.
+ return ptr->weak_from_this().lock();
+ } else {
+ return nullptr;
+ }
+ }
+
+ void ForgetContext(TContextImpl* child) {
+ std::unique_lock<std::mutex> guard(Mutex);
+
+ auto removed = RemoveChild(child);
+ Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child);
+ }
+
+ IQueueClientContextPtr CreateContext() override {
+ auto self = shared_from_this();
+ auto child = std::make_shared<TContextImpl>();
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+
+ AddChild(child.get());
+
+ // It's now safe to initialize parent and owner
+ child->Parent = std::move(self);
+ child->Owner = Owner;
child->CQ = CQ;
-
- // Propagate cancellation to a child context
- if (Cancelled.load(std::memory_order_relaxed)) {
- child->Cancelled.store(true, std::memory_order_relaxed);
- }
- }
-
- return child;
- }
-
- grpc::CompletionQueue* CompletionQueue() override {
- Y_VERIFY(Owner, "Uninitialized context");
+
+ // Propagate cancellation to a child context
+ if (Cancelled.load(std::memory_order_relaxed)) {
+ child->Cancelled.store(true, std::memory_order_relaxed);
+ }
+ }
+
+ return child;
+ }
+
+ grpc::CompletionQueue* CompletionQueue() override {
+ Y_VERIFY(Owner, "Uninitialized context");
return CQ;
- }
-
- bool IsCancelled() const override {
- return Cancelled.load(std::memory_order_acquire);
- }
-
- bool Cancel() override {
- TStackVec<TCallback, 1> callbacks;
- TStackVec<TContextPtr, 2> children;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
-
- if (Cancelled.load(std::memory_order_relaxed)) {
- // Already cancelled in another thread
- return false;
- }
-
- callbacks.reserve(Callbacks.size());
- children.reserve(CountChildren());
-
- for (auto& callback : Callbacks) {
- callbacks.emplace_back().swap(callback);
- }
- Callbacks.clear();
-
- // Collect all children we need to cancel
- // N.B. we don't clear children links (cleared by destructors)
- // N.B. some children may be stuck in destructors at the moment
- for (TContextImpl* ptr : InlineChildren) {
- if (auto child = LockChildPtr(ptr)) {
- children.emplace_back(std::move(child));
- }
- }
- for (auto* ptr : Children) {
- if (auto child = LockChildPtr(ptr)) {
- children.emplace_back(std::move(child));
- }
- }
-
- Cancelled.store(true, std::memory_order_release);
- }
-
- // Call directly subscribed callbacks
- if (callbacks) {
- RunCallbacksNoExcept(callbacks);
- }
-
- // Cancel all children
- for (auto& child : children) {
- child->Cancel();
- child.reset();
- }
-
- return true;
- }
-
- void SubscribeCancel(TCallback callback) override {
- Y_VERIFY(callback, "SubscribeCancel called with an empty callback");
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
-
- if (!Cancelled.load(std::memory_order_relaxed)) {
- Callbacks.emplace_back().swap(callback);
- return;
- }
- }
-
- // Already cancelled, run immediately
- callback();
- }
-
-private:
- void AddChild(TContextImpl* child) {
- for (TContextImpl*& slot : InlineChildren) {
- if (!slot) {
- slot = child;
- return;
- }
- }
-
- Children.insert(child);
- }
-
- bool RemoveChild(TContextImpl* child) {
- for (TContextImpl*& slot : InlineChildren) {
- if (slot == child) {
- slot = nullptr;
- return true;
- }
- }
-
- return Children.erase(child);
- }
-
- size_t CountChildren() {
- size_t count = 0;
-
- for (TContextImpl* ptr : InlineChildren) {
- if (ptr) {
- ++count;
- }
- }
-
- return count + Children.size();
- }
-
- template<class TCallbacks>
- static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept {
- for (auto& callback : callbacks) {
- if (callback) {
- callback();
- callback = nullptr;
- }
- }
- }
-
-private:
- // We want a simple lock here, without extra memory allocations
- std::mutex Mutex;
-
- // These fields are initialized on successful registration
- TContextPtr Parent;
- TGRpcClientLow* Owner = nullptr;
+ }
+
+ bool IsCancelled() const override {
+ return Cancelled.load(std::memory_order_acquire);
+ }
+
+ bool Cancel() override {
+ TStackVec<TCallback, 1> callbacks;
+ TStackVec<TContextPtr, 2> children;
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+
+ if (Cancelled.load(std::memory_order_relaxed)) {
+ // Already cancelled in another thread
+ return false;
+ }
+
+ callbacks.reserve(Callbacks.size());
+ children.reserve(CountChildren());
+
+ for (auto& callback : Callbacks) {
+ callbacks.emplace_back().swap(callback);
+ }
+ Callbacks.clear();
+
+ // Collect all children we need to cancel
+ // N.B. we don't clear children links (cleared by destructors)
+ // N.B. some children may be stuck in destructors at the moment
+ for (TContextImpl* ptr : InlineChildren) {
+ if (auto child = LockChildPtr(ptr)) {
+ children.emplace_back(std::move(child));
+ }
+ }
+ for (auto* ptr : Children) {
+ if (auto child = LockChildPtr(ptr)) {
+ children.emplace_back(std::move(child));
+ }
+ }
+
+ Cancelled.store(true, std::memory_order_release);
+ }
+
+ // Call directly subscribed callbacks
+ if (callbacks) {
+ RunCallbacksNoExcept(callbacks);
+ }
+
+ // Cancel all children
+ for (auto& child : children) {
+ child->Cancel();
+ child.reset();
+ }
+
+ return true;
+ }
+
+ void SubscribeCancel(TCallback callback) override {
+ Y_VERIFY(callback, "SubscribeCancel called with an empty callback");
+
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+
+ if (!Cancelled.load(std::memory_order_relaxed)) {
+ Callbacks.emplace_back().swap(callback);
+ return;
+ }
+ }
+
+ // Already cancelled, run immediately
+ callback();
+ }
+
+private:
+ void AddChild(TContextImpl* child) {
+ for (TContextImpl*& slot : InlineChildren) {
+ if (!slot) {
+ slot = child;
+ return;
+ }
+ }
+
+ Children.insert(child);
+ }
+
+ bool RemoveChild(TContextImpl* child) {
+ for (TContextImpl*& slot : InlineChildren) {
+ if (slot == child) {
+ slot = nullptr;
+ return true;
+ }
+ }
+
+ return Children.erase(child);
+ }
+
+ size_t CountChildren() {
+ size_t count = 0;
+
+ for (TContextImpl* ptr : InlineChildren) {
+ if (ptr) {
+ ++count;
+ }
+ }
+
+ return count + Children.size();
+ }
+
+ template<class TCallbacks>
+ static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept {
+ for (auto& callback : callbacks) {
+ if (callback) {
+ callback();
+ callback = nullptr;
+ }
+ }
+ }
+
+private:
+ // We want a simple lock here, without extra memory allocations
+ std::mutex Mutex;
+
+ // These fields are initialized on successful registration
+ TContextPtr Parent;
+ TGRpcClientLow* Owner = nullptr;
grpc::CompletionQueue* CQ = nullptr;
-
- // Some children are stored inline, others are in a set
- std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
+
+ // Some children are stored inline, others are in a set
+ std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
std::unordered_set<TContextImpl*> Children;
-
- // Single callback is stored without extra allocations
- TStackVec<TCallback, 1> Callbacks;
-
- // Atomic flag for a faster IsCancelled() implementation
- std::atomic<bool> Cancelled;
-};
-
+
+ // Single callback is stored without extra allocations
+ TStackVec<TCallback, 1> Callbacks;
+
+ // Atomic flag for a faster IsCancelled() implementation
+ std::atomic<bool> Cancelled;
+};
+
TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
: UseCompletionQueuePerThread_(useCompletionQueuePerThread)
{
@@ -408,7 +408,7 @@ TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePe
}
void TGRpcClientLow::Init(size_t numWorkerThread) {
- SetCqState(WORKING);
+ SetCqState(WORKING);
if (UseCompletionQueuePerThread_) {
for (size_t i = 0; i < numWorkerThread; i++) {
CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
@@ -428,7 +428,7 @@ void TGRpcClientLow::Init(size_t numWorkerThread) {
}
}
-void TGRpcClientLow::AddWorkerThreadForTest() {
+void TGRpcClientLow::AddWorkerThreadForTest() {
if (UseCompletionQueuePerThread_) {
CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
auto* cq = CQS_.back().get();
@@ -441,75 +441,75 @@ void TGRpcClientLow::AddWorkerThreadForTest() {
PullEvents(cq);
}).Release());
}
-}
-
-TGRpcClientLow::~TGRpcClientLow() {
- StopInternal(true);
- WaitInternal();
-}
-
-void TGRpcClientLow::Stop(bool wait) {
- StopInternal(false);
-
- if (wait) {
- WaitInternal();
+}
+
+TGRpcClientLow::~TGRpcClientLow() {
+ StopInternal(true);
+ WaitInternal();
+}
+
+void TGRpcClientLow::Stop(bool wait) {
+ StopInternal(false);
+
+ if (wait) {
+ WaitInternal();
}
}
-void TGRpcClientLow::StopInternal(bool silent) {
- bool shutdown;
-
- TVector<TContextImpl::TContextPtr> cancelQueue;
-
+void TGRpcClientLow::StopInternal(bool silent) {
+ bool shutdown;
+
+ TVector<TContextImpl::TContextPtr> cancelQueue;
+
{
std::unique_lock<std::mutex> guard(Mtx_);
- auto allowStateChange = [&]() {
- switch (GetCqState()) {
- case WORKING:
- return true;
- case STOP_SILENT:
- return !silent;
- case STOP_EXPLICIT:
- return false;
- }
-
- Y_UNREACHABLE();
- };
-
- if (!allowStateChange()) {
- // Completion queue is already stopping
- return;
- }
-
- SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
-
+ auto allowStateChange = [&]() {
+ switch (GetCqState()) {
+ case WORKING:
+ return true;
+ case STOP_SILENT:
+ return !silent;
+ case STOP_EXPLICIT:
+ return false;
+ }
+
+ Y_UNREACHABLE();
+ };
+
+ if (!allowStateChange()) {
+ // Completion queue is already stopping
+ return;
+ }
+
+ SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
+
if (!silent && !Contexts_.empty()) {
- cancelQueue.reserve(Contexts_.size());
- for (auto* ptr : Contexts_) {
- // N.B. some contexts may be stuck in destructors
- if (auto context = TContextImpl::LockChildPtr(ptr)) {
- cancelQueue.emplace_back(std::move(context));
- }
- }
- }
-
+ cancelQueue.reserve(Contexts_.size());
+ for (auto* ptr : Contexts_) {
+ // N.B. some contexts may be stuck in destructors
+ if (auto context = TContextImpl::LockChildPtr(ptr)) {
+ cancelQueue.emplace_back(std::move(context));
+ }
+ }
+ }
+
shutdown = Contexts_.empty();
- }
-
- for (auto& context : cancelQueue) {
- context->Cancel();
- context.reset();
- }
-
- if (shutdown) {
+ }
+
+ for (auto& context : cancelQueue) {
+ context->Cancel();
+ context.reset();
+ }
+
+ if (shutdown) {
for (auto& cq : CQS_) {
cq->Shutdown();
}
}
-}
-
-void TGRpcClientLow::WaitInternal() {
+}
+
+void TGRpcClientLow::WaitInternal() {
std::unique_lock<std::mutex> guard(JoinMutex_);
for (auto& ti : WorkerThreads_) {
@@ -517,7 +517,7 @@ void TGRpcClientLow::WaitInternal() {
}
}
-void TGRpcClientLow::WaitIdle() {
+void TGRpcClientLow::WaitIdle() {
std::unique_lock<std::mutex> guard(Mtx_);
while (!Contexts_.empty()) {
@@ -525,7 +525,7 @@ void TGRpcClientLow::WaitIdle() {
}
}
-std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
+std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
std::unique_lock<std::mutex> guard(Mtx_);
auto allowCreateContext = [&]() {
@@ -535,15 +535,15 @@ std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
case STOP_SILENT:
case STOP_EXPLICIT:
return false;
- }
-
+ }
+
Y_UNREACHABLE();
};
if (!allowCreateContext()) {
// New context creation is forbidden
return nullptr;
- }
+ }
auto context = std::make_shared<TContextImpl>();
Contexts_.insert(context.get());
@@ -554,32 +554,32 @@ std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
context->CQ = CQS_[0].get();
}
return context;
-}
-
-void TGRpcClientLow::ForgetContext(TContextImpl* context) {
- bool shutdown = false;
-
+}
+
+void TGRpcClientLow::ForgetContext(TContextImpl* context) {
+ bool shutdown = false;
+
{
std::unique_lock<std::mutex> guard(Mtx_);
- if (!Contexts_.erase(context)) {
- Y_FAIL("Unexpected ForgetContext(%p)", context);
- }
-
- if (Contexts_.empty()) {
- if (IsStopping()) {
- shutdown = true;
- }
-
+ if (!Contexts_.erase(context)) {
+ Y_FAIL("Unexpected ForgetContext(%p)", context);
+ }
+
+ if (Contexts_.empty()) {
+ if (IsStopping()) {
+ shutdown = true;
+ }
+
ContextsEmpty_.notify_all();
- }
- }
-
- if (shutdown) {
- // This was the last context, shutdown CQ
+ }
+ }
+
+ if (shutdown) {
+ // This was the last context, shutdown CQ
for (auto& cq : CQS_) {
cq->Shutdown();
- }
+ }
}
}
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h
index 5d0983f804..ab0a0627be 100644
--- a/library/cpp/grpc/client/grpc_client_low.h
+++ b/library/cpp/grpc/client/grpc_client_low.h
@@ -52,125 +52,125 @@ public:
virtual void Destroy() = 0;
};
-// Implementation of IQueueClientEvent that reduces allocations
-template<class TSelf>
-class TQueueClientFixedEvent : private IQueueClientEvent {
- using TCallback = void (TSelf::*)(bool);
-
-public:
- TQueueClientFixedEvent(TSelf* self, TCallback callback)
- : Self(self)
- , Callback(callback)
- { }
-
- IQueueClientEvent* Prepare() {
- Self->Ref();
- return this;
- }
-
-private:
- bool Execute(bool ok) override {
- ((*Self).*Callback)(ok);
- return false;
- }
-
- void Destroy() override {
- Self->UnRef();
- }
-
-private:
- TSelf* const Self;
- TCallback const Callback;
-};
-
-class IQueueClientContext;
-using IQueueClientContextPtr = std::shared_ptr<IQueueClientContext>;
-
-// Provider of IQueueClientContext instances
-class IQueueClientContextProvider {
-public:
- virtual ~IQueueClientContextProvider() = default;
-
- virtual IQueueClientContextPtr CreateContext() = 0;
-};
-
-// Activity context for a low-level client
-class IQueueClientContext : public IQueueClientContextProvider {
-public:
- virtual ~IQueueClientContext() = default;
-
- //! Returns CompletionQueue associated with the client
- virtual grpc::CompletionQueue* CompletionQueue() = 0;
-
- //! Returns true if context has been cancelled
- virtual bool IsCancelled() const = 0;
-
- //! Tries to cancel context, calling all registered callbacks
- virtual bool Cancel() = 0;
-
- //! Subscribes callback to cancellation
- //
- // Note there's no way to unsubscribe, if subscription is temporary
- // make sure you create a new context with CreateContext and release
- // it as soon as it's no longer needed.
- virtual void SubscribeCancel(std::function<void()> callback) = 0;
-
- //! Subscribes callback to cancellation
- //
- // This alias is for compatibility with older code.
- void SubscribeStop(std::function<void()> callback) {
- SubscribeCancel(std::move(callback));
- }
-};
-
+// Implementation of IQueueClientEvent that reduces allocations
+template<class TSelf>
+class TQueueClientFixedEvent : private IQueueClientEvent {
+ using TCallback = void (TSelf::*)(bool);
+
+public:
+ TQueueClientFixedEvent(TSelf* self, TCallback callback)
+ : Self(self)
+ , Callback(callback)
+ { }
+
+ IQueueClientEvent* Prepare() {
+ Self->Ref();
+ return this;
+ }
+
+private:
+ bool Execute(bool ok) override {
+ ((*Self).*Callback)(ok);
+ return false;
+ }
+
+ void Destroy() override {
+ Self->UnRef();
+ }
+
+private:
+ TSelf* const Self;
+ TCallback const Callback;
+};
+
+class IQueueClientContext;
+using IQueueClientContextPtr = std::shared_ptr<IQueueClientContext>;
+
+// Provider of IQueueClientContext instances
+class IQueueClientContextProvider {
+public:
+ virtual ~IQueueClientContextProvider() = default;
+
+ virtual IQueueClientContextPtr CreateContext() = 0;
+};
+
+// Activity context for a low-level client
+class IQueueClientContext : public IQueueClientContextProvider {
+public:
+ virtual ~IQueueClientContext() = default;
+
+ //! Returns CompletionQueue associated with the client
+ virtual grpc::CompletionQueue* CompletionQueue() = 0;
+
+ //! Returns true if context has been cancelled
+ virtual bool IsCancelled() const = 0;
+
+ //! Tries to cancel context, calling all registered callbacks
+ virtual bool Cancel() = 0;
+
+ //! Subscribes callback to cancellation
+ //
+ // Note there's no way to unsubscribe, if subscription is temporary
+ // make sure you create a new context with CreateContext and release
+ // it as soon as it's no longer needed.
+ virtual void SubscribeCancel(std::function<void()> callback) = 0;
+
+ //! Subscribes callback to cancellation
+ //
+ // This alias is for compatibility with older code.
+ void SubscribeStop(std::function<void()> callback) {
+ SubscribeCancel(std::move(callback));
+ }
+};
+
// Represents grpc status and error message string
struct TGrpcStatus {
- TString Msg;
+ TString Msg;
TString Details;
- int GRpcStatusCode;
- bool InternalError;
-
- TGrpcStatus()
- : GRpcStatusCode(grpc::StatusCode::OK)
- , InternalError(false)
- { }
-
- TGrpcStatus(TString msg, int statusCode, bool internalError)
- : Msg(std::move(msg))
- , GRpcStatusCode(statusCode)
- , InternalError(internalError)
- { }
-
+ int GRpcStatusCode;
+ bool InternalError;
+
+ TGrpcStatus()
+ : GRpcStatusCode(grpc::StatusCode::OK)
+ , InternalError(false)
+ { }
+
+ TGrpcStatus(TString msg, int statusCode, bool internalError)
+ : Msg(std::move(msg))
+ , GRpcStatusCode(statusCode)
+ , InternalError(internalError)
+ { }
+
TGrpcStatus(grpc::StatusCode status, TString msg, TString details = {})
- : Msg(std::move(msg))
+ : Msg(std::move(msg))
, Details(std::move(details))
- , GRpcStatusCode(status)
- , InternalError(false)
- { }
-
- TGrpcStatus(const grpc::Status& status)
+ , GRpcStatusCode(status)
+ , InternalError(false)
+ { }
+
+ TGrpcStatus(const grpc::Status& status)
: TGrpcStatus(status.error_code(), TString(status.error_message()), TString(status.error_details()))
- { }
-
- TGrpcStatus& operator=(const grpc::Status& status) {
- Msg = TString(status.error_message());
+ { }
+
+ TGrpcStatus& operator=(const grpc::Status& status) {
+ Msg = TString(status.error_message());
Details = TString(status.error_details());
- GRpcStatusCode = status.error_code();
- InternalError = false;
- return *this;
- }
-
- static TGrpcStatus Internal(TString msg) {
- return { std::move(msg), -1, true };
- }
-
- bool Ok() const {
- return !InternalError && GRpcStatusCode == grpc::StatusCode::OK;
- }
+ GRpcStatusCode = status.error_code();
+ InternalError = false;
+ return *this;
+ }
+
+ static TGrpcStatus Internal(TString msg) {
+ return { std::move(msg), -1, true };
+ }
+
+ bool Ok() const {
+ return !InternalError && GRpcStatusCode == grpc::StatusCode::OK;
+ }
};
bool inline IsGRpcStatusGood(const TGrpcStatus& status) {
- return status.Ok();
+ return status.Ok();
}
// Response callback type - this callback will be called when request is finished
@@ -241,9 +241,9 @@ public:
{ }
~TSimpleRequestProcessor() {
- if (!Replied_ && Callback_) {
- Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
+ if (!Replied_ && Callback_) {
+ Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
+ Callback_ = nullptr; // free resources as early as possible
}
}
@@ -251,53 +251,53 @@ public:
{
std::unique_lock<std::mutex> guard(Mutex_);
LocalContext.reset();
- }
- TGrpcStatus status;
- if (ok) {
+ }
+ TGrpcStatus status;
+ if (ok) {
status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
}
Replied_ = true;
Callback_(std::move(status), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
+ Callback_ = nullptr; // free resources as early as possible
return false;
}
void Destroy() override {
- UnRef();
+ UnRef();
}
private:
- IQueueClientEvent* FinishedEvent() {
- Ref();
- return this;
+ IQueueClientEvent* FinishedEvent() {
+ Ref();
+ return this;
}
void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- Replied_ = true;
- Callback_(TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
- Callback_ = nullptr;
- return;
- }
+ auto context = provider->CreateContext();
+ if (!context) {
+ Replied_ = true;
+ Callback_(TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
+ Callback_ = nullptr;
+ return;
+ }
{
std::unique_lock<std::mutex> guard(Mutex_);
LocalContext = context;
Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
Reader_->Finish(&Reply_, &Status, FinishedEvent());
- }
- context->SubscribeStop([self = TPtr(this)] {
- self->Stop();
- });
- }
-
- void Stop() {
+ }
+ context->SubscribeStop([self = TPtr(this)] {
+ self->Stop();
+ });
+ }
+
+ void Stop() {
Context.TryCancel();
- }
-
- TResponseCallback<TResponse> Callback_;
+ }
+
+ TResponseCallback<TResponse> Callback_;
TResponse Reply_;
std::mutex Mutex_;
TAsyncReaderPtr Reader_;
@@ -387,49 +387,49 @@ private:
template<class TResponse>
class IStreamRequestReadProcessor : public TThrRefBase {
-public:
+public:
using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>;
- using TReadCallback = std::function<void(TGrpcStatus&&)>;
-
- /**
- * Asynchronously cancel the request
- */
- virtual void Cancel() = 0;
-
- /**
+ using TReadCallback = std::function<void(TGrpcStatus&&)>;
+
+ /**
+ * Asynchronously cancel the request
+ */
+ virtual void Cancel() = 0;
+
+ /**
* Scheduled initial server metadata read from the stream
*/
virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0;
/**
- * Scheduled response read from the stream
- * Callback will be called with the status if it failed
- * Only one Read or Finish call may be active at a time
- */
- virtual void Read(TResponse* response, TReadCallback callback) = 0;
-
- /**
- * Stop reading and gracefully finish the stream
- * Only one Read or Finish call may be active at a time
- */
- virtual void Finish(TReadCallback callback) = 0;
-
- /**
- * Additional callback to be called when stream has finished
- */
- virtual void AddFinishedCallback(TReadCallback callback) = 0;
-};
-
+ * Scheduled response read from the stream
+ * Callback will be called with the status if it failed
+ * Only one Read or Finish call may be active at a time
+ */
+ virtual void Read(TResponse* response, TReadCallback callback) = 0;
+
+ /**
+ * Stop reading and gracefully finish the stream
+ * Only one Read or Finish call may be active at a time
+ */
+ virtual void Finish(TReadCallback callback) = 0;
+
+ /**
+ * Additional callback to be called when stream has finished
+ */
+ virtual void AddFinishedCallback(TReadCallback callback) = 0;
+};
+
template<class TRequest, class TResponse>
class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> {
public:
using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>;
- using TWriteCallback = std::function<void(TGrpcStatus&&)>;
+ using TWriteCallback = std::function<void(TGrpcStatus&&)>;
/**
* Scheduled request write to the stream
*/
- virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0;
+ virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0;
};
class TGRpcKeepAliveSocketMutator;
@@ -548,7 +548,7 @@ public:
{
std::unique_lock<std::mutex> guard(Mutex);
Cancelled = true;
- if (Started && !ReadFinished) {
+ if (Started && !ReadFinished) {
if (!ReadActive) {
ReadFinished = true;
}
@@ -640,31 +640,31 @@ public:
callback(std::move(status));
}
-
- void AddFinishedCallback(TReadCallback callback) override {
- Y_VERIFY(callback, "Unexpected empty callback");
-
- TGrpcStatus status;
-
+
+ void AddFinishedCallback(TReadCallback callback) override {
+ Y_VERIFY(callback, "Unexpected empty callback");
+
+ TGrpcStatus status;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- if (!Finished) {
- FinishedCallbacks.emplace_back().swap(callback);
- return;
- }
-
- if (FinishedOk) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- callback(std::move(status));
- }
-
+ if (!Finished) {
+ FinishedCallbacks.emplace_back().swap(callback);
+ return;
+ }
+
+ if (FinishedOk) {
+ status = Status;
+ } else if (Cancelled) {
+ status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+ }
+
+ callback(std::move(status));
+ }
+
private:
void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
auto context = provider->CreateContext();
@@ -727,8 +727,8 @@ private:
{
std::unique_lock<std::mutex> guard(Mutex);
- Started = true;
- if (!ok || Cancelled) {
+ Started = true;
+ if (!ok || Cancelled) {
ReadFinished = true;
Stream->Finish(&Status, OnFinishedTag.Prepare());
return;
@@ -743,7 +743,7 @@ private:
void OnFinished(bool ok) {
TGrpcStatus status;
std::vector<TReadCallback> finishedCallbacks;
- TReaderCallback startCallback;
+ TReaderCallback startCallback;
TReadCallback readCallback;
TReadCallback finishCallback;
@@ -756,19 +756,19 @@ private:
if (ok) {
status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
+ } else if (Cancelled) {
+ status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
} else {
status = TGrpcStatus::Internal("Unexpected error");
}
- finishedCallbacks.swap(FinishedCallbacks);
-
- if (Callback) {
- Y_VERIFY(!ReadActive);
- startCallback = std::move(Callback);
- Callback = nullptr;
- } else if (ReadActive) {
+ finishedCallbacks.swap(FinishedCallbacks);
+
+ if (Callback) {
+ Y_VERIFY(!ReadActive);
+ startCallback = std::move(Callback);
+ Callback = nullptr;
+ } else if (ReadActive) {
if (ReadCallback) {
readCallback = std::move(ReadCallback);
ReadCallback = nullptr;
@@ -780,18 +780,18 @@ private:
}
}
- for (auto& finishedCallback : finishedCallbacks) {
- auto statusCopy = status;
- finishedCallback(std::move(statusCopy));
- }
-
- if (startCallback) {
+ for (auto& finishedCallback : finishedCallbacks) {
+ auto statusCopy = status;
+ finishedCallback(std::move(statusCopy));
+ }
+
+ if (startCallback) {
+ if (status.Ok()) {
+ status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
+ }
+ startCallback(std::move(status), nullptr);
+ } else if (readCallback) {
if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
- }
- startCallback(std::move(status), nullptr);
- } else if (readCallback) {
- if (status.Ok()) {
status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
}
readCallback(std::move(status));
@@ -812,7 +812,7 @@ private:
TReadCallback FinishCallback;
std::vector<TReadCallback> FinishedCallbacks;
std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
- bool Started = false;
+ bool Started = false;
bool HasInitialMetadata = false;
bool ReadActive = false;
bool ReadFinished = false;
@@ -821,72 +821,72 @@ private:
bool FinishedOk = false;
};
-template<class TRequest, class TResponse>
+template<class TRequest, class TResponse>
using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>;
-
-template<class TStub, class TRequest, class TResponse>
+
+template<class TStub, class TRequest, class TResponse>
class TStreamRequestReadWriteProcessor
: public IStreamRequestReadWriteProcessor<TRequest, TResponse>
, public TGRpcRequestProcessorCommon {
-public:
+public:
using TSelf = TStreamRequestReadWriteProcessor;
using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>;
- using TPtr = TIntrusivePtr<TSelf>;
- using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>;
- using TReadCallback = typename TBase::TReadCallback;
- using TWriteCallback = typename TBase::TWriteCallback;
- using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>;
- using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*);
-
+ using TPtr = TIntrusivePtr<TSelf>;
+ using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>;
+ using TReadCallback = typename TBase::TReadCallback;
+ using TWriteCallback = typename TBase::TWriteCallback;
+ using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>;
+ using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*);
+
explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback)
- : ConnectedCallback(std::move(callback))
- {
- Y_VERIFY(ConnectedCallback, "Missing connected callback");
- }
-
- void Cancel() override {
- Context.TryCancel();
-
+ : ConnectedCallback(std::move(callback))
+ {
+ Y_VERIFY(ConnectedCallback, "Missing connected callback");
+ }
+
+ void Cancel() override {
+ Context.TryCancel();
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Cancelled = true;
- if (Started && !(ReadFinished && WriteFinished)) {
- if (!ReadActive) {
- ReadFinished = true;
- }
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (ReadFinished && WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- }
- }
-
- void Write(TRequest&& request, TWriteCallback callback) override {
- TGrpcStatus status;
-
+ Cancelled = true;
+ if (Started && !(ReadFinished && WriteFinished)) {
+ if (!ReadActive) {
+ ReadFinished = true;
+ }
+ if (!WriteActive) {
+ WriteFinished = true;
+ }
+ if (ReadFinished && WriteFinished) {
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ }
+ }
+ }
+ }
+
+ void Write(TRequest&& request, TWriteCallback callback) override {
+ TGrpcStatus status;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- if (Cancelled || ReadFinished || WriteFinished) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
- } else if (WriteActive) {
- auto& item = WriteQueue.emplace_back();
- item.Callback.swap(callback);
- item.Request.Swap(&request);
- } else {
- WriteActive = true;
- WriteCallback.swap(callback);
- Stream->Write(request, OnWriteDoneTag.Prepare());
- }
- }
-
- if (!status.Ok() && callback) {
- callback(std::move(status));
- }
- }
-
+ if (Cancelled || ReadFinished || WriteFinished) {
+ status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
+ } else if (WriteActive) {
+ auto& item = WriteQueue.emplace_back();
+ item.Callback.swap(callback);
+ item.Request.Swap(&request);
+ } else {
+ WriteActive = true;
+ WriteCallback.swap(callback);
+ Stream->Write(request, OnWriteDoneTag.Prepare());
+ }
+ }
+
+ if (!status.Ok() && callback) {
+ callback(std::move(status));
+ }
+ }
+
void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
TGrpcStatus status;
@@ -916,321 +916,321 @@ public:
callback(std::move(status));
}
- void Read(TResponse* message, TReadCallback callback) override {
- TGrpcStatus status;
-
+ void Read(TResponse* message, TReadCallback callback) override {
+ TGrpcStatus status;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- ReadCallback = std::move(callback);
- if (!ReadFinished) {
- Stream->Read(message, OnReadDoneTag.Prepare());
- }
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
-
- callback(std::move(status));
- }
-
- void Finish(TReadCallback callback) override {
- TGrpcStatus status;
-
+ Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ if (!Finished) {
+ ReadActive = true;
+ ReadCallback = std::move(callback);
+ if (!ReadFinished) {
+ Stream->Read(message, OnReadDoneTag.Prepare());
+ }
+ return;
+ }
+ if (FinishedOk) {
+ status = Status;
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+ }
+
+ if (status.Ok()) {
+ status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
+ }
+
+ callback(std::move(status));
+ }
+
+ void Finish(TReadCallback callback) override {
+ TGrpcStatus status;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- FinishCallback = std::move(callback);
- if (!ReadFinished) {
- ReadFinished = true;
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- callback(std::move(status));
- }
-
- void AddFinishedCallback(TReadCallback callback) override {
- Y_VERIFY(callback, "Unexpected empty callback");
-
- TGrpcStatus status;
-
+ Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected");
+ if (!Finished) {
+ ReadActive = true;
+ FinishCallback = std::move(callback);
+ if (!ReadFinished) {
+ ReadFinished = true;
+ if (!WriteActive) {
+ WriteFinished = true;
+ }
+ if (WriteFinished) {
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ }
+ }
+ return;
+ }
+ if (FinishedOk) {
+ status = Status;
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+ }
+
+ callback(std::move(status));
+ }
+
+ void AddFinishedCallback(TReadCallback callback) override {
+ Y_VERIFY(callback, "Unexpected empty callback");
+
+ TGrpcStatus status;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- if (!Finished) {
- FinishedCallbacks.emplace_back().swap(callback);
- return;
- }
-
- if (FinishedOk) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- callback(std::move(status));
- }
-
-private:
- template<typename> friend class TServiceConnection;
-
- void Start(TStub& stub, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- auto callback = std::move(ConnectedCallback);
- TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
- callback(std::move(status), nullptr);
- return;
- }
-
+ if (!Finished) {
+ FinishedCallbacks.emplace_back().swap(callback);
+ return;
+ }
+
+ if (FinishedOk) {
+ status = Status;
+ } else if (Cancelled) {
+ status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+ }
+
+ callback(std::move(status));
+ }
+
+private:
+ template<typename> friend class TServiceConnection;
+
+ void Start(TStub& stub, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
+ auto context = provider->CreateContext();
+ if (!context) {
+ auto callback = std::move(ConnectedCallback);
+ TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
+ callback(std::move(status), nullptr);
+ return;
+ }
+
{
std::unique_lock<std::mutex> guard(Mutex);
- LocalContext = context;
- Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare());
- }
-
- context->SubscribeStop([self = TPtr(this)] {
- self->Cancel();
- });
- }
-
-private:
- void OnConnected(bool ok) {
- TConnectedCallback callback;
-
+ LocalContext = context;
+ Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare());
+ }
+
+ context->SubscribeStop([self = TPtr(this)] {
+ self->Cancel();
+ });
+ }
+
+private:
+ void OnConnected(bool ok) {
+ TConnectedCallback callback;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Started = true;
- if (!ok || Cancelled) {
- ReadFinished = true;
- WriteFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- return;
- }
-
- callback = std::move(ConnectedCallback);
- ConnectedCallback = nullptr;
- }
-
- callback({ }, typename TBase::TPtr(this));
- }
-
- void OnReadDone(bool ok) {
- TGrpcStatus status;
- TReadCallback callback;
+ Started = true;
+ if (!ok || Cancelled) {
+ ReadFinished = true;
+ WriteFinished = true;
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ return;
+ }
+
+ callback = std::move(ConnectedCallback);
+ ConnectedCallback = nullptr;
+ }
+
+ callback({ }, typename TBase::TPtr(this));
+ }
+
+ void OnReadDone(bool ok) {
+ TGrpcStatus status;
+ TReadCallback callback;
std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
-
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(ReadActive, "Unexpected Read done callback");
- Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");
-
- if (!ok || Cancelled || WriteFinished) {
- ReadFinished = true;
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- if (!ok) {
- // Keep ReadActive=true, so callback is called
- // after the call is finished with an error
- return;
- }
- }
-
- callback = std::move(ReadCallback);
- ReadCallback = nullptr;
- ReadActive = false;
+ Y_VERIFY(ReadActive, "Unexpected Read done callback");
+ Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag");
+
+ if (!ok || Cancelled || WriteFinished) {
+ ReadFinished = true;
+ if (!WriteActive) {
+ WriteFinished = true;
+ }
+ if (WriteFinished) {
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ }
+ if (!ok) {
+ // Keep ReadActive=true, so callback is called
+ // after the call is finished with an error
+ return;
+ }
+ }
+
+ callback = std::move(ReadCallback);
+ ReadCallback = nullptr;
+ ReadActive = false;
initialMetadata = InitialMetadata;
InitialMetadata = nullptr;
HasInitialMetadata = true;
- }
-
+ }
+
if (initialMetadata) {
GetInitialMetadata(initialMetadata);
}
- callback(std::move(status));
- }
-
- void OnWriteDone(bool ok) {
- TWriteCallback okCallback;
-
+ callback(std::move(status));
+ }
+
+ void OnWriteDone(bool ok) {
+ TWriteCallback okCallback;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Y_VERIFY(WriteActive, "Unexpected Write done callback");
- Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag");
-
- if (ok) {
- okCallback.swap(WriteCallback);
- } else if (WriteCallback) {
- // Put callback back on the queue until OnFinished
- auto& item = WriteQueue.emplace_front();
- item.Callback.swap(WriteCallback);
- }
-
- if (!ok || Cancelled) {
- WriteActive = false;
- WriteFinished = true;
- if (!ReadActive) {
- ReadFinished = true;
- }
- if (ReadFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
+ Y_VERIFY(WriteActive, "Unexpected Write done callback");
+ Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag");
+
+ if (ok) {
+ okCallback.swap(WriteCallback);
+ } else if (WriteCallback) {
+ // Put callback back on the queue until OnFinished
+ auto& item = WriteQueue.emplace_front();
+ item.Callback.swap(WriteCallback);
+ }
+
+ if (!ok || Cancelled) {
+ WriteActive = false;
+ WriteFinished = true;
+ if (!ReadActive) {
+ ReadFinished = true;
+ }
+ if (ReadFinished) {
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ }
} else if (!WriteQueue.empty()) {
- WriteCallback.swap(WriteQueue.front().Callback);
- Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare());
- WriteQueue.pop_front();
- } else {
- WriteActive = false;
- if (ReadFinished) {
- WriteFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- }
-
- if (okCallback) {
- okCallback(TGrpcStatus());
- }
- }
-
- void OnFinished(bool ok) {
- TGrpcStatus status;
+ WriteCallback.swap(WriteQueue.front().Callback);
+ Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare());
+ WriteQueue.pop_front();
+ } else {
+ WriteActive = false;
+ if (ReadFinished) {
+ WriteFinished = true;
+ Stream->Finish(&Status, OnFinishedTag.Prepare());
+ }
+ }
+ }
+
+ if (okCallback) {
+ okCallback(TGrpcStatus());
+ }
+ }
+
+ void OnFinished(bool ok) {
+ TGrpcStatus status;
std::deque<TWriteItem> writesDropped;
std::vector<TReadCallback> finishedCallbacks;
- TConnectedCallback connectedCallback;
- TReadCallback readCallback;
- TReadCallback finishCallback;
-
+ TConnectedCallback connectedCallback;
+ TReadCallback readCallback;
+ TReadCallback finishCallback;
+
{
std::unique_lock<std::mutex> guard(Mutex);
- Finished = true;
- FinishedOk = ok;
- LocalContext.reset();
-
- if (ok) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
-
- writesDropped.swap(WriteQueue);
- finishedCallbacks.swap(FinishedCallbacks);
-
- if (ConnectedCallback) {
- Y_VERIFY(!ReadActive);
- connectedCallback = std::move(ConnectedCallback);
- ConnectedCallback = nullptr;
- } else if (ReadActive) {
- if (ReadCallback) {
- readCallback = std::move(ReadCallback);
- ReadCallback = nullptr;
- } else {
- finishCallback = std::move(FinishCallback);
- FinishCallback = nullptr;
- }
- ReadActive = false;
- }
- }
-
- for (auto& item : writesDropped) {
- if (item.Callback) {
- TGrpcStatus writeStatus = status;
- if (writeStatus.Ok()) {
- writeStatus = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
- }
- item.Callback(std::move(writeStatus));
- }
- }
-
- for (auto& finishedCallback : finishedCallbacks) {
- TGrpcStatus statusCopy = status;
- finishedCallback(std::move(statusCopy));
- }
-
- if (connectedCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
- }
- connectedCallback(std::move(status), nullptr);
- } else if (readCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
- readCallback(std::move(status));
- } else if (finishCallback) {
- finishCallback(std::move(status));
- }
- }
-
-private:
- struct TWriteItem {
- TWriteCallback Callback;
- TRequest Request;
- };
-
-private:
- using TFixedEvent = TQueueClientFixedEvent<TSelf>;
-
- TFixedEvent OnConnectedTag = { this, &TSelf::OnConnected };
- TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
- TFixedEvent OnWriteDoneTag = { this, &TSelf::OnWriteDone };
- TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
-
-private:
+ Finished = true;
+ FinishedOk = ok;
+ LocalContext.reset();
+
+ if (ok) {
+ status = Status;
+ } else if (Cancelled) {
+ status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
+ } else {
+ status = TGrpcStatus::Internal("Unexpected error");
+ }
+
+ writesDropped.swap(WriteQueue);
+ finishedCallbacks.swap(FinishedCallbacks);
+
+ if (ConnectedCallback) {
+ Y_VERIFY(!ReadActive);
+ connectedCallback = std::move(ConnectedCallback);
+ ConnectedCallback = nullptr;
+ } else if (ReadActive) {
+ if (ReadCallback) {
+ readCallback = std::move(ReadCallback);
+ ReadCallback = nullptr;
+ } else {
+ finishCallback = std::move(FinishCallback);
+ FinishCallback = nullptr;
+ }
+ ReadActive = false;
+ }
+ }
+
+ for (auto& item : writesDropped) {
+ if (item.Callback) {
+ TGrpcStatus writeStatus = status;
+ if (writeStatus.Ok()) {
+ writeStatus = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
+ }
+ item.Callback(std::move(writeStatus));
+ }
+ }
+
+ for (auto& finishedCallback : finishedCallbacks) {
+ TGrpcStatus statusCopy = status;
+ finishedCallback(std::move(statusCopy));
+ }
+
+ if (connectedCallback) {
+ if (status.Ok()) {
+ status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
+ }
+ connectedCallback(std::move(status), nullptr);
+ } else if (readCallback) {
+ if (status.Ok()) {
+ status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
+ }
+ readCallback(std::move(status));
+ } else if (finishCallback) {
+ finishCallback(std::move(status));
+ }
+ }
+
+private:
+ struct TWriteItem {
+ TWriteCallback Callback;
+ TRequest Request;
+ };
+
+private:
+ using TFixedEvent = TQueueClientFixedEvent<TSelf>;
+
+ TFixedEvent OnConnectedTag = { this, &TSelf::OnConnected };
+ TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
+ TFixedEvent OnWriteDoneTag = { this, &TSelf::OnWriteDone };
+ TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
+
+private:
std::mutex Mutex;
- TAsyncReaderWriterPtr Stream;
- TConnectedCallback ConnectedCallback;
- TReadCallback ReadCallback;
- TReadCallback FinishCallback;
+ TAsyncReaderWriterPtr Stream;
+ TConnectedCallback ConnectedCallback;
+ TReadCallback ReadCallback;
+ TReadCallback FinishCallback;
std::vector<TReadCallback> FinishedCallbacks;
std::deque<TWriteItem> WriteQueue;
- TWriteCallback WriteCallback;
+ TWriteCallback WriteCallback;
std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
- bool Started = false;
+ bool Started = false;
bool HasInitialMetadata = false;
- bool ReadActive = false;
- bool ReadFinished = false;
- bool WriteActive = false;
- bool WriteFinished = false;
- bool Finished = false;
- bool Cancelled = false;
- bool FinishedOk = false;
-};
-
+ bool ReadActive = false;
+ bool ReadFinished = false;
+ bool WriteActive = false;
+ bool WriteFinished = false;
+ bool Finished = false;
+ bool Cancelled = false;
+ bool FinishedOk = false;
+};
+
class TGRpcClientLow;
template<typename TGRpcService>
@@ -1245,9 +1245,9 @@ public:
template<typename TRequest, typename TResponse>
void DoRequest(const TRequest& request,
TResponseCallback<TResponse> callback,
- typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
+ typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
+ IQueueClientContextProvider* provider = nullptr)
{
auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
processor->ApplyMeta(metas);
@@ -1282,31 +1282,31 @@ public:
processor->ApplyMeta(metas);
processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_);
}
-
+
/*
* Start streaming response reading (one request, many responses)
*/
- template<typename TRequest, typename TResponse>
+ template<typename TRequest, typename TResponse>
void DoStreamRequest(const TRequest& request,
TStreamReaderCallback<TResponse> callback,
typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
+ const TCallMeta& metas = { },
+ IQueueClientContextProvider* provider = nullptr)
+ {
auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
+ processor->ApplyMeta(metas);
processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_);
- }
-
+ }
+
private:
TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci,
- IQueueClientContextProvider* provider)
+ IQueueClientContextProvider* provider)
: Stub_(TGRpcService::NewStub(ci))
- , Provider_(provider)
- {
- Y_VERIFY(Provider_, "Connection does not have a queue provider");
- }
-
+ , Provider_(provider)
+ {
+ Y_VERIFY(Provider_, "Connection does not have a queue provider");
+ }
+
TServiceConnection(TStubsHolder& holder,
IQueueClientContextProvider* provider)
: Stub_(holder.GetOrCreateStub<TStub>())
@@ -1316,47 +1316,47 @@ private:
}
std::shared_ptr<TStub> Stub_;
- IQueueClientContextProvider* Provider_;
+ IQueueClientContextProvider* Provider_;
};
class TGRpcClientLow
: public IQueueClientContextProvider
{
- class TContextImpl;
- friend class TContextImpl;
-
- enum ECqState : TAtomicBase {
- WORKING = 0,
- STOP_SILENT = 1,
- STOP_EXPLICIT = 2,
- };
-
+ class TContextImpl;
+ friend class TContextImpl;
+
+ enum ECqState : TAtomicBase {
+ WORKING = 0,
+ STOP_SILENT = 1,
+ STOP_EXPLICIT = 2,
+ };
+
public:
explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false);
~TGRpcClientLow();
- // Tries to stop all currently running requests (via their stop callbacks)
- // Will shutdown CQ and drain events once all requests have finished
- // No new requests may be started after this call
- void Stop(bool wait = false);
-
- // Waits until all currently running requests finish execution
- void WaitIdle();
-
- inline bool IsStopping() const {
- switch (GetCqState()) {
- case WORKING:
- return false;
- case STOP_SILENT:
- case STOP_EXPLICIT:
- return true;
- }
-
- Y_UNREACHABLE();
- }
-
- IQueueClientContextPtr CreateContext() override;
-
+ // Tries to stop all currently running requests (via their stop callbacks)
+ // Will shutdown CQ and drain events once all requests have finished
+ // No new requests may be started after this call
+ void Stop(bool wait = false);
+
+ // Waits until all currently running requests finish execution
+ void WaitIdle();
+
+ inline bool IsStopping() const {
+ switch (GetCqState()) {
+ case WORKING:
+ return false;
+ case STOP_SILENT:
+ case STOP_EXPLICIT:
+ return true;
+ }
+
+ Y_UNREACHABLE();
+ }
+
+ IQueueClientContextPtr CreateContext() override;
+
template<typename TGRpcService>
std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) {
return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this));
@@ -1367,32 +1367,32 @@ public:
return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this));
}
- // Tests only, not thread-safe
- void AddWorkerThreadForTest();
-
+ // Tests only, not thread-safe
+ void AddWorkerThreadForTest();
+
private:
using IThreadRef = std::unique_ptr<IThreadFactory::IThread>;
using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>;
void Init(size_t numWorkerThread);
- inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); }
- inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); }
-
- void StopInternal(bool silent);
- void WaitInternal();
-
- void ForgetContext(TContextImpl* context);
-
-private:
+ inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); }
+ inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); }
+
+ void StopInternal(bool silent);
+ void WaitInternal();
+
+ void ForgetContext(TContextImpl* context);
+
+private:
bool UseCompletionQueuePerThread_;
std::vector<CompletionQueueRef> CQS_;
std::vector<IThreadRef> WorkerThreads_;
- TAtomic CqState_ = -1;
-
+ TAtomic CqState_ = -1;
+
std::mutex Mtx_;
std::condition_variable ContextsEmpty_;
std::unordered_set<TContextImpl*> Contexts_;
-
+
std::mutex JoinMutex_;
};
diff --git a/library/cpp/grpc/client/grpc_common.h b/library/cpp/grpc/client/grpc_common.h
index 65f74cda2f..ffcdafe045 100644
--- a/library/cpp/grpc/client/grpc_common.h
+++ b/library/cpp/grpc/client/grpc_common.h
@@ -24,8 +24,8 @@ struct TGRpcClientConfig {
ui64 MemQuota = 0;
std::unordered_map<TString, TString> StringChannelParams;
std::unordered_map<TString, int> IntChannelParams;
- TString LoadBalancingPolicy = { };
- TString SslTargetNameOverride = { };
+ TString LoadBalancingPolicy = { };
+ TString SslTargetNameOverride = { };
TGRpcClientConfig() = default;
TGRpcClientConfig(const TGRpcClientConfig&) = default;
@@ -68,12 +68,12 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp
if (mutator) {
args.SetSocketMutator(mutator);
}
- if (!config.LoadBalancingPolicy.empty()) {
- args.SetLoadBalancingPolicyName(config.LoadBalancingPolicy);
- }
- if (!config.SslTargetNameOverride.empty()) {
- args.SetSslTargetNameOverride(config.SslTargetNameOverride);
- }
+ if (!config.LoadBalancingPolicy.empty()) {
+ args.SetLoadBalancingPolicyName(config.LoadBalancingPolicy);
+ }
+ if (!config.SslTargetNameOverride.empty()) {
+ args.SetSslTargetNameOverride(config.SslTargetNameOverride);
+ }
if (config.EnableSsl || config.SslCaCert) {
return grpc::CreateCustomChannel(config.Locator, grpc::SslCredentials(grpc::SslCredentialsOptions{config.SslCaCert, "", ""}), args);
} else {
diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h
index e86299cbd9..0b6c36c84c 100644
--- a/library/cpp/grpc/server/grpc_counters.h
+++ b/library/cpp/grpc/server/grpc_counters.h
@@ -83,13 +83,13 @@ public:
}
void CountRequestBytes(ui32 requestSize) override {
- *RequestBytes += requestSize;
- }
-
+ *RequestBytes += requestSize;
+ }
+
void CountResponseBytes(ui32 responseSize) override {
- *ResponseBytes += responseSize;
- }
-
+ *ResponseBytes += responseSize;
+ }
+
void StartProcessing(ui32 requestSize) override {
TotalCounter->Inc();
InflyCounter->Inc();
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index 1fcd8b6655..5bd8d3902b 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -117,24 +117,24 @@ public:
return TString(this->Context.peer());
}
- bool SslServer() const override {
- return Server_->SslServer();
- }
-
+ bool SslServer() const override {
+ return Server_->SslServer();
+ }
+
void Run() {
- // Start request unless server is shutting down
- if (auto guard = Server_->ProtectShutdown()) {
- Ref(); //For grpc c runtime
- this->Context.AsyncNotifyWhenDone(OnFinishTag.Prepare());
- if (RequestCallback_) {
- (this->Service->*RequestCallback_)
- (&this->Context, Request_,
- reinterpret_cast<grpc::ServerAsyncResponseWriter<TOut>*>(Writer_.Get()), this->CQ, this->CQ, GetGRpcTag());
- } else {
- (this->Service->*StreamRequestCallback_)
- (&this->Context, Request_,
- reinterpret_cast<grpc::ServerAsyncWriter<TOut>*>(StreamWriter_.Get()), this->CQ, this->CQ, GetGRpcTag());
- }
+ // Start request unless server is shutting down
+ if (auto guard = Server_->ProtectShutdown()) {
+ Ref(); //For grpc c runtime
+ this->Context.AsyncNotifyWhenDone(OnFinishTag.Prepare());
+ if (RequestCallback_) {
+ (this->Service->*RequestCallback_)
+ (&this->Context, Request_,
+ reinterpret_cast<grpc::ServerAsyncResponseWriter<TOut>*>(Writer_.Get()), this->CQ, this->CQ, GetGRpcTag());
+ } else {
+ (this->Service->*StreamRequestCallback_)
+ (&this->Context, Request_,
+ reinterpret_cast<grpc::ServerAsyncWriter<TOut>*>(StreamWriter_.Get()), this->CQ, this->CQ, GetGRpcTag());
+ }
}
}
@@ -148,10 +148,10 @@ public:
}
void DestroyRequest() override {
- if (RequestRegistered_) {
- Server_->DeregisterRequestCtx(this);
- RequestRegistered_ = false;
- }
+ if (RequestRegistered_) {
+ Server_->DeregisterRequestCtx(this);
+ RequestRegistered_ = false;
+ }
UnRef();
}
@@ -346,15 +346,15 @@ private:
ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str());
if (this->Context.c_call() == nullptr) {
- Y_VERIFY(!ok);
+ Y_VERIFY(!ok);
// One ref by OnFinishTag, grpc will not call this tag if no request received
UnRef();
- } else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) {
- // Request cannot be registered due to shutdown
- // It's unsafe to continue, so drop this request without processing
+ } else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) {
+ // Request cannot be registered due to shutdown
+ // It's unsafe to continue, so drop this request without processing
GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_);
- this->Context.TryCancel();
- return false;
+ this->Context.TryCancel();
+ return false;
}
Clone(); // TODO: Request pool?
@@ -501,7 +501,7 @@ private:
ui32 ResponseStatus = 0;
THPTimer RequestTimer;
TAuthState AuthState_ = 0;
- bool RequestRegistered_ = false;
+ bool RequestRegistered_ = false;
using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>;
TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish };
diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h
index dc293ec01a..fcfce1c181 100644
--- a/library/cpp/grpc/server/grpc_request_base.h
+++ b/library/cpp/grpc/server/grpc_request_base.h
@@ -108,9 +108,9 @@ public:
//! Returns peer address
virtual TString GetPeer() const = 0;
-
- //! Returns true if server is using ssl
- virtual bool SslServer() const = 0;
+
+ //! Returns true if server is using ssl
+ virtual bool SslServer() const = 0;
};
} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 9bc8305390..7437b7a8f5 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -77,7 +77,7 @@ void TGRpcServer::Start() {
builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
builder.SetMaxSendMessageSize(Options_.MaxMessageSize);
for (IGRpcServicePtr service : Services_) {
- service->SetServerOptions(Options_);
+ service->SetServerOptions(Options_);
builder.RegisterService(service->GetService());
service->SetGlobalLimiterHandle(&Limiter_);
}
@@ -192,14 +192,14 @@ void TGRpcServer::Stop() {
}
for (ui64 attempt = 0; ; ++attempt) {
- bool unsafe = false;
+ bool unsafe = false;
size_t infly = 0;
for (auto& service : Services_) {
- unsafe |= service->IsUnsafeToShutdown();
- infly += service->RequestsInProgress();
+ unsafe |= service->IsUnsafeToShutdown();
+ infly += service->RequestsInProgress();
}
- if (!unsafe && !infly)
+ if (!unsafe && !infly)
break;
auto spent = (TInstant::Now() - now).SecondsFloat();
@@ -208,7 +208,7 @@ void TGRpcServer::Stop() {
Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl;
}
- if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat())
+ if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat())
break;
Sleep(TDuration::MilliSeconds(10));
}
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index 59ed364bc9..d6814a90a0 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -167,77 +167,77 @@ public:
virtual void StopService() noexcept = 0;
virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;
virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
- virtual bool IsUnsafeToShutdown() const = 0;
- virtual size_t RequestsInProgress() const = 0;
-
- /**
- * Called before service is added to the server builder. This allows
- * service to inspect server options and initialize accordingly.
- */
- virtual void SetServerOptions(const TServerOptions& options) = 0;
+ virtual bool IsUnsafeToShutdown() const = 0;
+ virtual size_t RequestsInProgress() const = 0;
+
+ /**
+ * Called before service is added to the server builder. This allows
+ * service to inspect server options and initialize accordingly.
+ */
+ virtual void SetServerOptions(const TServerOptions& options) = 0;
};
template<typename T>
class TGrpcServiceBase: public IGRpcService {
public:
- class TShutdownGuard {
- using TOwner = TGrpcServiceBase<T>;
- friend class TGrpcServiceBase<T>;
-
- public:
- TShutdownGuard()
- : Owner(nullptr)
- { }
-
- ~TShutdownGuard() {
- Release();
- }
-
- TShutdownGuard(TShutdownGuard&& other)
- : Owner(other.Owner)
- {
- other.Owner = nullptr;
- }
-
- TShutdownGuard& operator=(TShutdownGuard&& other) {
- if (Y_LIKELY(this != &other)) {
- Release();
- Owner = other.Owner;
- other.Owner = nullptr;
- }
- return *this;
- }
-
- explicit operator bool() const {
- return bool(Owner);
- }
-
- void Release() {
- if (Owner) {
- AtomicDecrement(Owner->GuardCount_);
- Owner = nullptr;
- }
- }
-
- TShutdownGuard(const TShutdownGuard&) = delete;
- TShutdownGuard& operator=(const TShutdownGuard&) = delete;
-
- private:
- explicit TShutdownGuard(TOwner* owner)
- : Owner(owner)
- { }
-
- private:
- TOwner* Owner;
- };
-
-public:
+ class TShutdownGuard {
+ using TOwner = TGrpcServiceBase<T>;
+ friend class TGrpcServiceBase<T>;
+
+ public:
+ TShutdownGuard()
+ : Owner(nullptr)
+ { }
+
+ ~TShutdownGuard() {
+ Release();
+ }
+
+ TShutdownGuard(TShutdownGuard&& other)
+ : Owner(other.Owner)
+ {
+ other.Owner = nullptr;
+ }
+
+ TShutdownGuard& operator=(TShutdownGuard&& other) {
+ if (Y_LIKELY(this != &other)) {
+ Release();
+ Owner = other.Owner;
+ other.Owner = nullptr;
+ }
+ return *this;
+ }
+
+ explicit operator bool() const {
+ return bool(Owner);
+ }
+
+ void Release() {
+ if (Owner) {
+ AtomicDecrement(Owner->GuardCount_);
+ Owner = nullptr;
+ }
+ }
+
+ TShutdownGuard(const TShutdownGuard&) = delete;
+ TShutdownGuard& operator=(const TShutdownGuard&) = delete;
+
+ private:
+ explicit TShutdownGuard(TOwner* owner)
+ : Owner(owner)
+ { }
+
+ private:
+ TOwner* Owner;
+ };
+
+public:
using TCurrentGRpcService = T;
void StopService() noexcept override {
with_lock(Lock_) {
- AtomicSet(ShuttingDown_, 1);
-
+ AtomicSet(ShuttingDown_, 1);
+
// Send TryCansel to event (can be send after finishing).
// Actual dtors will be called from grpc thread, so deadlock impossible
for (auto* request : Requests_) {
@@ -246,21 +246,21 @@ public:
}
}
- TShutdownGuard ProtectShutdown() noexcept {
- AtomicIncrement(GuardCount_);
- if (IsShuttingDown()) {
- AtomicDecrement(GuardCount_);
- return { };
- }
-
- return TShutdownGuard(this);
- };
-
- bool IsUnsafeToShutdown() const override {
- return AtomicGet(GuardCount_) > 0;
- }
-
- size_t RequestsInProgress() const override {
+ TShutdownGuard ProtectShutdown() noexcept {
+ AtomicIncrement(GuardCount_);
+ if (IsShuttingDown()) {
+ AtomicDecrement(GuardCount_);
+ return { };
+ }
+
+ return TShutdownGuard(this);
+ };
+
+ bool IsUnsafeToShutdown() const override {
+ return AtomicGet(GuardCount_) > 0;
+ }
+
+ size_t RequestsInProgress() const override {
size_t c = 0;
with_lock(Lock_) {
c = Requests_.size();
@@ -268,9 +268,9 @@ public:
return c;
}
- void SetServerOptions(const TServerOptions& options) override {
- SslServer_ = bool(options.SslData);
- NeedAuth_ = options.UseAuth;
+ void SetServerOptions(const TServerOptions& options) override {
+ SslServer_ = bool(options.SslData);
+ NeedAuth_ = options.UseAuth;
}
void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {}
@@ -280,32 +280,32 @@ public:
return AtomicGet(ShuttingDown_);
}
- bool SslServer() const {
- return SslServer_;
- }
-
+ bool SslServer() const {
+ return SslServer_;
+ }
+
bool NeedAuth() const {
return NeedAuth_;
}
- bool RegisterRequestCtx(ICancelableContext* req) {
+ bool RegisterRequestCtx(ICancelableContext* req) {
with_lock(Lock_) {
- auto r = Requests_.emplace(req);
- Y_VERIFY(r.second, "Ctx already registered");
-
- if (IsShuttingDown()) {
- // Server is already shutting down
- Requests_.erase(r.first);
- return false;
- }
+ auto r = Requests_.emplace(req);
+ Y_VERIFY(r.second, "Ctx already registered");
+
+ if (IsShuttingDown()) {
+ // Server is already shutting down
+ Requests_.erase(r.first);
+ return false;
+ }
}
-
- return true;
+
+ return true;
}
void DeregisterRequestCtx(ICancelableContext* req) {
with_lock(Lock_) {
- Y_VERIFY(Requests_.erase(req), "Ctx is not registered");
+ Y_VERIFY(Requests_.erase(req), "Ctx is not registered");
}
}
@@ -313,15 +313,15 @@ protected:
using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService;
TGrpcAsyncService Service_;
- TGrpcAsyncService* GetService() override {
+ TGrpcAsyncService* GetService() override {
return &Service_;
}
private:
TAtomic ShuttingDown_ = 0;
- TAtomic GuardCount_ = 0;
+ TAtomic GuardCount_ = 0;
- bool SslServer_ = false;
+ bool SslServer_ = false;
bool NeedAuth_ = false;
THashSet<ICancelableContext*> Requests_;