#include "impl.h"
#include "condvar.h"
#include "network.h"

#include <library/cpp/testing/unittest/registar.h>

#include <util/string/cast.h>
#include <util/system/pipe.h>
#include <util/system/env.h>
#include <util/system/info.h>
#include <util/system/thread.h>
#include <util/generic/xrange.h>
#include <util/generic/serialized_enum.h>

// TODO (velavokr): BALANCER-1345 add more tests on pollers

class TCoroTest: public TTestBase {
    UNIT_TEST_SUITE(TCoroTest);
    UNIT_TEST(TestSimpleX1);
    UNIT_TEST(TestSimpleX1MultiThread);
    UNIT_TEST(TestSimpleX2);
    UNIT_TEST(TestSimpleX3);
    UNIT_TEST(TestMemFun);
    UNIT_TEST(TestMutex);
    UNIT_TEST(TestCondVar);
    UNIT_TEST(TestJoinDefault);
    UNIT_TEST(TestJoinEpoll);
    UNIT_TEST(TestJoinKqueue);
    UNIT_TEST(TestJoinPoll);
    UNIT_TEST(TestJoinSelect);
    UNIT_TEST(TestException);
    UNIT_TEST(TestJoinCancelExitRaceBug);
    UNIT_TEST(TestWaitWakeLivelockBug);
    UNIT_TEST(TestFastPathWakeDefault)
    // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs
//    UNIT_TEST(TestFastPathWakeEpoll)
    UNIT_TEST(TestFastPathWakeKqueue)
    UNIT_TEST(TestFastPathWakePoll)
    UNIT_TEST(TestFastPathWakeSelect)
    UNIT_TEST(TestLegacyCancelYieldRaceBug)
    UNIT_TEST(TestJoinRescheduleBug);
    UNIT_TEST(TestEventQueue)
    UNIT_TEST(TestNestedExecutor)
    UNIT_TEST(TestComputeCoroutineYield)
    UNIT_TEST(TestPollEngines);
    UNIT_TEST(TestUserEvent);
    UNIT_TEST(TestPause);
    UNIT_TEST(TestOverrideTime);
    UNIT_TEST_SUITE_END();

public:
    void TestException();
    void TestSimpleX1();
    void TestSimpleX1MultiThread();
    void TestSimpleX2();
    void TestSimpleX3();
    void TestMemFun();
    void TestMutex();
    void TestCondVar();
    void TestJoinDefault();
    void TestJoinEpoll();
    void TestJoinKqueue();
    void TestJoinPoll();
    void TestJoinSelect();
    void TestJoinCancelExitRaceBug();
    void TestWaitWakeLivelockBug();
    void TestFastPathWakeDefault();
    void TestFastPathWakeEpoll();
    void TestFastPathWakeKqueue();
    void TestFastPathWakePoll();
    void TestFastPathWakeSelect();
    void TestLegacyCancelYieldRaceBug();
    void TestJoinRescheduleBug();
    void TestEventQueue();
    void TestNestedExecutor();
    void TestComputeCoroutineYield();
    void TestPollEngines();
    void TestUserEvent();
    void TestPause();
    void TestOverrideTime();
};

void TCoroTest::TestException() {
    TContExecutor e(1000000);

    bool f2run = false;

    auto f1 = [&f2run](TCont* c) {
        struct TCtx {
            ~TCtx() {
                Y_VERIFY(!*F2);

                C->Yield();
            }

            TCont* C;
            bool* F2;
        };

        try {
            TCtx ctx = {c, &f2run};

            throw 1;
        } catch (...) {
        }
    };

    bool unc = true;

    auto f2 = [&unc, &f2run](TCont*) {
        f2run = true;
        unc = std::uncaught_exception();

        // check segfault
        try {
            throw 2;
        } catch (int) {
        }
    };

    e.Create(f1, "f1");
    e.Create(f2, "f2");
    e.Execute();

    UNIT_ASSERT(!unc);
}

static int i0;

static void CoRun(TCont* c, void* /*run*/) {
    while (i0 < 100000) {
        ++i0;
        UNIT_ASSERT(RunningCont() == c);
        c->Yield();
        UNIT_ASSERT(RunningCont() == c);
    }
}

