diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/grpc/client/grpc_client_low.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.cpp')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.cpp | 586 |
1 files changed, 586 insertions, 0 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp new file mode 100644 index 0000000000..73cc908ef8 --- /dev/null +++ b/library/cpp/grpc/client/grpc_client_low.cpp @@ -0,0 +1,586 @@ +#include "grpc_client_low.h" +#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h> +#include <contrib/libs/grpc/include/grpc/support/log.h> + +#include <library/cpp/containers/stack_vector/stack_vec.h> + +#include <util/string/printf.h> +#include <util/system/thread.h> +#include <util/random/random.h> + +#if !defined(_WIN32) && !defined(_WIN64) +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#endif + +namespace NGrpc { + +void EnableGRpcTracing() { + grpc_tracer_set_enabled("tcp", true); + grpc_tracer_set_enabled("client_channel", true); + grpc_tracer_set_enabled("channel", true); + grpc_tracer_set_enabled("api", true); + grpc_tracer_set_enabled("connectivity_state", true); + grpc_tracer_set_enabled("handshaker", true); + grpc_tracer_set_enabled("http", true); + grpc_tracer_set_enabled("http2_stream_state", true); + grpc_tracer_set_enabled("op_failure", true); + grpc_tracer_set_enabled("timer", true); + gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); +} + +class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator { +public: + TGRpcKeepAliveSocketMutator(int idle, int count, int interval) + : Idle_(idle) + , Count_(count) + , Interval_(interval) + { + grpc_socket_mutator_init(this, &VTable); + } +private: + static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) { + return static_cast<TGRpcKeepAliveSocketMutator*>(mutator); + } + + template<typename TVal> + bool SetOption(int fd, int level, int optname, const TVal& value) { + return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0; + } + bool SetOption(int fd) { + if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) { + Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl; + return false; + } +#ifdef _linux_ + if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) { + Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl; + return false; + } + if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) { + Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl; + return false; + } + if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) { + Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl; + return false; + } +#endif + return true; + } + static bool Mutate(int fd, grpc_socket_mutator* mutator) { + auto self = Cast(mutator); + return self->SetOption(fd); + } + static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) { + const auto* selfA = Cast(a); + const auto* selfB = Cast(b); + auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_); + auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_); + return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0; + } + static void Destroy(grpc_socket_mutator* mutator) { + delete Cast(mutator); + } + + static grpc_socket_mutator_vtable VTable; + const int Idle_; + const int Count_; + const int Interval_; +}; + +grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable = + { + &TGRpcKeepAliveSocketMutator::Mutate, + &TGRpcKeepAliveSocketMutator::Compare, + &TGRpcKeepAliveSocketMutator::Destroy + }; + +TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime) + : TcpKeepAliveSettings_(tcpKeepAliveSettings) + , ExpireTime_(expireTime) + , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20)) +{} + +void TChannelPool::GetStubsHolderLocked( + const TString& channelId, + const TGRpcClientConfig& config, + std::function<void(TStubsHolder&)> cb) +{ + { + std::shared_lock readGuard(RWMutex_); + const auto it = Pool_.find(channelId); + if (it != Pool_.end()) { + if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) { + return cb(it->second); + } + } + } + { + std::unique_lock writeGuard(RWMutex_); + { + auto it = Pool_.find(channelId); + if (it != Pool_.end()) { + if (!it->second.IsChannelBroken()) { + EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); + auto now = Now(); + LastUsedQueue_.emplace(now, channelId); + it->second.SetLastUseTime(now); + return cb(it->second); + } else { + // This channel can't be used. Remove from pool to create new one + EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); + Pool_.erase(it); + } + } + } + TGRpcKeepAliveSocketMutator* mutator = nullptr; + // will be destroyed inside grpc + if (TcpKeepAliveSettings_.Enabled) { + mutator = new TGRpcKeepAliveSocketMutator( + TcpKeepAliveSettings_.Idle, + TcpKeepAliveSettings_.Count, + TcpKeepAliveSettings_.Interval + ); + } + cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second); + LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId); + } +} + +void TChannelPool::DeleteChannel(const TString& channelId) { + std::unique_lock writeLock(RWMutex_); + auto poolIt = Pool_.find(channelId); + if (poolIt != Pool_.end()) { + EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId); + Pool_.erase(poolIt); + } +} + +void TChannelPool::DeleteExpiredStubsHolders() { + std::unique_lock writeLock(RWMutex_); + auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_); + for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){ + Pool_.erase(i->second); + } + LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired); +} + +void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) { + auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime); + auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;}); + Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool"); + LastUsedQueue_.erase(pos); +} + +static void PullEvents(grpc::CompletionQueue* cq) { + TThread::SetCurrentThreadName("grpc_client"); + while (true) { + void* tag; + bool ok; + + if (!cq->Next(&tag, &ok)) { + break; + } + + if (auto* ev = static_cast<IQueueClientEvent*>(tag)) { + if (!ev->Execute(ok)) { + ev->Destroy(); + } + } + } +} + +class TGRpcClientLow::TContextImpl final + : public std::enable_shared_from_this<TContextImpl> + , public IQueueClientContext +{ + friend class TGRpcClientLow; + + using TCallback = std::function<void()>; + using TContextPtr = std::shared_ptr<TContextImpl>; + +public: + ~TContextImpl() override { + Y_VERIFY(CountChildren() == 0, + "Destructor called with non-empty children"); + + if (Parent) { + Parent->ForgetContext(this); + } else if (Y_LIKELY(Owner)) { + Owner->ForgetContext(this); + } + } + + /** + * Helper for locking child pointer from a parent container + */ + static TContextPtr LockChildPtr(TContextImpl* ptr) { + if (ptr) { + // N.B. it is safe to do as long as it's done under a mutex and + // pointer is among valid children. When that's the case we + // know that TContextImpl destructor has not finished yet, so + // the object is valid. The lock() method may return nullptr + // though, if the object is being destructed right now. + return ptr->weak_from_this().lock(); + } else { + return nullptr; + } + } + + void ForgetContext(TContextImpl* child) { + std::unique_lock<std::mutex> guard(Mutex); + + auto removed = RemoveChild(child); + Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child); + } + + IQueueClientContextPtr CreateContext() override { + auto self = shared_from_this(); + auto child = std::make_shared<TContextImpl>(); + + { + std::unique_lock<std::mutex> guard(Mutex); + + AddChild(child.get()); + + // It's now safe to initialize parent and owner + child->Parent = std::move(self); + child->Owner = Owner; + child->CQ = CQ; + + // Propagate cancellation to a child context + if (Cancelled.load(std::memory_order_relaxed)) { + child->Cancelled.store(true, std::memory_order_relaxed); + } + } + + return child; + } + + grpc::CompletionQueue* CompletionQueue() override { + Y_VERIFY(Owner, "Uninitialized context"); + return CQ; + } + + bool IsCancelled() const override { + return Cancelled.load(std::memory_order_acquire); + } + + bool Cancel() override { + TStackVec<TCallback, 1> callbacks; + TStackVec<TContextPtr, 2> children; + + { + std::unique_lock<std::mutex> guard(Mutex); + + if (Cancelled.load(std::memory_order_relaxed)) { + // Already cancelled in another thread + return false; + } + + callbacks.reserve(Callbacks.size()); + children.reserve(CountChildren()); + + for (auto& callback : Callbacks) { + callbacks.emplace_back().swap(callback); + } + Callbacks.clear(); + + // Collect all children we need to cancel + // N.B. we don't clear children links (cleared by destructors) + // N.B. some children may be stuck in destructors at the moment + for (TContextImpl* ptr : InlineChildren) { + if (auto child = LockChildPtr(ptr)) { + children.emplace_back(std::move(child)); + } + } + for (auto* ptr : Children) { + if (auto child = LockChildPtr(ptr)) { + children.emplace_back(std::move(child)); + } + } + + Cancelled.store(true, std::memory_order_release); + } + + // Call directly subscribed callbacks + if (callbacks) { + RunCallbacksNoExcept(callbacks); + } + + // Cancel all children + for (auto& child : children) { + child->Cancel(); + child.reset(); + } + + return true; + } + + void SubscribeCancel(TCallback callback) override { + Y_VERIFY(callback, "SubscribeCancel called with an empty callback"); + + { + std::unique_lock<std::mutex> guard(Mutex); + + if (!Cancelled.load(std::memory_order_relaxed)) { + Callbacks.emplace_back().swap(callback); + return; + } + } + + // Already cancelled, run immediately + callback(); + } + +private: + void AddChild(TContextImpl* child) { + for (TContextImpl*& slot : InlineChildren) { + if (!slot) { + slot = child; + return; + } + } + + Children.insert(child); + } + + bool RemoveChild(TContextImpl* child) { + for (TContextImpl*& slot : InlineChildren) { + if (slot == child) { + slot = nullptr; + return true; + } + } + + return Children.erase(child); + } + + size_t CountChildren() { + size_t count = 0; + + for (TContextImpl* ptr : InlineChildren) { + if (ptr) { + ++count; + } + } + + return count + Children.size(); + } + + template<class TCallbacks> + static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept { + for (auto& callback : callbacks) { + if (callback) { + callback(); + callback = nullptr; + } + } + } + +private: + // We want a simple lock here, without extra memory allocations + std::mutex Mutex; + + // These fields are initialized on successful registration + TContextPtr Parent; + TGRpcClientLow* Owner = nullptr; + grpc::CompletionQueue* CQ = nullptr; + + // Some children are stored inline, others are in a set + std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } }; + std::unordered_set<TContextImpl*> Children; + + // Single callback is stored without extra allocations + TStackVec<TCallback, 1> Callbacks; + + // Atomic flag for a faster IsCancelled() implementation + std::atomic<bool> Cancelled; +}; + +TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread) + : UseCompletionQueuePerThread_(useCompletionQueuePerThread) +{ + Init(numWorkerThread); +} + +void TGRpcClientLow::Init(size_t numWorkerThread) { + SetCqState(WORKING); + if (UseCompletionQueuePerThread_) { + for (size_t i = 0; i < numWorkerThread; i++) { + CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); + auto* cq = CQS_.back().get(); + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } + } else { + CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); + auto* cq = CQS_.back().get(); + for (size_t i = 0; i < numWorkerThread; i++) { + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } + } +} + +void TGRpcClientLow::AddWorkerThreadForTest() { + if (UseCompletionQueuePerThread_) { + CQS_.push_back(std::make_unique<grpc::CompletionQueue>()); + auto* cq = CQS_.back().get(); + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } else { + auto* cq = CQS_.back().get(); + WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() { + PullEvents(cq); + }).Release()); + } +} + +TGRpcClientLow::~TGRpcClientLow() { + StopInternal(true); + WaitInternal(); +} + +void TGRpcClientLow::Stop(bool wait) { + StopInternal(false); + + if (wait) { + WaitInternal(); + } +} + +void TGRpcClientLow::StopInternal(bool silent) { + bool shutdown; + + TVector<TContextImpl::TContextPtr> cancelQueue; + + { + std::unique_lock<std::mutex> guard(Mtx_); + + auto allowStateChange = [&]() { + switch (GetCqState()) { + case WORKING: + return true; + case STOP_SILENT: + return !silent; + case STOP_EXPLICIT: + return false; + } + + Y_UNREACHABLE(); + }; + + if (!allowStateChange()) { + // Completion queue is already stopping + return; + } + + SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT); + + if (!silent && !Contexts_.empty()) { + cancelQueue.reserve(Contexts_.size()); + for (auto* ptr : Contexts_) { + // N.B. some contexts may be stuck in destructors + if (auto context = TContextImpl::LockChildPtr(ptr)) { + cancelQueue.emplace_back(std::move(context)); + } + } + } + + shutdown = Contexts_.empty(); + } + + for (auto& context : cancelQueue) { + context->Cancel(); + context.reset(); + } + + if (shutdown) { + for (auto& cq : CQS_) { + cq->Shutdown(); + } + } +} + +void TGRpcClientLow::WaitInternal() { + std::unique_lock<std::mutex> guard(JoinMutex_); + + for (auto& ti : WorkerThreads_) { + ti->Join(); + } +} + +void TGRpcClientLow::WaitIdle() { + std::unique_lock<std::mutex> guard(Mtx_); + + while (!Contexts_.empty()) { + ContextsEmpty_.wait(guard); + } +} + +std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() { + std::unique_lock<std::mutex> guard(Mtx_); + + auto allowCreateContext = [&]() { + switch (GetCqState()) { + case WORKING: + return true; + case STOP_SILENT: + case STOP_EXPLICIT: + return false; + } + + Y_UNREACHABLE(); + }; + + if (!allowCreateContext()) { + // New context creation is forbidden + return nullptr; + } + + auto context = std::make_shared<TContextImpl>(); + Contexts_.insert(context.get()); + context->Owner = this; + if (UseCompletionQueuePerThread_) { + context->CQ = CQS_[RandomNumber(CQS_.size())].get(); + } else { + context->CQ = CQS_[0].get(); + } + return context; +} + +void TGRpcClientLow::ForgetContext(TContextImpl* context) { + bool shutdown = false; + + { + std::unique_lock<std::mutex> guard(Mtx_); + + if (!Contexts_.erase(context)) { + Y_FAIL("Unexpected ForgetContext(%p)", context); + } + + if (Contexts_.empty()) { + if (IsStopping()) { + shutdown = true; + } + + ContextsEmpty_.notify_all(); + } + } + + if (shutdown) { + // This was the last context, shutdown CQ + for (auto& cq : CQS_) { + cq->Shutdown(); + } + } +} + +} // namespace NGRpc |