diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 10:54:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 12:28:07 +0300 |
commit | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch) | |
tree | c15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/unified_agent_client/grpc_io.cpp | |
parent | 8a749596d40e91c896a1907afcd108d9221fbde1 (diff) | |
download | ydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz |
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/unified_agent_client/grpc_io.cpp')
-rw-r--r-- | library/cpp/unified_agent_client/grpc_io.cpp | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/library/cpp/unified_agent_client/grpc_io.cpp b/library/cpp/unified_agent_client/grpc_io.cpp new file mode 100644 index 0000000000..6d237d75ec --- /dev/null +++ b/library/cpp/unified_agent_client/grpc_io.cpp @@ -0,0 +1,161 @@ +#include "grpc_io.h" + +#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h> +#include <contrib/libs/grpc/src/core/lib/surface/completion_queue.h> +#include <contrib/libs/grpc/include/grpc/impl/codegen/log.h> + +#include <util/generic/yexception.h> +#include <util/string/cast.h> +#include <util/system/env.h> +#include <util/system/mutex.h> +#include <util/system/thread.h> + +namespace NUnifiedAgent { + namespace { + std::once_flag GrpcConfigured{}; + } + + TGrpcNotification::TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback) + : CompletionQueue(completionQueue) + , IOCallback(std::move(ioCallback)) + , Completion(MakeHolder<grpc_cq_completion>()) + , InQueue(false) + { + } + + TGrpcNotification::~TGrpcNotification() = default; + + void TGrpcNotification::Trigger() { + { + bool inQueue = false; + if (!InQueue.compare_exchange_strong(inQueue, true)) { + return; + } + } + grpc_core::ApplicationCallbackExecCtx callbackExecCtx; + grpc_core::ExecCtx execCtx; + IOCallback->Ref(); + Y_VERIFY(grpc_cq_begin_op(CompletionQueue.cq(), this)); + grpc_cq_end_op(CompletionQueue.cq(), this, nullptr, + [](void* self, grpc_cq_completion*) { + Y_VERIFY(static_cast<TGrpcNotification*>(self)->InQueue.exchange(false)); + }, + this, Completion.Get()); + } + + bool TGrpcNotification::FinalizeResult(void** tag, bool*) { + *tag = IOCallback.Get(); + return true; + } + + TGrpcTimer::TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback) + : CompletionQueue(completionQueue) + , IOCallback(std::move(ioCallback)) + , Alarm() + , AlarmIsSet(false) + , NextTriggerTime(Nothing()) + { + } + + void TGrpcTimer::Set(TInstant triggerTime) { + if (AlarmIsSet) { + NextTriggerTime = triggerTime; + Alarm.Cancel(); + } else { + AlarmIsSet = true; + Alarm.Set(&CompletionQueue, InstantToTimespec(triggerTime), Ref()); + } + } + + void TGrpcTimer::Cancel() { + NextTriggerTime.Clear(); + if (AlarmIsSet) { + Alarm.Cancel(); + } + } + + IIOCallback* TGrpcTimer::Ref() { + IOCallback->Ref(); + return this; + } + + void TGrpcTimer::OnIOCompleted(EIOStatus status) { + Y_VERIFY(AlarmIsSet); + if (NextTriggerTime) { + Alarm.Set(&CompletionQueue, InstantToTimespec(*NextTriggerTime), this); + NextTriggerTime.Clear(); + } else { + AlarmIsSet = false; + IOCallback->OnIOCompleted(status); + } + } + + TGrpcCompletionQueuePoller::TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue) + : Queue(queue) + , Thread() + { + } + + void TGrpcCompletionQueuePoller::Start() { + Thread = std::thread([this]() { + TThread::SetCurrentThreadName("ua_grpc_cq"); + void* tag; + bool ok; + while (Queue.Next(&tag, &ok)) { + try { + static_cast<IIOCallback*>(tag)->OnIOCompleted(ok ? EIOStatus::Ok : EIOStatus::Error); + } catch (...) { + Y_FAIL("unexpected exception [%s]", CurrentExceptionMessage().c_str()); + } + } + }); + } + + void TGrpcCompletionQueuePoller::Join() { + Thread.join(); + } + + TGrpcCompletionQueueHost::TGrpcCompletionQueueHost() + : CompletionQueue() + , Poller(CompletionQueue) + { + } + + void TGrpcCompletionQueueHost::Start() { + Poller.Start(); + } + + void TGrpcCompletionQueueHost::Stop() { + CompletionQueue.Shutdown(); + Poller.Join(); + } + + gpr_timespec InstantToTimespec(TInstant instant) { + gpr_timespec result; + result.clock_type = GPR_CLOCK_REALTIME; + result.tv_sec = static_cast<int64_t>(instant.Seconds()); + result.tv_nsec = instant.NanoSecondsOfSecond(); + return result; + } + + void EnsureGrpcConfigured() { + std::call_once(GrpcConfigured, []() { + const auto limitStr = GetEnv("UA_GRPC_EXECUTOR_THREADS_LIMIT"); + ui64 limit; + if (limitStr.Empty() || !TryFromString(limitStr, limit)) { + limit = 2; + } + grpc_core::Executor::SetThreadsLimit(limit); + }); + } + + void StartGrpcTracing() { + grpc_tracer_set_enabled("all", true); + gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); + } + + void FinishGrpcTracing() { + grpc_tracer_set_enabled("all", false); + gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR); + } +} |