static void CoMain(TCont* c, void* /*arg*/) {
    for (volatile size_t i2 = 0; i2 < 10; ++i2) {
        UNIT_ASSERT(RunningCont() == c);
        c->Executor()->Create(CoRun, nullptr, "run");
        UNIT_ASSERT(RunningCont() == c);
    }
}

void TCoroTest::TestSimpleX1() {
    i0 = 0;
    TContExecutor e(32000);

    UNIT_ASSERT(RunningCont() == nullptr);

    e.Execute(CoMain);
    UNIT_ASSERT_VALUES_EQUAL(i0, 100000);

    UNIT_ASSERT(RunningCont() == nullptr);
}

void TCoroTest::TestSimpleX1MultiThread() {
    TVector<THolder<TThread>> threads;
    const size_t nThreads = 0;
    TAtomic c = 0;
    for (size_t i = 0; i < nThreads; ++i) {
        threads.push_back(MakeHolder<TThread>([&]() {
            TestSimpleX1();
            AtomicIncrement(c);
        }));
    }

    for (auto& t : threads) {
        t->Start();
    }

    for (auto& t: threads) {
        t->Join();
    }

    UNIT_ASSERT_EQUAL(c, nThreads);
}

struct TTestObject {
    int i = 0;
    int j = 0;

public:
    void RunTask1(TCont*) {
        i = 1;
    }
    void RunTask2(TCont*) {
        j = 2;
    }
};

void TCoroTest::TestMemFun() {
    i0 = 0;
    TContExecutor e(32000);
    TTestObject obj;
    e.Create<TTestObject, &TTestObject::RunTask1>(&obj, "test1");
    e.Execute<TTestObject, &TTestObject::RunTask2>(&obj);
    UNIT_ASSERT_EQUAL(obj.i, 1);
    UNIT_ASSERT_EQUAL(obj.j, 2);
}

void TCoroTest::TestSimpleX2() {
    {
        i0 = 0;

        {
            TContExecutor e(32000);
            e.Execute(CoMain);
        }

        UNIT_ASSERT_EQUAL(i0, 100000);
    }

    {
        i0 = 0;

        {
            TContExecutor e(32000);
            e.Execute(CoMain);
        }

        UNIT_ASSERT_EQUAL(i0, 100000);
    }
}

struct TRunner {
    inline TRunner()
        : Runs(0)
    {
    }

    inline void operator()(TCont* c) {
        ++Runs;
        c->Yield();
    }

    size_t Runs;
};

void TCoroTest::TestSimpleX3() {
    TContExecutor e(32000);
    TRunner runner;

    for (volatile size_t i3 = 0; i3 < 1000; ++i3) {
        e.Create(runner, "runner");
    }

    e.Execute();

    UNIT_ASSERT_EQUAL(runner.Runs, 1000);
}

static TString res;
static TContMutex mutex;

static void CoMutex(TCont* c, void* /*run*/) {
    {
        mutex.LockI(c);
        c->Yield();
        res += c->Name();
        mutex.UnLock();
    }

    c->Yield();

    {
        mutex.LockI(c);
        c->Yield();
        res += c->Name();
        mutex.UnLock();
    }
}

static void CoMutexTest(TCont* c, void* /*run*/) {
    c->Executor()->Create(CoMutex, nullptr, "1");
    c->Executor()->Create(CoMutex, nullptr, "2");
}

void TCoroTest::TestMutex() {
    TContExecutor e(32000);
    e.Execute(CoMutexTest);
    UNIT_ASSERT_EQUAL(res, "1212");
    res.clear();
}

static TContMutex m1;
static TContCondVar c1;

static void CoCondVar(TCont* c, void* /*run*/) {
    for (size_t i4 = 0; i4 < 3; ++i4) {
        UNIT_ASSERT_EQUAL(m1.LockI(c), 0);
        UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0);
        res += c->Name();
        m1.UnLock();
    }
}

static void CoCondVarTest(TCont* c, void* /*run*/) {
    c->Executor()->Create(CoCondVar, nullptr, "1");
    c->Yield();
    c->Executor()->Create(CoCondVar, nullptr, "2");
    c->Yield();
    c->Executor()->Create(CoCondVar, nullptr, "3");
    c->Yield();
    c->Executor()->Create(CoCondVar, nullptr, "4");
    c->Yield();
    c->Executor()->Create(CoCondVar, nullptr, "5");
    c->Yield();
    c->Executor()->Create(CoCondVar, nullptr, "6");
    c->Yield();

    for (size_t i5 = 0; i5 < 3; ++i5) {
        res += ToString((size_t)i5) + "^";
        c1.BroadCast();
        c->Yield();
    }
}

