#include "impl.h"

#include "stack/stack_allocator.h"
#include "stack/stack_guards.h"

#include <util/generic/scope.h>
#include <util/thread/singleton.h>
#include <util/stream/format.h>
#include <util/stream/output.h>
#include <util/system/yassert.h>


TCont::TJoinWait::TJoinWait(TCont& c) noexcept
    : Cont_(c)
{}

void TCont::TJoinWait::Wake() noexcept {
    Cont_.ReSchedule();
}

TCont::TCont(NCoro::NStack::IAllocator& allocator,
             uint32_t stackSize,
             TContExecutor& executor,
             NCoro::TTrampoline::TFunc func,
             const char* name) noexcept
    : Executor_(executor)
    , Name_(name)
    , Trampoline_(
        allocator,
        stackSize,
        std::move(func),
        this
    )
{}


void TCont::PrintMe(IOutputStream& out) const noexcept {
    out << "cont("
        << "name = " << Name_ << ", "
        << "addr = " << Hex((size_t)this)
        << ")";
}

bool TCont::Join(TCont* c, TInstant deadLine, std::function<void(TJoinWait&, TCont*)> forceStop) noexcept {
    TJoinWait ev(*this);
    c->Waiters_.PushBack(&ev);

    do {
        if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) {
            if (!ev.Empty()) {
                if (forceStop) {
                    forceStop(ev, c);
                } else {
                    c->Cancel();
                }

                do {
                    Switch();
                } while (!ev.Empty());
            }

            return false;
        }
    } while (!ev.Empty());

    return true;
}

int TCont::SleepD(TInstant deadline) noexcept {
    TTimerEvent event(this, deadline);

    return ExecuteEvent(&event);
}

void TCont::Switch() noexcept {
    Executor()->RunScheduler();
}

void TCont::Yield() noexcept {
    if (SleepD(TInstant::Zero())) {
        ReScheduleAndSwitch();
    }
}

void TCont::ReScheduleAndSwitch() noexcept {
    ReSchedule();
    Switch();
}

void TCont::Terminate() {
    while (!Waiters_.Empty()) {
        Waiters_.PopFront()->Wake();
    }
    Executor()->Exit(this);
}

bool TCont::IAmRunning() const noexcept {
    return this == Executor()->Running();
}

void TCont::Cancel() noexcept {
    if (Cancelled()) {
        return;
    }

    Cancelled_ = true;

    if (!IAmRunning()) {
        ReSchedule();
    }
}

void TCont::Cancel(THolder<std::exception> exception) noexcept {
    if (!Cancelled()) {
        SetException(std::move(exception));
        Cancel();
    }
}

void TCont::ReSchedule() noexcept {
    if (Cancelled()) {
        // Legacy code may expect a Cancelled coroutine to be scheduled without delay.
        Executor()->ScheduleExecutionNow(this);
    } else {
        Executor()->ScheduleExecution(this);
    }
}


TContExecutor::TContExecutor(
    uint32_t defaultStackSize,
    THolder<IPollerFace> poller,
    NCoro::IScheduleCallback* scheduleCallback,
    NCoro::IEnterPollerCallback* enterPollerCallback,
    NCoro::NStack::EGuard defaultGuard,
    TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings,
    NCoro::ITime* time
)
    : ScheduleCallback_(scheduleCallback)
    , EnterPollerCallback_(enterPollerCallback)
    , DefaultStackSize_(defaultStackSize)
    , Poller_(std::move(poller))
    , Time_(time)
{
    StackAllocator_ = NCoro::NStack::GetAllocator(poolSettings, defaultGuard);
}

TContExecutor::~TContExecutor() {
    Y_ABORT_UNLESS(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_);
}

void TContExecutor::Execute() noexcept {
    auto nop = [](void*){};
    Execute(nop);
}

void TContExecutor::Execute(TContFunc func, void* arg) noexcept {
    CreateOwned([=](TCont* cont) {
        func(cont, arg);
    }, "sys_main");
    RunScheduler();
}

void TContExecutor::WaitForIO() {
    while (Ready_.Empty() && !WaitQueue_.Empty()) {
        const auto now = Now();

        // Waking a coroutine puts it into ReadyNext_ list
        const auto next = WaitQueue_.WakeTimedout(now);

        if (!UserEvents_.Empty()) {
            TIntrusiveList<IUserEvent> userEvents;
            userEvents.Swap(UserEvents_);
            do {
                userEvents.PopFront()->Execute();
            } while (!userEvents.Empty());
        }

        // Polling will return as soon as there is an event to process or a timeout.
        // If there are woken coroutines we do not want to sleep in the poller
        //      yet still we want to check for new io
        //      to prevent ourselves from locking out of io by constantly waking coroutines.

        if (ReadyNext_.Empty()) {
            if (EnterPollerCallback_) {
                EnterPollerCallback_->OnEnterPoller();
            }
            Poll(next);
            if (EnterPollerCallback_) {
                EnterPollerCallback_->OnExitPoller();
            }
        } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) {
            if (EnterPollerCallback_) {
                EnterPollerCallback_->OnEnterPoller();
            }
            Poll(now);
            if (EnterPollerCallback_) {
                EnterPollerCallback_->OnExitPoller();
            }
        }

        Ready_.Append(ReadyNext_);
    }
}

