#include "grpc_io.h"
#include <contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h>
#include <contrib/libs/grpc/src/core/lib/iomgr/exec_ctx.h>
#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h>
#include <contrib/libs/grpc/src/core/lib/surface/completion_queue.h>
#include <contrib/libs/grpc/include/grpc/impl/codegen/log.h>
#include <util/generic/yexception.h>
#include <util/string/cast.h>
#include <util/system/env.h>
#include <util/system/mutex.h>
#include <util/system/thread.h>
namespace NUnifiedAgent {
namespace {
std::once_flag GrpcConfigured{};
}
TGrpcNotification::TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
: CompletionQueue(completionQueue)
, IOCallback(std::move(ioCallback))
, Completion(MakeHolder<grpc_cq_completion>())
, InQueue(false)
{
}
TGrpcNotification::~TGrpcNotification() = default;
void TGrpcNotification::Trigger() {
{
bool inQueue = false;
if (!InQueue.compare_exchange_strong(inQueue, true)) {
return;
}
}
grpc_core::ApplicationCallbackExecCtx callbackExecCtx;
grpc_core::ExecCtx execCtx;
IOCallback->Ref();
Y_ABORT_UNLESS(grpc_cq_begin_op(CompletionQueue.cq(), this));
grpc_cq_end_op(CompletionQueue.cq(), this, y_absl::OkStatus(),
[](void* self, grpc_cq_completion*) {
Y_ABORT_UNLESS(static_cast<TGrpcNotification*>(self)->InQueue.exchange(false));
},
this, Completion.Get());
}
bool TGrpcNotification::FinalizeResult(void** tag, bool*) {
*tag = IOCallback.Get();
return true;
}
TGrpcTimer::TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
: CompletionQueue(completionQueue)
, IOCallback(std::move(ioCallback))
, Alarm()
, AlarmIsSet(false)
, NextTriggerTime(Nothing())
{
}
void TGrpcTimer::Set(TInstant triggerTime) {
if (AlarmIsSet) {
NextTriggerTime = triggerTime;
Alarm.Cancel();
} else {
AlarmIsSet = true;
Alarm.Set(&CompletionQueue, InstantToTimespec(triggerTime), Ref());
}
}
void TGrpcTimer::Cancel() {
NextTriggerTime.Clear();
if (AlarmIsSet) {
Alarm.Cancel();
}
}
IIOCallback* TGrpcTimer::Ref() {
IOCallback->Ref();
return this;
}
void TGrpcTimer::OnIOCompleted(EIOStatus status) {
Y_ABORT_UNLESS(AlarmIsSet);
if (NextTriggerTime) {
Alarm.Set(&CompletionQueue, InstantToTimespec(*NextTriggerTime), this);
NextTriggerTime.Clear();
} else {
AlarmIsSet = false;
IOCallback->OnIOCompleted(status);
}
}
TGrpcCompletionQueuePoller::TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue)
: Queue(queue)
, Thread()
{
}
void TGrpcCompletionQueuePoller::Start() {
Thread = std::thread([this]() {
TThread::SetCurrentThreadName("ua_grpc_cq");
void* tag;
bool ok;
while (Queue.Next(&tag, &ok)) {
try {
static_cast<IIOCallback*>(tag)->OnIOCompleted(ok ? EIOStatus::Ok : EIOStatus::Error);
} catch (...) {
Y_ABORT("unexpected exception [%s]", CurrentExceptionMessage().c_str());
}
}
});
}
void TGrpcCompletionQueuePoller::Join() {
Thread.join();
}
TGrpcCompletionQueueHost::TGrpcCompletionQueueHost()
: CompletionQueue()
, Poller(CompletionQueue)
{
}
void TGrpcCompletionQueueHost::Start() {
Poller.Start();
}
void TGrpcCompletionQueueHost::Stop() {
CompletionQueue.Shutdown();
Poller.Join();
}
gpr_timespec InstantToTimespec(TInstant instant) {
gpr_timespec result;
result.clock_type = GPR_CLOCK_REALTIME;
result.tv_sec = static_cast<int64_t>(instant.Seconds());
result.tv_nsec = instant.NanoSecondsOfSecond();
return result;
}
void EnsureGrpcConfigured() {
std::call_once(GrpcConfigured, []() {
const auto limitStr = GetEnv("UA_GRPC_EXECUTOR_THREADS_LIMIT");
ui64 limit;
if (limitStr.empty() || !TryFromString(limitStr, limit)) {
limit = 2;
}
grpc_core::Executor::SetThreadsLimit(limit);
grpc_event_engine::experimental::ThreadPool::SetThreadsLimit(limit);
});
}
void StartGrpcTracing() {
grpc_tracer_set_enabled("all", true);
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
}
void FinishGrpcTracing() {
grpc_tracer_set_enabled("all", false);
gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
}
}