void TCoroTest::TestCondVar() {
    TContExecutor e(32000);
    e.Execute(CoCondVarTest);
    UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456");
    res.clear();
}

namespace NCoroTestJoin {
    struct TSleepCont {
        const TInstant Deadline;
        int Result;

        inline void operator()(TCont* c) {
            Result = c->SleepD(Deadline);
        }
    };

    struct TReadCont {
        const TInstant Deadline;
        const SOCKET Sock;
        int Result;

        inline void operator()(TCont* c) {
            char buf = 0;
            Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status();
        }
    };

    struct TJoinCont {
        const TInstant Deadline;
        TCont* const Cont;
        bool Result;

        inline void operator()(TCont* c) {
            Result = c->Join(Cont, Deadline);
        }
    };

    void DoTestJoin(EContPoller pollerType) {
        auto poller = IPollerFace::Construct(pollerType);

        if (!poller) {
            return;
        }

        TContExecutor e(32000, std::move(poller));

        TPipe in, out;
        TPipe::Pipe(in, out);
        SetNonBlock(in.GetHandle());

        {
            TSleepCont sc = {TInstant::Max(), 0};
            TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};

            e.Execute(jc);

            UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
            UNIT_ASSERT_EQUAL(jc.Result, false);
        }

        {
            TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0};
            TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false};

            e.Execute(jc);

            UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT);
            UNIT_ASSERT_EQUAL(jc.Result, true);
        }

        {
            TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0};
            TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};

            e.Execute(jc);

            UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
            UNIT_ASSERT_EQUAL(jc.Result, false);
        }

        {
            TReadCont rc = {TInstant::Max(), in.GetHandle(), 0};
            TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};

            e.Execute(jc);

            UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
            UNIT_ASSERT_EQUAL(jc.Result, false);
        }

        {
            TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0};
            TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false};

            e.Execute(jc);

            UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT);
            UNIT_ASSERT_EQUAL(jc.Result, true);
        }

        {
            TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0};
            TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};

            e.Execute(jc);

            UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
            UNIT_ASSERT_EQUAL(jc.Result, false);
        }
    }
}

void TCoroTest::TestJoinDefault() {
    NCoroTestJoin::DoTestJoin(EContPoller::Default);
}

void TCoroTest::TestJoinEpoll() {
    NCoroTestJoin::DoTestJoin(EContPoller::Epoll);
}

void TCoroTest::TestJoinKqueue() {
    NCoroTestJoin::DoTestJoin(EContPoller::Kqueue);
}

void TCoroTest::TestJoinPoll() {
    NCoroTestJoin::DoTestJoin(EContPoller::Poll);
}

void TCoroTest::TestJoinSelect() {
    NCoroTestJoin::DoTestJoin(EContPoller::Select);
}

namespace NCoroJoinCancelExitRaceBug {
    struct TState {
        TCont* Sub = nullptr;
    };

    static void DoAux(TCont*, void* argPtr) noexcept {
        TState& state = *(TState*)(argPtr);

        // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]}
        state.Sub->Cancel();
    }

    static void DoSub2(TCont*, void*) noexcept {
        // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]}
        // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]}
    }

    static void DoSub(TCont* cont, void* argPtr) noexcept {
        TState& state = *(TState*)(argPtr);
        state.Sub = cont;

        // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]}
        auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2");

        // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux)
        // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]}
        // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit
        cont->Join(sub2);

        state.Sub = nullptr;
    }

    static void DoMain(TCont* cont) noexcept {
        TState state;

        // 01.{Ready:[]} > {Ready:[Sub]}
        auto* sub = cont->Executor()->Create(DoSub, &state, "Sub");

        // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]}
        cont->Executor()->Create(DoAux, &state, "Aux");

        // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub)
        cont->Join(sub);
    }
}

void TCoroTest::TestJoinCancelExitRaceBug() {
    TContExecutor exec(20000);
    exec.SetFailOnError(true);
    exec.Execute(NCoroJoinCancelExitRaceBug::DoMain);
}

