diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:17 +0300 |
commit | 4b11037e5a7d071c63e3c966199fe7102e6462e4 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/client/grpc_client_low.cpp | |
parent | 17e20fa084178ddcb16255f974dbde74fb93608b (diff) | |
download | ydb-4b11037e5a7d071c63e3c966199fe7102e6462e4.tar.gz |
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.cpp')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.cpp | 414 |
1 files changed, 207 insertions, 207 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp index ee9e997fa7..73cc908ef8 100644 --- a/library/cpp/grpc/client/grpc_client_low.cpp +++ b/library/cpp/grpc/client/grpc_client_low.cpp @@ -1,164 +1,164 @@ -#include "grpc_client_low.h" -#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h> -#include <contrib/libs/grpc/include/grpc/support/log.h> - -#include <library/cpp/containers/stack_vector/stack_vec.h> - -#include <util/string/printf.h> +#include "grpc_client_low.h" +#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h> +#include <contrib/libs/grpc/include/grpc/support/log.h> + +#include <library/cpp/containers/stack_vector/stack_vec.h> + +#include <util/string/printf.h> #include <util/system/thread.h> #include <util/random/random.h> - -#if !defined(_WIN32) && !defined(_WIN64) -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#endif - + +#if !defined(_WIN32) && !defined(_WIN64) +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#endif + namespace NGrpc { - -void EnableGRpcTracing() { - grpc_tracer_set_enabled("tcp", true); - grpc_tracer_set_enabled("client_channel", true); - grpc_tracer_set_enabled("channel", true); - grpc_tracer_set_enabled("api", true); - grpc_tracer_set_enabled("connectivity_state", true); - grpc_tracer_set_enabled("handshaker", true); - grpc_tracer_set_enabled("http", true); - grpc_tracer_set_enabled("http2_stream_state", true); - grpc_tracer_set_enabled("op_failure", true); - grpc_tracer_set_enabled("timer", true); - gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); -} - -class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator { -public: - TGRpcKeepAliveSocketMutator(int idle, int count, int interval) - : Idle_(idle) - , Count_(count) - , Interval_(interval) - { - grpc_socket_mutator_init(this, &VTable); - } -private: - static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) { - return static_cast<TGRpcKeepAliveSocketMutator*>(mutator); - } - - template<typename TVal> - bool SetOption(int fd, int level, int optname, const TVal& value) { - return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0; - } - bool SetOption(int fd) { - if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) { - Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl; - return false; - } -#ifdef _linux_ - if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) { - Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl; - return false; - } - if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) { - Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl; - return false; - } - if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) { - Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl; - return false; - } -#endif - return true; - } - static bool Mutate(int fd, grpc_socket_mutator* mutator) { - auto self = Cast(mutator); - return self->SetOption(fd); - } - static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) { - const auto* selfA = Cast(a); - const auto* selfB = Cast(b); - auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_); - auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_); - return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0; - } - static void Destroy(grpc_socket_mutator* mutator) { - delete Cast(mutator); - } - - static grpc_socket_mutator_vtable VTable; - const int Idle_; - const int Count_; - const int Interval_; -}; - -grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable = - { - &TGRpcKeepAliveSocketMutator::Mutate, - &TGRpcKeepAliveSocketMutator::Compare, - &TGRpcKeepAliveSocketMutator::Destroy - }; - + +void EnableGRpcTracing() { + grpc_tracer_set_enabled("tcp", true); + grpc_tracer_set_enabled("client_channel", true); + grpc_tracer_set_enabled("channel", true); + grpc_tracer_set_enabled("api", true); + grpc_tracer_set_enabled("connectivity_state", true); + grpc_tracer_set_enabled("handshaker", true); + grpc_tracer_set_enabled("http", true); + grpc_tracer_set_enabled("http2_stream_state", true); + grpc_tracer_set_enabled("op_failure", true); + grpc_tracer_set_enabled("timer", true); + gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); +} + +class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator { +public: + TGRpcKeepAliveSocketMutator(int idle, int count, int interval) + : Idle_(idle) + , Count_(count) + , Interval_(interval) + { + grpc_socket_mutator_init(this, &VTable); + } +private: + static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) { + return static_cast<TGRpcKeepAliveSocketMutator*>(mutator); + } + + template<typename TVal> + bool SetOption(int fd, int level, int optname, const TVal& value) { + return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0; + } + bool SetOption(int fd) { + if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) { + Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl; + return false; + } +#ifdef _linux_ + if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) { + Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl; + return false; + } + if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) { + Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl; + return false; + } + if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) { + Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl; + return false; + } +#endif + return true; + } + static bool Mutate(int fd, grpc_socket_mutator* mutator) { + auto self = Cast(mutator); + return self->SetOption(fd); + } + static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) { + const auto* selfA = Cast(a); + const auto* selfB = Cast(b); + auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_); + auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_); + return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0; + } + static void Destroy(grpc_socket_mutator* mutator) { + delete Cast(mutator); + } + + static grpc_socket_mutator_vtable VTable; + const int Idle_; + const int Count_; + const int Interval_; +}; + +grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable = + { + &TGRpcKeepAliveSocketMutator::Mutate, + &TGRpcKeepAliveSocketMutator::Compare, + &TGRpcKeepAliveSocketMutator::Destroy + }; + TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime) - : TcpKeepAliveSettings_(tcpKeepAliveSettings) + : TcpKeepAliveSettings_(tcpKeepAliveSettings) , ExpireTime_(expireTime) , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20)) -{} - -void TChannelPool::GetStubsHolderLocked( - const TString& channelId, - const TGRpcClientConfig& config, - std::function<void(TStubsHolder&)> cb) -{ - { - std::shared_lock readGuard(RWMutex_); - const auto it = Pool_.find(channelId); - if (it != Pool_.end()) { +{} + +void TChannelPool::GetStubsHolderLocked( + const TString& channelId, + const TGRpcClientConfig& config, + std::function<void(TStubsHolder&)> cb) +{ + { + std::shared_lock readGuard(RWMutex_); + const auto it = Pool_.find(channelId); + if (it != Pool_.end()) { if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) { - return cb(it->second); - } - } - } - { - std::unique_lock writeGuard(RWMutex_); - { - auto it = Pool_.find(channelId); - if (it != Pool_.end()) { - if (!it->second.IsChannelBroken()) { + return cb(it->second); + } + } + } + { + std::unique_lock writeGuard(RWMutex_); + { + auto it = Pool_.find(channelId); + if (it != Pool_.end()) { + if (!it->second.IsChannelBroken()) { EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); auto now = Now(); LastUsedQueue_.emplace(now, channelId); it->second.SetLastUseTime(now); - return cb(it->second); - } else { - // This channel can't be used. Remove from pool to create new one + return cb(it->second); + } else { + // This channel can't be used. Remove from pool to create new one EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); - Pool_.erase(it); - } - } - } - TGRpcKeepAliveSocketMutator* mutator = nullptr; - // will be destroyed inside grpc - if (TcpKeepAliveSettings_.Enabled) { - mutator = new TGRpcKeepAliveSocketMutator( - TcpKeepAliveSettings_.Idle, - TcpKeepAliveSettings_.Count, - TcpKeepAliveSettings_.Interval - ); - } - cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second); + Pool_.erase(it); + } + } + } + TGRpcKeepAliveSocketMutator* mutator = nullptr; + // will be destroyed inside grpc + if (TcpKeepAliveSettings_.Enabled) { + mutator = new TGRpcKeepAliveSocketMutator( + TcpKeepAliveSettings_.Idle, + TcpKeepAliveSettings_.Count, + TcpKeepAliveSettings_.Interval + ); + } + cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second); LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId); - } -} - -void TChannelPool::DeleteChannel(const TString& channelId) { - std::unique_lock writeLock(RWMutex_); + } +} + +void TChannelPool::DeleteChannel(const TString& channelId) { + std::unique_lock writeLock(RWMutex_); auto poolIt = Pool_.find(channelId); if (poolIt != Pool_.end()) { EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId); Pool_.erase(poolIt); } -} - +} + void TChannelPool::DeleteExpiredStubsHolders() { std::unique_lock writeLock(RWMutex_); auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_); @@ -392,7 +392,7 @@ private: // Some children are stored inline, others are in a set std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } }; - std::unordered_set<TContextImpl*> Children; + std::unordered_set<TContextImpl*> Children; // Single callback is stored without extra allocations TStackVec<TCallback, 1> Callbacks; @@ -404,10 +404,10 @@ private: TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread) : UseCompletionQueuePerThread_(useCompletionQueuePerThread) { - Init(numWorkerThread); -} - -void TGRpcClientLow::Init(size_t numWorkerThread) { + Init(numWorkerThread); +} + +void TGRpcClientLow::Init(size_t numWorkerThread) { SetCqState(WORKING); if (UseCompletionQueuePerThread_) { for (size_t i = 0; i < numWorkerThread; i++) { @@ -425,9 +425,9 @@ void TGRpcClientLow::Init(size_t numWorkerThread) { PullEvents(cq); }).Release()); } - } -} - + } +} + void TGRpcClientLow::AddWorkerThreadForTest() { if (UseCompletionQueuePerThread_) { CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); @@ -453,17 +453,17 @@ void TGRpcClientLow::Stop(bool wait) { if (wait) { WaitInternal(); - } -} - + } +} + void TGRpcClientLow::StopInternal(bool silent) { bool shutdown; TVector<TContextImpl::TContextPtr> cancelQueue; - { - std::unique_lock<std::mutex> guard(Mtx_); - + { + std::unique_lock<std::mutex> guard(Mtx_); + auto allowStateChange = [&]() { switch (GetCqState()) { case WORKING: @@ -484,7 +484,7 @@ void TGRpcClientLow::StopInternal(bool silent) { SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT); - if (!silent && !Contexts_.empty()) { + if (!silent && !Contexts_.empty()) { cancelQueue.reserve(Contexts_.size()); for (auto* ptr : Contexts_) { // N.B. some contexts may be stuck in destructors @@ -494,7 +494,7 @@ void TGRpcClientLow::StopInternal(bool silent) { } } - shutdown = Contexts_.empty(); + shutdown = Contexts_.empty(); } for (auto& context : cancelQueue) { @@ -506,62 +506,62 @@ void TGRpcClientLow::StopInternal(bool silent) { for (auto& cq : CQS_) { cq->Shutdown(); } - } + } } void TGRpcClientLow::WaitInternal() { - std::unique_lock<std::mutex> guard(JoinMutex_); - - for (auto& ti : WorkerThreads_) { - ti->Join(); - } -} - + std::unique_lock<std::mutex> guard(JoinMutex_); + + for (auto& ti : WorkerThreads_) { + ti->Join(); + } +} + void TGRpcClientLow::WaitIdle() { - std::unique_lock<std::mutex> guard(Mtx_); - - while (!Contexts_.empty()) { - ContextsEmpty_.wait(guard); - } -} - + std::unique_lock<std::mutex> guard(Mtx_); + + while (!Contexts_.empty()) { + ContextsEmpty_.wait(guard); + } +} + std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { - std::unique_lock<std::mutex> guard(Mtx_); - - auto allowCreateContext = [&]() { - switch (GetCqState()) { - case WORKING: - return true; - case STOP_SILENT: - case STOP_EXPLICIT: - return false; + std::unique_lock<std::mutex> guard(Mtx_); + + auto allowCreateContext = [&]() { + switch (GetCqState()) { + case WORKING: + return true; + case STOP_SILENT: + case STOP_EXPLICIT: + return false; } - Y_UNREACHABLE(); - }; - - if (!allowCreateContext()) { - // New context creation is forbidden - return nullptr; - } - - auto context = std::make_shared<TContextImpl>(); - Contexts_.insert(context.get()); - context->Owner = this; - if (UseCompletionQueuePerThread_) { - context->CQ = CQS_[RandomNumber(CQS_.size())].get(); - } else { - context->CQ = CQS_[0].get(); - } - return context; + Y_UNREACHABLE(); + }; + + if (!allowCreateContext()) { + // New context creation is forbidden + return nullptr; + } + + auto context = std::make_shared<TContextImpl>(); + Contexts_.insert(context.get()); + context->Owner = this; + if (UseCompletionQueuePerThread_) { + context->CQ = CQS_[RandomNumber(CQS_.size())].get(); + } else { + context->CQ = CQS_[0].get(); + } + return context; } void TGRpcClientLow::ForgetContext(TContextImpl* context) { bool shutdown = false; - { - std::unique_lock<std::mutex> guard(Mtx_); - + { + std::unique_lock<std::mutex> guard(Mtx_); + if (!Contexts_.erase(context)) { Y_FAIL("Unexpected ForgetContext(%p)", context); } @@ -571,7 +571,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) { shutdown = true; } - ContextsEmpty_.notify_all(); + ContextsEmpty_.notify_all(); } } @@ -580,7 +580,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) { for (auto& cq : CQS_) { cq->Shutdown(); } - } -} - + } +} + } // namespace NGRpc |