aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/coroutine/engine/coroutine_ut.cpp
diff options
context:
space:
mode:
authorRuslan Kovalev <ruslan.a.kovalev@gmail.com>2022-02-10 16:46:45 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:45 +0300
commit9123176b341b6f2658cff5132482b8237c1416c8 (patch)
tree49e222ea1c5804306084bb3ae065bb702625360f /library/cpp/coroutine/engine/coroutine_ut.cpp
parent59e19371de37995fcb36beb16cd6ec030af960bc (diff)
downloadydb-9123176b341b6f2658cff5132482b8237c1416c8.tar.gz
Restoring authorship annotation for Ruslan Kovalev <ruslan.a.kovalev@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/coroutine/engine/coroutine_ut.cpp')
-rw-r--r--library/cpp/coroutine/engine/coroutine_ut.cpp1112
1 files changed, 556 insertions, 556 deletions
diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp
index 1ba31245b0..8b372496a2 100644
--- a/library/cpp/coroutine/engine/coroutine_ut.cpp
+++ b/library/cpp/coroutine/engine/coroutine_ut.cpp
@@ -1,19 +1,19 @@
#include "impl.h"
-#include "condvar.h"
-#include "network.h"
+#include "condvar.h"
+#include "network.h"
#include <library/cpp/testing/unittest/registar.h>
#include <util/string/cast.h>
#include <util/system/pipe.h>
-#include <util/system/env.h>
-#include <util/system/info.h>
+#include <util/system/env.h>
+#include <util/system/info.h>
#include <util/system/thread.h>
-#include <util/generic/xrange.h>
+#include <util/generic/xrange.h>
#include <util/generic/serialized_enum.h>
-// TODO (velavokr): BALANCER-1345 add more tests on pollers
-
+// TODO (velavokr): BALANCER-1345 add more tests on pollers
+
class TCoroTest: public TTestBase {
UNIT_TEST_SUITE(TCoroTest);
UNIT_TEST(TestSimpleX1);
@@ -23,23 +23,23 @@ class TCoroTest: public TTestBase {
UNIT_TEST(TestMemFun);
UNIT_TEST(TestMutex);
UNIT_TEST(TestCondVar);
- UNIT_TEST(TestJoinDefault);
- UNIT_TEST(TestJoinEpoll);
- UNIT_TEST(TestJoinKqueue);
- UNIT_TEST(TestJoinPoll);
- UNIT_TEST(TestJoinSelect);
+ UNIT_TEST(TestJoinDefault);
+ UNIT_TEST(TestJoinEpoll);
+ UNIT_TEST(TestJoinKqueue);
+ UNIT_TEST(TestJoinPoll);
+ UNIT_TEST(TestJoinSelect);
UNIT_TEST(TestException);
- UNIT_TEST(TestJoinCancelExitRaceBug);
- UNIT_TEST(TestWaitWakeLivelockBug);
- UNIT_TEST(TestFastPathWakeDefault)
- // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs
-// UNIT_TEST(TestFastPathWakeEpoll)
- UNIT_TEST(TestFastPathWakeKqueue)
- UNIT_TEST(TestFastPathWakePoll)
+ UNIT_TEST(TestJoinCancelExitRaceBug);
+ UNIT_TEST(TestWaitWakeLivelockBug);
+ UNIT_TEST(TestFastPathWakeDefault)
+ // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs
+// UNIT_TEST(TestFastPathWakeEpoll)
+ UNIT_TEST(TestFastPathWakeKqueue)
+ UNIT_TEST(TestFastPathWakePoll)
UNIT_TEST(TestFastPathWakeSelect)
- UNIT_TEST(TestLegacyCancelYieldRaceBug)
- UNIT_TEST(TestJoinRescheduleBug);
- UNIT_TEST(TestEventQueue)
+ UNIT_TEST(TestLegacyCancelYieldRaceBug)
+ UNIT_TEST(TestJoinRescheduleBug);
+ UNIT_TEST(TestEventQueue)
UNIT_TEST(TestNestedExecutor)
UNIT_TEST(TestComputeCoroutineYield)
UNIT_TEST(TestPollEngines);
@@ -57,21 +57,21 @@ public:
void TestMemFun();
void TestMutex();
void TestCondVar();
- void TestJoinDefault();
- void TestJoinEpoll();
- void TestJoinKqueue();
- void TestJoinPoll();
- void TestJoinSelect();
- void TestJoinCancelExitRaceBug();
- void TestWaitWakeLivelockBug();
- void TestFastPathWakeDefault();
- void TestFastPathWakeEpoll();
- void TestFastPathWakeKqueue();
- void TestFastPathWakePoll();
- void TestFastPathWakeSelect();
- void TestLegacyCancelYieldRaceBug();
- void TestJoinRescheduleBug();
- void TestEventQueue();
+ void TestJoinDefault();
+ void TestJoinEpoll();
+ void TestJoinKqueue();
+ void TestJoinPoll();
+ void TestJoinSelect();
+ void TestJoinCancelExitRaceBug();
+ void TestWaitWakeLivelockBug();
+ void TestFastPathWakeDefault();
+ void TestFastPathWakeEpoll();
+ void TestFastPathWakeKqueue();
+ void TestFastPathWakePoll();
+ void TestFastPathWakeSelect();
+ void TestLegacyCancelYieldRaceBug();
+ void TestJoinRescheduleBug();
+ void TestEventQueue();
void TestNestedExecutor();
void TestComputeCoroutineYield();
void TestPollEngines();
@@ -88,7 +88,7 @@ void TCoroTest::TestException() {
auto f1 = [&f2run](TCont* c) {
struct TCtx {
~TCtx() {
- Y_VERIFY(!*F2);
+ Y_VERIFY(!*F2);
C->Yield();
}
@@ -144,18 +144,18 @@ static void CoMain(TCont* c, void* /*arg*/) {
}
}
-void TCoroTest::TestSimpleX1() {
- i0 = 0;
- TContExecutor e(32000);
+void TCoroTest::TestSimpleX1() {
+ i0 = 0;
+ TContExecutor e(32000);
UNIT_ASSERT(RunningCont() == nullptr);
- e.Execute(CoMain);
- UNIT_ASSERT_VALUES_EQUAL(i0, 100000);
+ e.Execute(CoMain);
+ UNIT_ASSERT_VALUES_EQUAL(i0, 100000);
UNIT_ASSERT(RunningCont() == nullptr);
-}
-
+}
+
void TCoroTest::TestSimpleX1MultiThread() {
TVector<THolder<TThread>> threads;
const size_t nThreads = 0;
@@ -179,10 +179,10 @@ void TCoroTest::TestSimpleX1MultiThread() {
}
struct TTestObject {
- int i = 0;
- int j = 0;
+ int i = 0;
+ int j = 0;
-public:
+public:
void RunTask1(TCont*) {
i = 1;
}
@@ -325,7 +325,7 @@ void TCoroTest::TestCondVar() {
res.clear();
}
-namespace NCoroTestJoin {
+namespace NCoroTestJoin {
struct TSleepCont {
const TInstant Deadline;
int Result;
@@ -342,7 +342,7 @@ namespace NCoroTestJoin {
inline void operator()(TCont* c) {
char buf = 0;
- Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status();
+ Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status();
}
};
@@ -356,513 +356,513 @@ namespace NCoroTestJoin {
}
};
- void DoTestJoin(EContPoller pollerType) {
- auto poller = IPollerFace::Construct(pollerType);
-
- if (!poller) {
- return;
- }
-
- TContExecutor e(32000, std::move(poller));
-
- TPipe in, out;
- TPipe::Pipe(in, out);
- SetNonBlock(in.GetHandle());
-
- {
- TSleepCont sc = {TInstant::Max(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
-
- {
- TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT);
- UNIT_ASSERT_EQUAL(jc.Result, true);
- }
-
- {
- TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
-
- {
- TReadCont rc = {TInstant::Max(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
-
- {
- TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT);
- UNIT_ASSERT_EQUAL(jc.Result, true);
- }
-
- {
- TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
- }
-}
-
-void TCoroTest::TestJoinDefault() {
- NCoroTestJoin::DoTestJoin(EContPoller::Default);
-}
-
-void TCoroTest::TestJoinEpoll() {
- NCoroTestJoin::DoTestJoin(EContPoller::Epoll);
-}
-
-void TCoroTest::TestJoinKqueue() {
- NCoroTestJoin::DoTestJoin(EContPoller::Kqueue);
-}
-
-void TCoroTest::TestJoinPoll() {
- NCoroTestJoin::DoTestJoin(EContPoller::Poll);
-}
-
-void TCoroTest::TestJoinSelect() {
- NCoroTestJoin::DoTestJoin(EContPoller::Select);
-}
-
-namespace NCoroJoinCancelExitRaceBug {
- struct TState {
- TCont* Sub = nullptr;
- };
-
- static void DoAux(TCont*, void* argPtr) noexcept {
- TState& state = *(TState*)(argPtr);
-
- // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]}
- state.Sub->Cancel();
- }
-
- static void DoSub2(TCont*, void*) noexcept {
- // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]}
- // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]}
- }
-
- static void DoSub(TCont* cont, void* argPtr) noexcept {
- TState& state = *(TState*)(argPtr);
- state.Sub = cont;
-
- // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]}
- auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2");
-
- // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux)
- // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]}
- // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit
- cont->Join(sub2);
-
- state.Sub = nullptr;
- }
-
- static void DoMain(TCont* cont) noexcept {
- TState state;
-
- // 01.{Ready:[]} > {Ready:[Sub]}
- auto* sub = cont->Executor()->Create(DoSub, &state, "Sub");
-
- // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]}
- cont->Executor()->Create(DoAux, &state, "Aux");
-
- // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub)
- cont->Join(sub);
- }
-}
-
-void TCoroTest::TestJoinCancelExitRaceBug() {
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroJoinCancelExitRaceBug::DoMain);
-}
-
-namespace NCoroWaitWakeLivelockBug {
- struct TState;
-
- struct TSubState {
- TSubState(TState& parent, ui32 self)
- : Parent(parent)
+ void DoTestJoin(EContPoller pollerType) {
+ auto poller = IPollerFace::Construct(pollerType);
+
+ if (!poller) {
+ return;
+ }
+
+ TContExecutor e(32000, std::move(poller));
+
+ TPipe in, out;
+ TPipe::Pipe(in, out);
+ SetNonBlock(in.GetHandle());
+
+ {
+ TSleepCont sc = {TInstant::Max(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+
+ {
+ TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT);
+ UNIT_ASSERT_EQUAL(jc.Result, true);
+ }
+
+ {
+ TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+
+ {
+ TReadCont rc = {TInstant::Max(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+
+ {
+ TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT);
+ UNIT_ASSERT_EQUAL(jc.Result, true);
+ }
+
+ {
+ TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+ }
+}
+
+void TCoroTest::TestJoinDefault() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Default);
+}
+
+void TCoroTest::TestJoinEpoll() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Epoll);
+}
+
+void TCoroTest::TestJoinKqueue() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Kqueue);
+}
+
+void TCoroTest::TestJoinPoll() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Poll);
+}
+
+void TCoroTest::TestJoinSelect() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Select);
+}
+
+namespace NCoroJoinCancelExitRaceBug {
+ struct TState {
+ TCont* Sub = nullptr;
+ };
+
+ static void DoAux(TCont*, void* argPtr) noexcept {
+ TState& state = *(TState*)(argPtr);
+
+ // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]}
+ state.Sub->Cancel();
+ }
+
+ static void DoSub2(TCont*, void*) noexcept {
+ // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]}
+ // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]}
+ }
+
+ static void DoSub(TCont* cont, void* argPtr) noexcept {
+ TState& state = *(TState*)(argPtr);
+ state.Sub = cont;
+
+ // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]}
+ auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2");
+
+ // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux)
+ // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]}
+ // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit
+ cont->Join(sub2);
+
+ state.Sub = nullptr;
+ }
+
+ static void DoMain(TCont* cont) noexcept {
+ TState state;
+
+ // 01.{Ready:[]} > {Ready:[Sub]}
+ auto* sub = cont->Executor()->Create(DoSub, &state, "Sub");
+
+ // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]}
+ cont->Executor()->Create(DoAux, &state, "Aux");
+
+ // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub)
+ cont->Join(sub);
+ }
+}
+
+void TCoroTest::TestJoinCancelExitRaceBug() {
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroJoinCancelExitRaceBug::DoMain);
+}
+
+namespace NCoroWaitWakeLivelockBug {
+ struct TState;
+
+ struct TSubState {
+ TSubState(TState& parent, ui32 self)
+ : Parent(parent)
, Name(TStringBuilder() << "Sub" << self)
- , Self(self)
- {
- UNIT_ASSERT(self < 2);
- }
-
- TSubState& OtherState();
-
- TState& Parent;
- TTimerEvent* Event = nullptr;
- TCont* Cont = nullptr;
- TString Name;
- ui32 Self = -1;
- };
-
- struct TState {
- TState()
- : Subs{{*this, 0}, {*this, 1}}
- {}
-
- TSubState Subs[2];
- bool Stop = false;
- };
-
- TSubState& TSubState::OtherState() {
- return Parent.Subs[1 - Self];
- }
-
- static void DoStop(TCont* cont, void* argPtr) {
- TState& state = *(TState*)(argPtr);
-
- TTimerEvent event(cont, TInstant::Now());
- ExecuteEvent(&event);
- state.Stop = true;
- for (auto& sub: state.Subs) {
- if (sub.Event) {
- sub.Event->Wake(EWAKEDUP);
- }
- }
- }
-
- static void DoSub(TCont* cont, void* argPtr) {
- TSubState& state = *(TSubState*)(argPtr);
-
- while (!state.Parent.Stop) {
- TTimerEvent event(cont, TInstant::Max());
- if (state.OtherState().Event) {
- state.OtherState().Event->Wake(EWAKEDUP);
- }
- state.Event = &event;
- ExecuteEvent(&event);
- state.Event = nullptr;
- }
-
- state.Cont = nullptr;
- }
-
- static void DoMain(TCont* cont) noexcept {
- TState state;
-
- for (auto& subState : state.Subs) {
- subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
- }
-
- cont->Join(
- cont->Executor()->Create(DoStop, &state, "Stop")
- );
-
- for (auto& subState : state.Subs) {
- if (subState.Cont) {
- cont->Join(subState.Cont);
- }
- }
- }
-}
-
-void TCoroTest::TestWaitWakeLivelockBug() {
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroWaitWakeLivelockBug::DoMain);
-}
-
-namespace NCoroTestFastPathWake {
- struct TState;
-
- struct TSubState {
- TSubState(TState& parent, ui32 self)
- : Parent(parent)
- , Name(TStringBuilder() << "Sub" << self)
- {}
-
- TState& Parent;
- TInstant Finish;
- TTimerEvent* Event = nullptr;
- TCont* Cont = nullptr;
- TString Name;
- };
-
- struct TState {
- TState()
- : Subs{{*this, 0}, {*this, 1}}
- {
- TPipe::Pipe(In, Out);
- SetNonBlock(In.GetHandle());
- }
-
- TSubState Subs[2];
- TPipe In, Out;
- bool IoSleepRunning = false;
- };
-
- static void DoIoSleep(TCont* cont, void* argPtr) noexcept {
- try {
- TState& state = *(TState*) (argPtr);
- state.IoSleepRunning = true;
-
- TTempBuf tmp;
- // Wait for the event from io
- auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine());
- UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0);
- state.IoSleepRunning = false;
- } catch (const NUnitTest::TAssertException& ex) {
- Cerr << ex.AsStrBuf() << Endl;
- ex.BackTrace()->PrintTo(Cerr);
- throw;
- } catch (...) {
- Cerr << CurrentExceptionMessage() << Endl;
- throw;
- }
- }
-
- static void DoSub(TCont* cont, void* argPtr) noexcept {
- TSubState& state = *(TSubState*)(argPtr);
-
- TTimerEvent event(cont, TInstant::Max());
- state.Event = &event;
- ExecuteEvent(&event);
- state.Event = nullptr;
- state.Cont = nullptr;
- state.Finish = TInstant::Now();
- }
-
- static void DoMain(TCont* cont) noexcept {
- try {
- TState state;
- TInstant start = TInstant::Now();
-
- // This guy sleeps on io
- auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper");
-
- // These guys are to be woken up right away
- for (auto& subState : state.Subs) {
- subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
- }
-
- // Give way
- cont->Yield();
-
- // Check everyone has started, wake those to be woken
- UNIT_ASSERT(state.IoSleepRunning);
-
- for (auto& subState : state.Subs) {
- UNIT_ASSERT(subState.Event);
- subState.Event->Wake(EWAKEDUP);
- }
-
- // Give way again
- cont->Yield();
-
- // Check the woken guys have finished and quite soon
- for (auto& subState : state.Subs) {
- UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100));
- UNIT_ASSERT(!subState.Cont);
- }
-
- // Wake the io guy and finish
- state.Out.Close();
-
- if (state.IoSleepRunning) {
- cont->Join(sleeper);
- }
-
- // Check everything has ended sooner than the timeout
- UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1));
- } catch (const NUnitTest::TAssertException& ex) {
- Cerr << ex.AsStrBuf() << Endl;
- ex.BackTrace()->PrintTo(Cerr);
- throw;
- } catch (...) {
- Cerr << CurrentExceptionMessage() << Endl;
- throw;
- }
- }
-
- static void DoTestFastPathWake(EContPoller pollerType) {
- if (auto poller = IPollerFace::Construct(pollerType)) {
- TContExecutor exec(20000, std::move(poller));
- exec.SetFailOnError(true);
- exec.Execute(NCoroTestFastPathWake::DoMain);
- }
- }
-}
-
-void TCoroTest::TestFastPathWakeDefault() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default);
-}
-
-void TCoroTest::TestFastPathWakeEpoll() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll);
-}
-
-void TCoroTest::TestFastPathWakeKqueue() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue);
-}
-
-void TCoroTest::TestFastPathWakePoll() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll);
-}
-
-void TCoroTest::TestFastPathWakeSelect() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select);
-}
-
-namespace NCoroTestLegacyCancelYieldRaceBug {
- enum class EState {
- Idle, Running, Finished,
- };
-
- struct TState {
- EState SubState = EState::Idle;
- };
-
- static void DoSub(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubState = EState::Running;
- cont->Yield();
- cont->Yield();
- state.SubState = EState::Finished;
- }
-
- static void DoMain(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub");
- sub->Cancel();
- cont->Yield();
- UNIT_ASSERT_EQUAL(state.SubState, EState::Finished);
- }
-}
-
-void TCoroTest::TestLegacyCancelYieldRaceBug() {
- NCoroTestLegacyCancelYieldRaceBug::TState state;
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state);
-}
-
-namespace NCoroTestJoinRescheduleBug {
- enum class EState {
- Idle, Running, Finished,
- };
-
- struct TState {
- TCont* volatile SubA = nullptr;
- volatile EState SubAState = EState::Idle;
- volatile EState SubBState = EState::Idle;
- volatile EState SubCState = EState::Idle;
- };
-
- static void DoSubC(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubCState = EState::Running;
- while (state.SubBState != EState::Running) {
- cont->Yield();
- }
- while (cont->SleepD(TInstant::Max()) != ECANCELED) {
- }
- state.SubCState = EState::Finished;
- }
-
- static void DoSubB(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubBState = EState::Running;
- while (state.SubAState != EState::Running && state.SubCState != EState::Running) {
- cont->Yield();
- }
- for (auto i : xrange(100)) {
- Y_UNUSED(i);
- if (!state.SubA) {
- break;
- }
- state.SubA->ReSchedule();
- cont->Yield();
- }
- state.SubBState = EState::Finished;
- }
-
- static void DoSubA(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubAState = EState::Running;
- TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC");
- while (state.SubBState != EState::Running && state.SubCState != EState::Running) {
- cont->Yield();
- }
- cont->Join(subC);
- UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
- state.SubA = nullptr;
- state.SubAState = EState::Finished;
- }
-
- static void DoMain(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA");
- state.SubA = subA;
- cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB"));
-
- if (state.SubA) {
- subA->Cancel();
- cont->Join(subA);
- }
- }
-}
-
-void TCoroTest::TestJoinRescheduleBug() {
- using namespace NCoroTestJoinRescheduleBug;
- TState state;
- {
- TContExecutor exec(20000);
- exec.Execute(DoMain, &state);
- }
- UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished);
- UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished);
- UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
-}
-
-void TCoroTest::TestEventQueue() {
- NCoro::TEventWaitQueue queue;
- UNIT_ASSERT(queue.Empty());
- UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant());
- TContExecutor exec(32000);
- exec.Execute([](TCont* cont, void* arg) {
- NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg;
- TTimerEvent ev(cont, TInstant::Max());
- TTimerEvent ev2(cont, TInstant::Seconds(12345));
- q->Register(&ev);
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
- q->Register(&ev2);
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max());
- }, &queue);
-}
-
+ , Self(self)
+ {
+ UNIT_ASSERT(self < 2);
+ }
+
+ TSubState& OtherState();
+
+ TState& Parent;
+ TTimerEvent* Event = nullptr;
+ TCont* Cont = nullptr;
+ TString Name;
+ ui32 Self = -1;
+ };
+
+ struct TState {
+ TState()
+ : Subs{{*this, 0}, {*this, 1}}
+ {}
+
+ TSubState Subs[2];
+ bool Stop = false;
+ };
+
+ TSubState& TSubState::OtherState() {
+ return Parent.Subs[1 - Self];
+ }
+
+ static void DoStop(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)(argPtr);
+
+ TTimerEvent event(cont, TInstant::Now());
+ ExecuteEvent(&event);
+ state.Stop = true;
+ for (auto& sub: state.Subs) {
+ if (sub.Event) {
+ sub.Event->Wake(EWAKEDUP);
+ }
+ }
+ }
+
+ static void DoSub(TCont* cont, void* argPtr) {
+ TSubState& state = *(TSubState*)(argPtr);
+
+ while (!state.Parent.Stop) {
+ TTimerEvent event(cont, TInstant::Max());
+ if (state.OtherState().Event) {
+ state.OtherState().Event->Wake(EWAKEDUP);
+ }
+ state.Event = &event;
+ ExecuteEvent(&event);
+ state.Event = nullptr;
+ }
+
+ state.Cont = nullptr;
+ }
+
+ static void DoMain(TCont* cont) noexcept {
+ TState state;
+
+ for (auto& subState : state.Subs) {
+ subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
+ }
+
+ cont->Join(
+ cont->Executor()->Create(DoStop, &state, "Stop")
+ );
+
+ for (auto& subState : state.Subs) {
+ if (subState.Cont) {
+ cont->Join(subState.Cont);
+ }
+ }
+ }
+}
+
+void TCoroTest::TestWaitWakeLivelockBug() {
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroWaitWakeLivelockBug::DoMain);
+}
+
+namespace NCoroTestFastPathWake {
+ struct TState;
+
+ struct TSubState {
+ TSubState(TState& parent, ui32 self)
+ : Parent(parent)
+ , Name(TStringBuilder() << "Sub" << self)
+ {}
+
+ TState& Parent;
+ TInstant Finish;
+ TTimerEvent* Event = nullptr;
+ TCont* Cont = nullptr;
+ TString Name;
+ };
+
+ struct TState {
+ TState()
+ : Subs{{*this, 0}, {*this, 1}}
+ {
+ TPipe::Pipe(In, Out);
+ SetNonBlock(In.GetHandle());
+ }
+
+ TSubState Subs[2];
+ TPipe In, Out;
+ bool IoSleepRunning = false;
+ };
+
+ static void DoIoSleep(TCont* cont, void* argPtr) noexcept {
+ try {
+ TState& state = *(TState*) (argPtr);
+ state.IoSleepRunning = true;
+
+ TTempBuf tmp;
+ // Wait for the event from io
+ auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine());
+ UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0);
+ state.IoSleepRunning = false;
+ } catch (const NUnitTest::TAssertException& ex) {
+ Cerr << ex.AsStrBuf() << Endl;
+ ex.BackTrace()->PrintTo(Cerr);
+ throw;
+ } catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ throw;
+ }
+ }
+
+ static void DoSub(TCont* cont, void* argPtr) noexcept {
+ TSubState& state = *(TSubState*)(argPtr);
+
+ TTimerEvent event(cont, TInstant::Max());
+ state.Event = &event;
+ ExecuteEvent(&event);
+ state.Event = nullptr;
+ state.Cont = nullptr;
+ state.Finish = TInstant::Now();
+ }
+
+ static void DoMain(TCont* cont) noexcept {
+ try {
+ TState state;
+ TInstant start = TInstant::Now();
+
+ // This guy sleeps on io
+ auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper");
+
+ // These guys are to be woken up right away
+ for (auto& subState : state.Subs) {
+ subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
+ }
+
+ // Give way
+ cont->Yield();
+
+ // Check everyone has started, wake those to be woken
+ UNIT_ASSERT(state.IoSleepRunning);
+
+ for (auto& subState : state.Subs) {
+ UNIT_ASSERT(subState.Event);
+ subState.Event->Wake(EWAKEDUP);
+ }
+
+ // Give way again
+ cont->Yield();
+
+ // Check the woken guys have finished and quite soon
+ for (auto& subState : state.Subs) {
+ UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100));
+ UNIT_ASSERT(!subState.Cont);
+ }
+
+ // Wake the io guy and finish
+ state.Out.Close();
+
+ if (state.IoSleepRunning) {
+ cont->Join(sleeper);
+ }
+
+ // Check everything has ended sooner than the timeout
+ UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1));
+ } catch (const NUnitTest::TAssertException& ex) {
+ Cerr << ex.AsStrBuf() << Endl;
+ ex.BackTrace()->PrintTo(Cerr);
+ throw;
+ } catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ throw;
+ }
+ }
+
+ static void DoTestFastPathWake(EContPoller pollerType) {
+ if (auto poller = IPollerFace::Construct(pollerType)) {
+ TContExecutor exec(20000, std::move(poller));
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroTestFastPathWake::DoMain);
+ }
+ }
+}
+
+void TCoroTest::TestFastPathWakeDefault() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default);
+}
+
+void TCoroTest::TestFastPathWakeEpoll() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll);
+}
+
+void TCoroTest::TestFastPathWakeKqueue() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue);
+}
+
+void TCoroTest::TestFastPathWakePoll() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll);
+}
+
+void TCoroTest::TestFastPathWakeSelect() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select);
+}
+
+namespace NCoroTestLegacyCancelYieldRaceBug {
+ enum class EState {
+ Idle, Running, Finished,
+ };
+
+ struct TState {
+ EState SubState = EState::Idle;
+ };
+
+ static void DoSub(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubState = EState::Running;
+ cont->Yield();
+ cont->Yield();
+ state.SubState = EState::Finished;
+ }
+
+ static void DoMain(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub");
+ sub->Cancel();
+ cont->Yield();
+ UNIT_ASSERT_EQUAL(state.SubState, EState::Finished);
+ }
+}
+
+void TCoroTest::TestLegacyCancelYieldRaceBug() {
+ NCoroTestLegacyCancelYieldRaceBug::TState state;
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state);
+}
+
+namespace NCoroTestJoinRescheduleBug {
+ enum class EState {
+ Idle, Running, Finished,
+ };
+
+ struct TState {
+ TCont* volatile SubA = nullptr;
+ volatile EState SubAState = EState::Idle;
+ volatile EState SubBState = EState::Idle;
+ volatile EState SubCState = EState::Idle;
+ };
+
+ static void DoSubC(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubCState = EState::Running;
+ while (state.SubBState != EState::Running) {
+ cont->Yield();
+ }
+ while (cont->SleepD(TInstant::Max()) != ECANCELED) {
+ }
+ state.SubCState = EState::Finished;
+ }
+
+ static void DoSubB(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubBState = EState::Running;
+ while (state.SubAState != EState::Running && state.SubCState != EState::Running) {
+ cont->Yield();
+ }
+ for (auto i : xrange(100)) {
+ Y_UNUSED(i);
+ if (!state.SubA) {
+ break;
+ }
+ state.SubA->ReSchedule();
+ cont->Yield();
+ }
+ state.SubBState = EState::Finished;
+ }
+
+ static void DoSubA(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubAState = EState::Running;
+ TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC");
+ while (state.SubBState != EState::Running && state.SubCState != EState::Running) {
+ cont->Yield();
+ }
+ cont->Join(subC);
+ UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
+ state.SubA = nullptr;
+ state.SubAState = EState::Finished;
+ }
+
+ static void DoMain(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA");
+ state.SubA = subA;
+ cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB"));
+
+ if (state.SubA) {
+ subA->Cancel();
+ cont->Join(subA);
+ }
+ }
+}
+
+void TCoroTest::TestJoinRescheduleBug() {
+ using namespace NCoroTestJoinRescheduleBug;
+ TState state;
+ {
+ TContExecutor exec(20000);
+ exec.Execute(DoMain, &state);
+ }
+ UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished);
+ UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished);
+ UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
+}
+
+void TCoroTest::TestEventQueue() {
+ NCoro::TEventWaitQueue queue;
+ UNIT_ASSERT(queue.Empty());
+ UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant());
+ TContExecutor exec(32000);
+ exec.Execute([](TCont* cont, void* arg) {
+ NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg;
+ TTimerEvent ev(cont, TInstant::Max());
+ TTimerEvent ev2(cont, TInstant::Seconds(12345));
+ q->Register(&ev);
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
+ q->Register(&ev2);
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max());
+ }, &queue);
+}
+
void TCoroTest::TestNestedExecutor() {
#ifndef _win_
//nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr