#include "grpc_io.h"

#include <contrib/libs/grpc/src/core/lib/event_engine/thread_pool.h>
#include <contrib/libs/grpc/src/core/lib/iomgr/exec_ctx.h>
#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h>
#include <contrib/libs/grpc/src/core/lib/surface/completion_queue.h>
#include <contrib/libs/grpc/include/grpc/impl/codegen/log.h>

#include <util/generic/yexception.h>
#include <util/string/cast.h>
#include <util/system/env.h>
#include <util/system/mutex.h>
#include <util/system/thread.h>

namespace NUnifiedAgent {
    namespace {
        std::once_flag GrpcConfigured{};
    }

    TGrpcNotification::TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
        : CompletionQueue(completionQueue)
        , IOCallback(std::move(ioCallback))
        , Completion(MakeHolder<grpc_cq_completion>())
        , InQueue(false)
    {
    }

    TGrpcNotification::~TGrpcNotification() = default;

    void TGrpcNotification::Trigger() {
        {
            bool inQueue = false;
            if (!InQueue.compare_exchange_strong(inQueue, true)) {
                return;
            }
        }
        grpc_core::ApplicationCallbackExecCtx callbackExecCtx;
        grpc_core::ExecCtx execCtx;
        IOCallback->Ref();
        Y_ABORT_UNLESS(grpc_cq_begin_op(CompletionQueue.cq(), this));
        grpc_cq_end_op(CompletionQueue.cq(), this, y_absl::OkStatus(),
                       [](void* self, grpc_cq_completion*) {
                           Y_ABORT_UNLESS(static_cast<TGrpcNotification*>(self)->InQueue.exchange(false));
                       },
                       this, Completion.Get());
    }

    bool TGrpcNotification::FinalizeResult(void** tag, bool*) {
        *tag = IOCallback.Get();
        return true;
    }

    TGrpcTimer::TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
        : CompletionQueue(completionQueue)
        , IOCallback(std::move(ioCallback))
        , Alarm()
        , AlarmIsSet(false)
        , NextTriggerTime(Nothing())
    {
    }

    void TGrpcTimer::Set(TInstant triggerTime) {
        if (AlarmIsSet) {
            NextTriggerTime = triggerTime;
            Alarm.Cancel();
        } else {
            AlarmIsSet = true;
            Alarm.Set(&CompletionQueue, InstantToTimespec(triggerTime), Ref());
        }
    }

    void TGrpcTimer::Cancel() {
        NextTriggerTime.Clear();
        if (AlarmIsSet) {
            Alarm.Cancel();
        }
    }

    IIOCallback* TGrpcTimer::Ref() {
        IOCallback->Ref();
        return this;
    }

    void TGrpcTimer::OnIOCompleted(EIOStatus status) {
        Y_ABORT_UNLESS(AlarmIsSet);
        if (NextTriggerTime) {
            Alarm.Set(&CompletionQueue, InstantToTimespec(*NextTriggerTime), this);
            NextTriggerTime.Clear();
        } else {
            AlarmIsSet = false;
            IOCallback->OnIOCompleted(status);
        }
    }

    TGrpcCompletionQueuePoller::TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue)
        : Queue(queue)
        , Thread()
    {
    }

    void TGrpcCompletionQueuePoller::Start() {
        Thread = std::thread([this]() {
            TThread::SetCurrentThreadName("ua_grpc_cq");
            void* tag;
            bool ok;
            while (Queue.Next(&tag, &ok)) {
                try {
                    static_cast<IIOCallback*>(tag)->OnIOCompleted(ok ? EIOStatus::Ok : EIOStatus::Error);
                } catch (...) {
                    Y_ABORT("unexpected exception [%s]", CurrentExceptionMessage().c_str());
                }
            }
        });
    }

    void TGrpcCompletionQueuePoller::Join() {
        Thread.join();
    }

    TGrpcCompletionQueueHost::TGrpcCompletionQueueHost()
        : CompletionQueue()
        , Poller(CompletionQueue)
    {
    }

    void TGrpcCompletionQueueHost::Start() {
        Poller.Start();
    }

    void TGrpcCompletionQueueHost::Stop() {
        CompletionQueue.Shutdown();
        Poller.Join();
    }

    gpr_timespec InstantToTimespec(TInstant instant) {
        gpr_timespec result;
        result.clock_type = GPR_CLOCK_REALTIME;
        result.tv_sec = static_cast<int64_t>(instant.Seconds());
        result.tv_nsec = instant.NanoSecondsOfSecond();
        return result;
    }

    void EnsureGrpcConfigured() {
        std::call_once(GrpcConfigured, []() {
            const auto limitStr = GetEnv("UA_GRPC_EXECUTOR_THREADS_LIMIT");
            ui64 limit;
            if (limitStr.empty() || !TryFromString(limitStr, limit)) {
                limit = 2;
            }
            grpc_core::Executor::SetThreadsLimit(limit);
            grpc_event_engine::experimental::ThreadPool::SetThreadsLimit(limit);
        });
    }

    void StartGrpcTracing() {
        grpc_tracer_set_enabled("all", true);
        gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
    }

    void FinishGrpcTracing() {
        grpc_tracer_set_enabled("all", false);
        gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
    }
}