diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:16 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:16 +0300 |
commit | 17e20fa084178ddcb16255f974dbde74fb93608b (patch) | |
tree | 39605336c0b4d33928df69a256102c515fdf6ff5 /library/cpp/grpc/client | |
parent | 97df5ca7413550bf233fc6c7210e292fca0a51af (diff) | |
download | ydb-17e20fa084178ddcb16255f974dbde74fb93608b.tar.gz |
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc/client')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.cpp | 414 | ||||
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.h | 1122 | ||||
-rw-r--r-- | library/cpp/grpc/client/grpc_common.h | 88 | ||||
-rw-r--r-- | library/cpp/grpc/client/ya.make | 4 |
4 files changed, 814 insertions, 814 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp index 73cc908ef8..ee9e997fa7 100644 --- a/library/cpp/grpc/client/grpc_client_low.cpp +++ b/library/cpp/grpc/client/grpc_client_low.cpp @@ -1,164 +1,164 @@ -#include "grpc_client_low.h" -#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h> -#include <contrib/libs/grpc/include/grpc/support/log.h> - -#include <library/cpp/containers/stack_vector/stack_vec.h> - -#include <util/string/printf.h> +#include "grpc_client_low.h" +#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h> +#include <contrib/libs/grpc/include/grpc/support/log.h> + +#include <library/cpp/containers/stack_vector/stack_vec.h> + +#include <util/string/printf.h> #include <util/system/thread.h> #include <util/random/random.h> - -#if !defined(_WIN32) && !defined(_WIN64) -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#endif - + +#if !defined(_WIN32) && !defined(_WIN64) +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#endif + namespace NGrpc { - -void EnableGRpcTracing() { - grpc_tracer_set_enabled("tcp", true); - grpc_tracer_set_enabled("client_channel", true); - grpc_tracer_set_enabled("channel", true); - grpc_tracer_set_enabled("api", true); - grpc_tracer_set_enabled("connectivity_state", true); - grpc_tracer_set_enabled("handshaker", true); - grpc_tracer_set_enabled("http", true); - grpc_tracer_set_enabled("http2_stream_state", true); - grpc_tracer_set_enabled("op_failure", true); - grpc_tracer_set_enabled("timer", true); - gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); -} - -class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator { -public: - TGRpcKeepAliveSocketMutator(int idle, int count, int interval) - : Idle_(idle) - , Count_(count) - , Interval_(interval) - { - grpc_socket_mutator_init(this, &VTable); - } -private: - static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) { - return static_cast<TGRpcKeepAliveSocketMutator*>(mutator); - } - - template<typename TVal> - bool SetOption(int fd, int level, int optname, const TVal& value) { - return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0; - } - bool SetOption(int fd) { - if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) { - Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl; - return false; - } -#ifdef _linux_ - if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) { - Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl; - return false; - } - if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) { - Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl; - return false; - } - if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) { - Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl; - return false; - } -#endif - return true; - } - static bool Mutate(int fd, grpc_socket_mutator* mutator) { - auto self = Cast(mutator); - return self->SetOption(fd); - } - static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) { - const auto* selfA = Cast(a); - const auto* selfB = Cast(b); - auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_); - auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_); - return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0; - } - static void Destroy(grpc_socket_mutator* mutator) { - delete Cast(mutator); - } - - static grpc_socket_mutator_vtable VTable; - const int Idle_; - const int Count_; - const int Interval_; -}; - -grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable = - { - &TGRpcKeepAliveSocketMutator::Mutate, - &TGRpcKeepAliveSocketMutator::Compare, - &TGRpcKeepAliveSocketMutator::Destroy - }; - + +void EnableGRpcTracing() { + grpc_tracer_set_enabled("tcp", true); + grpc_tracer_set_enabled("client_channel", true); + grpc_tracer_set_enabled("channel", true); + grpc_tracer_set_enabled("api", true); + grpc_tracer_set_enabled("connectivity_state", true); + grpc_tracer_set_enabled("handshaker", true); + grpc_tracer_set_enabled("http", true); + grpc_tracer_set_enabled("http2_stream_state", true); + grpc_tracer_set_enabled("op_failure", true); + grpc_tracer_set_enabled("timer", true); + gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); +} + +class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator { +public: + TGRpcKeepAliveSocketMutator(int idle, int count, int interval) + : Idle_(idle) + , Count_(count) + , Interval_(interval) + { + grpc_socket_mutator_init(this, &VTable); + } +private: + static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) { + return static_cast<TGRpcKeepAliveSocketMutator*>(mutator); + } + + template<typename TVal> + bool SetOption(int fd, int level, int optname, const TVal& value) { + return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0; + } + bool SetOption(int fd) { + if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) { + Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl; + return false; + } +#ifdef _linux_ + if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) { + Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl; + return false; + } + if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) { + Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl; + return false; + } + if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) { + Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl; + return false; + } +#endif + return true; + } + static bool Mutate(int fd, grpc_socket_mutator* mutator) { + auto self = Cast(mutator); + return self->SetOption(fd); + } + static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) { + const auto* selfA = Cast(a); + const auto* selfB = Cast(b); + auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_); + auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_); + return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0; + } + static void Destroy(grpc_socket_mutator* mutator) { + delete Cast(mutator); + } + + static grpc_socket_mutator_vtable VTable; + const int Idle_; + const int Count_; + const int Interval_; +}; + +grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable = + { + &TGRpcKeepAliveSocketMutator::Mutate, + &TGRpcKeepAliveSocketMutator::Compare, + &TGRpcKeepAliveSocketMutator::Destroy + }; + TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime) - : TcpKeepAliveSettings_(tcpKeepAliveSettings) + : TcpKeepAliveSettings_(tcpKeepAliveSettings) , ExpireTime_(expireTime) , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20)) -{} - -void TChannelPool::GetStubsHolderLocked( - const TString& channelId, - const TGRpcClientConfig& config, - std::function<void(TStubsHolder&)> cb) -{ - { - std::shared_lock readGuard(RWMutex_); - const auto it = Pool_.find(channelId); - if (it != Pool_.end()) { +{} + +void TChannelPool::GetStubsHolderLocked( + const TString& channelId, + const TGRpcClientConfig& config, + std::function<void(TStubsHolder&)> cb) +{ + { + std::shared_lock readGuard(RWMutex_); + const auto it = Pool_.find(channelId); + if (it != Pool_.end()) { if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) { - return cb(it->second); - } - } - } - { - std::unique_lock writeGuard(RWMutex_); - { - auto it = Pool_.find(channelId); - if (it != Pool_.end()) { - if (!it->second.IsChannelBroken()) { + return cb(it->second); + } + } + } + { + std::unique_lock writeGuard(RWMutex_); + { + auto it = Pool_.find(channelId); + if (it != Pool_.end()) { + if (!it->second.IsChannelBroken()) { EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); auto now = Now(); LastUsedQueue_.emplace(now, channelId); it->second.SetLastUseTime(now); - return cb(it->second); - } else { - // This channel can't be used. Remove from pool to create new one + return cb(it->second); + } else { + // This channel can't be used. Remove from pool to create new one EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); - Pool_.erase(it); - } - } - } - TGRpcKeepAliveSocketMutator* mutator = nullptr; - // will be destroyed inside grpc - if (TcpKeepAliveSettings_.Enabled) { - mutator = new TGRpcKeepAliveSocketMutator( - TcpKeepAliveSettings_.Idle, - TcpKeepAliveSettings_.Count, - TcpKeepAliveSettings_.Interval - ); - } - cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second); + Pool_.erase(it); + } + } + } + TGRpcKeepAliveSocketMutator* mutator = nullptr; + // will be destroyed inside grpc + if (TcpKeepAliveSettings_.Enabled) { + mutator = new TGRpcKeepAliveSocketMutator( + TcpKeepAliveSettings_.Idle, + TcpKeepAliveSettings_.Count, + TcpKeepAliveSettings_.Interval + ); + } + cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second); LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId); - } -} - -void TChannelPool::DeleteChannel(const TString& channelId) { - std::unique_lock writeLock(RWMutex_); + } +} + +void TChannelPool::DeleteChannel(const TString& channelId) { + std::unique_lock writeLock(RWMutex_); auto poolIt = Pool_.find(channelId); if (poolIt != Pool_.end()) { EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId); Pool_.erase(poolIt); } -} - +} + void TChannelPool::DeleteExpiredStubsHolders() { std::unique_lock writeLock(RWMutex_); auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_); @@ -392,7 +392,7 @@ private: // Some children are stored inline, others are in a set std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } }; - std::unordered_set<TContextImpl*> Children; + std::unordered_set<TContextImpl*> Children; // Single callback is stored without extra allocations TStackVec<TCallback, 1> Callbacks; @@ -404,10 +404,10 @@ private: TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread) : UseCompletionQueuePerThread_(useCompletionQueuePerThread) { - Init(numWorkerThread); -} - -void TGRpcClientLow::Init(size_t numWorkerThread) { + Init(numWorkerThread); +} + +void TGRpcClientLow::Init(size_t numWorkerThread) { SetCqState(WORKING); if (UseCompletionQueuePerThread_) { for (size_t i = 0; i < numWorkerThread; i++) { @@ -425,9 +425,9 @@ void TGRpcClientLow::Init(size_t numWorkerThread) { PullEvents(cq); }).Release()); } - } -} - + } +} + void TGRpcClientLow::AddWorkerThreadForTest() { if (UseCompletionQueuePerThread_) { CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); @@ -453,17 +453,17 @@ void TGRpcClientLow::Stop(bool wait) { if (wait) { WaitInternal(); - } -} - + } +} + void TGRpcClientLow::StopInternal(bool silent) { bool shutdown; TVector<TContextImpl::TContextPtr> cancelQueue; - { - std::unique_lock<std::mutex> guard(Mtx_); - + { + std::unique_lock<std::mutex> guard(Mtx_); + auto allowStateChange = [&]() { switch (GetCqState()) { case WORKING: @@ -484,7 +484,7 @@ void TGRpcClientLow::StopInternal(bool silent) { SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT); - if (!silent && !Contexts_.empty()) { + if (!silent && !Contexts_.empty()) { cancelQueue.reserve(Contexts_.size()); for (auto* ptr : Contexts_) { // N.B. some contexts may be stuck in destructors @@ -494,7 +494,7 @@ void TGRpcClientLow::StopInternal(bool silent) { } } - shutdown = Contexts_.empty(); + shutdown = Contexts_.empty(); } for (auto& context : cancelQueue) { @@ -506,62 +506,62 @@ void TGRpcClientLow::StopInternal(bool silent) { for (auto& cq : CQS_) { cq->Shutdown(); } - } + } } void TGRpcClientLow::WaitInternal() { - std::unique_lock<std::mutex> guard(JoinMutex_); - - for (auto& ti : WorkerThreads_) { - ti->Join(); - } -} - + std::unique_lock<std::mutex> guard(JoinMutex_); + + for (auto& ti : WorkerThreads_) { + ti->Join(); + } +} + void TGRpcClientLow::WaitIdle() { - std::unique_lock<std::mutex> guard(Mtx_); - - while (!Contexts_.empty()) { - ContextsEmpty_.wait(guard); - } -} - + std::unique_lock<std::mutex> guard(Mtx_); + + while (!Contexts_.empty()) { + ContextsEmpty_.wait(guard); + } +} + std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { - std::unique_lock<std::mutex> guard(Mtx_); - - auto allowCreateContext = [&]() { - switch (GetCqState()) { - case WORKING: - return true; - case STOP_SILENT: - case STOP_EXPLICIT: - return false; + std::unique_lock<std::mutex> guard(Mtx_); + + auto allowCreateContext = [&]() { + switch (GetCqState()) { + case WORKING: + return true; + case STOP_SILENT: + case STOP_EXPLICIT: + return false; } - Y_UNREACHABLE(); - }; - - if (!allowCreateContext()) { - // New context creation is forbidden - return nullptr; - } - - auto context = std::make_shared<TContextImpl>(); - Contexts_.insert(context.get()); - context->Owner = this; - if (UseCompletionQueuePerThread_) { - context->CQ = CQS_[RandomNumber(CQS_.size())].get(); - } else { - context->CQ = CQS_[0].get(); - } - return context; + Y_UNREACHABLE(); + }; + + if (!allowCreateContext()) { + // New context creation is forbidden + return nullptr; + } + + auto context = std::make_shared<TContextImpl>(); + Contexts_.insert(context.get()); + context->Owner = this; + if (UseCompletionQueuePerThread_) { + context->CQ = CQS_[RandomNumber(CQS_.size())].get(); + } else { + context->CQ = CQS_[0].get(); + } + return context; } void TGRpcClientLow::ForgetContext(TContextImpl* context) { bool shutdown = false; - { - std::unique_lock<std::mutex> guard(Mtx_); - + { + std::unique_lock<std::mutex> guard(Mtx_); + if (!Contexts_.erase(context)) { Y_FAIL("Unexpected ForgetContext(%p)", context); } @@ -571,7 +571,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) { shutdown = true; } - ContextsEmpty_.notify_all(); + ContextsEmpty_.notify_all(); } } @@ -580,7 +580,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) { for (auto& cq : CQS_) { cq->Shutdown(); } - } -} - + } +} + } // namespace NGRpc diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h index ab0a0627be..e452be45a1 100644 --- a/library/cpp/grpc/client/grpc_client_low.h +++ b/library/cpp/grpc/client/grpc_client_low.h @@ -1,57 +1,57 @@ -#pragma once - -#include "grpc_common.h" - +#pragma once + +#include "grpc_common.h" + #include <util/thread/factory.h> -#include <grpc++/grpc++.h> +#include <grpc++/grpc++.h> #include <grpc++/support/async_stream.h> #include <grpc++/support/async_unary_call.h> - -#include <deque> -#include <typeindex> -#include <typeinfo> + +#include <deque> +#include <typeindex> +#include <typeinfo> #include <variant> -#include <vector> -#include <unordered_map> -#include <unordered_set> -#include <mutex> -#include <shared_mutex> - -/* - * This file contains low level logic for grpc - * This file should not be used in high level code without special reason - */ +#include <vector> +#include <unordered_map> +#include <unordered_set> +#include <mutex> +#include <shared_mutex> + +/* + * This file contains low level logic for grpc + * This file should not be used in high level code without special reason + */ namespace NGrpc { - -const size_t DEFAULT_NUM_THREADS = 2; - -//////////////////////////////////////////////////////////////////////////////// - -void EnableGRpcTracing(); - -//////////////////////////////////////////////////////////////////////////////// - -struct TTcpKeepAliveSettings { - bool Enabled; - size_t Idle; - size_t Count; - size_t Interval; -}; - -//////////////////////////////////////////////////////////////////////////////// - -// Common interface used to execute action from grpc cq routine -class IQueueClientEvent { -public: - virtual ~IQueueClientEvent() = default; - + +const size_t DEFAULT_NUM_THREADS = 2; + +//////////////////////////////////////////////////////////////////////////////// + +void EnableGRpcTracing(); + +//////////////////////////////////////////////////////////////////////////////// + +struct TTcpKeepAliveSettings { + bool Enabled; + size_t Idle; + size_t Count; + size_t Interval; +}; + +//////////////////////////////////////////////////////////////////////////////// + +// Common interface used to execute action from grpc cq routine +class IQueueClientEvent { +public: + virtual ~IQueueClientEvent() = default; + //! Execute an action defined by implementation - virtual bool Execute(bool ok) = 0; - - //! Finish and destroy event - virtual void Destroy() = 0; -}; - + virtual bool Execute(bool ok) = 0; + + //! Finish and destroy event + virtual void Destroy() = 0; +}; + // Implementation of IQueueClientEvent that reduces allocations template<class TSelf> class TQueueClientFixedEvent : private IQueueClientEvent { @@ -123,8 +123,8 @@ public: } }; -// Represents grpc status and error message string -struct TGrpcStatus { +// Represents grpc status and error message string +struct TGrpcStatus { TString Msg; TString Details; int GRpcStatusCode; @@ -167,36 +167,36 @@ struct TGrpcStatus { bool Ok() const { return !InternalError && GRpcStatusCode == grpc::StatusCode::OK; } -}; - -bool inline IsGRpcStatusGood(const TGrpcStatus& status) { +}; + +bool inline IsGRpcStatusGood(const TGrpcStatus& status) { return status.Ok(); -} - +} + // Response callback type - this callback will be called when request is finished -// (or after getting each chunk in case of streaming mode) -template<typename TResponse> -using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>; - +// (or after getting each chunk in case of streaming mode) +template<typename TResponse> +using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>; + template<typename TResponse> using TAdvancedResponseCallback = std::function<void (const grpc::ClientContext&, TGrpcStatus&&, TResponse&&)>; -// Call associated metadata -struct TCallMeta { - std::shared_ptr<grpc::CallCredentials> CallCredentials; - std::vector<std::pair<TString, TString>> Aux; +// Call associated metadata +struct TCallMeta { + std::shared_ptr<grpc::CallCredentials> CallCredentials; + std::vector<std::pair<TString, TString>> Aux; std::variant<TDuration, TInstant> Timeout; // timeout as duration from now or time point in future -}; - -class TGRpcRequestProcessorCommon { -protected: - void ApplyMeta(const TCallMeta& meta) { - for (const auto& rec : meta.Aux) { - Context.AddMetadata(rec.first, rec.second); - } - if (meta.CallCredentials) { - Context.set_credentials(meta.CallCredentials); - } +}; + +class TGRpcRequestProcessorCommon { +protected: + void ApplyMeta(const TCallMeta& meta) { + for (const auto& rec : meta.Aux) { + Context.AddMetadata(rec.first, rec.second); + } + if (meta.CallCredentials) { + Context.set_credentials(meta.CallCredentials); + } if (const TDuration* timeout = std::get_if<TDuration>(&meta.Timeout)) { if (*timeout) { auto deadline = gpr_time_add( @@ -208,10 +208,10 @@ protected: if (*deadline) { Context.set_deadline(gpr_time_from_micros(deadline->MicroSeconds(), GPR_CLOCK_MONOTONIC)); } - } - } + } + } - void GetInitialMetadata(std::unordered_multimap<TString, TString>* metadata) { + void GetInitialMetadata(std::unordered_multimap<TString, TString>* metadata) { for (const auto& [key, value] : Context.GetServerInitialMetadata()) { metadata->emplace( TString(key.begin(), key.end()), @@ -220,61 +220,61 @@ protected: } } - grpc::Status Status; - grpc::ClientContext Context; - std::shared_ptr<IQueueClientContext> LocalContext; -}; - -template<typename TStub, typename TRequest, typename TResponse> -class TSimpleRequestProcessor - : public TThrRefBase - , public IQueueClientEvent - , public TGRpcRequestProcessorCommon { - using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>; - template<typename> friend class TServiceConnection; -public: - using TPtr = TIntrusivePtr<TSimpleRequestProcessor>; - using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*); - + grpc::Status Status; + grpc::ClientContext Context; + std::shared_ptr<IQueueClientContext> LocalContext; +}; + +template<typename TStub, typename TRequest, typename TResponse> +class TSimpleRequestProcessor + : public TThrRefBase + , public IQueueClientEvent + , public TGRpcRequestProcessorCommon { + using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>; + template<typename> friend class TServiceConnection; +public: + using TPtr = TIntrusivePtr<TSimpleRequestProcessor>; + using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*); + explicit TSimpleRequestProcessor(TResponseCallback<TResponse>&& callback) : Callback_(std::move(callback)) - { } - - ~TSimpleRequestProcessor() { + { } + + ~TSimpleRequestProcessor() { if (!Replied_ && Callback_) { Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_)); Callback_ = nullptr; // free resources as early as possible - } - } - - bool Execute(bool ok) override { - { - std::unique_lock<std::mutex> guard(Mutex_); - LocalContext.reset(); + } + } + + bool Execute(bool ok) override { + { + std::unique_lock<std::mutex> guard(Mutex_); + LocalContext.reset(); } TGrpcStatus status; if (ok) { - status = Status; + status = Status; } else { status = TGrpcStatus::Internal("Unexpected error"); - } - Replied_ = true; - Callback_(std::move(status), std::move(Reply_)); + } + Replied_ = true; + Callback_(std::move(status), std::move(Reply_)); Callback_ = nullptr; // free resources as early as possible - return false; - } - - void Destroy() override { + return false; + } + + void Destroy() override { UnRef(); - } - -private: + } + +private: IQueueClientEvent* FinishedEvent() { Ref(); return this; - } - - void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) { + } + + void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) { auto context = provider->CreateContext(); if (!context) { Replied_ = true; @@ -282,11 +282,11 @@ private: Callback_ = nullptr; return; } - { - std::unique_lock<std::mutex> guard(Mutex_); - LocalContext = context; - Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue()); - Reader_->Finish(&Reply_, &Status, FinishedEvent()); + { + std::unique_lock<std::mutex> guard(Mutex_); + LocalContext = context; + Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue()); + Reader_->Finish(&Reply_, &Status, FinishedEvent()); } context->SubscribeStop([self = TPtr(this)] { self->Stop(); @@ -294,17 +294,17 @@ private: } void Stop() { - Context.TryCancel(); + Context.TryCancel(); } TResponseCallback<TResponse> Callback_; - TResponse Reply_; - std::mutex Mutex_; - TAsyncReaderPtr Reader_; - - bool Replied_ = false; -}; - + TResponse Reply_; + std::mutex Mutex_; + TAsyncReaderPtr Reader_; + + bool Replied_ = false; +}; + template<typename TStub, typename TRequest, typename TResponse> class TAdvancedRequestProcessor : public TThrRefBase @@ -328,8 +328,8 @@ public: } bool Execute(bool ok) override { - { - std::unique_lock<std::mutex> guard(Mutex_); + { + std::unique_lock<std::mutex> guard(Mutex_); LocalContext.reset(); } TGrpcStatus status; @@ -362,8 +362,8 @@ private: Callback_ = nullptr; return; } - { - std::unique_lock<std::mutex> guard(Mutex_); + { + std::unique_lock<std::mutex> guard(Mutex_); LocalContext = context; Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue()); Reader_->Finish(&Reply_, &Status, FinishedEvent()); @@ -379,16 +379,16 @@ private: TAdvancedResponseCallback<TResponse> Callback_; TResponse Reply_; - std::mutex Mutex_; + std::mutex Mutex_; TAsyncReaderPtr Reader_; bool Replied_ = false; }; -template<class TResponse> -class IStreamRequestReadProcessor : public TThrRefBase { +template<class TResponse> +class IStreamRequestReadProcessor : public TThrRefBase { public: - using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>; + using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>; using TReadCallback = std::function<void(TGrpcStatus&&)>; /** @@ -399,7 +399,7 @@ public: /** * Scheduled initial server metadata read from the stream */ - virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0; + virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0; /** * Scheduled response read from the stream @@ -420,72 +420,72 @@ public: virtual void AddFinishedCallback(TReadCallback callback) = 0; }; -template<class TRequest, class TResponse> -class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> { -public: - using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>; +template<class TRequest, class TResponse> +class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> { +public: + using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>; using TWriteCallback = std::function<void(TGrpcStatus&&)>; - - /** - * Scheduled request write to the stream - */ + + /** + * Scheduled request write to the stream + */ virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0; -}; - -class TGRpcKeepAliveSocketMutator; - -// Class to hold stubs allocated on channel. -// It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit - -// Stub holds shared_ptr<ChannelInterface>, so we can destroy this holder even if -// request processor using stub -class TStubsHolder : public TNonCopyable { - using TypeInfoRef = std::reference_wrapper<const std::type_info>; - - struct THasher { - std::size_t operator()(TypeInfoRef code) const { - return code.get().hash_code(); - } - }; - - struct TEqualTo { - bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const { - return lhs.get() == rhs.get(); - } - }; -public: - TStubsHolder(std::shared_ptr<grpc::ChannelInterface> channel) - : ChannelInterface_(channel) - {} - - // Returns true if channel can't be used to perform request now - bool IsChannelBroken() const { - auto state = ChannelInterface_->GetState(false); - return state == GRPC_CHANNEL_SHUTDOWN || - state == GRPC_CHANNEL_TRANSIENT_FAILURE; - } - - template<typename TStub> - std::shared_ptr<TStub> GetOrCreateStub() { - const auto& stubId = typeid(TStub); - { - std::shared_lock readGuard(RWMutex_); - const auto it = Stubs_.find(stubId); - if (it != Stubs_.end()) { - return std::static_pointer_cast<TStub>(it->second); - } - } - { - std::unique_lock writeGuard(RWMutex_); - auto it = Stubs_.emplace(stubId, nullptr); - if (!it.second) { - return std::static_pointer_cast<TStub>(it.first->second); - } else { - it.first->second = std::make_shared<TStub>(ChannelInterface_); - return std::static_pointer_cast<TStub>(it.first->second); - } - } - } +}; + +class TGRpcKeepAliveSocketMutator; + +// Class to hold stubs allocated on channel. +// It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit + +// Stub holds shared_ptr<ChannelInterface>, so we can destroy this holder even if +// request processor using stub +class TStubsHolder : public TNonCopyable { + using TypeInfoRef = std::reference_wrapper<const std::type_info>; + + struct THasher { + std::size_t operator()(TypeInfoRef code) const { + return code.get().hash_code(); + } + }; + + struct TEqualTo { + bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const { + return lhs.get() == rhs.get(); + } + }; +public: + TStubsHolder(std::shared_ptr<grpc::ChannelInterface> channel) + : ChannelInterface_(channel) + {} + + // Returns true if channel can't be used to perform request now + bool IsChannelBroken() const { + auto state = ChannelInterface_->GetState(false); + return state == GRPC_CHANNEL_SHUTDOWN || + state == GRPC_CHANNEL_TRANSIENT_FAILURE; + } + + template<typename TStub> + std::shared_ptr<TStub> GetOrCreateStub() { + const auto& stubId = typeid(TStub); + { + std::shared_lock readGuard(RWMutex_); + const auto it = Stubs_.find(stubId); + if (it != Stubs_.end()) { + return std::static_pointer_cast<TStub>(it->second); + } + } + { + std::unique_lock writeGuard(RWMutex_); + auto it = Stubs_.emplace(stubId, nullptr); + if (!it.second) { + return std::static_pointer_cast<TStub>(it.first->second); + } else { + it.first->second = std::make_shared<TStub>(ChannelInterface_); + return std::static_pointer_cast<TStub>(it.first->second); + } + } + } const TInstant& GetLastUseTime() const { return LastUsed_; @@ -494,76 +494,76 @@ public: void SetLastUseTime(const TInstant& time) { LastUsed_ = time; } -private: +private: TInstant LastUsed_ = Now(); - std::shared_mutex RWMutex_; - std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_; - std::shared_ptr<grpc::ChannelInterface> ChannelInterface_; -}; - -class TChannelPool { -public: + std::shared_mutex RWMutex_; + std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_; + std::shared_ptr<grpc::ChannelInterface> ChannelInterface_; +}; + +class TChannelPool { +public: TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime = TDuration::Minutes(6)); - //Allows to CreateStub from TStubsHolder under lock - //The callback will be called just during GetStubsHolderLocked call - void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb); - void DeleteChannel(const TString& channelId); + //Allows to CreateStub from TStubsHolder under lock + //The callback will be called just during GetStubsHolderLocked call + void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb); + void DeleteChannel(const TString& channelId); void DeleteExpiredStubsHolders(); -private: - std::shared_mutex RWMutex_; - std::unordered_map<TString, TStubsHolder> Pool_; +private: + std::shared_mutex RWMutex_; + std::unordered_map<TString, TStubsHolder> Pool_; std::multimap<TInstant, TString> LastUsedQueue_; - TTcpKeepAliveSettings TcpKeepAliveSettings_; + TTcpKeepAliveSettings TcpKeepAliveSettings_; TDuration ExpireTime_; TDuration UpdateReUseTime_; void EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId); -}; - -template<class TResponse> -using TStreamReaderCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadProcessor<TResponse>::TPtr)>; - -template<typename TStub, typename TRequest, typename TResponse> -class TStreamRequestReadProcessor - : public IStreamRequestReadProcessor<TResponse> - , public TGRpcRequestProcessorCommon { - template<typename> friend class TServiceConnection; -public: - using TSelf = TStreamRequestReadProcessor; - using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncReader<TResponse>>; - using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*, void*); - using TReaderCallback = TStreamReaderCallback<TResponse>; - using TPtr = TIntrusivePtr<TSelf>; - using TBase = IStreamRequestReadProcessor<TResponse>; - using TReadCallback = typename TBase::TReadCallback; - - explicit TStreamRequestReadProcessor(TReaderCallback&& callback) - : Callback(std::move(callback)) - { - Y_VERIFY(Callback, "Missing connected callback"); - } - - void Cancel() override { - Context.TryCancel(); - - { - std::unique_lock<std::mutex> guard(Mutex); - Cancelled = true; +}; + +template<class TResponse> +using TStreamReaderCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadProcessor<TResponse>::TPtr)>; + +template<typename TStub, typename TRequest, typename TResponse> +class TStreamRequestReadProcessor + : public IStreamRequestReadProcessor<TResponse> + , public TGRpcRequestProcessorCommon { + template<typename> friend class TServiceConnection; +public: + using TSelf = TStreamRequestReadProcessor; + using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncReader<TResponse>>; + using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*, void*); + using TReaderCallback = TStreamReaderCallback<TResponse>; + using TPtr = TIntrusivePtr<TSelf>; + using TBase = IStreamRequestReadProcessor<TResponse>; + using TReadCallback = typename TBase::TReadCallback; + + explicit TStreamRequestReadProcessor(TReaderCallback&& callback) + : Callback(std::move(callback)) + { + Y_VERIFY(Callback, "Missing connected callback"); + } + + void Cancel() override { + Context.TryCancel(); + + { + std::unique_lock<std::mutex> guard(Mutex); + Cancelled = true; if (Started && !ReadFinished) { - if (!ReadActive) { - ReadFinished = true; - } - if (ReadFinished) { - Stream->Finish(&Status, OnFinishedTag.Prepare()); - } - } - } - } - - void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { + if (!ReadActive) { + ReadFinished = true; + } + if (ReadFinished) { + Stream->Finish(&Status, OnFinishedTag.Prepare()); + } + } + } + } + + void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); if (!Finished && !HasInitialMetadata) { ReadActive = true; @@ -588,66 +588,66 @@ public: callback(std::move(status)); } - void Read(TResponse* message, TReadCallback callback) override { - TGrpcStatus status; - - { - std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); - if (!Finished) { - ReadActive = true; - ReadCallback = std::move(callback); - if (!ReadFinished) { - Stream->Read(message, OnReadDoneTag.Prepare()); - } - return; - } - if (FinishedOk) { - status = Status; - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); - } - - callback(std::move(status)); - } - - void Finish(TReadCallback callback) override { - TGrpcStatus status; - - { - std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); - if (!Finished) { - ReadActive = true; - FinishCallback = std::move(callback); - if (!ReadFinished) { - ReadFinished = true; - } - Stream->Finish(&Status, OnFinishedTag.Prepare()); - return; - } - if (FinishedOk) { - status = Status; - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - } - - callback(std::move(status)); - } + void Read(TResponse* message, TReadCallback callback) override { + TGrpcStatus status; + + { + std::unique_lock<std::mutex> guard(Mutex); + Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); + if (!Finished) { + ReadActive = true; + ReadCallback = std::move(callback); + if (!ReadFinished) { + Stream->Read(message, OnReadDoneTag.Prepare()); + } + return; + } + if (FinishedOk) { + status = Status; + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + if (status.Ok()) { + status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); + } + + callback(std::move(status)); + } + + void Finish(TReadCallback callback) override { + TGrpcStatus status; + + { + std::unique_lock<std::mutex> guard(Mutex); + Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); + if (!Finished) { + ReadActive = true; + FinishCallback = std::move(callback); + if (!ReadFinished) { + ReadFinished = true; + } + Stream->Finish(&Status, OnFinishedTag.Prepare()); + return; + } + if (FinishedOk) { + status = Status; + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + } + + callback(std::move(status)); + } void AddFinishedCallback(TReadCallback callback) override { Y_VERIFY(callback, "Unexpected empty callback"); TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); if (!Finished) { FinishedCallbacks.emplace_back().swap(callback); return; @@ -665,103 +665,103 @@ public: callback(std::move(status)); } -private: - void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) { - auto context = provider->CreateContext(); - if (!context) { - auto callback = std::move(Callback); - TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down"); - callback(std::move(status), nullptr); - return; - } - - { - std::unique_lock<std::mutex> guard(Mutex); - LocalContext = context; - Stream = (stub.*asyncRequest)(&Context, request, context->CompletionQueue(), OnStartDoneTag.Prepare()); - } - - context->SubscribeStop([self = TPtr(this)] { - self->Cancel(); - }); - } - - void OnReadDone(bool ok) { - TGrpcStatus status; - TReadCallback callback; - std::unordered_multimap<TString, TString>* initialMetadata = nullptr; - - { - std::unique_lock<std::mutex> guard(Mutex); - Y_VERIFY(ReadActive, "Unexpected Read done callback"); - Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); - - if (!ok || Cancelled) { - ReadFinished = true; - - Stream->Finish(&Status, OnFinishedTag.Prepare()); - if (!ok) { - // Keep ReadActive=true, so callback is called - // after the call is finished with an error - return; - } - } - - callback = std::move(ReadCallback); - ReadCallback = nullptr; - ReadActive = false; +private: + void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) { + auto context = provider->CreateContext(); + if (!context) { + auto callback = std::move(Callback); + TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down"); + callback(std::move(status), nullptr); + return; + } + + { + std::unique_lock<std::mutex> guard(Mutex); + LocalContext = context; + Stream = (stub.*asyncRequest)(&Context, request, context->CompletionQueue(), OnStartDoneTag.Prepare()); + } + + context->SubscribeStop([self = TPtr(this)] { + self->Cancel(); + }); + } + + void OnReadDone(bool ok) { + TGrpcStatus status; + TReadCallback callback; + std::unordered_multimap<TString, TString>* initialMetadata = nullptr; + + { + std::unique_lock<std::mutex> guard(Mutex); + Y_VERIFY(ReadActive, "Unexpected Read done callback"); + Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); + + if (!ok || Cancelled) { + ReadFinished = true; + + Stream->Finish(&Status, OnFinishedTag.Prepare()); + if (!ok) { + // Keep ReadActive=true, so callback is called + // after the call is finished with an error + return; + } + } + + callback = std::move(ReadCallback); + ReadCallback = nullptr; + ReadActive = false; initialMetadata = InitialMetadata; InitialMetadata = nullptr; HasInitialMetadata = true; - } - + } + if (initialMetadata) { GetInitialMetadata(initialMetadata); } - callback(std::move(status)); - } - - void OnStartDone(bool ok) { - TReaderCallback callback; - - { - std::unique_lock<std::mutex> guard(Mutex); + callback(std::move(status)); + } + + void OnStartDone(bool ok) { + TReaderCallback callback; + + { + std::unique_lock<std::mutex> guard(Mutex); Started = true; if (!ok || Cancelled) { - ReadFinished = true; - Stream->Finish(&Status, OnFinishedTag.Prepare()); - return; - } - callback = std::move(Callback); - Callback = nullptr; - } - - callback({ }, typename TBase::TPtr(this)); - } - - void OnFinished(bool ok) { - TGrpcStatus status; - std::vector<TReadCallback> finishedCallbacks; + ReadFinished = true; + Stream->Finish(&Status, OnFinishedTag.Prepare()); + return; + } + callback = std::move(Callback); + Callback = nullptr; + } + + callback({ }, typename TBase::TPtr(this)); + } + + void OnFinished(bool ok) { + TGrpcStatus status; + std::vector<TReadCallback> finishedCallbacks; TReaderCallback startCallback; - TReadCallback readCallback; - TReadCallback finishCallback; - - { - std::unique_lock<std::mutex> guard(Mutex); - - Finished = true; - FinishedOk = ok; - LocalContext.reset(); - - if (ok) { - status = Status; + TReadCallback readCallback; + TReadCallback finishCallback; + + { + std::unique_lock<std::mutex> guard(Mutex); + + Finished = true; + FinishedOk = ok; + LocalContext.reset(); + + if (ok) { + status = Status; } else if (Cancelled) { status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled"); - } else { - status = TGrpcStatus::Internal("Unexpected error"); - } - + } else { + status = TGrpcStatus::Internal("Unexpected error"); + } + finishedCallbacks.swap(FinishedCallbacks); if (Callback) { @@ -769,68 +769,68 @@ private: startCallback = std::move(Callback); Callback = nullptr; } else if (ReadActive) { - if (ReadCallback) { - readCallback = std::move(ReadCallback); - ReadCallback = nullptr; - } else { - finishCallback = std::move(FinishCallback); - FinishCallback = nullptr; - } - ReadActive = false; - } - } - + if (ReadCallback) { + readCallback = std::move(ReadCallback); + ReadCallback = nullptr; + } else { + finishCallback = std::move(FinishCallback); + FinishCallback = nullptr; + } + ReadActive = false; + } + } + for (auto& finishedCallback : finishedCallbacks) { auto statusCopy = status; finishedCallback(std::move(statusCopy)); } if (startCallback) { - if (status.Ok()) { + if (status.Ok()) { status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure"); } startCallback(std::move(status), nullptr); } else if (readCallback) { if (status.Ok()) { - status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); - } - readCallback(std::move(status)); - } else if (finishCallback) { - finishCallback(std::move(status)); - } - } - - TReaderCallback Callback; - TAsyncReaderPtr Stream; - using TFixedEvent = TQueueClientFixedEvent<TSelf>; - std::mutex Mutex; - TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone }; - TFixedEvent OnStartDoneTag = { this, &TSelf::OnStartDone }; - TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished }; - - TReadCallback ReadCallback; - TReadCallback FinishCallback; - std::vector<TReadCallback> FinishedCallbacks; - std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; + status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); + } + readCallback(std::move(status)); + } else if (finishCallback) { + finishCallback(std::move(status)); + } + } + + TReaderCallback Callback; + TAsyncReaderPtr Stream; + using TFixedEvent = TQueueClientFixedEvent<TSelf>; + std::mutex Mutex; + TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone }; + TFixedEvent OnStartDoneTag = { this, &TSelf::OnStartDone }; + TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished }; + + TReadCallback ReadCallback; + TReadCallback FinishCallback; + std::vector<TReadCallback> FinishedCallbacks; + std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; bool Started = false; bool HasInitialMetadata = false; - bool ReadActive = false; - bool ReadFinished = false; - bool Finished = false; - bool Cancelled = false; - bool FinishedOk = false; -}; - + bool ReadActive = false; + bool ReadFinished = false; + bool Finished = false; + bool Cancelled = false; + bool FinishedOk = false; +}; + template<class TRequest, class TResponse> -using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>; +using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>; template<class TStub, class TRequest, class TResponse> -class TStreamRequestReadWriteProcessor - : public IStreamRequestReadWriteProcessor<TRequest, TResponse> - , public TGRpcRequestProcessorCommon { +class TStreamRequestReadWriteProcessor + : public IStreamRequestReadWriteProcessor<TRequest, TResponse> + , public TGRpcRequestProcessorCommon { public: - using TSelf = TStreamRequestReadWriteProcessor; - using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>; + using TSelf = TStreamRequestReadWriteProcessor; + using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>; using TPtr = TIntrusivePtr<TSelf>; using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>; using TReadCallback = typename TBase::TReadCallback; @@ -838,7 +838,7 @@ public: using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>; using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*); - explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback) + explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback) : ConnectedCallback(std::move(callback)) { Y_VERIFY(ConnectedCallback, "Missing connected callback"); @@ -847,8 +847,8 @@ public: void Cancel() override { Context.TryCancel(); - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Cancelled = true; if (Started && !(ReadFinished && WriteFinished)) { if (!ReadActive) { @@ -867,8 +867,8 @@ public: void Write(TRequest&& request, TWriteCallback callback) override { TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); if (Cancelled || ReadFinished || WriteFinished) { status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped"); } else if (WriteActive) { @@ -887,11 +887,11 @@ public: } } - void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { + void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override { TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); if (!Finished && !HasInitialMetadata) { ReadActive = true; @@ -919,8 +919,8 @@ public: void Read(TResponse* message, TReadCallback callback) override { TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); if (!Finished) { ReadActive = true; @@ -947,8 +947,8 @@ public: void Finish(TReadCallback callback) override { TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(!ReadActive, "Multiple Read/Finish calls detected"); if (!Finished) { ReadActive = true; @@ -979,8 +979,8 @@ public: TGrpcStatus status; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); if (!Finished) { FinishedCallbacks.emplace_back().swap(callback); return; @@ -1010,8 +1010,8 @@ private: return; } - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); LocalContext = context; Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare()); } @@ -1025,8 +1025,8 @@ private: void OnConnected(bool ok) { TConnectedCallback callback; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Started = true; if (!ok || Cancelled) { ReadFinished = true; @@ -1045,10 +1045,10 @@ private: void OnReadDone(bool ok) { TGrpcStatus status; TReadCallback callback; - std::unordered_multimap<TString, TString>* initialMetadata = nullptr; + std::unordered_multimap<TString, TString>* initialMetadata = nullptr; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(ReadActive, "Unexpected Read done callback"); Y_VERIFY(!ReadFinished, "Unexpected ReadFinished flag"); @@ -1085,8 +1085,8 @@ private: void OnWriteDone(bool ok) { TWriteCallback okCallback; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Y_VERIFY(WriteActive, "Unexpected Write done callback"); Y_VERIFY(!WriteFinished, "Unexpected WriteFinished flag"); @@ -1107,7 +1107,7 @@ private: if (ReadFinished) { Stream->Finish(&Status, OnFinishedTag.Prepare()); } - } else if (!WriteQueue.empty()) { + } else if (!WriteQueue.empty()) { WriteCallback.swap(WriteQueue.front().Callback); Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare()); WriteQueue.pop_front(); @@ -1127,14 +1127,14 @@ private: void OnFinished(bool ok) { TGrpcStatus status; - std::deque<TWriteItem> writesDropped; - std::vector<TReadCallback> finishedCallbacks; + std::deque<TWriteItem> writesDropped; + std::vector<TReadCallback> finishedCallbacks; TConnectedCallback connectedCallback; TReadCallback readCallback; TReadCallback finishCallback; - { - std::unique_lock<std::mutex> guard(Mutex); + { + std::unique_lock<std::mutex> guard(Mutex); Finished = true; FinishedOk = ok; LocalContext.reset(); @@ -1211,15 +1211,15 @@ private: TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished }; private: - std::mutex Mutex; + std::mutex Mutex; TAsyncReaderWriterPtr Stream; TConnectedCallback ConnectedCallback; TReadCallback ReadCallback; TReadCallback FinishCallback; - std::vector<TReadCallback> FinishedCallbacks; - std::deque<TWriteItem> WriteQueue; + std::vector<TReadCallback> FinishedCallbacks; + std::deque<TWriteItem> WriteQueue; TWriteCallback WriteCallback; - std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; + std::unordered_multimap<TString, TString>* InitialMetadata = nullptr; bool Started = false; bool HasInitialMetadata = false; bool ReadActive = false; @@ -1231,30 +1231,30 @@ private: bool FinishedOk = false; }; -class TGRpcClientLow; - -template<typename TGRpcService> -class TServiceConnection { - using TStub = typename TGRpcService::Stub; - friend class TGRpcClientLow; - -public: - /* - * Start simple request - */ - template<typename TRequest, typename TResponse> - void DoRequest(const TRequest& request, +class TGRpcClientLow; + +template<typename TGRpcService> +class TServiceConnection { + using TStub = typename TGRpcService::Stub; + friend class TGRpcClientLow; + +public: + /* + * Start simple request + */ + template<typename TRequest, typename TResponse> + void DoRequest(const TRequest& request, TResponseCallback<TResponse> callback, typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, - const TCallMeta& metas = { }, + const TCallMeta& metas = { }, IQueueClientContextProvider* provider = nullptr) - { - auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback)); - processor->ApplyMeta(metas); - processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_); - } - - /* + { + auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback)); + processor->ApplyMeta(metas); + processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_); + } + + /* * Start simple request */ template<typename TRequest, typename TResponse> @@ -1270,58 +1270,58 @@ public: } /* - * Start bidirectional streamming - */ - template<typename TRequest, typename TResponse> - void DoStreamRequest(TStreamConnectedCallback<TRequest, TResponse> callback, - typename TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, - const TCallMeta& metas = { }, - IQueueClientContextProvider* provider = nullptr) - { - auto processor = MakeIntrusive<TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>>(std::move(callback)); - processor->ApplyMeta(metas); - processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_); - } - - /* - * Start streaming response reading (one request, many responses) - */ + * Start bidirectional streamming + */ + template<typename TRequest, typename TResponse> + void DoStreamRequest(TStreamConnectedCallback<TRequest, TResponse> callback, + typename TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, + const TCallMeta& metas = { }, + IQueueClientContextProvider* provider = nullptr) + { + auto processor = MakeIntrusive<TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>>(std::move(callback)); + processor->ApplyMeta(metas); + processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_); + } + + /* + * Start streaming response reading (one request, many responses) + */ template<typename TRequest, typename TResponse> - void DoStreamRequest(const TRequest& request, - TStreamReaderCallback<TResponse> callback, - typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, + void DoStreamRequest(const TRequest& request, + TStreamReaderCallback<TResponse> callback, + typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest, const TCallMeta& metas = { }, IQueueClientContextProvider* provider = nullptr) { - auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback)); + auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback)); processor->ApplyMeta(metas); - processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_); + processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_); } -private: - TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci, - IQueueClientContextProvider* provider) - : Stub_(TGRpcService::NewStub(ci)) - , Provider_(provider) - { - Y_VERIFY(Provider_, "Connection does not have a queue provider"); - } - - TServiceConnection(TStubsHolder& holder, +private: + TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci, IQueueClientContextProvider* provider) - : Stub_(holder.GetOrCreateStub<TStub>()) + : Stub_(TGRpcService::NewStub(ci)) , Provider_(provider) { Y_VERIFY(Provider_, "Connection does not have a queue provider"); } - std::shared_ptr<TStub> Stub_; + TServiceConnection(TStubsHolder& holder, + IQueueClientContextProvider* provider) + : Stub_(holder.GetOrCreateStub<TStub>()) + , Provider_(provider) + { + Y_VERIFY(Provider_, "Connection does not have a queue provider"); + } + + std::shared_ptr<TStub> Stub_; IQueueClientContextProvider* Provider_; -}; - -class TGRpcClientLow +}; + +class TGRpcClientLow : public IQueueClientContextProvider -{ +{ class TContextImpl; friend class TContextImpl; @@ -1331,10 +1331,10 @@ class TGRpcClientLow STOP_EXPLICIT = 2, }; -public: +public: explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false); - ~TGRpcClientLow(); - + ~TGRpcClientLow(); + // Tries to stop all currently running requests (via their stop callbacks) // Will shutdown CQ and drain events once all requests have finished // No new requests may be started after this call @@ -1357,24 +1357,24 @@ public: IQueueClientContextPtr CreateContext() override; - template<typename TGRpcService> - std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) { - return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this)); - } - - template<typename TGRpcService> - std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) { - return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this)); - } - + template<typename TGRpcService> + std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) { + return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this)); + } + + template<typename TGRpcService> + std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) { + return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this)); + } + // Tests only, not thread-safe void AddWorkerThreadForTest(); -private: +private: using IThreadRef = std::unique_ptr<IThreadFactory::IThread>; using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>; - void Init(size_t numWorkerThread); - + void Init(size_t numWorkerThread); + inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); } inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); } @@ -1385,15 +1385,15 @@ private: private: bool UseCompletionQueuePerThread_; - std::vector<CompletionQueueRef> CQS_; - std::vector<IThreadRef> WorkerThreads_; + std::vector<CompletionQueueRef> CQS_; + std::vector<IThreadRef> WorkerThreads_; TAtomic CqState_ = -1; - std::mutex Mtx_; - std::condition_variable ContextsEmpty_; - std::unordered_set<TContextImpl*> Contexts_; - - std::mutex JoinMutex_; -}; + std::mutex Mtx_; + std::condition_variable ContextsEmpty_; + std::unordered_set<TContextImpl*> Contexts_; + std::mutex JoinMutex_; +}; + } // namespace NGRpc diff --git a/library/cpp/grpc/client/grpc_common.h b/library/cpp/grpc/client/grpc_common.h index ffcdafe045..ac62e8b331 100644 --- a/library/cpp/grpc/client/grpc_common.h +++ b/library/cpp/grpc/client/grpc_common.h @@ -1,53 +1,53 @@ -#pragma once - -#include <grpc++/grpc++.h> +#pragma once + +#include <grpc++/grpc++.h> #include <grpc++/resource_quota.h> - + #include <util/datetime/base.h> -#include <unordered_map> -#include <util/generic/string.h> - -constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; - +#include <unordered_map> +#include <util/generic/string.h> + +constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000; + namespace NGrpc { - -struct TGRpcClientConfig { - TString Locator; // format host:port - TDuration Timeout = TDuration::Max(); // request timeout - ui64 MaxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT; // Max request and response size + +struct TGRpcClientConfig { + TString Locator; // format host:port + TDuration Timeout = TDuration::Max(); // request timeout + ui64 MaxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT; // Max request and response size ui64 MaxInboundMessageSize = 0; // overrides MaxMessageSize for incoming requests ui64 MaxOutboundMessageSize = 0; // overrides MaxMessageSize for outgoing requests - ui32 MaxInFlight = 0; + ui32 MaxInFlight = 0; bool EnableSsl = false; TString SslCaCert; //Implicitly enables Ssl if not empty grpc_compression_algorithm CompressionAlgoritm = GRPC_COMPRESS_NONE; ui64 MemQuota = 0; - std::unordered_map<TString, TString> StringChannelParams; - std::unordered_map<TString, int> IntChannelParams; + std::unordered_map<TString, TString> StringChannelParams; + std::unordered_map<TString, int> IntChannelParams; TString LoadBalancingPolicy = { }; TString SslTargetNameOverride = { }; - - TGRpcClientConfig() = default; - TGRpcClientConfig(const TGRpcClientConfig&) = default; - TGRpcClientConfig(TGRpcClientConfig&&) = default; - TGRpcClientConfig& operator=(const TGRpcClientConfig&) = default; - TGRpcClientConfig& operator=(TGRpcClientConfig&&) = default; - - TGRpcClientConfig(const TString& locator, TDuration timeout = TDuration::Max(), + + TGRpcClientConfig() = default; + TGRpcClientConfig(const TGRpcClientConfig&) = default; + TGRpcClientConfig(TGRpcClientConfig&&) = default; + TGRpcClientConfig& operator=(const TGRpcClientConfig&) = default; + TGRpcClientConfig& operator=(TGRpcClientConfig&&) = default; + + TGRpcClientConfig(const TString& locator, TDuration timeout = TDuration::Max(), ui64 maxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT, ui32 maxInFlight = 0, TString caCert = "", grpc_compression_algorithm compressionAlgorithm = GRPC_COMPRESS_NONE, bool enableSsl = false) - : Locator(locator) - , Timeout(timeout) - , MaxMessageSize(maxMessageSize) - , MaxInFlight(maxInFlight) + : Locator(locator) + , Timeout(timeout) + , MaxMessageSize(maxMessageSize) + , MaxInFlight(maxInFlight) , EnableSsl(enableSsl) - , SslCaCert(caCert) + , SslCaCert(caCert) , CompressionAlgoritm(compressionAlgorithm) - {} -}; - -inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRpcClientConfig& config, grpc_socket_mutator* mutator = nullptr){ - grpc::ChannelArguments args; + {} +}; + +inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRpcClientConfig& config, grpc_socket_mutator* mutator = nullptr){ + grpc::ChannelArguments args; args.SetMaxReceiveMessageSize(config.MaxInboundMessageSize ? config.MaxInboundMessageSize : config.MaxMessageSize); args.SetMaxSendMessageSize(config.MaxOutboundMessageSize ? config.MaxOutboundMessageSize : config.MaxMessageSize); args.SetCompressionAlgorithm(config.CompressionAlgoritm); @@ -65,9 +65,9 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp quota.Resize(config.MemQuota); args.SetResourceQuota(quota); } - if (mutator) { - args.SetSocketMutator(mutator); - } + if (mutator) { + args.SetSocketMutator(mutator); + } if (!config.LoadBalancingPolicy.empty()) { args.SetLoadBalancingPolicyName(config.LoadBalancingPolicy); } @@ -75,10 +75,10 @@ inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRp args.SetSslTargetNameOverride(config.SslTargetNameOverride); } if (config.EnableSsl || config.SslCaCert) { - return grpc::CreateCustomChannel(config.Locator, grpc::SslCredentials(grpc::SslCredentialsOptions{config.SslCaCert, "", ""}), args); - } else { - return grpc::CreateCustomChannel(config.Locator, grpc::InsecureChannelCredentials(), args); - } -} - + return grpc::CreateCustomChannel(config.Locator, grpc::SslCredentials(grpc::SslCredentialsOptions{config.SslCaCert, "", ""}), args); + } else { + return grpc::CreateCustomChannel(config.Locator, grpc::InsecureChannelCredentials(), args); + } +} + } // namespace NGRpc diff --git a/library/cpp/grpc/client/ya.make b/library/cpp/grpc/client/ya.make index a4e74b067c..11f36aa94f 100644 --- a/library/cpp/grpc/client/ya.make +++ b/library/cpp/grpc/client/ya.make @@ -6,11 +6,11 @@ OWNER( ) SRCS( - grpc_client_low.cpp + grpc_client_low.cpp ) PEERDIR( - contrib/libs/grpc + contrib/libs/grpc ) END() |