path: root/library/cpp/coroutine/engine/coroutine_ut.cpp
diff options
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/coroutine/engine/coroutine_ut.cpp
intermediate changes
Diffstat (limited to 'library/cpp/coroutine/engine/coroutine_ut.cpp')
1 files changed, 1007 insertions, 0 deletions
diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp
new file mode 100644
index 0000000000..8b372496a2
--- /dev/null
+++ b/library/cpp/coroutine/engine/coroutine_ut.cpp
@@ -0,0 +1,1007 @@
+#include "impl.h"
+#include "condvar.h"
+#include "network.h"
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/string/cast.h>
+#include <util/system/pipe.h>
+#include <util/system/env.h>
+#include <util/system/info.h>
+#include <util/system/thread.h>
+#include <util/generic/xrange.h>
+#include <util/generic/serialized_enum.h>
+// TODO (velavokr): BALANCER-1345 add more tests on pollers
+class TCoroTest: public TTestBase {
+ UNIT_TEST(TestSimpleX1);
+ UNIT_TEST(TestSimpleX1MultiThread);
+ UNIT_TEST(TestSimpleX2);
+ UNIT_TEST(TestSimpleX3);
+ UNIT_TEST(TestMemFun);
+ UNIT_TEST(TestMutex);
+ UNIT_TEST(TestCondVar);
+ UNIT_TEST(TestJoinDefault);
+ UNIT_TEST(TestJoinEpoll);
+ UNIT_TEST(TestJoinKqueue);
+ UNIT_TEST(TestJoinPoll);
+ UNIT_TEST(TestJoinSelect);
+ UNIT_TEST(TestException);
+ UNIT_TEST(TestJoinCancelExitRaceBug);
+ UNIT_TEST(TestWaitWakeLivelockBug);
+ UNIT_TEST(TestFastPathWakeDefault)
+ // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs
+// UNIT_TEST(TestFastPathWakeEpoll)
+ UNIT_TEST(TestFastPathWakeKqueue)
+ UNIT_TEST(TestFastPathWakePoll)
+ UNIT_TEST(TestFastPathWakeSelect)
+ UNIT_TEST(TestLegacyCancelYieldRaceBug)
+ UNIT_TEST(TestJoinRescheduleBug);
+ UNIT_TEST(TestEventQueue)
+ UNIT_TEST(TestNestedExecutor)
+ UNIT_TEST(TestComputeCoroutineYield)
+ UNIT_TEST(TestPollEngines);
+ UNIT_TEST(TestUserEvent);
+ UNIT_TEST(TestPause);
+ UNIT_TEST(TestOverrideTime);
+ void TestException();
+ void TestSimpleX1();
+ void TestSimpleX1MultiThread();
+ void TestSimpleX2();
+ void TestSimpleX3();
+ void TestMemFun();
+ void TestMutex();
+ void TestCondVar();
+ void TestJoinDefault();
+ void TestJoinEpoll();
+ void TestJoinKqueue();
+ void TestJoinPoll();
+ void TestJoinSelect();
+ void TestJoinCancelExitRaceBug();
+ void TestWaitWakeLivelockBug();
+ void TestFastPathWakeDefault();
+ void TestFastPathWakeEpoll();
+ void TestFastPathWakeKqueue();
+ void TestFastPathWakePoll();
+ void TestFastPathWakeSelect();
+ void TestLegacyCancelYieldRaceBug();
+ void TestJoinRescheduleBug();
+ void TestEventQueue();
+ void TestNestedExecutor();
+ void TestComputeCoroutineYield();
+ void TestPollEngines();
+ void TestUserEvent();
+ void TestPause();
+ void TestOverrideTime();
+void TCoroTest::TestException() {
+ TContExecutor e(1000000);
+ bool f2run = false;
+ auto f1 = [&f2run](TCont* c) {
+ struct TCtx {
+ ~TCtx() {
+ Y_VERIFY(!*F2);
+ C->Yield();
+ }
+ TCont* C;
+ bool* F2;
+ };
+ try {
+ TCtx ctx = {c, &f2run};
+ throw 1;
+ } catch (...) {
+ }
+ };
+ bool unc = true;
+ auto f2 = [&unc, &f2run](TCont*) {
+ f2run = true;
+ unc = std::uncaught_exception();
+ // check segfault
+ try {
+ throw 2;
+ } catch (int) {
+ }
+ };
+ e.Create(f1, "f1");
+ e.Create(f2, "f2");
+ e.Execute();
+ UNIT_ASSERT(!unc);
+static int i0;
+static void CoRun(TCont* c, void* /*run*/) {
+ while (i0 < 100000) {
+ ++i0;
+ UNIT_ASSERT(RunningCont() == c);
+ c->Yield();
+ UNIT_ASSERT(RunningCont() == c);
+ }
+static void CoMain(TCont* c, void* /*arg*/) {
+ for (volatile size_t i2 = 0; i2 < 10; ++i2) {
+ UNIT_ASSERT(RunningCont() == c);
+ c->Executor()->Create(CoRun, nullptr, "run");
+ UNIT_ASSERT(RunningCont() == c);
+ }
+void TCoroTest::TestSimpleX1() {
+ i0 = 0;
+ TContExecutor e(32000);
+ UNIT_ASSERT(RunningCont() == nullptr);
+ e.Execute(CoMain);
+ UNIT_ASSERT(RunningCont() == nullptr);
+void TCoroTest::TestSimpleX1MultiThread() {
+ TVector<THolder<TThread>> threads;
+ const size_t nThreads = 0;
+ TAtomic c = 0;
+ for (size_t i = 0; i < nThreads; ++i) {
+ threads.push_back(MakeHolder<TThread>([&]() {
+ TestSimpleX1();
+ AtomicIncrement(c);
+ }));
+ }
+ for (auto& t : threads) {
+ t->Start();
+ }
+ for (auto& t: threads) {
+ t->Join();
+ }
+ UNIT_ASSERT_EQUAL(c, nThreads);
+struct TTestObject {
+ int i = 0;
+ int j = 0;
+ void RunTask1(TCont*) {
+ i = 1;
+ }
+ void RunTask2(TCont*) {
+ j = 2;
+ }
+void TCoroTest::TestMemFun() {
+ i0 = 0;
+ TContExecutor e(32000);
+ TTestObject obj;
+ e.Create<TTestObject, &TTestObject::RunTask1>(&obj, "test1");
+ e.Execute<TTestObject, &TTestObject::RunTask2>(&obj);
+ UNIT_ASSERT_EQUAL(obj.i, 1);
+ UNIT_ASSERT_EQUAL(obj.j, 2);
+void TCoroTest::TestSimpleX2() {
+ {
+ i0 = 0;
+ {
+ TContExecutor e(32000);
+ e.Execute(CoMain);
+ }
+ UNIT_ASSERT_EQUAL(i0, 100000);
+ }
+ {
+ i0 = 0;
+ {
+ TContExecutor e(32000);
+ e.Execute(CoMain);
+ }
+ UNIT_ASSERT_EQUAL(i0, 100000);
+ }
+struct TRunner {
+ inline TRunner()
+ : Runs(0)
+ {
+ }
+ inline void operator()(TCont* c) {
+ ++Runs;
+ c->Yield();
+ }
+ size_t Runs;
+void TCoroTest::TestSimpleX3() {
+ TContExecutor e(32000);
+ TRunner runner;
+ for (volatile size_t i3 = 0; i3 < 1000; ++i3) {
+ e.Create(runner, "runner");
+ }
+ e.Execute();
+ UNIT_ASSERT_EQUAL(runner.Runs, 1000);
+static TString res;
+static TContMutex mutex;
+static void CoMutex(TCont* c, void* /*run*/) {
+ {
+ mutex.LockI(c);
+ c->Yield();
+ res += c->Name();
+ mutex.UnLock();
+ }
+ c->Yield();
+ {
+ mutex.LockI(c);
+ c->Yield();
+ res += c->Name();
+ mutex.UnLock();
+ }
+static void CoMutexTest(TCont* c, void* /*run*/) {
+ c->Executor()->Create(CoMutex, nullptr, "1");
+ c->Executor()->Create(CoMutex, nullptr, "2");
+void TCoroTest::TestMutex() {
+ TContExecutor e(32000);
+ e.Execute(CoMutexTest);
+ UNIT_ASSERT_EQUAL(res, "1212");
+ res.clear();
+static TContMutex m1;
+static TContCondVar c1;
+static void CoCondVar(TCont* c, void* /*run*/) {
+ for (size_t i4 = 0; i4 < 3; ++i4) {
+ UNIT_ASSERT_EQUAL(m1.LockI(c), 0);
+ UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0);
+ res += c->Name();
+ m1.UnLock();
+ }
+static void CoCondVarTest(TCont* c, void* /*run*/) {
+ c->Executor()->Create(CoCondVar, nullptr, "1");
+ c->Yield();
+ c->Executor()->Create(CoCondVar, nullptr, "2");
+ c->Yield();
+ c->Executor()->Create(CoCondVar, nullptr, "3");
+ c->Yield();
+ c->Executor()->Create(CoCondVar, nullptr, "4");
+ c->Yield();
+ c->Executor()->Create(CoCondVar, nullptr, "5");
+ c->Yield();
+ c->Executor()->Create(CoCondVar, nullptr, "6");
+ c->Yield();
+ for (size_t i5 = 0; i5 < 3; ++i5) {
+ res += ToString((size_t)i5) + "^";
+ c1.BroadCast();
+ c->Yield();
+ }
+void TCoroTest::TestCondVar() {
+ TContExecutor e(32000);
+ e.Execute(CoCondVarTest);
+ UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456");
+ res.clear();
+namespace NCoroTestJoin {
+ struct TSleepCont {
+ const TInstant Deadline;
+ int Result;
+ inline void operator()(TCont* c) {
+ Result = c->SleepD(Deadline);
+ }
+ };
+ struct TReadCont {
+ const TInstant Deadline;
+ const SOCKET Sock;
+ int Result;
+ inline void operator()(TCont* c) {
+ char buf = 0;
+ Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status();
+ }
+ };
+ struct TJoinCont {
+ const TInstant Deadline;
+ TCont* const Cont;
+ bool Result;
+ inline void operator()(TCont* c) {
+ Result = c->Join(Cont, Deadline);
+ }
+ };
+ void DoTestJoin(EContPoller pollerType) {
+ auto poller = IPollerFace::Construct(pollerType);
+ if (!poller) {
+ return;
+ }
+ TContExecutor e(32000, std::move(poller));
+ TPipe in, out;
+ TPipe::Pipe(in, out);
+ SetNonBlock(in.GetHandle());
+ {
+ TSleepCont sc = {TInstant::Max(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
+ e.Execute(jc);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+ {
+ TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false};
+ e.Execute(jc);
+ UNIT_ASSERT_EQUAL(jc.Result, true);
+ }
+ {
+ TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
+ e.Execute(jc);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+ {
+ TReadCont rc = {TInstant::Max(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
+ e.Execute(jc);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+ {
+ TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false};
+ e.Execute(jc);
+ UNIT_ASSERT_EQUAL(jc.Result, true);
+ }
+ {
+ TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
+ e.Execute(jc);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+ }
+void TCoroTest::TestJoinDefault() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Default);
+void TCoroTest::TestJoinEpoll() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Epoll);
+void TCoroTest::TestJoinKqueue() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Kqueue);
+void TCoroTest::TestJoinPoll() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Poll);
+void TCoroTest::TestJoinSelect() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Select);
+namespace NCoroJoinCancelExitRaceBug {
+ struct TState {
+ TCont* Sub = nullptr;
+ };
+ static void DoAux(TCont*, void* argPtr) noexcept {
+ TState& state = *(TState*)(argPtr);
+ // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]}
+ state.Sub->Cancel();
+ }
+ static void DoSub2(TCont*, void*) noexcept {
+ // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]}
+ // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]}
+ }
+ static void DoSub(TCont* cont, void* argPtr) noexcept {
+ TState& state = *(TState*)(argPtr);
+ state.Sub = cont;
+ // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]}
+ auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2");
+ // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux)
+ // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]}
+ // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit
+ cont->Join(sub2);
+ state.Sub = nullptr;
+ }
+ static void DoMain(TCont* cont) noexcept {
+ TState state;
+ // 01.{Ready:[]} > {Ready:[Sub]}
+ auto* sub = cont->Executor()->Create(DoSub, &state, "Sub");
+ // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]}
+ cont->Executor()->Create(DoAux, &state, "Aux");
+ // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub)
+ cont->Join(sub);
+ }
+void TCoroTest::TestJoinCancelExitRaceBug() {
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroJoinCancelExitRaceBug::DoMain);
+namespace NCoroWaitWakeLivelockBug {
+ struct TState;
+ struct TSubState {
+ TSubState(TState& parent, ui32 self)
+ : Parent(parent)
+ , Name(TStringBuilder() << "Sub" << self)
+ , Self(self)
+ {
+ UNIT_ASSERT(self < 2);
+ }
+ TSubState& OtherState();
+ TState& Parent;
+ TTimerEvent* Event = nullptr;
+ TCont* Cont = nullptr;
+ TString Name;
+ ui32 Self = -1;
+ };
+ struct TState {
+ TState()
+ : Subs{{*this, 0}, {*this, 1}}
+ {}
+ TSubState Subs[2];
+ bool Stop = false;
+ };
+ TSubState& TSubState::OtherState() {
+ return Parent.Subs[1 - Self];
+ }
+ static void DoStop(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)(argPtr);
+ TTimerEvent event(cont, TInstant::Now());
+ ExecuteEvent(&event);
+ state.Stop = true;
+ for (auto& sub: state.Subs) {
+ if (sub.Event) {
+ sub.Event->Wake(EWAKEDUP);
+ }
+ }
+ }
+ static void DoSub(TCont* cont, void* argPtr) {
+ TSubState& state = *(TSubState*)(argPtr);
+ while (!state.Parent.Stop) {
+ TTimerEvent event(cont, TInstant::Max());
+ if (state.OtherState().Event) {
+ state.OtherState().Event->Wake(EWAKEDUP);
+ }
+ state.Event = &event;
+ ExecuteEvent(&event);
+ state.Event = nullptr;
+ }
+ state.Cont = nullptr;
+ }
+ static void DoMain(TCont* cont) noexcept {
+ TState state;
+ for (auto& subState : state.Subs) {
+ subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
+ }
+ cont->Join(
+ cont->Executor()->Create(DoStop, &state, "Stop")
+ );
+ for (auto& subState : state.Subs) {
+ if (subState.Cont) {
+ cont->Join(subState.Cont);
+ }
+ }
+ }
+void TCoroTest::TestWaitWakeLivelockBug() {
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroWaitWakeLivelockBug::DoMain);
+namespace NCoroTestFastPathWake {
+ struct TState;
+ struct TSubState {
+ TSubState(TState& parent, ui32 self)
+ : Parent(parent)
+ , Name(TStringBuilder() << "Sub" << self)
+ {}
+ TState& Parent;
+ TInstant Finish;
+ TTimerEvent* Event = nullptr;
+ TCont* Cont = nullptr;
+ TString Name;
+ };
+ struct TState {
+ TState()
+ : Subs{{*this, 0}, {*this, 1}}
+ {
+ TPipe::Pipe(In, Out);
+ SetNonBlock(In.GetHandle());
+ }
+ TSubState Subs[2];
+ TPipe In, Out;
+ bool IoSleepRunning = false;
+ };
+ static void DoIoSleep(TCont* cont, void* argPtr) noexcept {
+ try {
+ TState& state = *(TState*) (argPtr);
+ state.IoSleepRunning = true;
+ TTempBuf tmp;
+ // Wait for the event from io
+ auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine());
+ UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0);
+ state.IoSleepRunning = false;
+ } catch (const NUnitTest::TAssertException& ex) {
+ Cerr << ex.AsStrBuf() << Endl;
+ ex.BackTrace()->PrintTo(Cerr);
+ throw;
+ } catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ throw;
+ }
+ }
+ static void DoSub(TCont* cont, void* argPtr) noexcept {
+ TSubState& state = *(TSubState*)(argPtr);
+ TTimerEvent event(cont, TInstant::Max());
+ state.Event = &event;
+ ExecuteEvent(&event);
+ state.Event = nullptr;
+ state.Cont = nullptr;
+ state.Finish = TInstant::Now();
+ }
+ static void DoMain(TCont* cont) noexcept {
+ try {
+ TState state;
+ TInstant start = TInstant::Now();
+ // This guy sleeps on io
+ auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper");
+ // These guys are to be woken up right away
+ for (auto& subState : state.Subs) {
+ subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
+ }
+ // Give way
+ cont->Yield();
+ // Check everyone has started, wake those to be woken
+ UNIT_ASSERT(state.IoSleepRunning);
+ for (auto& subState : state.Subs) {
+ UNIT_ASSERT(subState.Event);
+ subState.Event->Wake(EWAKEDUP);
+ }
+ // Give way again
+ cont->Yield();
+ // Check the woken guys have finished and quite soon
+ for (auto& subState : state.Subs) {
+ UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100));
+ UNIT_ASSERT(!subState.Cont);
+ }
+ // Wake the io guy and finish
+ state.Out.Close();
+ if (state.IoSleepRunning) {
+ cont->Join(sleeper);
+ }
+ // Check everything has ended sooner than the timeout
+ UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1));
+ } catch (const NUnitTest::TAssertException& ex) {
+ Cerr << ex.AsStrBuf() << Endl;
+ ex.BackTrace()->PrintTo(Cerr);
+ throw;
+ } catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ throw;
+ }
+ }
+ static void DoTestFastPathWake(EContPoller pollerType) {
+ if (auto poller = IPollerFace::Construct(pollerType)) {
+ TContExecutor exec(20000, std::move(poller));
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroTestFastPathWake::DoMain);
+ }
+ }
+void TCoroTest::TestFastPathWakeDefault() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default);
+void TCoroTest::TestFastPathWakeEpoll() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll);
+void TCoroTest::TestFastPathWakeKqueue() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue);
+void TCoroTest::TestFastPathWakePoll() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll);
+void TCoroTest::TestFastPathWakeSelect() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select);
+namespace NCoroTestLegacyCancelYieldRaceBug {
+ enum class EState {
+ Idle, Running, Finished,
+ };
+ struct TState {
+ EState SubState = EState::Idle;
+ };
+ static void DoSub(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubState = EState::Running;
+ cont->Yield();
+ cont->Yield();
+ state.SubState = EState::Finished;
+ }
+ static void DoMain(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub");
+ sub->Cancel();
+ cont->Yield();
+ UNIT_ASSERT_EQUAL(state.SubState, EState::Finished);
+ }
+void TCoroTest::TestLegacyCancelYieldRaceBug() {
+ NCoroTestLegacyCancelYieldRaceBug::TState state;
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state);
+namespace NCoroTestJoinRescheduleBug {
+ enum class EState {
+ Idle, Running, Finished,
+ };
+ struct TState {
+ TCont* volatile SubA = nullptr;
+ volatile EState SubAState = EState::Idle;
+ volatile EState SubBState = EState::Idle;
+ volatile EState SubCState = EState::Idle;
+ };
+ static void DoSubC(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubCState = EState::Running;
+ while (state.SubBState != EState::Running) {
+ cont->Yield();
+ }
+ while (cont->SleepD(TInstant::Max()) != ECANCELED) {
+ }
+ state.SubCState = EState::Finished;
+ }
+ static void DoSubB(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubBState = EState::Running;
+ while (state.SubAState != EState::Running && state.SubCState != EState::Running) {
+ cont->Yield();
+ }
+ for (auto i : xrange(100)) {
+ Y_UNUSED(i);
+ if (!state.SubA) {
+ break;
+ }
+ state.SubA->ReSchedule();
+ cont->Yield();
+ }
+ state.SubBState = EState::Finished;
+ }
+ static void DoSubA(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubAState = EState::Running;
+ TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC");
+ while (state.SubBState != EState::Running && state.SubCState != EState::Running) {
+ cont->Yield();
+ }
+ cont->Join(subC);
+ UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
+ state.SubA = nullptr;
+ state.SubAState = EState::Finished;
+ }
+ static void DoMain(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA");
+ state.SubA = subA;
+ cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB"));
+ if (state.SubA) {
+ subA->Cancel();
+ cont->Join(subA);
+ }
+ }
+void TCoroTest::TestJoinRescheduleBug() {
+ using namespace NCoroTestJoinRescheduleBug;
+ TState state;
+ {
+ TContExecutor exec(20000);
+ exec.Execute(DoMain, &state);
+ }
+ UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished);
+ UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished);
+ UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
+void TCoroTest::TestEventQueue() {
+ NCoro::TEventWaitQueue queue;
+ UNIT_ASSERT(queue.Empty());
+ UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant());
+ TContExecutor exec(32000);
+ exec.Execute([](TCont* cont, void* arg) {
+ NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg;
+ TTimerEvent ev(cont, TInstant::Max());
+ TTimerEvent ev2(cont, TInstant::Seconds(12345));
+ q->Register(&ev);
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
+ q->Register(&ev2);
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max());
+ }, &queue);
+void TCoroTest::TestNestedExecutor() {
+#ifndef _win_
+ //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr
+ TContExecutor exec(32000);
+ UNIT_ASSERT(!RunningCont());
+ exec.Execute([](TCont* cont, void*) {
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
+ TContExecutor exec2(32000);
+ exec2.Execute([](TCont* cont2, void*) {
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
+ TContExecutor exec3(32000);
+ exec3.Execute([](TCont* cont3, void*) {
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3);
+ });
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
+ });
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
+ });
+ UNIT_ASSERT(!RunningCont());
+void TCoroTest::TestComputeCoroutineYield() {
+//if we have busy (e.g., on cpu) coroutine, when it yields, io must flow
+ TContExecutor exec(32000);
+ exec.SetFailOnError(true);
+ TPipe in, out;
+ TPipe::Pipe(in, out);
+ SetNonBlock(in.GetHandle());
+ size_t lastRead = 42;
+ auto compute = [&](TCont* cont) {
+ for (size_t i = 0; i < 10; ++i) {
+ write(out.GetHandle(), &i, sizeof i);
+ Sleep(TDuration::MilliSeconds(10));
+ cont->Yield();
+ UNIT_ASSERT(lastRead == i);
+ }
+ };
+ auto io = [&](TCont* cont) {
+ for (size_t i = 0; i < 10; ++i) {
+ NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead);
+ }
+ };
+ exec.Create(compute, "compute");
+ exec.Create(io, "io");
+ exec.Execute();
+void TCoroTest::TestPollEngines() {
+ bool defaultChecked = false;
+ for (auto engine : GetEnumAllValues<EContPoller>()) {
+ auto poller = IPollerFace::Construct(engine);
+ if (!poller) {
+ continue;
+ }
+ TContExecutor exec(32000, IPollerFace::Construct(engine));
+ if (engine == EContPoller::Default) {
+ defaultChecked = true;
+ UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine);
+ }
+ }
+ UNIT_ASSERT(defaultChecked);
+void TCoroTest::TestPause() {
+ TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing()};
+ int i = 0;
+ executor.CreateOwned([&](TCont*) {
+ i++;
+ executor.Pause();
+ i++;
+ }, "coro");
+ executor.Execute();
+ executor.Execute();
+void TCoroTest::TestUserEvent() {
+ TContExecutor exec(32000);
+ struct TUserEvent : public IUserEvent {
+ bool Called = false;
+ void Execute() override {
+ Called = true;
+ }
+ } event;
+ auto f = [&](TCont* cont) {
+ UNIT_ASSERT(!event.Called);
+ exec.ScheduleUserEvent(&event);
+ UNIT_ASSERT(!event.Called);
+ cont->Yield();
+ UNIT_ASSERT(event.Called);
+ };
+ exec.Execute(f);
+ UNIT_ASSERT(event.Called);
+void TCoroTest::TestOverrideTime() {
+ class TTime: public NCoro::ITime {
+ public:
+ TInstant Now() override {
+ return Current;
+ }
+ TInstant Current = TInstant::Zero();
+ };
+ TTime time;
+ TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing(), &time};
+ executor.CreateOwned([&](TCont* cont) {
+ UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Zero());
+ time.Current = TInstant::Seconds(1);
+ cont->SleepD(TInstant::Seconds(1));
+ UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Seconds(1));
+ }, "coro");
+ executor.Execute();