namespace NCoroWaitWakeLivelockBug {
    struct TState;

    struct TSubState {
        TSubState(TState& parent, ui32 self)
            : Parent(parent)
            , Name(TStringBuilder() << "Sub" << self)
            , Self(self)
        {
            UNIT_ASSERT(self < 2);
        }

        TSubState& OtherState();

        TState& Parent;
        TTimerEvent* Event = nullptr;
        TCont* Cont = nullptr;
        TString Name;
        ui32 Self = -1;
    };

    struct TState {
        TState()
            : Subs{{*this, 0}, {*this, 1}}
        {}

        TSubState Subs[2];
        bool Stop = false;
    };

    TSubState& TSubState::OtherState() {
        return Parent.Subs[1 - Self];
    }

    static void DoStop(TCont* cont, void* argPtr) {
        TState& state = *(TState*)(argPtr);

        TTimerEvent event(cont, TInstant::Now());
        ExecuteEvent(&event);
        state.Stop = true;
        for (auto& sub: state.Subs) {
            if (sub.Event) {
                sub.Event->Wake(EWAKEDUP);
            }
        }
    }

    static void DoSub(TCont* cont, void* argPtr) {
        TSubState& state = *(TSubState*)(argPtr);

        while (!state.Parent.Stop) {
            TTimerEvent event(cont, TInstant::Max());
            if (state.OtherState().Event) {
                state.OtherState().Event->Wake(EWAKEDUP);
            }
            state.Event = &event;
            ExecuteEvent(&event);
            state.Event = nullptr;
        }

        state.Cont = nullptr;
    }

    static void DoMain(TCont* cont) noexcept {
        TState state;

        for (auto& subState : state.Subs) {
            subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
        }

        cont->Join(
            cont->Executor()->Create(DoStop, &state, "Stop")
        );

        for (auto& subState : state.Subs) {
            if (subState.Cont) {
                cont->Join(subState.Cont);
            }
        }
    }
}

void TCoroTest::TestWaitWakeLivelockBug() {
    TContExecutor exec(20000);
    exec.SetFailOnError(true);
    exec.Execute(NCoroWaitWakeLivelockBug::DoMain);
}

namespace NCoroTestFastPathWake {
    struct TState;

    struct TSubState {
        TSubState(TState& parent, ui32 self)
            : Parent(parent)
            , Name(TStringBuilder() << "Sub" << self)
        {}

        TState& Parent;
        TInstant Finish;
        TTimerEvent* Event = nullptr;
        TCont* Cont = nullptr;
        TString Name;
    };

    struct TState {
        TState()
            : Subs{{*this, 0}, {*this, 1}}
        {
            TPipe::Pipe(In, Out);
            SetNonBlock(In.GetHandle());
        }

        TSubState Subs[2];
        TPipe In, Out;
        bool IoSleepRunning = false;
    };

    static void DoIoSleep(TCont* cont, void* argPtr) noexcept {
        try {
            TState& state = *(TState*) (argPtr);
            state.IoSleepRunning = true;

            TTempBuf tmp;
            // Wait for the event from io
            auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine());
            UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0);
            state.IoSleepRunning = false;
        } catch (const NUnitTest::TAssertException& ex) {
            Cerr << ex.AsStrBuf() << Endl;
            ex.BackTrace()->PrintTo(Cerr);
            throw;
        } catch (...) {
            Cerr << CurrentExceptionMessage() << Endl;
            throw;
        }
    }

    static void DoSub(TCont* cont, void* argPtr) noexcept {
        TSubState& state = *(TSubState*)(argPtr);

        TTimerEvent event(cont, TInstant::Max());
        state.Event = &event;
        ExecuteEvent(&event);
        state.Event = nullptr;
        state.Cont = nullptr;
        state.Finish = TInstant::Now();
    }

    static void DoMain(TCont* cont) noexcept {
        try {
            TState state;
            TInstant start = TInstant::Now();

            // This guy sleeps on io
            auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper");

            // These guys are to be woken up right away
            for (auto& subState : state.Subs) {
                subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
            }

            // Give way
            cont->Yield();

            // Check everyone has started, wake those to be woken
            UNIT_ASSERT(state.IoSleepRunning);

            for (auto& subState : state.Subs) {
                UNIT_ASSERT(subState.Event);
                subState.Event->Wake(EWAKEDUP);
            }

            // Give way again
            cont->Yield();

            // Check the woken guys have finished and quite soon
            for (auto& subState : state.Subs) {
                UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100));
                UNIT_ASSERT(!subState.Cont);
            }

            // Wake the io guy and finish
            state.Out.Close();

            if (state.IoSleepRunning) {
                cont->Join(sleeper);
            }

            // Check everything has ended sooner than the timeout
            UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1));
        } catch (const NUnitTest::TAssertException& ex) {
            Cerr << ex.AsStrBuf() << Endl;
            ex.BackTrace()->PrintTo(Cerr);
            throw;
        } catch (...) {
            Cerr << CurrentExceptionMessage() << Endl;
            throw;
        }
    }

    static void DoTestFastPathWake(EContPoller pollerType) {
        if (auto poller = IPollerFace::Construct(pollerType)) {
            TContExecutor exec(20000, std::move(poller));
            exec.SetFailOnError(true);
            exec.Execute(NCoroTestFastPathWake::DoMain);
        }
    }
}

