+#include "grpc_client_low.h"
+#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
+#include <contrib/libs/grpc/include/grpc/support/log.h>
+#include <library/cpp/containers/stack_vector/stack_vec.h>
+#include <util/string/printf.h>
+#include <util/system/thread.h>
+#include <util/random/random.h>
+#if !defined(_WIN32) && !defined(_WIN64)
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+namespace NGrpc {
+void EnableGRpcTracing() {
+ grpc_tracer_set_enabled("tcp", true);
+ grpc_tracer_set_enabled("client_channel", true);
+ grpc_tracer_set_enabled("channel", true);
+ grpc_tracer_set_enabled("api", true);
+ grpc_tracer_set_enabled("connectivity_state", true);
+ grpc_tracer_set_enabled("handshaker", true);
+ grpc_tracer_set_enabled("http", true);
+ grpc_tracer_set_enabled("http2_stream_state", true);
+ grpc_tracer_set_enabled("op_failure", true);
+ grpc_tracer_set_enabled("timer", true);
+ gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
+class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
+ TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
+ : Idle_(idle)
+ , Count_(count)
+ , Interval_(interval)
+ {
+ grpc_socket_mutator_init(this, &VTable);
+ }
+ static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
+ return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
+ }
+ template<typename TVal>
+ bool SetOption(int fd, int level, int optname, const TVal& value) {
+ return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
+ }
+ bool SetOption(int fd) {
+ if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
+ Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+#ifdef _linux_
+ if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
+ Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+ if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
+ Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+ if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
+ Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+ return true;
+ }
+ static bool Mutate(int fd, grpc_socket_mutator* mutator) {
+ auto self = Cast(mutator);
+ return self->SetOption(fd);
+ }
+ static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
+ const auto* selfA = Cast(a);
+ const auto* selfB = Cast(b);
+ auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
+ auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
+ return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
+ }
+ static void Destroy(grpc_socket_mutator* mutator) {
+ delete Cast(mutator);
+ }
+ static grpc_socket_mutator_vtable VTable;
+ const int Idle_;
+ const int Count_;
+ const int Interval_;
+grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
+ {
+ &TGRpcKeepAliveSocketMutator::Mutate,
+ &TGRpcKeepAliveSocketMutator::Compare,
+ &TGRpcKeepAliveSocketMutator::Destroy
+ };
+TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
+ : TcpKeepAliveSettings_(tcpKeepAliveSettings)
+ , ExpireTime_(expireTime)
+ , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
+void TChannelPool::GetStubsHolderLocked(
+ const TString& channelId,
+ const TGRpcClientConfig& config,
+ std::function<void(TStubsHolder&)> cb)
+ {
+ std::shared_lock readGuard(RWMutex_);
+ const auto it = Pool_.find(channelId);
+ if (it != Pool_.end()) {
+ if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
+ return cb(it->second);
+ }
+ }
+ }
+ {
+ std::unique_lock writeGuard(RWMutex_);
+ {
+ auto it = Pool_.find(channelId);
+ if (it != Pool_.end()) {
+ if (!it->second.IsChannelBroken()) {
+ EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
+ auto now = Now();
+ LastUsedQueue_.emplace(now, channelId);
+ it->second.SetLastUseTime(now);
+ return cb(it->second);
+ } else {
+ // This channel can't be used. Remove from pool to create new one
+ EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
+ Pool_.erase(it);
+ }
+ }
+ }
+ TGRpcKeepAliveSocketMutator* mutator = nullptr;
+ // will be destroyed inside grpc
+ if (TcpKeepAliveSettings_.Enabled) {
+ mutator = new TGRpcKeepAliveSocketMutator(
+ TcpKeepAliveSettings_.Idle,
+ TcpKeepAliveSettings_.Count,
+ TcpKeepAliveSettings_.Interval
+ );
+ }
+ cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
+ LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
+ }
+void TChannelPool::DeleteChannel(const TString& channelId) {
+ std::unique_lock writeLock(RWMutex_);
+ auto poolIt = Pool_.find(channelId);
+ if (poolIt != Pool_.end()) {
+ EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
+ Pool_.erase(poolIt);
+ }
+void TChannelPool::DeleteExpiredStubsHolders() {
+ std::unique_lock writeLock(RWMutex_);
+ auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
+ for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){
+ Pool_.erase(i->second);
+ }
+ LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired);
+void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
+ auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
+ auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
+ Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
+ LastUsedQueue_.erase(pos);
+static void PullEvents(grpc::CompletionQueue* cq) {
+ TThread::SetCurrentThreadName("grpc_client");
+ while (true) {
+ void* tag;
+ bool ok;
+ if (!cq->Next(&tag, &ok)) {
+ break;
+ }
+ if (auto* ev = static_cast<IQueueClientEvent*>(tag)) {
+ if (!ev->Execute(ok)) {
+ ev->Destroy();
+ }
+ }
+ }
+class TGRpcClientLow::TContextImpl final
+ : public std::enable_shared_from_this<TContextImpl>
+ , public IQueueClientContext
+ friend class TGRpcClientLow;
+ using TCallback = std::function<void()>;
+ using TContextPtr = std::shared_ptr<TContextImpl>;
+ ~TContextImpl() override {
+ Y_VERIFY(CountChildren() == 0,
+ "Destructor called with non-empty children");
+ if (Parent) {
+ Parent->ForgetContext(this);
+ } else if (Y_LIKELY(Owner)) {
+ Owner->ForgetContext(this);
+ }
+ }
+ /**
+ * Helper for locking child pointer from a parent container
+ */
+ static TContextPtr LockChildPtr(TContextImpl* ptr) {
+ if (ptr) {
+ // N.B. it is safe to do as long as it's done under a mutex and
+ // pointer is among valid children. When that's the case we
+ // know that TContextImpl destructor has not finished yet, so
+ // the object is valid. The lock() method may return nullptr
+ // though, if the object is being destructed right now.
+ return ptr->weak_from_this().lock();
+ } else {
+ return nullptr;
+ }
+ }
+ void ForgetContext(TContextImpl* child) {
+ std::unique_lock<std::mutex> guard(Mutex);
+ auto removed = RemoveChild(child);
+ Y_VERIFY(removed, "Unexpected ForgetContext(%p)", child);
+ }
+ IQueueClientContextPtr CreateContext() override {
+ auto self = shared_from_this();
+ auto child = std::make_shared<TContextImpl>();
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+ AddChild(child.get());
+ // It's now safe to initialize parent and owner
+ child->Parent = std::move(self);
+ child->Owner = Owner;
+ child->CQ = CQ;
+ // Propagate cancellation to a child context
+ if (Cancelled.load(std::memory_order_relaxed)) {
+ child->Cancelled.store(true, std::memory_order_relaxed);
+ }
+ }
+ return child;
+ }
+ grpc::CompletionQueue* CompletionQueue() override {
+ Y_VERIFY(Owner, "Uninitialized context");
+ return CQ;
+ }
+ bool IsCancelled() const override {
+ return Cancelled.load(std::memory_order_acquire);
+ }
+ bool Cancel() override {
+ TStackVec<TCallback, 1> callbacks;
+ TStackVec<TContextPtr, 2> children;
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+ if (Cancelled.load(std::memory_order_relaxed)) {
+ // Already cancelled in another thread
+ return false;
+ }
+ callbacks.reserve(Callbacks.size());
+ children.reserve(CountChildren());
+ for (auto& callback : Callbacks) {
+ callbacks.emplace_back().swap(callback);
+ }
+ Callbacks.clear();
+ // Collect all children we need to cancel
+ // N.B. we don't clear children links (cleared by destructors)
+ // N.B. some children may be stuck in destructors at the moment
+ for (TContextImpl* ptr : InlineChildren) {
+ if (auto child = LockChildPtr(ptr)) {
+ children.emplace_back(std::move(child));
+ }
+ }
+ for (auto* ptr : Children) {
+ if (auto child = LockChildPtr(ptr)) {
+ children.emplace_back(std::move(child));
+ }
+ }
+ Cancelled.store(true, std::memory_order_release);
+ }
+ // Call directly subscribed callbacks
+ if (callbacks) {
+ RunCallbacksNoExcept(callbacks);
+ }
+ // Cancel all children
+ for (auto& child : children) {
+ child->Cancel();
+ child.reset();
+ }
+ return true;
+ }
+ void SubscribeCancel(TCallback callback) override {
+ Y_VERIFY(callback, "SubscribeCancel called with an empty callback");
+ {
+ std::unique_lock<std::mutex> guard(Mutex);
+ if (!Cancelled.load(std::memory_order_relaxed)) {
+ Callbacks.emplace_back().swap(callback);
+ return;
+ }
+ }
+ // Already cancelled, run immediately
+ callback();
+ }
+ void AddChild(TContextImpl* child) {
+ for (TContextImpl*& slot : InlineChildren) {
+ if (!slot) {
+ slot = child;
+ return;
+ }
+ }
+ Children.insert(child);
+ }
+ bool RemoveChild(TContextImpl* child) {
+ for (TContextImpl*& slot : InlineChildren) {
+ if (slot == child) {
+ slot = nullptr;
+ return true;
+ }
+ }
+ return Children.erase(child);
+ }
+ size_t CountChildren() {
+ size_t count = 0;
+ for (TContextImpl* ptr : InlineChildren) {
+ if (ptr) {
+ ++count;
+ }
+ }
+ return count + Children.size();
+ }
+ template<class TCallbacks>
+ static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept {
+ for (auto& callback : callbacks) {
+ if (callback) {
+ callback();
+ callback = nullptr;
+ }
+ }
+ }
+ // We want a simple lock here, without extra memory allocations
+ std::mutex Mutex;
+ // These fields are initialized on successful registration
+ TContextPtr Parent;
+ TGRpcClientLow* Owner = nullptr;
+ grpc::CompletionQueue* CQ = nullptr;
+ // Some children are stored inline, others are in a set
+ std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
+ std::unordered_set<TContextImpl*> Children;
+ // Single callback is stored without extra allocations
+ TStackVec<TCallback, 1> Callbacks;
+ // Atomic flag for a faster IsCancelled() implementation
+ std::atomic<bool> Cancelled;
+TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
+ : UseCompletionQueuePerThread_(useCompletionQueuePerThread)
+ Init(numWorkerThread);
+void TGRpcClientLow::Init(size_t numWorkerThread) {
+ SetCqState(WORKING);
+ if (UseCompletionQueuePerThread_) {
+ for (size_t i = 0; i < numWorkerThread; i++) {
+ CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
+ auto* cq = CQS_.back().get();
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ }
+ } else {
+ CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
+ auto* cq = CQS_.back().get();
+ for (size_t i = 0; i < numWorkerThread; i++) {
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ }
+ }
+void TGRpcClientLow::AddWorkerThreadForTest() {
+ if (UseCompletionQueuePerThread_) {
+ CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
+ auto* cq = CQS_.back().get();
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ } else {
+ auto* cq = CQS_.back().get();
+ WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
+ PullEvents(cq);
+ }).Release());
+ }
+TGRpcClientLow::~TGRpcClientLow() {
+ StopInternal(true);
+ WaitInternal();
+void TGRpcClientLow::Stop(bool wait) {
+ StopInternal(false);
+ if (wait) {
+ WaitInternal();
+ }
+void TGRpcClientLow::StopInternal(bool silent) {
+ bool shutdown;
+ TVector<TContextImpl::TContextPtr> cancelQueue;
+ {
+ std::unique_lock<std::mutex> guard(Mtx_);
+ auto allowStateChange = [&]() {
+ switch (GetCqState()) {
+ case WORKING:
+ return true;
+ return !silent;
+ return false;
+ }
+ };
+ if (!allowStateChange()) {
+ // Completion queue is already stopping
+ return;
+ }
+ SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
+ if (!silent && !Contexts_.empty()) {
+ cancelQueue.reserve(Contexts_.size());
+ for (auto* ptr : Contexts_) {
+ // N.B. some contexts may be stuck in destructors
+ if (auto context = TContextImpl::LockChildPtr(ptr)) {
+ cancelQueue.emplace_back(std::move(context));
+ }
+ }
+ }
+ shutdown = Contexts_.empty();
+ }
+ for (auto& context : cancelQueue) {
+ context->Cancel();
+ context.reset();
+ }
+ if (shutdown) {
+ for (auto& cq : CQS_) {
+ cq->Shutdown();
+ }
+ }
+void TGRpcClientLow::WaitInternal() {
+ std::unique_lock<std::mutex> guard(JoinMutex_);
+ for (auto& ti : WorkerThreads_) {
+ ti->Join();
+ }
+void TGRpcClientLow::WaitIdle() {
+ std::unique_lock<std::mutex> guard(Mtx_);
+ while (!Contexts_.empty()) {
+ ContextsEmpty_.wait(guard);
+ }
+std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
+ std::unique_lock<std::mutex> guard(Mtx_);
+ auto allowCreateContext = [&]() {
+ switch (GetCqState()) {
+ case WORKING:
+ return true;
+ return false;
+ }
+ };
+ if (!allowCreateContext()) {
+ // New context creation is forbidden
+ return nullptr;
+ }
+ auto context = std::make_shared<TContextImpl>();
+ Contexts_.insert(context.get());
+ context->Owner = this;
+ if (UseCompletionQueuePerThread_) {
+ context->CQ = CQS_[RandomNumber(CQS_.size())].get();
+ } else {
+ context->CQ = CQS_[0].get();
+ }
+ return context;
+void TGRpcClientLow::ForgetContext(TContextImpl* context) {
+ bool shutdown = false;
+ {
+ std::unique_lock<std::mutex> guard(Mtx_);
+ if (!Contexts_.erase(context)) {
+ Y_FAIL("Unexpected ForgetContext(%p)", context);
+ }
+ if (Contexts_.empty()) {
+ if (IsStopping()) {
+ shutdown = true;
+ }
+ ContextsEmpty_.notify_all();
+ }
+ }
+ if (shutdown) {
+ // This was the last context, shutdown CQ
+ for (auto& cq : CQS_) {
+ cq->Shutdown();
+ }
+ }
+} // namespace NGRpc