diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:17 +0300 |
commit | 4b11037e5a7d071c63e3c966199fe7102e6462e4 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/client/grpc_client_low.h | |
parent | 17e20fa084178ddcb16255f974dbde74fb93608b (diff) | |
download | ydb-4b11037e5a7d071c63e3c966199fe7102e6462e4.tar.gz |
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.h')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.h | 1122 |
1 files changed, 561 insertions, 561 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h index e452be45a1..ab0a0627be 100644 --- a/library/cpp/grpc/client/grpc_client_low.h +++ b/library/cpp/grpc/client/grpc_client_low.h @@ -1,57 +1,57 @@ -#pragma once - -#include "grpc_common.h" - +#pragma once + +#include "grpc_common.h" + #include <util/thread/factory.h> -#include <grpc++/grpc++.h> +#include <grpc++/grpc++.h> #include <grpc++/support/async_stream.h> #include <grpc++/support/async_unary_call.h> - -#include <deque> -#include <typeindex> -#include <typeinfo> + +#include <deque> +#include <typeindex> +#include <typeinfo> #include <variant> -#include <vector> -#include <unordered_map> -#include <unordered_set> -#include <mutex> -#include <shared_mutex> - -/* - * This file contains low level logic for grpc - * This file should not be used in high level code without special reason - */ +#include <vector> +#include <unordered_map> +#include <unordered_set> +#include <mutex> +#include <shared_mutex> + +/* + * This file contains low level logic for grpc + * This file should not be used in high level code without special reason + */ namespace NGrpc { - -const size_t DEFAULT_NUM_THREADS = 2; - -//////////////////////////////////////////////////////////////////////////////// - -void EnableGRpcTracing(); - -//////////////////////////////////////////////////////////////////////////////// - -struct TTcpKeepAliveSettings { - bool Enabled; - size_t Idle; - size_t Count; - size_t Interval; -}; - -//////////////////////////////////////////////////////////////////////////////// - -// Common interface used to execute action from grpc cq routine -class IQueueClientEvent { -public: - virtual ~IQueueClientEvent() = default; - + +const size_t DEFAULT_NUM_THREADS = 2; + +//////////////////////////////////////////////////////////////////////////////// + +void EnableGRpcTracing(); + +//////////////////////////////////////////////////////////////////////////////// + +struct TTcpKeepAliveSettings { + bool Enabled; + size_t Idle; + size_t Count; + size_t Interval; +}; + +//////////////////////////////////////////////////////////////////////////////// + +// Common interface used to execute action from grpc cq routine +class IQueueClientEvent { +public: + virtual ~IQueueClientEvent() = default; + //! Execute an action defined by implementation - virtual bool Execute(bool ok) = 0; - - //! Finish and destroy event - virtual void Destroy() = 0; -}; - + virtual bool Execute(bool ok) = 0; + + //! Finish and destroy event + virtual void Destroy() = 0; +}; + // Implementation of IQueueClientEvent that reduces allocations template<class TSelf> class TQueueClientFixedEvent : private IQueueClientEvent { @@ -123,8 +123,8 @@ public: } }; -// Represents grpc status and error message string -struct TGrpcStatus { +// Represents grpc status and error message string +struct TGrpcStatus { TString Msg; TString Details; int GRpcStatusCode; @@ -167,36 +167,36 @@ struct TGrpcStatus { bool Ok() const { return !InternalError && GRpcStatusCode == grpc::StatusCode::OK; } -}; - -bool inline IsGRpcStatusGood(const TGrpcStatus& status) { +}; + +bool inline IsGRpcStatusGood(const TGrpcStatus& status) { return status.Ok(); -} - +} + // Response callback type - this callback will be called when request is finished -// (or after getting each chunk in case of streaming mode) -template<typename TResponse> -using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>; - +// (or after getting each chunk in case of streaming mode) +template<typename TResponse> +using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>; + template<typename TResponse> using TAdvancedResponseCallback = std::function<void (const grpc::ClientContext&, TGrpcStatus&&, TResponse&&)>; -// Call associated metadata -struct TCallMeta { - std::shared_ptr<grpc::CallCredentials> CallCredentials; - std::vector<std::pair<TString, TString>> Aux; +// Call associated metadata +struct TCallMeta { + std::shared_ptr<grpc::CallCredentials> CallCredentials; + std::vector<std::pair<TString, TString>> Aux; std::variant<TDuration, TInstant> Timeout; // timeout as duration from now or time point in future -}; - -class TGRpcRequestProcessorCommon { -protected: - void ApplyMeta(const TCallMeta& meta) { - for (const auto& rec : meta.Aux) { - Context.AddMetadata(rec.first, rec.second); - } - if (meta.CallCredentials) { - Context.set_credentials(meta.CallCredentials); - } +}; + +class TGRpcRequestProcessorCommon { +protected: + void ApplyMeta(const TCallMeta& meta) { + for (const auto& rec : meta.Aux) { + Context.AddMetadata(rec.first, rec.second); + } + if (meta.CallCredentials) { + Context.set_credentials(meta.CallCredentials); + } if (const TDuration* timeout = std::get_if<TDuration>(&meta.Timeout)) { if (*timeout) { auto deadline = gpr_time_add( @@ -208,10 +208,10 @@ protected: if (*deadline) { Context.set_deadline(gpr_time_from_micros(deadline->MicroSeconds(), GPR_CLOCK_MONOTONIC)); } - } - } + } + } - void GetInitialMetadata(std::unordered_multimap<TString, TString>* metadata) { + void GetInitialMetadata(std::unordered_multimap<TString, TString>* metadata) { for (const auto& [key, value] : Context.GetServerInitialMetadata()) { metadata->emplace( TString(key.begin(), key.end()), @@ -220,61 +220,61 @@ protected: } } - grpc::Status Status; - grpc::ClientContext Context; - std::shared_ptr<IQueueClientContext> LocalContext; -}; - -template<typename TStub, typename TRequest, typename TResponse> -class TSimpleRequestProcessor - : public TThrRefBase - , public IQueueClientEvent - , public TGRpcRequestProcessorCommon { - using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>; - template<typename> friend class TServiceConnection; -public: - using TPtr = TIntrusivePtr<TSimpleRequestProcessor>; - using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*); - + grpc::Status Status; + grpc::ClientContext Context; + std::shared_ptr<IQueueClientContext> LocalContext; +}; + +template<typename TStub, typename TRequest, typename TResponse> +class TSimpleRequestProcessor + : public TThrRefBase + , public IQueueClientEvent + , public TGRpcRequestProcessorCommon { + using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>; + template<typename> friend class TServiceConnection; +public: + using TPtr = TIntrusivePtr<TSimpleRequestProcessor>; + using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*); + explicit TSimpleRequestProcessor(TResponseCallback<TResponse>&& callback) : Callback_(std::move(callback)) - { } - - ~TSimpleRequestProcessor() { + { } + + ~TSimpleRequestProcessor() { if (!Replied_ && Callback_) { Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_)); Callback_ = nullptr; // free resources as early as possible - } - } - - bool Execute(bool ok) override { - { - std::unique_lock<std::mutex> guard(Mutex_); - LocalContext.reset(); + } + } + + bool Execute(bool ok) override { + { + std::unique_lock<std::mutex> guard(Mutex_); + LocalContext.reset(); } TGrpcStatus status; if (ok) { - status = Status; + status = Status; } else { status = TGrpcStatus::Internal("Unexpected error"); - } - Replied_ = true; - Callback_(std::move(status), std::move(Reply_)); + } + Replied_ = true; + Callback_(std::move(status), std::move(Reply_)); Callback_ = nullptr; // free resources as early as possible - return false; - } - - void Destroy() override { + return false; + } + + void Destroy() override { UnRef(); - } - -private: + } + +private: IQueueClientEvent* FinishedEvent() { Ref(); return this; - } - - void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) { + } + + void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) { auto context = provider->CreateContext(); if (!context) { Replied_ = true; @@ -282,11 +282,11 @@ private: Callback_ = nullptr; return; } - { - std::unique_lock<std::mutex> guard(Mutex_); - LocalContext = context; - Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue()); - Reader_->Finish(&Reply_, &Status, FinishedEvent()); + { + std::unique_lock<std::mutex> guard(Mutex_); + LocalContext = context; + Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue()); + Reader_->Finish(&Reply_, &Status, FinishedEvent()); } context->SubscribeStop([self = TPtr(this)] { self->Stop(); @@ -294,17 +294,17 @@ private: } void Stop() { - Context.TryCancel(); + Context.TryCancel(); } TResponseCallback<TResponse> Callback_; - TResponse Reply_; - std::mutex Mutex_; - TAsyncReaderPtr Reader_; - - bool Replied_ = false; -}; - + TResponse Reply_; + std::mutex Mutex_; + TAsyncReaderPtr Reader_; + + bool Replied_ = false; +}; + template<typename TStub, typename TRequest, typename TResponse> class TAdvancedRequestProcessor : public TThrRefBase @@ -328,8 +328,8 @@ public: } bool Execute(bool ok) override { - { - std::unique_lock<std::mutex> guard(Mutex_); + { + std::unique_lock<std::mutex> guard(Mutex_); LocalContext.reset(); } TGrpcStatus status; @@ -362,8 +362,8 @@ private: Callback_ = nullptr; return; } - { - std::unique_lock<std::mutex> guard(Mutex_); + { + std::unique_lock<std::mutex> guard(Mutex_); LocalContext = context; Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue()); Reader_->Finish(&Reply_, &Status, FinishedEvent()); @@ -379,16 +379,16 @@ private: TAdvancedResponseCallback<TResponse> Callback_; TResponse Reply_; - std::mutex Mutex_; + std::mutex Mutex_; TAsyncReaderPtr Reader_; bool Replied_ = false; }; -template<class TResponse> -class IStreamRequestReadProcessor : public TThrRefBase { +template<class TResponse> +class IStreamRequestReadProcessor : public TThrRefBase { public: - using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>; + using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>; using TReadCallback = std::function<void(TGrpcStatus&&)>; /** @@ -399,7 +399,7 @@ public: /** * Scheduled initial server metadata read from the stream */ - virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0; + virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0; /** * Scheduled response read from the stream @@ -420,72 +420,72 @@ public: virtual void AddFinishedCallback(TReadCallback callback) = 0; }; -template<class TRequest, class TResponse> -class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> { -public: - using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>; +template<class TRequest, class TResponse> +class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> { +public: + using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>; using TWriteCallback = std::function<void(TGrpcStatus&&)>; - - /** - * Scheduled request write to the stream - */ + + /** + * Scheduled request write to the stream + */ virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0; -}; - -class TGRpcKeepAliveSocketMutator; - -// Class to hold stubs allocated on channel. -// It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit - -// Stub holds shared_ptr<ChannelInterface>, so we can destroy this holder even if -// request processor using stub -class TStubsHolder : public TNonCopyable { - using TypeInfoRef = std::reference_wrapper<const std::type_info>; - - struct THasher { - std::size_t operator()(TypeInfoRef code) const { - return code.get().hash_code(); - } - }; - - struct TEqualTo { - bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const { - return lhs.get() == rhs.get(); - } - }; -public: - TStubsHolder(std::shared_ptr<grpc::ChannelInterface> channel) - : ChannelInterface_(channel) - {} - - // Returns true if channel can't be used to perform request now - bool IsChannelBroken() const { - auto state = ChannelInterface_->GetState(false); - return state == GRPC_CHANNEL_SHUTDOWN || - state == GRPC_CHANNEL_TRANSIENT_FAILURE; - } - - template<typename TStub> - std::shared_ptr<TStub> GetOrCreateStub() { - const auto& stubId = typeid(TStub); - { - std::shared_lock readGuard(RWMutex_); - const auto it = Stubs_.find(stubId); - if (it != Stubs_.end()) { - return std::static_pointer_cast<TStub>(it->second); - } - } - { - std::unique_lock writeGuard(RWMutex_); - auto it = Stubs_.emplace(stubId, nullptr); - if (!it.second) { - return std::static_pointer_cast<TStub>(it.first->second); - } else { - it.first->second = std::make_shared<TStub>(ChannelInterface_); - return std::static_pointer_cast<TStub>(it.first->second); - } - } - } +}; + +class TGRpcKeepAliveSocketMutator; + +// Class to hold stubs allocated on channel. +// It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit + +// Stub holds shared_ptr<ChannelInterface>, so we can destroy this holder even if +// request processor using stub +class TStubsHolder : public TNonCopyable { + using TypeInfoRef = std::reference_wrapper<const std::type_info>; + + struct THasher { + std::size_t operator()(TypeInfoRef code) const { + return code.get().hash_code(); + } + }; + + struct TEqualTo { + bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const { + return lhs.get() == rhs.get(); + } + }; +public: + TStubsHolder(std::shared_ptr<grpc::ChannelInterface> channel) + : ChannelInterface_(channel) + {} + + // Returns true if channel can't be used to perform request now + bool IsChannelBroken() const { + auto state = ChannelInterface_->GetState(false); + return state == GRPC_CHANNEL_SHUTDOWN || + state == GRPC_CHANNEL_TRANSIENT_FAILURE; + } + + template<typename TStub> + std::shared_ptr<TStub> GetOrCreateStub() { + const auto& stubId = typeid(TStub); + { + std::shared_lock readGuard(RWMutex_); + const auto it = Stubs_.find(stubId); + if (it != Stubs_.end()) { + return std::static_pointer_cast<TStub>(it->second); + } + } + { + std::unique_lock writeGuard(RWMutex_); + auto it = Stubs_.emplace(stubId, nullptr); + if (!it.second) { + return std::static_pointer_cast<TStub>(it.first->second); + } else { + it.first->second = std::make_shared<TStub>(ChannelInterface_); + return std::static_pointer_cast<TStub>(it.first->second); + } + } + } const TInstant& GetLastUseTime() const { return LastUsed_; @@ -494,76 +494,76 @@ public: void SetLastUseTime(const TInstant& time) { LastUsed_ = time; } -private: +private: TInstant LastUsed_ = Now(); - std::shared_mutex RWMutex_; - std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_; - std::shared_ptr<grpc::ChannelInterface> ChannelInterface_; -}; - -class TChannelPool { -public: + std::shared_mutex RWMutex_; + std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_; + std::shared_ptr<grpc::ChannelInterface> ChannelInterface_; +}; + +class TChannelPool { +public: TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime = TDuration::Minutes(6)); - //Allows to CreateStub from TStubsHolder under lock - //The callback will be called just during GetStubsHolderLocked call - void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb); - void DeleteChannel(const TString& channelId); + //Allows to CreateStub from TStubsHolder under lock + //The callback will be called just during GetStubsHolderLocked call + void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb); + void DeleteChannel(const TString& channelId); void DeleteExpiredStubsHolders(); -private: - std::shared_mutex RWMutex_; - std::unordered_map<TString, TStubsHolder> Pool_; +private: + std::shared_mutex RWMutex_; + std::unordered_map<TString, TStubsHolder> Pool_; std::multimap<TInstant, TString> LastUsedQueue_; - TTcpKeepAliveSettings TcpKeepAliveSettings_; + TTcpKeepAliveSettings TcpKeepAliveSettings_; TDuration ExpireTime_; TDuration UpdateReUseTime_; void EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId); -}; - -template<class TResponse> -using TStreamReaderCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadProcessor<TResponse>::TPtr)>; - -template<typename TStub, typename TRequest, typename TResponse> -class TStreamRequestReadProcessor - : public IStreamRequestReadProcessor<TResponse> - , public TGRpcRequestProcessorCommon { - template<typename> friend class TServiceConnection; -public: - using TSelf = TStreamRequestReadProcessor; - using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncReader<TResponse>>; - using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*, void*); - using TReaderCallback = TStreamReaderCallback<TResponse>; - using TPtr = TIntrusivePtr<TSelf>; - using TBase = IStreamRequestReadProcessor<TResponse>; - using TReadCallback = typename TBase::TReadCallback; - - explicit TStreamRequestReadProcessor(TReaderCallback&& callback) - : Callback(std::move(callback)) - { - Y_VERIFY(Callback, "Missing connected callback"); - } - - void Cancel() override { - Context.TryCancel(); - - { - std::unique_lock<std::mutex> guard(Mutex); - Cancelled = true; +}; + +template<class TResponse> +using TStreamReaderCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadProcessor<TResponse>::TPtr)>; + +template<typename TStub, typename TRequest, typename TResponse> +class TStreamRequestReadProcessor + : public IStreamRequestReadProcessor<TResponse> + , public TGRpcRequestProcessorCommon { + template<typename> friend class TServiceConnection; +public: + using TSelf = TStreamRequestReadProcessor; + using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncReader<TResponse>>; + using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*, void*); + using TReaderCallback = TStreamReaderCallback<TResponse>; + using TPtr = TIntrusivePtr<TSelf>; + using TBase = IStreamRequestReadProcessor<TResponse>; + using TReadCallback = typename TBase::TReadCallback; + + explicit TStreamRequestReadProcessor(TReaderCallback&& callback) + : Callback(std::move(callback)) + { + Y_VERIFY(Callback, "Missing connected callback"); + } + + void Cancel() override { + Context.TryCancel(); + + { + std::unique_lock<std::mutex> guard(Mutex); + Cancelled = true; if (Started && !ReadFinished) { - if (!ReadActive) { - ReadFinished = true; - } - if (ReadFinished) { - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } - } - } - } - - void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { + if (!ReadActive) { + ReadFinished = true; + } + if (ReadFinished) { + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } + } + } + } + + void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); if (!Finished && !HasInitialMetadata) { ReadActive = true; @@ -588,66 +588,66 @@ public: callback(std::move(status)); } - void Read(TResponse* message, TReadCallback callback) override { - TGrpcStatus status; - - { - std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); - if (!Finished) { - ReadActive = true; - ReadCallback = std::move(callback); - if (!ReadFinished) { - Stream->Read(message, OnReadDoneTag.Prepare()); - } - return; - } - if (FinishedOk) { - status = Status; - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); - } - - callback(std::move(status)); - } - - void Finish(TReadCallback callback) override { - TGrpcStatus status; - - { - std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); - if (!Finished) { - ReadActive = true; - FinishCallback = std::move(callback); - if (!ReadFinished) { - ReadFinished = true; - } - Stream->Finish(&Status, OnFinishedTag.Prepare()); - return; - } - if (FinishedOk) { - status = Status; - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - callback(std::move(status)); - } + void Read(TResponse* message, TReadCallback callback) override { + TGrpcStatus status; + + { + std::unique_lock<std::mutex> guard(Mutex); + Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); + if (!Finished) { + ReadActive = true; + ReadCallback = std::move(callback); + if (!ReadFinished) { + Stream->Read(message, OnReadDoneTag.Prepare()); + } + return; + } + if (FinishedOk) { + status = Status; + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + if (status.Ok()) { + status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); + } + + callback(std::move(status)); + } + + void Finish(TReadCallback callback) override { + TGrpcStatus status; + + { + std::unique_lock<std::mutex> guard(Mutex); + Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); + if (!Finished) { + ReadActive = true; + FinishCallback = std::move(callback); + if (!ReadFinished) { + ReadFinished = true; + } + Stream->Finish(&Status, OnFinishedTag.Prepare()); + return; + } + if (FinishedOk) { + status = Status; + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + callback(std::move(status)); + } void AddFinishedCallback(TReadCallback callback) override { Y_VERIFY(callback, "Unexpected empty callback"); TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); if (!Finished) { FinishedCallbacks.emplace_back().swap(callback); return; @@ -665,103 +665,103 @@ public: callback(std::move(status)); } -private: - void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) { - auto context = provider->CreateContext(); - if (!context) { - auto callback = std::move(Callback); - TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down"); - callback(std::move(status), nullptr); - return; - } - - { - std::unique_lock<std::mutex> guard(Mutex); - LocalContext = context; - Stream = (stub.*asyncRequest)(&Context, request, context->CompletionQueue(), OnStartDoneTag.Prepare()); - } - - context->SubscribeStop([self = TPtr(this)] { - self->Cancel(); - }); - } - - void OnReadDone(bool ok) { - TGrpcStatus status; - TReadCallback callback; - std::unordered_multimap<TString, TString>* initialMetadata = nullptr; - - { - std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(ReadActive, "Unexpected Read done callback"); - Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); - - if (!ok || Cancelled) { - ReadFinished = true; - - Stream->Finish(&Status, OnFinishedTag.Prepare()); - if (!ok) { - // Keep ReadActive=true, so callback is called - // after the call is finished with an error - return; - } - } - - callback = std::move(ReadCallback); - ReadCallback = nullptr; - ReadActive = false; +private: + void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) { + auto context = provider->CreateContext(); + if (!context) { + auto callback = std::move(Callback); + TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down"); + callback(std::move(status), nullptr); + return; + } + + { + std::unique_lock<std::mutex> guard(Mutex); + LocalContext = context; + Stream = (stub.*asyncRequest)(&Context, request, context->CompletionQueue(), OnStartDoneTag.Prepare()); + } + + context->SubscribeStop([self = TPtr(this)] { + self->Cancel(); + }); + } + + void OnReadDone(bool ok) { + TGrpcStatus status; + TReadCallback callback; + std::unordered_multimap<TString, TString>* initialMetadata = nullptr; + + { + std::unique_lock<std::mutex> guard(Mutex); + Y_VERIFY(ReadActive, "Unexpected Read done callback"); + Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); + + if (!ok || Cancelled) { + ReadFinished = true; + + Stream->Finish(&Status, OnFinishedTag.Prepare()); + if (!ok) { + // Keep ReadActive=true, so callback is called + // after the call is finished with an error + return; + } + } + + callback = std::move(ReadCallback); + ReadCallback = nullptr; + ReadActive = false; initialMetadata = InitialMetadata; InitialMetadata = nullptr; HasInitialMetadata = true; - } - + } + if (initialMetadata) { GetInitialMetadata(initialMetadata); } - callback(std::move(status)); - } - - void OnStartDone(bool ok) { - TReaderCallback callback; - - { - std::unique_lock<std::mutex> guard(Mutex); + callback(std::move(status)); + } + + void OnStartDone(bool ok) { + TReaderCallback callback; + + { + std::unique_lock<std::mutex> guard(Mutex); Started = true; if (!ok || Cancelled) { - ReadFinished = true; - Stream->Finish(&Status, OnFinishedTag.Prepare()); - return; - } - callback = std::move(Callback); - Callback = nullptr; - } - - callback({ }, typename TBase::TPtr(this)); - } - - void OnFinished(bool ok) { - TGrpcStatus status; - std::vector<TReadCallback> finishedCallbacks; + ReadFinished = true; + Stream->Finish(&Status, OnFinishedTag.Prepare()); + return; + } + callback = std::move(Callback); + Callback = nullptr; + } + + callback({ }, typename TBase::TPtr(this)); + } + + void OnFinished(bool ok) { + TGrpcStatus status; + std::vector<TReadCallback> finishedCallbacks; TReaderCallback startCallback; - TReadCallback readCallback; - TReadCallback finishCallback; - - { - std::unique_lock<std::mutex> guard(Mutex); - - Finished = true; - FinishedOk = ok; - LocalContext.reset(); - - if (ok) { - status = Status; + TReadCallback readCallback; + TReadCallback finishCallback; + + { + std::unique_lock<std::mutex> guard(Mutex); + + Finished = true; + FinishedOk = ok; + LocalContext.reset(); + + if (ok) { + status = Status; } else if (Cancelled) { status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + finishedCallbacks.swap(FinishedCallbacks); if (Callback) { @@ -769,68 +769,68 @@ private: startCallback = std::move(Callback); Callback = nullptr; } else if (ReadActive) { - if (ReadCallback) { - readCallback = std::move(ReadCallback); - ReadCallback = nullptr; - } else { - finishCallback = std::move(FinishCallback); - FinishCallback = nullptr; - } - ReadActive = false; - } - } - + if (ReadCallback) { + readCallback = std::move(ReadCallback); + ReadCallback = nullptr; + } else { + finishCallback = std::move(FinishCallback); + FinishCallback = nullptr; + } + ReadActive = false; + } + } + for (auto& finishedCallback : finishedCallbacks) { auto statusCopy = status; finishedCallback(std::move(statusCopy)); } if (startCallback) { - if (status.Ok()) { + if (status.Ok()) { status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure"); } startCallback(std::move(status), nullptr); } else if (readCallback) { if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); - } - readCallback(std::move(status)); - } else if (finishCallback) { - finishCallback(std::move(status)); - } - } - - TReaderCallback Callback; - TAsyncReaderPtr Stream; - using TFixedEvent = TQueueClientFixedEvent<TSelf>; - std::mutex Mutex; - TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone }; - TFixedEvent OnStartDoneTag = { this, &TSelf::OnStartDone }; - TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished }; - - TReadCallback ReadCallback; - TReadCallback FinishCallback; - std::vector<TReadCallback> FinishedCallbacks; - std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; + status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); + } + readCallback(std::move(status)); + } else if (finishCallback) { + finishCallback(std::move(status)); + } + } + + TReaderCallback Callback; + TAsyncReaderPtr Stream; + using TFixedEvent = TQueueClientFixedEvent<TSelf>; + std::mutex Mutex; + TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone }; + TFixedEvent OnStartDoneTag = { this, &TSelf::OnStartDone }; + TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished }; + + TReadCallback ReadCallback; + TReadCallback FinishCallback; + std::vector<TReadCallback> FinishedCallbacks; + std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; bool Started = false; bool HasInitialMetadata = false; - bool ReadActive = false; - bool ReadFinished = false; - bool Finished = false; - bool Cancelled = false; - bool FinishedOk = false; -}; - + bool ReadActive = false; + bool ReadFinished = false; + bool Finished = false; + bool Cancelled = false; + bool FinishedOk = false; +}; + template<class TRequest, class TResponse> -using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>; +using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>; template<class TStub, class TRequest, class TResponse> -class TStreamRequestReadWriteProcessor - : public IStreamRequestReadWriteProcessor<TRequest, TResponse> - , public TGRpcRequestProcessorCommon { +class TStreamRequestReadWriteProcessor + : public IStreamRequestReadWriteProcessor<TRequest, TResponse> + , public TGRpcRequestProcessorCommon { public: - using TSelf = TStreamRequestReadWriteProcessor; - using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>; + using TSelf = TStreamRequestReadWriteProcessor; + using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>; using TPtr = TIntrusivePtr<TSelf>; using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>; using TReadCallback = typename TBase::TReadCallback; @@ -838,7 +838,7 @@ public: using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>; using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*); - explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback) + explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback) : ConnectedCallback(std::move(callback)) { Y_VERIFY(ConnectedCallback, "Missing connected callback"); @@ -847,8 +847,8 @@ public: void Cancel() override { Context.TryCancel(); - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Cancelled = true; if (Started && !(ReadFinished && WriteFinished)) { if (!ReadActive) { @@ -867,8 +867,8 @@ public: void Write(TRequest&& request, TWriteCallback callback) override { TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); if (Cancelled || ReadFinished || WriteFinished) { status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped"); } else if (WriteActive) { @@ -887,11 +887,11 @@ public: } } - void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { + void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); if (!Finished && !HasInitialMetadata) { ReadActive = true; @@ -919,8 +919,8 @@ public: void Read(TResponse* message, TReadCallback callback) override { TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); if (!Finished) { ReadActive = true; @@ -947,8 +947,8 @@ public: void Finish(TReadCallback callback) override { TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); if (!Finished) { ReadActive = true; @@ -979,8 +979,8 @@ public: TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); if (!Finished) { FinishedCallbacks.emplace_back().swap(callback); return; @@ -1010,8 +1010,8 @@ private: return; } - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); LocalContext = context; Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare()); } @@ -1025,8 +1025,8 @@ private: void OnConnected(bool ok) { TConnectedCallback callback; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Started = true; if (!ok || Cancelled) { ReadFinished = true; @@ -1045,10 +1045,10 @@ private: void OnReadDone(bool ok) { TGrpcStatus status; TReadCallback callback; - std::unordered_multimap<TString, TString>* initialMetadata = nullptr; + std::unordered_multimap<TString, TString>* initialMetadata = nullptr; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(ReadActive, "Unexpected Read done callback"); Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); @@ -1085,8 +1085,8 @@ private: void OnWriteDone(bool ok) { TWriteCallback okCallback; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(WriteActive, "Unexpected Write done callback"); Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag"); @@ -1107,7 +1107,7 @@ private: if (ReadFinished) { Stream->Finish(&Status, OnFinishedTag.Prepare()); } - } else if (!WriteQueue.empty()) { + } else if (!WriteQueue.empty()) { WriteCallback.swap(WriteQueue.front().Callback); Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare()); WriteQueue.pop_front(); @@ -1127,14 +1127,14 @@ private: void OnFinished(bool ok) { TGrpcStatus status; - std::deque<TWriteItem> writesDropped; - std::vector<TReadCallback> finishedCallbacks; + std::deque<TWriteItem> writesDropped; + std::vector<TReadCallback> finishedCallbacks; TConnectedCallback connectedCallback; TReadCallback readCallback; TReadCallback finishCallback; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Finished = true; FinishedOk = ok; LocalContext.reset(); @@ -1211,15 +1211,15 @@ private: TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished }; private: - std::mutex Mutex; + std::mutex Mutex; TAsyncReaderWriterPtr Stream; TConnectedCallback ConnectedCallback; TReadCallback ReadCallback; TReadCallback FinishCallback; - std::vector<TReadCallback> FinishedCallbacks; - std::deque<TWriteItem> WriteQueue; + std::vector<TReadCallback> FinishedCallbacks; + std::deque<TWriteItem> WriteQueue; TWriteCallback WriteCallback; - std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; + std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; bool Started = false; bool HasInitialMetadata = false; bool ReadActive = false; @@ -1231,30 +1231,30 @@ private: bool FinishedOk = false; }; -class TGRpcClientLow; - -template<typename TGRpcService> -class TServiceConnection { - using TStub = typename TGRpcService::Stub; - friend class TGRpcClientLow; - -public: - /* - * Start simple request - */ - template<typename TRequest, typename TResponse> - void DoRequest(const TRequest& request, +class TGRpcClientLow; + +template<typename TGRpcService> +class TServiceConnection { + using TStub = typename TGRpcService::Stub; + friend class TGRpcClientLow; + +public: + /* + * Start simple request + */ + template<typename TRequest, typename TResponse> + void DoRequest(const TRequest& request, TResponseCallback<TResponse> callback, typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, - const TCallMeta& metas = { }, + const TCallMeta& metas = { }, IQueueClientContextProvider* provider = nullptr) - { - auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback)); - processor->ApplyMeta(metas); - processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_); - } - - /* + { + auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback)); + processor->ApplyMeta(metas); + processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_); + } + + /* * Start simple request */ template<typename TRequest, typename TResponse> @@ -1270,58 +1270,58 @@ public: } /* - * Start bidirectional streamming - */ - template<typename TRequest, typename TResponse> - void DoStreamRequest(TStreamConnectedCallback<TRequest, TResponse> callback, - typename TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, - const TCallMeta& metas = { }, - IQueueClientContextProvider* provider = nullptr) - { - auto processor = MakeIntrusive<TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>>(std::move(callback)); - processor->ApplyMeta(metas); - processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_); - } - - /* - * Start streaming response reading (one request, many responses) - */ + * Start bidirectional streamming + */ + template<typename TRequest, typename TResponse> + void DoStreamRequest(TStreamConnectedCallback<TRequest, TResponse> callback, + typename TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, + const TCallMeta& metas = { }, + IQueueClientContextProvider* provider = nullptr) + { + auto processor = MakeIntrusive<TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>>(std::move(callback)); + processor->ApplyMeta(metas); + processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_); + } + + /* + * Start streaming response reading (one request, many responses) + */ template<typename TRequest, typename TResponse> - void DoStreamRequest(const TRequest& request, - TStreamReaderCallback<TResponse> callback, - typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, + void DoStreamRequest(const TRequest& request, + TStreamReaderCallback<TResponse> callback, + typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, const TCallMeta& metas = { }, IQueueClientContextProvider* provider = nullptr) { - auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback)); + auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback)); processor->ApplyMeta(metas); - processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_); + processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_); } -private: - TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci, +private: + TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci, + IQueueClientContextProvider* provider) + : Stub_(TGRpcService::NewStub(ci)) + , Provider_(provider) + { + Y_VERIFY(Provider_, "Connection does not have a queue provider"); + } + + TServiceConnection(TStubsHolder& holder, IQueueClientContextProvider* provider) - : Stub_(TGRpcService::NewStub(ci)) + : Stub_(holder.GetOrCreateStub<TStub>()) , Provider_(provider) { Y_VERIFY(Provider_, "Connection does not have a queue provider"); } - TServiceConnection(TStubsHolder& holder, - IQueueClientContextProvider* provider) - : Stub_(holder.GetOrCreateStub<TStub>()) - , Provider_(provider) - { - Y_VERIFY(Provider_, "Connection does not have a queue provider"); - } - - std::shared_ptr<TStub> Stub_; + std::shared_ptr<TStub> Stub_; IQueueClientContextProvider* Provider_; -}; - -class TGRpcClientLow +}; + +class TGRpcClientLow : public IQueueClientContextProvider -{ +{ class TContextImpl; friend class TContextImpl; @@ -1331,10 +1331,10 @@ class TGRpcClientLow STOP_EXPLICIT = 2, }; -public: +public: explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false); - ~TGRpcClientLow(); - + ~TGRpcClientLow(); + // Tries to stop all currently running requests (via their stop callbacks) // Will shutdown CQ and drain events once all requests have finished // No new requests may be started after this call @@ -1357,24 +1357,24 @@ public: IQueueClientContextPtr CreateContext() override; - template<typename TGRpcService> - std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) { - return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this)); - } - - template<typename TGRpcService> - std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) { - return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this)); - } - + template<typename TGRpcService> + std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) { + return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this)); + } + + template<typename TGRpcService> + std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) { + return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this)); + } + // Tests only, not thread-safe void AddWorkerThreadForTest(); -private: +private: using IThreadRef = std::unique_ptr<IThreadFactory::IThread>; using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>; - void Init(size_t numWorkerThread); - + void Init(size_t numWorkerThread); + inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); } inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); } @@ -1385,15 +1385,15 @@ private: private: bool UseCompletionQueuePerThread_; - std::vector<CompletionQueueRef> CQS_; - std::vector<IThreadRef> WorkerThreads_; + std::vector<CompletionQueueRef> CQS_; + std::vector<IThreadRef> WorkerThreads_; TAtomic CqState_ = -1; - std::mutex Mtx_; - std::condition_variable ContextsEmpty_; - std::unordered_set<TContextImpl*> Contexts_; + std::mutex Mtx_; + std::condition_variable ContextsEmpty_; + std::unordered_set<TContextImpl*> Contexts_; + + std::mutex JoinMutex_; +}; - std::mutex JoinMutex_; -}; - } // namespace NGRpc |