void TCoroTest::TestFastPathWakeDefault() {
    NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default);
}

void TCoroTest::TestFastPathWakeEpoll() {
    NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll);
}

void TCoroTest::TestFastPathWakeKqueue() {
    NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue);
}

void TCoroTest::TestFastPathWakePoll() {
    NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll);
}

void TCoroTest::TestFastPathWakeSelect() {
    NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select);
}

namespace NCoroTestLegacyCancelYieldRaceBug {
    enum class EState {
        Idle, Running, Finished,
    };

    struct TState {
        EState SubState = EState::Idle;
    };

    static void DoSub(TCont* cont, void* argPtr) {
        TState& state = *(TState*)argPtr;
        state.SubState = EState::Running;
        cont->Yield();
        cont->Yield();
        state.SubState = EState::Finished;
    }

    static void DoMain(TCont* cont, void* argPtr) {
        TState& state = *(TState*)argPtr;
        TCont* sub =  cont->Executor()->Create(DoSub, argPtr, "Sub");
        sub->Cancel();
        cont->Yield();
        UNIT_ASSERT_EQUAL(state.SubState, EState::Finished);
    }
}

void TCoroTest::TestLegacyCancelYieldRaceBug() {
    NCoroTestLegacyCancelYieldRaceBug::TState state;
    TContExecutor exec(20000);
    exec.SetFailOnError(true);
    exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state);
}

namespace NCoroTestJoinRescheduleBug {
    enum class EState {
        Idle, Running, Finished,
    };

    struct TState {
        TCont* volatile SubA = nullptr;
        volatile EState SubAState = EState::Idle;
        volatile EState SubBState = EState::Idle;
        volatile EState SubCState = EState::Idle;
    };

    static void DoSubC(TCont* cont, void* argPtr) {
        TState& state = *(TState*)argPtr;
        state.SubCState = EState::Running;
        while (state.SubBState != EState::Running) {
            cont->Yield();
        }
        while (cont->SleepD(TInstant::Max()) != ECANCELED) {
        }
        state.SubCState = EState::Finished;
    }

    static void DoSubB(TCont* cont, void* argPtr) {
        TState& state = *(TState*)argPtr;
        state.SubBState = EState::Running;
        while (state.SubAState != EState::Running && state.SubCState != EState::Running) {
            cont->Yield();
        }
        for (auto i : xrange(100)) {
            Y_UNUSED(i);
            if (!state.SubA) {
                break;
            }
            state.SubA->ReSchedule();
            cont->Yield();
        }
        state.SubBState = EState::Finished;
    }

    static void DoSubA(TCont* cont, void* argPtr) {
        TState& state = *(TState*)argPtr;
        state.SubAState = EState::Running;
        TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC");
        while (state.SubBState != EState::Running && state.SubCState != EState::Running) {
            cont->Yield();
        }
        cont->Join(subC);
        UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
        state.SubA = nullptr;
        state.SubAState = EState::Finished;
    }

    static void DoMain(TCont* cont, void* argPtr) {
        TState& state = *(TState*)argPtr;
        TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA");
        state.SubA = subA;
        cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB"));

        if (state.SubA) {
            subA->Cancel();
            cont->Join(subA);
        }
    }
}

void TCoroTest::TestJoinRescheduleBug() {
    using namespace NCoroTestJoinRescheduleBug;
    TState state;
    {
        TContExecutor exec(20000);
        exec.Execute(DoMain, &state);
    }
    UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished);
    UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished);
    UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
}

void TCoroTest::TestEventQueue() {
    NCoro::TEventWaitQueue queue;
    UNIT_ASSERT(queue.Empty());
    UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant());
    TContExecutor exec(32000);
    exec.Execute([](TCont* cont, void* arg) {
        NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg;
        TTimerEvent ev(cont, TInstant::Max());
        TTimerEvent ev2(cont, TInstant::Seconds(12345));
        q->Register(&ev);
        UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
        UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
        q->Register(&ev2);
        UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
        UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
        UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345));
        UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max());
    }, &queue);
}

void TCoroTest::TestNestedExecutor() {
#ifndef _win_
    //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr
    TContExecutor exec(32000);
    UNIT_ASSERT(!RunningCont());

    exec.Execute([](TCont* cont, void*) {
        UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);

        TContExecutor exec2(32000);
        exec2.Execute([](TCont* cont2, void*) {
            UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
            TContExecutor exec3(32000);
            exec3.Execute([](TCont* cont3, void*) {
                UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3);
            });

            UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
        });

        UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
    });

    UNIT_ASSERT(!RunningCont());
#endif
}

void TCoroTest::TestComputeCoroutineYield() {
//if we have busy (e.g., on cpu) coroutine, when it yields, io must flow
    TContExecutor exec(32000);
    exec.SetFailOnError(true);

    TPipe in, out;
    TPipe::Pipe(in, out);
    SetNonBlock(in.GetHandle());
    size_t lastRead = 42;

    auto compute = [&](TCont* cont) {
        for (size_t i = 0; i < 10; ++i) {
            write(out.GetHandle(), &i, sizeof i);
            Sleep(TDuration::MilliSeconds(10));
            cont->Yield();
            UNIT_ASSERT(lastRead == i);
        }
    };

    auto io = [&](TCont* cont) {
        for (size_t i = 0; i < 10; ++i) {
            NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead);
        }
    };

    exec.Create(compute, "compute");
    exec.Create(io, "io");

    exec.Execute();
}

void TCoroTest::TestPollEngines() {
    bool defaultChecked = false;
    for (auto engine : GetEnumAllValues<EContPoller>()) {
        auto poller = IPollerFace::Construct(engine);
        if (!poller) {
            continue;
        }

        TContExecutor exec(32000, IPollerFace::Construct(engine));

        if (engine == EContPoller::Default) {
            defaultChecked = true;
            UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined);
        } else {
            UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine);
        }
    }

    UNIT_ASSERT(defaultChecked);
}

void TCoroTest::TestPause() {
    TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing()};

    int i = 0;
    executor.CreateOwned([&](TCont*) {
        i++;
        executor.Pause();
        i++;
    }, "coro");

    UNIT_ASSERT_EQUAL(i, 0);
    executor.Execute();
    UNIT_ASSERT_EQUAL(i, 1);
    executor.Execute();
    UNIT_ASSERT_EQUAL(i, 2);
}

void TCoroTest::TestUserEvent() {
    TContExecutor exec(32000);

    struct TUserEvent : public IUserEvent {
        bool Called = false;
        void Execute() override {
            Called = true;
        }
    } event;

    auto f = [&](TCont* cont) {
        UNIT_ASSERT(!event.Called);
        exec.ScheduleUserEvent(&event);
        UNIT_ASSERT(!event.Called);
        cont->Yield();
        UNIT_ASSERT(event.Called);
    };

    exec.Execute(f);

    UNIT_ASSERT(event.Called);
}

void TCoroTest::TestOverrideTime() {
    class TTime: public NCoro::ITime {
      public:
        TInstant Now() override {
            return Current;
        }

        TInstant Current = TInstant::Zero();
    };

    TTime time;
    TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing(), &time};

    executor.CreateOwned([&](TCont* cont) {
        UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Zero());
        time.Current = TInstant::Seconds(1);
        cont->SleepD(TInstant::Seconds(1));
        UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Seconds(1));
    }, "coro");

    executor.Execute();
}
UNIT_TEST_SUITE_REGISTRATION(TCoroTest);