void TContExecutor::Poll(TInstant deadline) {
    Poller_.Wait(PollerEvents_, deadline);
    LastPoll_ = Now();

    // Waking a coroutine puts it into ReadyNext_ list
    for (auto event : PollerEvents_) {
        auto* lst = (NCoro::TPollEventList*)event.Data;
        const int status = event.Status;

        if (status) {
            for (auto it = lst->Begin(); it != lst->End();) {
                (it++)->OnPollEvent(status);
            }
        } else {
            const ui16 filter = event.Filter;

            for (auto it = lst->Begin(); it != lst->End();) {
                if (it->What() & filter) {
                    (it++)->OnPollEvent(0);
                } else {
                    ++it;
                }
            }
        }
    }
}

void TContExecutor::Abort() noexcept {
    WaitQueue_.Abort();
    auto visitor = [](TCont* c) {
        c->Cancel();
    };
    Ready_.ForEach(visitor);
    ReadyNext_.ForEach(visitor);
}

TCont* TContExecutor::Create(
    TContFunc func,
    void* arg,
    const char* name,
    TMaybe<ui32> customStackSize
) noexcept {
    return CreateOwned([=](TCont* cont) {
        func(cont, arg);
    }, name, customStackSize);
}

TCont* TContExecutor::CreateOwned(
    NCoro::TTrampoline::TFunc func,
    const char* name,
    TMaybe<ui32> customStackSize
) noexcept {
    Allocated_ += 1;
    if (!customStackSize) {
        customStackSize = DefaultStackSize_;
    }
    auto* cont = new TCont(*StackAllocator_, *customStackSize, *this, std::move(func), name);
    ScheduleExecution(cont);
    return cont;
}

NCoro::NStack::TAllocatorStats TContExecutor::GetAllocatorStats() const noexcept {
    return StackAllocator_->GetStackStats();
}

void TContExecutor::Release(TCont* cont) noexcept {
    delete cont;
    Allocated_ -= 1;
}

void TContExecutor::ScheduleToDelete(TCont* cont) noexcept {
    ToDelete_.PushBack(cont);
}

void TContExecutor::ScheduleExecution(TCont* cont) noexcept {
    cont->Scheduled_ = true;
    ReadyNext_.PushBack(cont);
}

void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept {
    cont->Scheduled_ = true;
    Ready_.PushBack(cont);
}

namespace {
    inline TContExecutor*& ThisThreadExecutor() {
        struct TThisThreadExecutorHolder {
            TContExecutor* Executor = nullptr;
        };
        return FastTlsSingletonWithPriority<TThisThreadExecutorHolder, 0>()->Executor;
    }
}

void TContExecutor::DeleteScheduled() noexcept {
    ToDelete_.ForEach([this](TCont* c) {
        Release(c);
    });
}

TCont* RunningCont() {
    TContExecutor* thisThreadExecutor = ThisThreadExecutor();
    return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr;
}

void TContExecutor::RunScheduler() noexcept {
    try {
        TContExecutor* const prev = ThisThreadExecutor();
        ThisThreadExecutor() = this;
        TCont* caller = Current_;
        TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_;
        Y_DEFER {
            ThisThreadExecutor() = prev;
        };

        while (true) {
            if (ScheduleCallback_ && Current_) {
                ScheduleCallback_->OnUnschedule(*this);
            }

            WaitForIO();
            DeleteScheduled();
            Ready_.Append(ReadyNext_);

            if (Ready_.Empty()) {
                Current_ = nullptr;
                if (caller) {
                    context->SwitchTo(&SchedContext_);
                }
                break;
            }

            TCont* cont = Ready_.PopFront();

            if (ScheduleCallback_) {
                ScheduleCallback_->OnSchedule(*this, *cont);
            }

            Current_ = cont;
            cont->Scheduled_ = false;
            if (cont == caller) {
                break;
            }
            context->SwitchTo(cont->Trampoline_.Context());
            if (Paused_) {
                Paused_ = false;
                Current_ = nullptr;
                break;
            }
            if (caller) {
                break;
            }
        }
    } catch (...) {
        TBackTrace::FromCurrentException().PrintTo(Cerr);
        Y_ABORT("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str());
    }
}

void TContExecutor::Pause() {
    if (auto cont = Running()) {
        Paused_ = true;
        ScheduleExecutionNow(cont);
        cont->SwitchTo(&SchedContext_);
    }
}

void TContExecutor::Exit(TCont* cont) noexcept {
    ScheduleToDelete(cont);
    cont->SwitchTo(&SchedContext_);
    Y_ABORT("can not return from exit");
}

TInstant TContExecutor::Now() {
    return Y_LIKELY(Time_ == nullptr) ? TInstant::Now() : Time_->Now();
}

template <>
void Out<TCont>(IOutputStream& out, const TCont& c) {
    c.PrintMe(out);
}