aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/coroutine
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/coroutine
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/coroutine')
-rw-r--r--library/cpp/coroutine/engine/callbacks.h18
-rw-r--r--library/cpp/coroutine/engine/condvar.h38
-rw-r--r--library/cpp/coroutine/engine/cont_poller.cpp70
-rw-r--r--library/cpp/coroutine/engine/cont_poller.h245
-rw-r--r--library/cpp/coroutine/engine/coroutine_ut.cpp1007
-rw-r--r--library/cpp/coroutine/engine/custom_time.h10
-rw-r--r--library/cpp/coroutine/engine/events.h148
-rw-r--r--library/cpp/coroutine/engine/helper.cpp37
-rw-r--r--library/cpp/coroutine/engine/helper.h15
-rw-r--r--library/cpp/coroutine/engine/impl.cpp374
-rw-r--r--library/cpp/coroutine/engine/impl.h313
-rw-r--r--library/cpp/coroutine/engine/iostatus.cpp1
-rw-r--r--library/cpp/coroutine/engine/iostatus.h91
-rw-r--r--library/cpp/coroutine/engine/mutex.h49
-rw-r--r--library/cpp/coroutine/engine/network.cpp325
-rw-r--r--library/cpp/coroutine/engine/network.h55
-rw-r--r--library/cpp/coroutine/engine/poller.cpp390
-rw-r--r--library/cpp/coroutine/engine/poller.h50
-rw-r--r--library/cpp/coroutine/engine/sockmap.h24
-rw-r--r--library/cpp/coroutine/engine/sockpool.cpp58
-rw-r--r--library/cpp/coroutine/engine/sockpool.h253
-rw-r--r--library/cpp/coroutine/engine/stack/benchmark/alloc_bm.cpp316
-rw-r--r--library/cpp/coroutine/engine/stack/benchmark/ya.make13
-rw-r--r--library/cpp/coroutine/engine/stack/stack.cpp67
-rw-r--r--library/cpp/coroutine/engine/stack/stack.h77
-rw-r--r--library/cpp/coroutine/engine/stack/stack_allocator.cpp26
-rw-r--r--library/cpp/coroutine/engine/stack/stack_allocator.h52
-rw-r--r--library/cpp/coroutine/engine/stack/stack_allocator.inl138
-rw-r--r--library/cpp/coroutine/engine/stack/stack_common.h35
-rw-r--r--library/cpp/coroutine/engine/stack/stack_guards.cpp17
-rw-r--r--library/cpp/coroutine/engine/stack/stack_guards.h123
-rw-r--r--library/cpp/coroutine/engine/stack/stack_pool.h54
-rw-r--r--library/cpp/coroutine/engine/stack/stack_pool.inl132
-rw-r--r--library/cpp/coroutine/engine/stack/stack_storage.cpp46
-rw-r--r--library/cpp/coroutine/engine/stack/stack_storage.h60
-rw-r--r--library/cpp/coroutine/engine/stack/stack_utils.cpp84
-rw-r--r--library/cpp/coroutine/engine/stack/stack_utils.h27
-rw-r--r--library/cpp/coroutine/engine/stack/ut/stack_allocator_ut.cpp115
-rw-r--r--library/cpp/coroutine/engine/stack/ut/stack_guards_ut.cpp158
-rw-r--r--library/cpp/coroutine/engine/stack/ut/stack_pool_ut.cpp70
-rw-r--r--library/cpp/coroutine/engine/stack/ut/stack_ut.cpp60
-rw-r--r--library/cpp/coroutine/engine/stack/ut/stack_utils_ut.cpp73
-rw-r--r--library/cpp/coroutine/engine/stack/ut/ya.make17
-rw-r--r--library/cpp/coroutine/engine/trampoline.cpp50
-rw-r--r--library/cpp/coroutine/engine/trampoline.h60
-rw-r--r--library/cpp/coroutine/engine/ya.make36
-rw-r--r--library/cpp/coroutine/listener/listen.cpp354
-rw-r--r--library/cpp/coroutine/listener/listen.h125
-rw-r--r--library/cpp/coroutine/listener/ya.make13
-rw-r--r--library/cpp/coroutine/ya.make11
50 files changed, 5980 insertions, 0 deletions
diff --git a/library/cpp/coroutine/engine/callbacks.h b/library/cpp/coroutine/engine/callbacks.h
new file mode 100644
index 0000000000..e81b17344f
--- /dev/null
+++ b/library/cpp/coroutine/engine/callbacks.h
@@ -0,0 +1,18 @@
+#pragma once
+
+class TCont;
+class TContExecutor;
+
+namespace NCoro {
+ class IScheduleCallback {
+ public:
+ virtual void OnSchedule(TContExecutor&, TCont&) = 0;
+ virtual void OnUnschedule(TContExecutor&) = 0;
+ };
+
+ class IEnterPollerCallback {
+ public:
+ virtual void OnEnterPoller() = 0;
+ virtual void OnExitPoller() = 0;
+ };
+}
diff --git a/library/cpp/coroutine/engine/condvar.h b/library/cpp/coroutine/engine/condvar.h
new file mode 100644
index 0000000000..ffceede6fa
--- /dev/null
+++ b/library/cpp/coroutine/engine/condvar.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include "events.h"
+#include "mutex.h"
+
+class TContCondVar {
+public:
+ int WaitD(TCont* current, TContMutex* mutex, TInstant deadline) {
+ mutex->UnLock();
+
+ const int ret = WaitQueue_.WaitD(current, deadline);
+
+ if (ret != EWAKEDUP) {
+ return ret;
+ }
+
+ return mutex->LockD(current, deadline);
+ }
+
+ int WaitT(TCont* current, TContMutex* mutex, TDuration timeout) {
+ return WaitD(current, mutex, timeout.ToDeadLine());
+ }
+
+ int WaitI(TCont* current, TContMutex* mutex) {
+ return WaitD(current, mutex, TInstant::Max());
+ }
+
+ void Signal() noexcept {
+ WaitQueue_.Signal();
+ }
+
+ void BroadCast() noexcept {
+ WaitQueue_.BroadCast();
+ }
+
+private:
+ TContWaitQueue WaitQueue_;
+};
diff --git a/library/cpp/coroutine/engine/cont_poller.cpp b/library/cpp/coroutine/engine/cont_poller.cpp
new file mode 100644
index 0000000000..76593d4e9b
--- /dev/null
+++ b/library/cpp/coroutine/engine/cont_poller.cpp
@@ -0,0 +1,70 @@
+#include "cont_poller.h"
+#include "impl.h"
+
+namespace NCoro {
+ namespace {
+ template <class T>
+ int DoExecuteEvent(T* event) noexcept {
+ auto* cont = event->Cont();
+
+ if (cont->Cancelled()) {
+ return ECANCELED;
+ }
+
+ cont->Executor()->ScheduleIoWait(event);
+ cont->Switch();
+
+ if (cont->Cancelled()) {
+ return ECANCELED;
+ }
+
+ return event->Status();
+ }
+ }
+
+ void TContPollEvent::Wake() noexcept {
+ UnLink();
+ Cont()->ReSchedule();
+ }
+
+
+ TInstant TEventWaitQueue::WakeTimedout(TInstant now) noexcept {
+ TIoWait::TIterator it = IoWait_.Begin();
+
+ if (it != IoWait_.End()) {
+ if (it->DeadLine() > now) {
+ return it->DeadLine();
+ }
+
+ do {
+ (it++)->Wake(ETIMEDOUT);
+ } while (it != IoWait_.End() && it->DeadLine() <= now);
+ }
+
+ return now;
+ }
+
+ void TEventWaitQueue::Register(NCoro::TContPollEvent* event) {
+ IoWait_.Insert(event);
+ event->Cont()->Unlink();
+ }
+
+ void TEventWaitQueue::Abort() noexcept {
+ auto visitor = [](TContPollEvent& e) {
+ e.Cont()->Cancel();
+ };
+ IoWait_.ForEach(visitor);
+ }
+}
+
+void TFdEvent::RemoveFromIOWait() noexcept {
+ this->Cont()->Executor()->Poller()->Remove(this);
+}
+
+int ExecuteEvent(TFdEvent* event) noexcept {
+ return NCoro::DoExecuteEvent(event);
+}
+
+int ExecuteEvent(TTimerEvent* event) noexcept {
+ return NCoro::DoExecuteEvent(event);
+}
diff --git a/library/cpp/coroutine/engine/cont_poller.h b/library/cpp/coroutine/engine/cont_poller.h
new file mode 100644
index 0000000000..b638b2df1a
--- /dev/null
+++ b/library/cpp/coroutine/engine/cont_poller.h
@@ -0,0 +1,245 @@
+#pragma once
+
+#include "poller.h"
+#include "sockmap.h"
+
+#include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>
+
+#include <util/datetime/base.h>
+#include <util/memory/pool.h>
+#include <util/memory/smallobj.h>
+#include <util/network/init.h>
+
+#include <cerrno>
+
+
+class TCont;
+class TContExecutor;
+class TFdEvent;
+
+namespace NCoro {
+
+ class IPollEvent;
+
+
+ struct TContPollEventCompare {
+ template <class T>
+ static inline bool Compare(const T& l, const T& r) noexcept {
+ return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
+ }
+ };
+
+
+ class TContPollEvent : public TRbTreeItem<TContPollEvent, TContPollEventCompare> {
+ public:
+ TContPollEvent(TCont* cont, TInstant deadLine) noexcept
+ : Cont_(cont)
+ , DeadLine_(deadLine)
+ {}
+
+ static bool Compare(const TContPollEvent& l, const TContPollEvent& r) noexcept {
+ return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
+ }
+
+ int Status() const noexcept {
+ return Status_;
+ }
+
+ void SetStatus(int status) noexcept {
+ Status_ = status;
+ }
+
+ TCont* Cont() noexcept {
+ return Cont_;
+ }
+
+ TInstant DeadLine() const noexcept {
+ return DeadLine_;
+ }
+
+ void Wake(int status) noexcept {
+ SetStatus(status);
+ Wake();
+ }
+
+ private:
+ void Wake() noexcept;
+
+ private:
+ TCont* Cont_;
+ TInstant DeadLine_;
+ int Status_ = EINPROGRESS;
+ };
+
+
+ class IPollEvent: public TIntrusiveListItem<IPollEvent> {
+ public:
+ IPollEvent(SOCKET fd, ui16 what) noexcept
+ : Fd_(fd)
+ , What_(what)
+ {}
+
+ virtual ~IPollEvent() {}
+
+ SOCKET Fd() const noexcept {
+ return Fd_;
+ }
+
+ int What() const noexcept {
+ return What_;
+ }
+
+ virtual void OnPollEvent(int status) noexcept = 0;
+
+ private:
+ SOCKET Fd_;
+ ui16 What_;
+ };
+
+
+ template <class T>
+ class TBigArray {
+ struct TValue: public T, public TObjectFromPool<TValue> {
+ TValue() {}
+ };
+
+ public:
+ TBigArray()
+ : Pool_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
+ {}
+
+ T* Get(size_t index) {
+ TRef& ret = Lst_.Get(index);
+ if (!ret) {
+ ret = TRef(new (&Pool_) TValue());
+ }
+ return ret.Get();
+ }
+
+ private:
+ using TRef = THolder<TValue>;
+ typename TValue::TPool Pool_;
+ TSocketMap<TRef> Lst_;
+ };
+
+
+ using TPollEventList = TIntrusiveList<IPollEvent>;
+
+ class TContPoller {
+ public:
+ using TEvent = IPollerFace::TEvent;
+ using TEvents = IPollerFace::TEvents;
+
+ TContPoller()
+ : P_(IPollerFace::Default())
+ {
+ }
+
+ explicit TContPoller(THolder<IPollerFace> poller)
+ : P_(std::move(poller))
+ {}
+
+ void Schedule(IPollEvent* event) {
+ auto* lst = Lists_.Get(event->Fd());
+ const ui16 oldFlags = Flags(*lst);
+ lst->PushFront(event);
+ ui16 newFlags = Flags(*lst);
+
+ if (newFlags != oldFlags) {
+ if (oldFlags) {
+ newFlags |= CONT_POLL_MODIFY;
+ }
+
+ P_->Set(lst, event->Fd(), newFlags);
+ }
+ }
+
+ void Remove(IPollEvent* event) noexcept {
+ auto* lst = Lists_.Get(event->Fd());
+ const ui16 oldFlags = Flags(*lst);
+ event->Unlink();
+ ui16 newFlags = Flags(*lst);
+
+ if (newFlags != oldFlags) {
+ if (newFlags) {
+ newFlags |= CONT_POLL_MODIFY;
+ }
+
+ P_->Set(lst, event->Fd(), newFlags);
+ }
+ }
+
+ void Wait(TEvents& events, TInstant deadLine) {
+ events.clear();
+ P_->Wait(events, deadLine);
+ }
+
+ EContPoller PollEngine() const {
+ return P_->PollEngine();
+ }
+ private:
+ static ui16 Flags(TIntrusiveList<IPollEvent>& lst) noexcept {
+ ui16 ret = 0;
+ for (auto&& item : lst) {
+ ret |= item.What();
+ }
+ return ret;
+ }
+
+ private:
+ TBigArray<TPollEventList> Lists_;
+ THolder<IPollerFace> P_;
+ };
+
+
+ class TEventWaitQueue {
+ using TIoWait = TRbTree<NCoro::TContPollEvent, NCoro::TContPollEventCompare>;
+
+ public:
+ void Register(NCoro::TContPollEvent* event);
+
+ bool Empty() const noexcept {
+ return IoWait_.Empty();
+ }
+
+ void Abort() noexcept;
+
+ TInstant WakeTimedout(TInstant now) noexcept;
+
+ private:
+ TIoWait IoWait_;
+ };
+}
+
+class TFdEvent final:
+ public NCoro::TContPollEvent,
+ public NCoro::IPollEvent
+{
+public:
+ TFdEvent(TCont* cont, SOCKET fd, ui16 what, TInstant deadLine) noexcept
+ : TContPollEvent(cont, deadLine)
+ , IPollEvent(fd, what)
+ {}
+
+ ~TFdEvent() {
+ RemoveFromIOWait();
+ }
+
+ void RemoveFromIOWait() noexcept;
+
+ void OnPollEvent(int status) noexcept override {
+ Wake(status);
+ }
+};
+
+
+class TTimerEvent: public NCoro::TContPollEvent {
+public:
+ TTimerEvent(TCont* cont, TInstant deadLine) noexcept
+ : TContPollEvent(cont, deadLine)
+ {}
+};
+
+int ExecuteEvent(TFdEvent* event) noexcept;
+
+int ExecuteEvent(TTimerEvent* event) noexcept;
diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp
new file mode 100644
index 0000000000..8b372496a2
--- /dev/null
+++ b/library/cpp/coroutine/engine/coroutine_ut.cpp
@@ -0,0 +1,1007 @@
+#include "impl.h"
+#include "condvar.h"
+#include "network.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/string/cast.h>
+#include <util/system/pipe.h>
+#include <util/system/env.h>
+#include <util/system/info.h>
+#include <util/system/thread.h>
+#include <util/generic/xrange.h>
+#include <util/generic/serialized_enum.h>
+
+// TODO (velavokr): BALANCER-1345 add more tests on pollers
+
+class TCoroTest: public TTestBase {
+ UNIT_TEST_SUITE(TCoroTest);
+ UNIT_TEST(TestSimpleX1);
+ UNIT_TEST(TestSimpleX1MultiThread);
+ UNIT_TEST(TestSimpleX2);
+ UNIT_TEST(TestSimpleX3);
+ UNIT_TEST(TestMemFun);
+ UNIT_TEST(TestMutex);
+ UNIT_TEST(TestCondVar);
+ UNIT_TEST(TestJoinDefault);
+ UNIT_TEST(TestJoinEpoll);
+ UNIT_TEST(TestJoinKqueue);
+ UNIT_TEST(TestJoinPoll);
+ UNIT_TEST(TestJoinSelect);
+ UNIT_TEST(TestException);
+ UNIT_TEST(TestJoinCancelExitRaceBug);
+ UNIT_TEST(TestWaitWakeLivelockBug);
+ UNIT_TEST(TestFastPathWakeDefault)
+ // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs
+// UNIT_TEST(TestFastPathWakeEpoll)
+ UNIT_TEST(TestFastPathWakeKqueue)
+ UNIT_TEST(TestFastPathWakePoll)
+ UNIT_TEST(TestFastPathWakeSelect)
+ UNIT_TEST(TestLegacyCancelYieldRaceBug)
+ UNIT_TEST(TestJoinRescheduleBug);
+ UNIT_TEST(TestEventQueue)
+ UNIT_TEST(TestNestedExecutor)
+ UNIT_TEST(TestComputeCoroutineYield)
+ UNIT_TEST(TestPollEngines);
+ UNIT_TEST(TestUserEvent);
+ UNIT_TEST(TestPause);
+ UNIT_TEST(TestOverrideTime);
+ UNIT_TEST_SUITE_END();
+
+public:
+ void TestException();
+ void TestSimpleX1();
+ void TestSimpleX1MultiThread();
+ void TestSimpleX2();
+ void TestSimpleX3();
+ void TestMemFun();
+ void TestMutex();
+ void TestCondVar();
+ void TestJoinDefault();
+ void TestJoinEpoll();
+ void TestJoinKqueue();
+ void TestJoinPoll();
+ void TestJoinSelect();
+ void TestJoinCancelExitRaceBug();
+ void TestWaitWakeLivelockBug();
+ void TestFastPathWakeDefault();
+ void TestFastPathWakeEpoll();
+ void TestFastPathWakeKqueue();
+ void TestFastPathWakePoll();
+ void TestFastPathWakeSelect();
+ void TestLegacyCancelYieldRaceBug();
+ void TestJoinRescheduleBug();
+ void TestEventQueue();
+ void TestNestedExecutor();
+ void TestComputeCoroutineYield();
+ void TestPollEngines();
+ void TestUserEvent();
+ void TestPause();
+ void TestOverrideTime();
+};
+
+void TCoroTest::TestException() {
+ TContExecutor e(1000000);
+
+ bool f2run = false;
+
+ auto f1 = [&f2run](TCont* c) {
+ struct TCtx {
+ ~TCtx() {
+ Y_VERIFY(!*F2);
+
+ C->Yield();
+ }
+
+ TCont* C;
+ bool* F2;
+ };
+
+ try {
+ TCtx ctx = {c, &f2run};
+
+ throw 1;
+ } catch (...) {
+ }
+ };
+
+ bool unc = true;
+
+ auto f2 = [&unc, &f2run](TCont*) {
+ f2run = true;
+ unc = std::uncaught_exception();
+
+ // check segfault
+ try {
+ throw 2;
+ } catch (int) {
+ }
+ };
+
+ e.Create(f1, "f1");
+ e.Create(f2, "f2");
+ e.Execute();
+
+ UNIT_ASSERT(!unc);
+}
+
+static int i0;
+
+static void CoRun(TCont* c, void* /*run*/) {
+ while (i0 < 100000) {
+ ++i0;
+ UNIT_ASSERT(RunningCont() == c);
+ c->Yield();
+ UNIT_ASSERT(RunningCont() == c);
+ }
+}
+
+static void CoMain(TCont* c, void* /*arg*/) {
+ for (volatile size_t i2 = 0; i2 < 10; ++i2) {
+ UNIT_ASSERT(RunningCont() == c);
+ c->Executor()->Create(CoRun, nullptr, "run");
+ UNIT_ASSERT(RunningCont() == c);
+ }
+}
+
+void TCoroTest::TestSimpleX1() {
+ i0 = 0;
+ TContExecutor e(32000);
+
+ UNIT_ASSERT(RunningCont() == nullptr);
+
+ e.Execute(CoMain);
+ UNIT_ASSERT_VALUES_EQUAL(i0, 100000);
+
+ UNIT_ASSERT(RunningCont() == nullptr);
+}
+
+void TCoroTest::TestSimpleX1MultiThread() {
+ TVector<THolder<TThread>> threads;
+ const size_t nThreads = 0;
+ TAtomic c = 0;
+ for (size_t i = 0; i < nThreads; ++i) {
+ threads.push_back(MakeHolder<TThread>([&]() {
+ TestSimpleX1();
+ AtomicIncrement(c);
+ }));
+ }
+
+ for (auto& t : threads) {
+ t->Start();
+ }
+
+ for (auto& t: threads) {
+ t->Join();
+ }
+
+ UNIT_ASSERT_EQUAL(c, nThreads);
+}
+
+struct TTestObject {
+ int i = 0;
+ int j = 0;
+
+public:
+ void RunTask1(TCont*) {
+ i = 1;
+ }
+ void RunTask2(TCont*) {
+ j = 2;
+ }
+};
+
+void TCoroTest::TestMemFun() {
+ i0 = 0;
+ TContExecutor e(32000);
+ TTestObject obj;
+ e.Create<TTestObject, &TTestObject::RunTask1>(&obj, "test1");
+ e.Execute<TTestObject, &TTestObject::RunTask2>(&obj);
+ UNIT_ASSERT_EQUAL(obj.i, 1);
+ UNIT_ASSERT_EQUAL(obj.j, 2);
+}
+
+void TCoroTest::TestSimpleX2() {
+ {
+ i0 = 0;
+
+ {
+ TContExecutor e(32000);
+ e.Execute(CoMain);
+ }
+
+ UNIT_ASSERT_EQUAL(i0, 100000);
+ }
+
+ {
+ i0 = 0;
+
+ {
+ TContExecutor e(32000);
+ e.Execute(CoMain);
+ }
+
+ UNIT_ASSERT_EQUAL(i0, 100000);
+ }
+}
+
+struct TRunner {
+ inline TRunner()
+ : Runs(0)
+ {
+ }
+
+ inline void operator()(TCont* c) {
+ ++Runs;
+ c->Yield();
+ }
+
+ size_t Runs;
+};
+
+void TCoroTest::TestSimpleX3() {
+ TContExecutor e(32000);
+ TRunner runner;
+
+ for (volatile size_t i3 = 0; i3 < 1000; ++i3) {
+ e.Create(runner, "runner");
+ }
+
+ e.Execute();
+
+ UNIT_ASSERT_EQUAL(runner.Runs, 1000);
+}
+
+static TString res;
+static TContMutex mutex;
+
+static void CoMutex(TCont* c, void* /*run*/) {
+ {
+ mutex.LockI(c);
+ c->Yield();
+ res += c->Name();
+ mutex.UnLock();
+ }
+
+ c->Yield();
+
+ {
+ mutex.LockI(c);
+ c->Yield();
+ res += c->Name();
+ mutex.UnLock();
+ }
+}
+
+static void CoMutexTest(TCont* c, void* /*run*/) {
+ c->Executor()->Create(CoMutex, nullptr, "1");
+ c->Executor()->Create(CoMutex, nullptr, "2");
+}
+
+void TCoroTest::TestMutex() {
+ TContExecutor e(32000);
+ e.Execute(CoMutexTest);
+ UNIT_ASSERT_EQUAL(res, "1212");
+ res.clear();
+}
+
+static TContMutex m1;
+static TContCondVar c1;
+
+static void CoCondVar(TCont* c, void* /*run*/) {
+ for (size_t i4 = 0; i4 < 3; ++i4) {
+ UNIT_ASSERT_EQUAL(m1.LockI(c), 0);
+ UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0);
+ res += c->Name();
+ m1.UnLock();
+ }
+}
+
+static void CoCondVarTest(TCont* c, void* /*run*/) {
+ c->Executor()->Create(CoCondVar, nullptr, "1");
+ c->Yield();
+ c->Executor()->Create(CoCondVar, nullptr, "2");
+ c->Yield();
+ c->Executor()->Create(CoCondVar, nullptr, "3");
+ c->Yield();
+ c->Executor()->Create(CoCondVar, nullptr, "4");
+ c->Yield();
+ c->Executor()->Create(CoCondVar, nullptr, "5");
+ c->Yield();
+ c->Executor()->Create(CoCondVar, nullptr, "6");
+ c->Yield();
+
+ for (size_t i5 = 0; i5 < 3; ++i5) {
+ res += ToString((size_t)i5) + "^";
+ c1.BroadCast();
+ c->Yield();
+ }
+}
+
+void TCoroTest::TestCondVar() {
+ TContExecutor e(32000);
+ e.Execute(CoCondVarTest);
+ UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456");
+ res.clear();
+}
+
+namespace NCoroTestJoin {
+ struct TSleepCont {
+ const TInstant Deadline;
+ int Result;
+
+ inline void operator()(TCont* c) {
+ Result = c->SleepD(Deadline);
+ }
+ };
+
+ struct TReadCont {
+ const TInstant Deadline;
+ const SOCKET Sock;
+ int Result;
+
+ inline void operator()(TCont* c) {
+ char buf = 0;
+ Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status();
+ }
+ };
+
+ struct TJoinCont {
+ const TInstant Deadline;
+ TCont* const Cont;
+ bool Result;
+
+ inline void operator()(TCont* c) {
+ Result = c->Join(Cont, Deadline);
+ }
+ };
+
+ void DoTestJoin(EContPoller pollerType) {
+ auto poller = IPollerFace::Construct(pollerType);
+
+ if (!poller) {
+ return;
+ }
+
+ TContExecutor e(32000, std::move(poller));
+
+ TPipe in, out;
+ TPipe::Pipe(in, out);
+ SetNonBlock(in.GetHandle());
+
+ {
+ TSleepCont sc = {TInstant::Max(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+
+ {
+ TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT);
+ UNIT_ASSERT_EQUAL(jc.Result, true);
+ }
+
+ {
+ TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+
+ {
+ TReadCont rc = {TInstant::Max(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+
+ {
+ TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT);
+ UNIT_ASSERT_EQUAL(jc.Result, true);
+ }
+
+ {
+ TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+ }
+}
+
+void TCoroTest::TestJoinDefault() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Default);
+}
+
+void TCoroTest::TestJoinEpoll() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Epoll);
+}
+
+void TCoroTest::TestJoinKqueue() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Kqueue);
+}
+
+void TCoroTest::TestJoinPoll() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Poll);
+}
+
+void TCoroTest::TestJoinSelect() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Select);
+}
+
+namespace NCoroJoinCancelExitRaceBug {
+ struct TState {
+ TCont* Sub = nullptr;
+ };
+
+ static void DoAux(TCont*, void* argPtr) noexcept {
+ TState& state = *(TState*)(argPtr);
+
+ // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]}
+ state.Sub->Cancel();
+ }
+
+ static void DoSub2(TCont*, void*) noexcept {
+ // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]}
+ // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]}
+ }
+
+ static void DoSub(TCont* cont, void* argPtr) noexcept {
+ TState& state = *(TState*)(argPtr);
+ state.Sub = cont;
+
+ // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]}
+ auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2");
+
+ // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux)
+ // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]}
+ // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit
+ cont->Join(sub2);
+
+ state.Sub = nullptr;
+ }
+
+ static void DoMain(TCont* cont) noexcept {
+ TState state;
+
+ // 01.{Ready:[]} > {Ready:[Sub]}
+ auto* sub = cont->Executor()->Create(DoSub, &state, "Sub");
+
+ // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]}
+ cont->Executor()->Create(DoAux, &state, "Aux");
+
+ // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub)
+ cont->Join(sub);
+ }
+}
+
+void TCoroTest::TestJoinCancelExitRaceBug() {
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroJoinCancelExitRaceBug::DoMain);
+}
+
+namespace NCoroWaitWakeLivelockBug {
+ struct TState;
+
+ struct TSubState {
+ TSubState(TState& parent, ui32 self)
+ : Parent(parent)
+ , Name(TStringBuilder() << "Sub" << self)
+ , Self(self)
+ {
+ UNIT_ASSERT(self < 2);
+ }
+
+ TSubState& OtherState();
+
+ TState& Parent;
+ TTimerEvent* Event = nullptr;
+ TCont* Cont = nullptr;
+ TString Name;
+ ui32 Self = -1;
+ };
+
+ struct TState {
+ TState()
+ : Subs{{*this, 0}, {*this, 1}}
+ {}
+
+ TSubState Subs[2];
+ bool Stop = false;
+ };
+
+ TSubState& TSubState::OtherState() {
+ return Parent.Subs[1 - Self];
+ }
+
+ static void DoStop(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)(argPtr);
+
+ TTimerEvent event(cont, TInstant::Now());
+ ExecuteEvent(&event);
+ state.Stop = true;
+ for (auto& sub: state.Subs) {
+ if (sub.Event) {
+ sub.Event->Wake(EWAKEDUP);
+ }
+ }
+ }
+
+ static void DoSub(TCont* cont, void* argPtr) {
+ TSubState& state = *(TSubState*)(argPtr);
+
+ while (!state.Parent.Stop) {
+ TTimerEvent event(cont, TInstant::Max());
+ if (state.OtherState().Event) {
+ state.OtherState().Event->Wake(EWAKEDUP);
+ }
+ state.Event = &event;
+ ExecuteEvent(&event);
+ state.Event = nullptr;
+ }
+
+ state.Cont = nullptr;
+ }
+
+ static void DoMain(TCont* cont) noexcept {
+ TState state;
+
+ for (auto& subState : state.Subs) {
+ subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
+ }
+
+ cont->Join(
+ cont->Executor()->Create(DoStop, &state, "Stop")
+ );
+
+ for (auto& subState : state.Subs) {
+ if (subState.Cont) {
+ cont->Join(subState.Cont);
+ }
+ }
+ }
+}
+
+void TCoroTest::TestWaitWakeLivelockBug() {
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroWaitWakeLivelockBug::DoMain);
+}
+
+namespace NCoroTestFastPathWake {
+ struct TState;
+
+ struct TSubState {
+ TSubState(TState& parent, ui32 self)
+ : Parent(parent)
+ , Name(TStringBuilder() << "Sub" << self)
+ {}
+
+ TState& Parent;
+ TInstant Finish;
+ TTimerEvent* Event = nullptr;
+ TCont* Cont = nullptr;
+ TString Name;
+ };
+
+ struct TState {
+ TState()
+ : Subs{{*this, 0}, {*this, 1}}
+ {
+ TPipe::Pipe(In, Out);
+ SetNonBlock(In.GetHandle());
+ }
+
+ TSubState Subs[2];
+ TPipe In, Out;
+ bool IoSleepRunning = false;
+ };
+
+ static void DoIoSleep(TCont* cont, void* argPtr) noexcept {
+ try {
+ TState& state = *(TState*) (argPtr);
+ state.IoSleepRunning = true;
+
+ TTempBuf tmp;
+ // Wait for the event from io
+ auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine());
+ UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0);
+ state.IoSleepRunning = false;
+ } catch (const NUnitTest::TAssertException& ex) {
+ Cerr << ex.AsStrBuf() << Endl;
+ ex.BackTrace()->PrintTo(Cerr);
+ throw;
+ } catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ throw;
+ }
+ }
+
+ static void DoSub(TCont* cont, void* argPtr) noexcept {
+ TSubState& state = *(TSubState*)(argPtr);
+
+ TTimerEvent event(cont, TInstant::Max());
+ state.Event = &event;
+ ExecuteEvent(&event);
+ state.Event = nullptr;
+ state.Cont = nullptr;
+ state.Finish = TInstant::Now();
+ }
+
+ static void DoMain(TCont* cont) noexcept {
+ try {
+ TState state;
+ TInstant start = TInstant::Now();
+
+ // This guy sleeps on io
+ auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper");
+
+ // These guys are to be woken up right away
+ for (auto& subState : state.Subs) {
+ subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
+ }
+
+ // Give way
+ cont->Yield();
+
+ // Check everyone has started, wake those to be woken
+ UNIT_ASSERT(state.IoSleepRunning);
+
+ for (auto& subState : state.Subs) {
+ UNIT_ASSERT(subState.Event);
+ subState.Event->Wake(EWAKEDUP);
+ }
+
+ // Give way again
+ cont->Yield();
+
+ // Check the woken guys have finished and quite soon
+ for (auto& subState : state.Subs) {
+ UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100));
+ UNIT_ASSERT(!subState.Cont);
+ }
+
+ // Wake the io guy and finish
+ state.Out.Close();
+
+ if (state.IoSleepRunning) {
+ cont->Join(sleeper);
+ }
+
+ // Check everything has ended sooner than the timeout
+ UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1));
+ } catch (const NUnitTest::TAssertException& ex) {
+ Cerr << ex.AsStrBuf() << Endl;
+ ex.BackTrace()->PrintTo(Cerr);
+ throw;
+ } catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ throw;
+ }
+ }
+
+ static void DoTestFastPathWake(EContPoller pollerType) {
+ if (auto poller = IPollerFace::Construct(pollerType)) {
+ TContExecutor exec(20000, std::move(poller));
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroTestFastPathWake::DoMain);
+ }
+ }
+}
+
+void TCoroTest::TestFastPathWakeDefault() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default);
+}
+
+void TCoroTest::TestFastPathWakeEpoll() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll);
+}
+
+void TCoroTest::TestFastPathWakeKqueue() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue);
+}
+
+void TCoroTest::TestFastPathWakePoll() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll);
+}
+
+void TCoroTest::TestFastPathWakeSelect() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select);
+}
+
+namespace NCoroTestLegacyCancelYieldRaceBug {
+ enum class EState {
+ Idle, Running, Finished,
+ };
+
+ struct TState {
+ EState SubState = EState::Idle;
+ };
+
+ static void DoSub(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubState = EState::Running;
+ cont->Yield();
+ cont->Yield();
+ state.SubState = EState::Finished;
+ }
+
+ static void DoMain(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub");
+ sub->Cancel();
+ cont->Yield();
+ UNIT_ASSERT_EQUAL(state.SubState, EState::Finished);
+ }
+}
+
+void TCoroTest::TestLegacyCancelYieldRaceBug() {
+ NCoroTestLegacyCancelYieldRaceBug::TState state;
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state);
+}
+
+namespace NCoroTestJoinRescheduleBug {
+ enum class EState {
+ Idle, Running, Finished,
+ };
+
+ struct TState {
+ TCont* volatile SubA = nullptr;
+ volatile EState SubAState = EState::Idle;
+ volatile EState SubBState = EState::Idle;
+ volatile EState SubCState = EState::Idle;
+ };
+
+ static void DoSubC(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubCState = EState::Running;
+ while (state.SubBState != EState::Running) {
+ cont->Yield();
+ }
+ while (cont->SleepD(TInstant::Max()) != ECANCELED) {
+ }
+ state.SubCState = EState::Finished;
+ }
+
+ static void DoSubB(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubBState = EState::Running;
+ while (state.SubAState != EState::Running && state.SubCState != EState::Running) {
+ cont->Yield();
+ }
+ for (auto i : xrange(100)) {
+ Y_UNUSED(i);
+ if (!state.SubA) {
+ break;
+ }
+ state.SubA->ReSchedule();
+ cont->Yield();
+ }
+ state.SubBState = EState::Finished;
+ }
+
+ static void DoSubA(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubAState = EState::Running;
+ TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC");
+ while (state.SubBState != EState::Running && state.SubCState != EState::Running) {
+ cont->Yield();
+ }
+ cont->Join(subC);
+ UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
+ state.SubA = nullptr;
+ state.SubAState = EState::Finished;
+ }
+
+ static void DoMain(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA");
+ state.SubA = subA;
+ cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB"));
+
+ if (state.SubA) {
+ subA->Cancel();
+ cont->Join(subA);
+ }
+ }
+}
+
+void TCoroTest::TestJoinRescheduleBug() {
+ using namespace NCoroTestJoinRescheduleBug;
+ TState state;
+ {
+ TContExecutor exec(20000);
+ exec.Execute(DoMain, &state);
+ }
+ UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished);
+ UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished);
+ UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
+}
+
+void TCoroTest::TestEventQueue() {
+ NCoro::TEventWaitQueue queue;
+ UNIT_ASSERT(queue.Empty());
+ UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant());
+ TContExecutor exec(32000);
+ exec.Execute([](TCont* cont, void* arg) {
+ NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg;
+ TTimerEvent ev(cont, TInstant::Max());
+ TTimerEvent ev2(cont, TInstant::Seconds(12345));
+ q->Register(&ev);
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
+ q->Register(&ev2);
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max());
+ }, &queue);
+}
+
+void TCoroTest::TestNestedExecutor() {
+#ifndef _win_
+ //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr
+ TContExecutor exec(32000);
+ UNIT_ASSERT(!RunningCont());
+
+ exec.Execute([](TCont* cont, void*) {
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
+
+ TContExecutor exec2(32000);
+ exec2.Execute([](TCont* cont2, void*) {
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
+ TContExecutor exec3(32000);
+ exec3.Execute([](TCont* cont3, void*) {
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3);
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2);
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont);
+ });
+
+ UNIT_ASSERT(!RunningCont());
+#endif
+}
+
+void TCoroTest::TestComputeCoroutineYield() {
+//if we have busy (e.g., on cpu) coroutine, when it yields, io must flow
+ TContExecutor exec(32000);
+ exec.SetFailOnError(true);
+
+ TPipe in, out;
+ TPipe::Pipe(in, out);
+ SetNonBlock(in.GetHandle());
+ size_t lastRead = 42;
+
+ auto compute = [&](TCont* cont) {
+ for (size_t i = 0; i < 10; ++i) {
+ write(out.GetHandle(), &i, sizeof i);
+ Sleep(TDuration::MilliSeconds(10));
+ cont->Yield();
+ UNIT_ASSERT(lastRead == i);
+ }
+ };
+
+ auto io = [&](TCont* cont) {
+ for (size_t i = 0; i < 10; ++i) {
+ NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead);
+ }
+ };
+
+ exec.Create(compute, "compute");
+ exec.Create(io, "io");
+
+ exec.Execute();
+}
+
+void TCoroTest::TestPollEngines() {
+ bool defaultChecked = false;
+ for (auto engine : GetEnumAllValues<EContPoller>()) {
+ auto poller = IPollerFace::Construct(engine);
+ if (!poller) {
+ continue;
+ }
+
+ TContExecutor exec(32000, IPollerFace::Construct(engine));
+
+ if (engine == EContPoller::Default) {
+ defaultChecked = true;
+ UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine);
+ }
+ }
+
+ UNIT_ASSERT(defaultChecked);
+}
+
+void TCoroTest::TestPause() {
+ TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing()};
+
+ int i = 0;
+ executor.CreateOwned([&](TCont*) {
+ i++;
+ executor.Pause();
+ i++;
+ }, "coro");
+
+ UNIT_ASSERT_EQUAL(i, 0);
+ executor.Execute();
+ UNIT_ASSERT_EQUAL(i, 1);
+ executor.Execute();
+ UNIT_ASSERT_EQUAL(i, 2);
+}
+
+void TCoroTest::TestUserEvent() {
+ TContExecutor exec(32000);
+
+ struct TUserEvent : public IUserEvent {
+ bool Called = false;
+ void Execute() override {
+ Called = true;
+ }
+ } event;
+
+ auto f = [&](TCont* cont) {
+ UNIT_ASSERT(!event.Called);
+ exec.ScheduleUserEvent(&event);
+ UNIT_ASSERT(!event.Called);
+ cont->Yield();
+ UNIT_ASSERT(event.Called);
+ };
+
+ exec.Execute(f);
+
+ UNIT_ASSERT(event.Called);
+}
+
+void TCoroTest::TestOverrideTime() {
+ class TTime: public NCoro::ITime {
+ public:
+ TInstant Now() override {
+ return Current;
+ }
+
+ TInstant Current = TInstant::Zero();
+ };
+
+ TTime time;
+ TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing(), &time};
+
+ executor.CreateOwned([&](TCont* cont) {
+ UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Zero());
+ time.Current = TInstant::Seconds(1);
+ cont->SleepD(TInstant::Seconds(1));
+ UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Seconds(1));
+ }, "coro");
+
+ executor.Execute();
+}
+UNIT_TEST_SUITE_REGISTRATION(TCoroTest);
diff --git a/library/cpp/coroutine/engine/custom_time.h b/library/cpp/coroutine/engine/custom_time.h
new file mode 100644
index 0000000000..0bff0f6a60
--- /dev/null
+++ b/library/cpp/coroutine/engine/custom_time.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <util/datetime/base.h>
+
+namespace NCoro {
+class ITime {
+ public:
+ virtual TInstant Now() = 0;
+};
+}
diff --git a/library/cpp/coroutine/engine/events.h b/library/cpp/coroutine/engine/events.h
new file mode 100644
index 0000000000..07cc4d25e8
--- /dev/null
+++ b/library/cpp/coroutine/engine/events.h
@@ -0,0 +1,148 @@
+#pragma once
+
+#include "impl.h"
+
+#include <util/datetime/base.h>
+
+class TContEvent {
+public:
+ TContEvent(TCont* current) noexcept
+ : Cont_(current)
+ , Status_(0)
+ {
+ }
+
+ ~TContEvent() {
+ }
+
+ int WaitD(TInstant deadline) {
+ Status_ = 0;
+ const int ret = Cont_->SleepD(deadline);
+
+ return Status_ ? Status_ : ret;
+ }
+
+ int WaitT(TDuration timeout) {
+ return WaitD(timeout.ToDeadLine());
+ }
+
+ int WaitI() {
+ return WaitD(TInstant::Max());
+ }
+
+ void Wake() noexcept {
+ SetStatus(EWAKEDUP);
+ Cont_->ReSchedule();
+ }
+
+ TCont* Cont() noexcept {
+ return Cont_;
+ }
+
+ int Status() const noexcept {
+ return Status_;
+ }
+
+ void SetStatus(int status) noexcept {
+ Status_ = status;
+ }
+
+private:
+ TCont* Cont_;
+ int Status_;
+};
+
+class TContWaitQueue {
+ class TWaiter: public TContEvent, public TIntrusiveListItem<TWaiter> {
+ public:
+ TWaiter(TCont* current) noexcept
+ : TContEvent(current)
+ {
+ }
+
+ ~TWaiter() {
+ }
+ };
+
+public:
+ TContWaitQueue() noexcept {
+ }
+
+ ~TContWaitQueue() {
+ Y_ASSERT(Waiters_.Empty());
+ }
+
+ int WaitD(TCont* current, TInstant deadline) {
+ TWaiter waiter(current);
+
+ Waiters_.PushBack(&waiter);
+
+ return waiter.WaitD(deadline);
+ }
+
+ int WaitT(TCont* current, TDuration timeout) {
+ return WaitD(current, timeout.ToDeadLine());
+ }
+
+ int WaitI(TCont* current) {
+ return WaitD(current, TInstant::Max());
+ }
+
+ void Signal() noexcept {
+ if (!Waiters_.Empty()) {
+ Waiters_.PopFront()->Wake();
+ }
+ }
+
+ void BroadCast() noexcept {
+ while (!Waiters_.Empty()) {
+ Waiters_.PopFront()->Wake();
+ }
+ }
+
+ void BroadCast(size_t number) noexcept {
+ for (size_t i = 0; i < number && !Waiters_.Empty(); ++i) {
+ Waiters_.PopFront()->Wake();
+ }
+ }
+
+private:
+ TIntrusiveList<TWaiter> Waiters_;
+};
+
+
+class TContSimpleEvent {
+public:
+ TContSimpleEvent(TContExecutor* e)
+ : E_(e)
+ {
+ }
+
+ TContExecutor* Executor() const noexcept {
+ return E_;
+ }
+
+ void Signal() noexcept {
+ Q_.Signal();
+ }
+
+ void BroadCast() noexcept {
+ Q_.BroadCast();
+ }
+
+ int WaitD(TInstant deadLine) noexcept {
+ return Q_.WaitD(E_->Running(), deadLine);
+ }
+
+ int WaitT(TDuration timeout) noexcept {
+ return WaitD(timeout.ToDeadLine());
+ }
+
+ int WaitI() noexcept {
+ return WaitD(TInstant::Max());
+ }
+
+private:
+ TContWaitQueue Q_;
+ TContExecutor* E_;
+};
diff --git a/library/cpp/coroutine/engine/helper.cpp b/library/cpp/coroutine/engine/helper.cpp
new file mode 100644
index 0000000000..bffe208dc8
--- /dev/null
+++ b/library/cpp/coroutine/engine/helper.cpp
@@ -0,0 +1,37 @@
+#include "helper.h"
+
+#include "impl.h"
+#include "network.h"
+
+namespace NCoro {
+
+ bool TryConnect(const TString& host, ui16 port, TDuration timeout) {
+ bool connected = false;
+
+ auto f = [&connected, &host, port, timeout](TCont* c) {
+ TSocketHolder socket;
+ TNetworkAddress address(host, port);
+ connected = (0 == NCoro::ConnectT(c, socket, address, timeout));
+ };
+
+ TContExecutor e(128 * 1024);
+ e.Create(f, "try_connect");
+ e.Execute();
+ return connected;
+ }
+
+ bool WaitUntilConnectable(const TString& host, ui16 port, TDuration timeout) {
+ const TInstant deadline = timeout.ToDeadLine();
+
+ for (size_t i = 1; Now() < deadline; ++i) {
+ const TDuration waitTime = TDuration::MilliSeconds(100) * i * i;
+ SleepUntil(Min(Now() + waitTime, deadline));
+
+ if (TryConnect(host, port, waitTime)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/library/cpp/coroutine/engine/helper.h b/library/cpp/coroutine/engine/helper.h
new file mode 100644
index 0000000000..ec2711ba52
--- /dev/null
+++ b/library/cpp/coroutine/engine/helper.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include <util/generic/string.h>
+#include <util/datetime/base.h>
+
+namespace NCoro {
+
+ // @brief check that address `host`:`port` is connectable
+ bool TryConnect(const TString& host, ui16 port, TDuration timeout = TDuration::Seconds(1));
+
+ // @brief waits until address `host`:`port` became connectable, but not more than timeout
+ // @return true on success, false if timeout exceeded
+ bool WaitUntilConnectable(const TString& host, ui16 port, TDuration timeout);
+
+}
diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp
new file mode 100644
index 0000000000..7ae6f74051
--- /dev/null
+++ b/library/cpp/coroutine/engine/impl.cpp
@@ -0,0 +1,374 @@
+#include "impl.h"
+
+#include "stack/stack_allocator.h"
+#include "stack/stack_guards.h"
+
+#include <util/generic/scope.h>
+#include <util/thread/singleton.h>
+#include <util/stream/format.h>
+#include <util/stream/output.h>
+#include <util/system/yassert.h>
+
+
+TCont::TJoinWait::TJoinWait(TCont& c) noexcept
+ : Cont_(c)
+{}
+
+void TCont::TJoinWait::Wake() noexcept {
+ Cont_.ReSchedule();
+}
+
+TCont::TCont(NCoro::NStack::IAllocator& allocator,
+ uint32_t stackSize,
+ TContExecutor& executor,
+ NCoro::TTrampoline::TFunc func,
+ const char* name) noexcept
+ : Executor_(executor)
+ , Name_(name)
+ , Trampoline_(
+ allocator,
+ stackSize,
+ std::move(func),
+ this
+ )
+{}
+
+
+void TCont::PrintMe(IOutputStream& out) const noexcept {
+ out << "cont("
+ << "name = " << Name_ << ", "
+ << "addr = " << Hex((size_t)this)
+ << ")";
+}
+
+bool TCont::Join(TCont* c, TInstant deadLine) noexcept {
+ TJoinWait ev(*this);
+ c->Waiters_.PushBack(&ev);
+
+ do {
+ if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) {
+ if (!ev.Empty()) {
+ c->Cancel();
+
+ do {
+ Switch();
+ } while (!ev.Empty());
+ }
+
+ return false;
+ }
+ } while (!ev.Empty());
+
+ return true;
+}
+
+int TCont::SleepD(TInstant deadline) noexcept {
+ TTimerEvent event(this, deadline);
+
+ return ExecuteEvent(&event);
+}
+
+void TCont::Switch() noexcept {
+ Executor()->RunScheduler();
+}
+
+void TCont::Yield() noexcept {
+ if (SleepD(TInstant::Zero())) {
+ ReScheduleAndSwitch();
+ }
+}
+
+void TCont::ReScheduleAndSwitch() noexcept {
+ ReSchedule();
+ Switch();
+}
+
+void TCont::Terminate() {
+ while (!Waiters_.Empty()) {
+ Waiters_.PopFront()->Wake();
+ }
+ Executor()->Exit(this);
+}
+
+bool TCont::IAmRunning() const noexcept {
+ return this == Executor()->Running();
+}
+
+void TCont::Cancel() noexcept {
+ if (Cancelled()) {
+ return;
+ }
+
+ Cancelled_ = true;
+
+ if (!IAmRunning()) {
+ ReSchedule();
+ }
+}
+
+void TCont::ReSchedule() noexcept {
+ if (Cancelled()) {
+ // Legacy code may expect a Cancelled coroutine to be scheduled without delay.
+ Executor()->ScheduleExecutionNow(this);
+ } else {
+ Executor()->ScheduleExecution(this);
+ }
+}
+
+
+TContExecutor::TContExecutor(
+ uint32_t defaultStackSize,
+ THolder<IPollerFace> poller,
+ NCoro::IScheduleCallback* scheduleCallback,
+ NCoro::IEnterPollerCallback* enterPollerCallback,
+ NCoro::NStack::EGuard defaultGuard,
+ TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings,
+ NCoro::ITime* time
+)
+ : ScheduleCallback_(scheduleCallback)
+ , EnterPollerCallback_(enterPollerCallback)
+ , DefaultStackSize_(defaultStackSize)
+ , Poller_(std::move(poller))
+ , Time_(time)
+{
+ StackAllocator_ = NCoro::NStack::GetAllocator(poolSettings, defaultGuard);
+}
+
+TContExecutor::~TContExecutor() {
+ Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_);
+}
+
+void TContExecutor::Execute() noexcept {
+ auto nop = [](void*){};
+ Execute(nop);
+}
+
+void TContExecutor::Execute(TContFunc func, void* arg) noexcept {
+ CreateOwned([=](TCont* cont) {
+ func(cont, arg);
+ }, "sys_main");
+ RunScheduler();
+}
+
+void TContExecutor::WaitForIO() {
+ while (Ready_.Empty() && !WaitQueue_.Empty()) {
+ const auto now = Now();
+
+ // Waking a coroutine puts it into ReadyNext_ list
+ const auto next = WaitQueue_.WakeTimedout(now);
+
+ if (!UserEvents_.Empty()) {
+ TIntrusiveList<IUserEvent> userEvents;
+ userEvents.Swap(UserEvents_);
+ do {
+ userEvents.PopFront()->Execute();
+ } while (!userEvents.Empty());
+ }
+
+ // Polling will return as soon as there is an event to process or a timeout.
+ // If there are woken coroutines we do not want to sleep in the poller
+ // yet still we want to check for new io
+ // to prevent ourselves from locking out of io by constantly waking coroutines.
+
+ if (ReadyNext_.Empty()) {
+ if (EnterPollerCallback_) {
+ EnterPollerCallback_->OnEnterPoller();
+ }
+ Poll(next);
+ if (EnterPollerCallback_) {
+ EnterPollerCallback_->OnExitPoller();
+ }
+ } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) {
+ if (EnterPollerCallback_) {
+ EnterPollerCallback_->OnEnterPoller();
+ }
+ Poll(now);
+ if (EnterPollerCallback_) {
+ EnterPollerCallback_->OnExitPoller();
+ }
+ }
+
+ Ready_.Append(ReadyNext_);
+ }
+}
+
+void TContExecutor::Poll(TInstant deadline) {
+ Poller_.Wait(PollerEvents_, deadline);
+ LastPoll_ = Now();
+
+ // Waking a coroutine puts it into ReadyNext_ list
+ for (auto event : PollerEvents_) {
+ auto* lst = (NCoro::TPollEventList*)event.Data;
+ const int status = event.Status;
+
+ if (status) {
+ for (auto it = lst->Begin(); it != lst->End();) {
+ (it++)->OnPollEvent(status);
+ }
+ } else {
+ const ui16 filter = event.Filter;
+
+ for (auto it = lst->Begin(); it != lst->End();) {
+ if (it->What() & filter) {
+ (it++)->OnPollEvent(0);
+ } else {
+ ++it;
+ }
+ }
+ }
+ }
+}
+
+void TContExecutor::Abort() noexcept {
+ WaitQueue_.Abort();
+ auto visitor = [](TCont* c) {
+ c->Cancel();
+ };
+ Ready_.ForEach(visitor);
+ ReadyNext_.ForEach(visitor);
+}
+
+TCont* TContExecutor::Create(
+ TContFunc func,
+ void* arg,
+ const char* name,
+ TMaybe<ui32> customStackSize
+) noexcept {
+ return CreateOwned([=](TCont* cont) {
+ func(cont, arg);
+ }, name, customStackSize);
+}
+
+TCont* TContExecutor::CreateOwned(
+ NCoro::TTrampoline::TFunc func,
+ const char* name,
+ TMaybe<ui32> customStackSize
+) noexcept {
+ Allocated_ += 1;
+ if (!customStackSize) {
+ customStackSize = DefaultStackSize_;
+ }
+ auto* cont = new TCont(*StackAllocator_, *customStackSize, *this, std::move(func), name);
+ ScheduleExecution(cont);
+ return cont;
+}
+
+NCoro::NStack::TAllocatorStats TContExecutor::GetAllocatorStats() const noexcept {
+ return StackAllocator_->GetStackStats();
+}
+
+void TContExecutor::Release(TCont* cont) noexcept {
+ delete cont;
+ Allocated_ -= 1;
+}
+
+void TContExecutor::ScheduleToDelete(TCont* cont) noexcept {
+ ToDelete_.PushBack(cont);
+}
+
+void TContExecutor::ScheduleExecution(TCont* cont) noexcept {
+ cont->Scheduled_ = true;
+ ReadyNext_.PushBack(cont);
+}
+
+void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept {
+ cont->Scheduled_ = true;
+ Ready_.PushBack(cont);
+}
+
+namespace {
+ inline TContExecutor*& ThisThreadExecutor() {
+ struct TThisThreadExecutorHolder {
+ TContExecutor* Executor = nullptr;
+ };
+ return FastTlsSingletonWithPriority<TThisThreadExecutorHolder, 0>()->Executor;
+ }
+}
+
+void TContExecutor::DeleteScheduled() noexcept {
+ ToDelete_.ForEach([this](TCont* c) {
+ Release(c);
+ });
+}
+
+TCont* RunningCont() {
+ TContExecutor* thisThreadExecutor = ThisThreadExecutor();
+ return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr;
+}
+
+void TContExecutor::RunScheduler() noexcept {
+ try {
+ TContExecutor* const prev = ThisThreadExecutor();
+ ThisThreadExecutor() = this;
+ TCont* caller = Current_;
+ TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_;
+ Y_DEFER {
+ ThisThreadExecutor() = prev;
+ };
+
+ while (true) {
+ if (ScheduleCallback_ && Current_) {
+ ScheduleCallback_->OnUnschedule(*this);
+ }
+
+ WaitForIO();
+ DeleteScheduled();
+ Ready_.Append(ReadyNext_);
+
+ if (Ready_.Empty()) {
+ Current_ = nullptr;
+ if (caller) {
+ context->SwitchTo(&SchedContext_);
+ }
+ break;
+ }
+
+ TCont* cont = Ready_.PopFront();
+
+ if (ScheduleCallback_) {
+ ScheduleCallback_->OnSchedule(*this, *cont);
+ }
+
+ Current_ = cont;
+ cont->Scheduled_ = false;
+ if (cont == caller) {
+ break;
+ }
+ context->SwitchTo(cont->Trampoline_.Context());
+ if (Paused_) {
+ Paused_ = false;
+ Current_ = nullptr;
+ break;
+ }
+ if (caller) {
+ break;
+ }
+ }
+ } catch (...) {
+ TBackTrace::FromCurrentException().PrintTo(Cerr);
+ Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str());
+ }
+}
+
+void TContExecutor::Pause() {
+ if (auto cont = Running()) {
+ Paused_ = true;
+ ScheduleExecutionNow(cont);
+ cont->SwitchTo(&SchedContext_);
+ }
+}
+
+void TContExecutor::Exit(TCont* cont) noexcept {
+ ScheduleToDelete(cont);
+ cont->SwitchTo(&SchedContext_);
+ Y_FAIL("can not return from exit");
+}
+
+TInstant TContExecutor::Now() {
+ return Y_LIKELY(Time_ == nullptr) ? TInstant::Now() : Time_->Now();
+}
+
+template <>
+void Out<TCont>(IOutputStream& out, const TCont& c) {
+ c.PrintMe(out);
+}
diff --git a/library/cpp/coroutine/engine/impl.h b/library/cpp/coroutine/engine/impl.h
new file mode 100644
index 0000000000..283a96ecf1
--- /dev/null
+++ b/library/cpp/coroutine/engine/impl.h
@@ -0,0 +1,313 @@
+#pragma once
+
+#include "callbacks.h"
+#include "cont_poller.h"
+#include "iostatus.h"
+#include "poller.h"
+#include "stack/stack_common.h"
+#include "trampoline.h"
+#include "custom_time.h"
+
+#include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>
+
+#include <util/system/error.h>
+#include <util/generic/ptr.h>
+#include <util/generic/intrlist.h>
+#include <util/datetime/base.h>
+#include <util/generic/maybe.h>
+#include <util/generic/function.h>
+
+
+#define EWAKEDUP 34567
+
+class TCont;
+struct TContRep;
+class TContExecutor;
+class TContPollEvent;
+
+namespace NCoro::NStack {
+ class IAllocator;
+}
+
+class TCont : private TIntrusiveListItem<TCont> {
+ struct TJoinWait: public TIntrusiveListItem<TJoinWait> {
+ TJoinWait(TCont& c) noexcept;
+
+ void Wake() noexcept;
+
+ public:
+ TCont& Cont_;
+ };
+
+ friend class TContExecutor;
+ friend class TIntrusiveListItem<TCont>;
+ friend class NCoro::TEventWaitQueue;
+ friend class NCoro::TTrampoline;
+
+private:
+ TCont(
+ NCoro::NStack::IAllocator& allocator,
+ uint32_t stackSize,
+ TContExecutor& executor,
+ NCoro::TTrampoline::TFunc func,
+ const char* name
+ ) noexcept;
+
+public:
+ TContExecutor* Executor() noexcept {
+ return &Executor_;
+ }
+
+ const TContExecutor* Executor() const noexcept {
+ return &Executor_;
+ }
+
+ const char* Name() const noexcept {
+ return Name_;
+ }
+
+ void PrintMe(IOutputStream& out) const noexcept;
+
+ void Yield() noexcept;
+
+ void ReScheduleAndSwitch() noexcept;
+
+ /// @return ETIMEDOUT on success
+ int SleepD(TInstant deadline) noexcept;
+
+ int SleepT(TDuration timeout) noexcept {
+ return SleepD(timeout.ToDeadLine());
+ }
+
+ int SleepI() noexcept {
+ return SleepD(TInstant::Max());
+ }
+
+ bool IAmRunning() const noexcept;
+
+ void Cancel() noexcept;
+
+ bool Cancelled() const noexcept {
+ return Cancelled_;
+ }
+
+ bool Scheduled() const noexcept {
+ return Scheduled_;
+ }
+
+ bool Join(TCont* c, TInstant deadLine = TInstant::Max()) noexcept;
+
+ void ReSchedule() noexcept;
+
+ void Switch() noexcept;
+
+ void SwitchTo(TExceptionSafeContext* ctx) {
+ Trampoline_.SwitchTo(ctx);
+ }
+
+private:
+ void Terminate();
+
+private:
+ TContExecutor& Executor_;
+
+ // TODO(velavokr): allow name storage owning (for generated names backed by TString)
+ const char* Name_ = nullptr;
+
+ NCoro::TTrampoline Trampoline_;
+
+ TIntrusiveList<TJoinWait> Waiters_;
+ bool Cancelled_ = false;
+ bool Scheduled_ = false;
+};
+
+TCont* RunningCont();
+
+
+template <class Functor>
+static void ContHelperFunc(TCont* cont, void* arg) {
+ (*((Functor*)(arg)))(cont);
+}
+
+template <typename T, void (T::*M)(TCont*)>
+static void ContHelperMemberFunc(TCont* c, void* arg) {
+ ((reinterpret_cast<T*>(arg))->*M)(c);
+}
+
+class IUserEvent
+ : public TIntrusiveListItem<IUserEvent>
+{
+public:
+ virtual ~IUserEvent() = default;
+
+ virtual void Execute() = 0;
+};
+
+/// Central coroutine class.
+/// Note, coroutines are single-threaded, and all methods must be called from the single thread
+class TContExecutor {
+ friend class TCont;
+ using TContList = TIntrusiveList<TCont>;
+
+public:
+ TContExecutor(
+ uint32_t defaultStackSize,
+ THolder<IPollerFace> poller = IPollerFace::Default(),
+ NCoro::IScheduleCallback* = nullptr,
+ NCoro::IEnterPollerCallback* = nullptr,
+ NCoro::NStack::EGuard stackGuard = NCoro::NStack::EGuard::Canary,
+ TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings = Nothing(),
+ NCoro::ITime* time = nullptr
+ );
+
+ ~TContExecutor();
+
+ // if we already have a coroutine to run
+ void Execute() noexcept;
+
+ void Execute(TContFunc func, void* arg = nullptr) noexcept;
+
+ template <class Functor>
+ void Execute(Functor& f) noexcept {
+ Execute((TContFunc)ContHelperFunc<Functor>, (void*)&f);
+ }
+
+ template <typename T, void (T::*M)(TCont*)>
+ void Execute(T* obj) noexcept {
+ Execute(ContHelperMemberFunc<T, M>, obj);
+ }
+
+ template <class Functor>
+ TCont* Create(
+ Functor& f,
+ const char* name,
+ TMaybe<ui32> customStackSize = Nothing()
+ ) noexcept {
+ return Create((TContFunc)ContHelperFunc<Functor>, (void*)&f, name, customStackSize);
+ }
+
+ template <typename T, void (T::*M)(TCont*)>
+ TCont* Create(
+ T* obj,
+ const char* name,
+ TMaybe<ui32> customStackSize = Nothing()
+ ) noexcept {
+ return Create(ContHelperMemberFunc<T, M>, obj, name, customStackSize);
+ }
+
+ TCont* Create(
+ TContFunc func,
+ void* arg,
+ const char* name,
+ TMaybe<ui32> customStackSize = Nothing()
+ ) noexcept;
+
+ TCont* CreateOwned(
+ NCoro::TTrampoline::TFunc func,
+ const char* name,
+ TMaybe<ui32> customStackSize = Nothing()
+ ) noexcept;
+
+ NCoro::TContPoller* Poller() noexcept {
+ return &Poller_;
+ }
+
+ TCont* Running() noexcept {
+ return Current_;
+ }
+
+ const TCont* Running() const noexcept {
+ return Current_;
+ }
+
+ size_t TotalReadyConts() const noexcept {
+ return Ready_.Size() + TotalScheduledConts();
+ }
+
+ size_t TotalScheduledConts() const noexcept {
+ return ReadyNext_.Size();
+ }
+
+ size_t TotalConts() const noexcept {
+ return Allocated_;
+ }
+
+ size_t TotalWaitingConts() const noexcept {
+ return TotalConts() - TotalReadyConts();
+ }
+
+ NCoro::NStack::TAllocatorStats GetAllocatorStats() const noexcept;
+
+ // TODO(velavokr): rename, it is just CancelAll actually
+ void Abort() noexcept;
+
+ void SetFailOnError(bool fail) noexcept {
+ FailOnError_ = fail;
+ }
+
+ bool FailOnError() const noexcept {
+ return FailOnError_;
+ }
+
+ void RegisterInWaitQueue(NCoro::TContPollEvent* event) {
+ WaitQueue_.Register(event);
+ }
+
+ void ScheduleIoWait(TFdEvent* event) {
+ RegisterInWaitQueue(event);
+ Poller_.Schedule(event);
+ }
+
+ void ScheduleIoWait(TTimerEvent* event) noexcept {
+ RegisterInWaitQueue(event);
+ }
+
+ void ScheduleUserEvent(IUserEvent* event) {
+ UserEvents_.PushBack(event);
+ }
+
+ void Pause();
+ TInstant Now();
+private:
+ void Release(TCont* cont) noexcept;
+
+ void Exit(TCont* cont) noexcept;
+
+ void RunScheduler() noexcept;
+
+ void ScheduleToDelete(TCont* cont) noexcept;
+
+ void ScheduleExecution(TCont* cont) noexcept;
+
+ void ScheduleExecutionNow(TCont* cont) noexcept;
+
+ void DeleteScheduled() noexcept;
+
+ void WaitForIO();
+
+ void Poll(TInstant deadline);
+
+private:
+ NCoro::IScheduleCallback* const ScheduleCallback_ = nullptr;
+ NCoro::IEnterPollerCallback* const EnterPollerCallback_ = nullptr;
+ const uint32_t DefaultStackSize_;
+ THolder<NCoro::NStack::IAllocator> StackAllocator_;
+
+ TExceptionSafeContext SchedContext_;
+
+ TContList ToDelete_;
+ TContList Ready_;
+ TContList ReadyNext_;
+ NCoro::TEventWaitQueue WaitQueue_;
+ NCoro::TContPoller Poller_;
+ NCoro::TContPoller::TEvents PollerEvents_;
+ TInstant LastPoll_;
+
+ TIntrusiveList<IUserEvent> UserEvents_;
+
+ size_t Allocated_ = 0;
+ TCont* Current_ = nullptr;
+ bool FailOnError_ = false;
+ bool Paused_ = false;
+ NCoro::ITime* Time_ = nullptr;
+};
diff --git a/library/cpp/coroutine/engine/iostatus.cpp b/library/cpp/coroutine/engine/iostatus.cpp
new file mode 100644
index 0000000000..8298229b39
--- /dev/null
+++ b/library/cpp/coroutine/engine/iostatus.cpp
@@ -0,0 +1 @@
+#include "iostatus.h"
diff --git a/library/cpp/coroutine/engine/iostatus.h b/library/cpp/coroutine/engine/iostatus.h
new file mode 100644
index 0000000000..bf6036805d
--- /dev/null
+++ b/library/cpp/coroutine/engine/iostatus.h
@@ -0,0 +1,91 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+
+class TIOStatus {
+public:
+ TIOStatus(int status) noexcept
+ : Status_(status)
+ {
+ }
+
+ static TIOStatus Error(int status) noexcept {
+ return TIOStatus(status);
+ }
+
+ static TIOStatus Error() noexcept {
+ return TIOStatus(LastSystemError());
+ }
+
+ static TIOStatus Success() noexcept {
+ return TIOStatus(0);
+ }
+
+ void Check() const {
+ if (Status_) {
+ ythrow TSystemError(Status_) << "io error";
+ }
+ }
+
+ bool Failed() const noexcept {
+ return (bool)Status_;
+ }
+
+ bool Succeed() const noexcept {
+ return !Failed();
+ }
+
+ int Status() const noexcept {
+ return Status_;
+ }
+
+private:
+ int Status_;
+};
+
+
+class TContIOStatus {
+public:
+ TContIOStatus(size_t processed, TIOStatus status) noexcept
+ : Processed_(processed)
+ , Status_(status)
+ {
+ }
+
+ static TContIOStatus Error(TIOStatus status) noexcept {
+ return TContIOStatus(0, status);
+ }
+
+ static TContIOStatus Error() noexcept {
+ return TContIOStatus(0, TIOStatus::Error());
+ }
+
+ static TContIOStatus Success(size_t processed) noexcept {
+ return TContIOStatus(processed, TIOStatus::Success());
+ }
+
+ static TContIOStatus Eof() noexcept {
+ return Success(0);
+ }
+
+ ~TContIOStatus() {
+ }
+
+ size_t Processed() const noexcept {
+ return Processed_;
+ }
+
+ int Status() const noexcept {
+ return Status_.Status();
+ }
+
+ size_t Checked() const {
+ Status_.Check();
+
+ return Processed_;
+ }
+
+private:
+ size_t Processed_;
+ TIOStatus Status_;
+};
diff --git a/library/cpp/coroutine/engine/mutex.h b/library/cpp/coroutine/engine/mutex.h
new file mode 100644
index 0000000000..93e9119503
--- /dev/null
+++ b/library/cpp/coroutine/engine/mutex.h
@@ -0,0 +1,49 @@
+#pragma once
+
+#include "impl.h"
+#include "events.h"
+
+class TContMutex {
+public:
+ TContMutex() noexcept
+ : Token_(true)
+ {
+ }
+
+ ~TContMutex() {
+ Y_ASSERT(Token_);
+ }
+
+ int LockD(TCont* current, TInstant deadline) {
+ while (!Token_) {
+ const int ret = WaitQueue_.WaitD(current, deadline);
+
+ if (ret != EWAKEDUP) {
+ return ret;
+ }
+ }
+
+ Token_ = false;
+
+ return 0;
+ }
+
+ int LockT(TCont* current, TDuration timeout) {
+ return LockD(current, timeout.ToDeadLine());
+ }
+
+ int LockI(TCont* current) {
+ return LockD(current, TInstant::Max());
+ }
+
+ void UnLock() noexcept {
+ Y_ASSERT(!Token_);
+
+ Token_ = true;
+ WaitQueue_.Signal();
+ }
+
+private:
+ TContWaitQueue WaitQueue_;
+ bool Token_;
+};
diff --git a/library/cpp/coroutine/engine/network.cpp b/library/cpp/coroutine/engine/network.cpp
new file mode 100644
index 0000000000..85b647d210
--- /dev/null
+++ b/library/cpp/coroutine/engine/network.cpp
@@ -0,0 +1,325 @@
+#include "impl.h"
+#include "network.h"
+
+#include <util/generic/scope.h>
+#include <util/generic/xrange.h>
+
+#include <sys/uio.h>
+
+#if defined(_bionic_)
+# define IOV_MAX 1024
+#endif
+
+
+namespace NCoro {
+ namespace {
+ bool IsBlocked(int lasterr) noexcept {
+ return lasterr == EAGAIN || lasterr == EWOULDBLOCK;
+ }
+
+ ssize_t DoReadVector(SOCKET fd, TContIOVector* vec) noexcept {
+ return readv(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count()));
+ }
+
+ ssize_t DoWriteVector(SOCKET fd, TContIOVector* vec) noexcept {
+ return writev(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count()));
+ }
+ }
+
+
+ int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept {
+ if (cont->Cancelled()) {
+ return ECANCELED;
+ }
+
+ if (nfds == 0) {
+ return 0;
+ }
+
+ TTempArray<TFdEvent> events(nfds);
+
+ for (auto i : xrange(nfds)) {
+ new(events.Data() + i) TFdEvent(cont, fds[i], (ui16) what[i], deadline);
+ }
+
+ Y_DEFER {
+ for (auto i : xrange(nfds)) {
+ (events.Data() + i)->~TFdEvent();
+ }
+ };
+
+ for (auto i : xrange(nfds)) {
+ cont->Executor()->ScheduleIoWait(events.Data() + i);
+ }
+ cont->Switch();
+
+ if (cont->Cancelled()) {
+ return ECANCELED;
+ }
+
+ TFdEvent* ret = nullptr;
+ int status = EINPROGRESS;
+
+ for (auto i : xrange(nfds)) {
+ auto& ev = *(events.Data() + i);
+ switch (ev.Status()) {
+ case EINPROGRESS:
+ break;
+ case ETIMEDOUT:
+ if (status != EINPROGRESS) {
+ break;
+ }
+ [[fallthrough]];
+ default:
+ status = ev.Status();
+ ret = &ev;
+ }
+ }
+
+ if (ret) {
+ if (outfd) {
+ *outfd = ret->Fd();
+ }
+ return ret->Status();
+ }
+
+ return EINPROGRESS;
+ }
+
+ int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept {
+ return SelectD(cont, fds, what, nfds, outfd, timeout.ToDeadLine());
+ }
+
+ int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd) {
+ return SelectD(cont, fds, what, nfds, outfd, TInstant::Max());
+ }
+
+
+ int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept {
+ TFdEvent event(cont, fd, (ui16)what, deadline);
+ return ExecuteEvent(&event);
+ }
+
+ int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept {
+ return PollD(cont, fd, what, timeout.ToDeadLine());
+ }
+
+ int PollI(TCont* cont, SOCKET fd, int what) noexcept {
+ return PollD(cont, fd, what, TInstant::Max());
+ }
+
+
+ TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept {
+ while (true) {
+ ssize_t res = DoReadVector(fd, vec);
+
+ if (res >= 0) {
+ return TContIOStatus::Success((size_t) res);
+ }
+
+ {
+ const int err = LastSystemError();
+
+ if (!IsBlocked(err)) {
+ return TContIOStatus::Error(err);
+ }
+ }
+
+ if ((res = PollD(cont, fd, CONT_POLL_READ, deadline)) != 0) {
+ return TContIOStatus::Error((int) res);
+ }
+ }
+ }
+
+ TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept {
+ return ReadVectorD(cont, fd, vec, timeOut.ToDeadLine());
+ }
+
+ TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept {
+ return ReadVectorD(cont, fd, vec, TInstant::Max());
+ }
+
+
+ TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept {
+ IOutputStream::TPart part(buf, len);
+ TContIOVector vec(&part, 1);
+ return ReadVectorD(cont, fd, &vec, deadline);
+ }
+
+ TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept {
+ return ReadD(cont, fd, buf, len, timeout.ToDeadLine());
+ }
+
+ TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept {
+ return ReadD(cont, fd, buf, len, TInstant::Max());
+ }
+
+
+ TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept {
+ size_t written = 0;
+
+ while (!vec->Complete()) {
+ ssize_t res = DoWriteVector(fd, vec);
+
+ if (res >= 0) {
+ written += res;
+
+ vec->Proceed((size_t) res);
+ } else {
+ {
+ const int err = LastSystemError();
+
+ if (!IsBlocked(err)) {
+ return TContIOStatus(written, err);
+ }
+ }
+
+ if ((res = PollD(cont, fd, CONT_POLL_WRITE, deadline)) != 0) {
+ return TContIOStatus(written, (int) res);
+ }
+ }
+ }
+
+ return TContIOStatus::Success(written);
+ }
+
+ TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept {
+ return WriteVectorD(cont, fd, vec, timeOut.ToDeadLine());
+ }
+
+ TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept {
+ return WriteVectorD(cont, fd, vec, TInstant::Max());
+ }
+
+
+ TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept {
+ IOutputStream::TPart part(buf, len);
+ TContIOVector vec(&part, 1);
+ return WriteVectorD(cont, fd, &vec, deadline);
+ }
+
+ TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept {
+ return WriteD(cont, fd, buf, len, timeout.ToDeadLine());
+ }
+
+ TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept {
+ return WriteD(cont, fd, buf, len, TInstant::Max());
+ }
+
+
+ int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept {
+ TSocketHolder res(Socket(ai));
+
+ if (res.Closed()) {
+ return LastSystemError();
+ }
+
+ const int ret = ConnectD(cont, res, ai.ai_addr, (socklen_t) ai.ai_addrlen, deadline);
+
+ if (!ret) {
+ s.Swap(res);
+ }
+
+ return ret;
+ }
+
+ int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept {
+ int ret = EHOSTUNREACH;
+
+ for (auto it = addr.Begin(); it != addr.End(); ++it) {
+ ret = ConnectD(cont, s, *it, deadline);
+
+ if (ret == 0 || ret == ETIMEDOUT) {
+ return ret;
+ }
+ }
+
+ return ret;
+ }
+
+ int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept {
+ return ConnectD(cont, s, addr, timeout.ToDeadLine());
+ }
+
+ int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept {
+ return ConnectD(cont, s, addr, TInstant::Max());
+ }
+
+ int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept {
+ if (connect(s, name, namelen)) {
+ const int err = LastSystemError();
+
+ if (!IsBlocked(err) && err != EINPROGRESS) {
+ return err;
+ }
+
+ int ret = PollD(cont, s, CONT_POLL_WRITE, deadline);
+
+ if (ret) {
+ return ret;
+ }
+
+ // check if we really connected
+ // FIXME: Unportable ??
+ int serr = 0;
+ socklen_t slen = sizeof(serr);
+
+ ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (char*) &serr, &slen);
+
+ if (ret) {
+ return LastSystemError();
+ }
+
+ if (serr) {
+ return serr;
+ }
+ }
+
+ return 0;
+ }
+
+ int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept {
+ return ConnectD(cont, s, name, namelen, timeout.ToDeadLine());
+ }
+
+ int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept {
+ return ConnectD(cont, s, name, namelen, TInstant::Max());
+ }
+
+
+ int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept {
+ SOCKET ret;
+
+ while ((ret = Accept4(s, addr, addrlen)) == INVALID_SOCKET) {
+ int err = LastSystemError();
+
+ if (!IsBlocked(err)) {
+ return -err;
+ }
+
+ err = PollD(cont, s, CONT_POLL_READ, deadline);
+
+ if (err) {
+ return -err;
+ }
+ }
+
+ return (int) ret;
+ }
+
+ int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept {
+ return AcceptD(cont, s, addr, addrlen, timeout.ToDeadLine());
+ }
+
+ int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept {
+ return AcceptD(cont, s, addr, addrlen, TInstant::Max());
+ }
+
+ SOCKET Socket(int domain, int type, int protocol) noexcept {
+ return Socket4(domain, type, protocol);
+ }
+
+ SOCKET Socket(const struct addrinfo& ai) noexcept {
+ return Socket(ai.ai_family, ai.ai_socktype, ai.ai_protocol);
+ }
+}
diff --git a/library/cpp/coroutine/engine/network.h b/library/cpp/coroutine/engine/network.h
new file mode 100644
index 0000000000..f2c9afe4f8
--- /dev/null
+++ b/library/cpp/coroutine/engine/network.h
@@ -0,0 +1,55 @@
+#pragma once
+
+#include "iostatus.h"
+
+#include <util/datetime/base.h>
+#include <util/network/init.h>
+#include <util/network/iovec.h>
+#include <util/network/nonblock.h>
+#include <util/network/socket.h>
+
+class TCont;
+
+namespace NCoro {
+
+ int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept;
+ int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept;
+ int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd);
+
+ int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept;
+ int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept;
+ int PollI(TCont* cont, SOCKET fd, int what) noexcept;
+
+ TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept;
+ TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept;
+ TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept;
+
+ TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept;
+ TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept;
+ TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept;
+
+ TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept;
+ TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept;
+ TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept;
+
+ TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept;
+ TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept;
+ TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept;
+
+ int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept;
+
+ int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept;
+ int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept;
+ int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept;
+
+ int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept;
+ int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept;
+ int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept;
+
+ int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept;
+ int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept;
+ int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept;
+
+ SOCKET Socket(int domain, int type, int protocol) noexcept;
+ SOCKET Socket(const struct addrinfo& ai) noexcept;
+}
diff --git a/library/cpp/coroutine/engine/poller.cpp b/library/cpp/coroutine/engine/poller.cpp
new file mode 100644
index 0000000000..61164fa56b
--- /dev/null
+++ b/library/cpp/coroutine/engine/poller.cpp
@@ -0,0 +1,390 @@
+#include "poller.h"
+#include "sockmap.h"
+
+#include <util/memory/smallobj.h>
+#include <util/generic/intrlist.h>
+#include <util/generic/singleton.h>
+#include <util/system/env.h>
+#include <util/string/cast.h>
+
+namespace {
+ using TChange = IPollerFace::TChange;
+ using TEvent = IPollerFace::TEvent;
+ using TEvents = IPollerFace::TEvents;
+
+ template <class T>
+ class TUnsafeBuf {
+ public:
+ TUnsafeBuf() noexcept
+ : L_(0)
+ {
+ }
+
+ T* operator~() const noexcept {
+ return B_.Get();
+ }
+
+ size_t operator+() const noexcept {
+ return L_;
+ }
+
+ void Reserve(size_t len) {
+ len = FastClp2(len);
+
+ if (len > L_) {
+ B_.Reset(new T[len]);
+ L_ = len;
+ }
+ }
+
+ private:
+ TArrayHolder<T> B_;
+ size_t L_;
+ };
+
+
+ template <class T>
+ class TVirtualize: public IPollerFace {
+ public:
+ TVirtualize(EContPoller pollerEngine)
+ : PollerEngine_(pollerEngine)
+ {
+ }
+
+ void Set(const TChange& c) override {
+ P_.Set(c);
+ }
+
+ void Wait(TEvents& events, TInstant deadLine) override {
+ P_.Wait(events, deadLine);
+ }
+
+ EContPoller PollEngine() const override {
+ return PollerEngine_;
+ }
+ private:
+ T P_;
+ const EContPoller PollerEngine_;
+ };
+
+
+ template <class T>
+ class TPoller {
+ using TInternalEvent = typename T::TEvent;
+
+ public:
+ TPoller() {
+ E_.Reserve(1);
+ }
+
+ void Set(const TChange& c) {
+ P_.Set(c.Data, c.Fd, c.Flags);
+ }
+
+ void Reserve(size_t size) {
+ E_.Reserve(size);
+ }
+
+ void Wait(TEvents& events, TInstant deadLine) {
+ const size_t ret = P_.WaitD(~E_, +E_, deadLine);
+
+ events.reserve(ret);
+
+ for (size_t i = 0; i < ret; ++i) {
+ const TInternalEvent* ie = ~E_ + i;
+
+ const TEvent e = {
+ T::ExtractEvent(ie),
+ T::ExtractStatus(ie),
+ (ui16)T::ExtractFilter(ie),
+ };
+
+ events.push_back(e);
+ }
+
+ E_.Reserve(ret + 1);
+ }
+
+ private:
+ T P_;
+ TUnsafeBuf<TInternalEvent> E_;
+ };
+
+
+ template <class T>
+ class TIndexedArray {
+ struct TVal:
+ public T,
+ public TIntrusiveListItem<TVal>,
+ public TObjectFromPool<TVal>
+ {
+ // NOTE Constructor must be user-defined (and not =default) here
+ // because TVal objects are created in the UB-capable placement
+ // TObjectFromPool::new operator that stores data in a memory
+ // allocated for the object. Without user defined constructor
+ // zero-initialization takes place in TVal() expression and the
+ // data is overwritten.
+ TVal() {
+ }
+ };
+
+ typedef TIntrusiveList<TVal> TListType;
+
+ public:
+ typedef typename TListType::TIterator TIterator;
+ typedef typename TListType::TConstIterator TConstIterator;
+
+ TIndexedArray()
+ : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
+ {
+ }
+
+ TIterator Begin() noexcept {
+ return I_.Begin();
+ }
+
+ TIterator End() noexcept {
+ return I_.End();
+ }
+
+ TConstIterator Begin() const noexcept {
+ return I_.Begin();
+ }
+
+ TConstIterator End() const noexcept {
+ return I_.End();
+ }
+
+ T& operator[](size_t i) {
+ return *Get(i);
+ }
+
+ T* Get(size_t i) {
+ TValRef& v = V_.Get(i);
+
+ if (Y_UNLIKELY(!v)) {
+ v.Reset(new (&P_) TVal());
+ I_.PushFront(v.Get());
+ }
+
+ Y_PREFETCH_WRITE(v.Get(), 1);
+
+ return v.Get();
+ }
+
+ void Erase(size_t i) noexcept {
+ V_.Get(i).Destroy();
+ }
+
+ size_t Size() const noexcept {
+ return I_.Size();
+ }
+
+ private:
+ using TValRef = THolder<TVal>;
+ typename TVal::TPool P_;
+ TSocketMap<TValRef> V_;
+ TListType I_;
+ };
+
+
+ inline short PollFlags(ui16 flags) noexcept {
+ short ret = 0;
+
+ if (flags & CONT_POLL_READ) {
+ ret |= POLLIN;
+ }
+
+ if (flags & CONT_POLL_WRITE) {
+ ret |= POLLOUT;
+ }
+
+#if defined(_linux_)
+ if (flags & CONT_POLL_RDHUP) {
+ ret |= POLLRDHUP;
+ }
+#endif
+
+ return ret;
+ }
+
+
+ class TPollPoller {
+ public:
+ size_t Size() const noexcept {
+ return S_.Size();
+ }
+
+ template <class T>
+ void Build(T& t) const {
+ for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
+ t.Set(*it);
+ }
+
+ t.Reserve(Size());
+ }
+
+ void Set(const TChange& c) {
+ if (c.Flags) {
+ S_[c.Fd] = c;
+ } else {
+ S_.Erase(c.Fd);
+ }
+ }
+
+ void Wait(TEvents& events, TInstant deadLine) {
+ T_.clear();
+ T_.reserve(Size());
+
+ for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
+ const pollfd pfd = {
+ it->Fd,
+ PollFlags(it->Flags),
+ 0,
+ };
+
+ T_.push_back(pfd);
+ }
+
+ const ssize_t ret = PollD(T_.data(), (nfds_t) T_.size(), deadLine);
+
+ if (ret <= 0) {
+ return;
+ }
+
+ events.reserve(T_.size());
+
+ for (size_t i = 0; i < T_.size(); ++i) {
+ const pollfd& pfd = T_[i];
+ const short ev = pfd.revents;
+
+ if (!ev) {
+ continue;
+ }
+
+ int status = 0;
+ ui16 filter = 0;
+
+ // We are perfectly fine with an EOF while reading a pipe or a unix socket
+ if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) {
+ filter |= CONT_POLL_READ;
+ }
+
+ if (ev & POLLOUT) {
+ filter |= CONT_POLL_WRITE;
+ }
+
+#if defined(_linux_)
+ if (ev & POLLRDHUP) {
+ filter |= CONT_POLL_RDHUP;
+ }
+#endif
+
+ if (ev & POLLERR) {
+ status = EIO;
+ } else if (ev & POLLHUP && pfd.events & POLLOUT) {
+ // Only write operations may cause EPIPE
+ status = EPIPE;
+ } else if (ev & POLLNVAL) {
+ status = EINVAL;
+ }
+
+ if (status) {
+ filter = CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP;
+ }
+
+ const TEvent res = {
+ S_[pfd.fd].Data,
+ status,
+ filter,
+ };
+
+ events.push_back(res);
+ }
+ }
+
+ private:
+ typedef TIndexedArray<TChange> TFds;
+ TFds S_;
+ typedef TVector<pollfd> TPollVec;
+ TPollVec T_;
+ };
+
+
+ class TCombinedPoller {
+ typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller;
+
+ public:
+ TCombinedPoller() {
+ P_.Reset(new TPollPoller());
+ }
+
+ void Set(const TChange& c) {
+ if (!P_) {
+ D_->Set(c);
+ } else {
+ P_->Set(c);
+ }
+ }
+
+ void Wait(TEvents& events, TInstant deadLine) {
+ if (!P_) {
+ D_->Wait(events, deadLine);
+ } else {
+ if (P_->Size() > 200) {
+ D_.Reset(new TDefaultPoller());
+ P_->Build(*D_);
+ P_.Destroy();
+ D_->Wait(events, deadLine);
+ } else {
+ P_->Wait(events, deadLine);
+ }
+ }
+ }
+
+ private:
+ THolder<TPollPoller> P_;
+ THolder<TDefaultPoller> D_;
+ };
+
+ struct TUserPoller: public TString {
+ TUserPoller()
+ : TString(GetEnv("USER_POLLER"))
+ {
+ }
+ };
+}
+
+THolder<IPollerFace> IPollerFace::Default() {
+ return Construct(*SingletonWithPriority<TUserPoller, 0>());
+}
+
+THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) {
+ return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default);
+}
+
+THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) {
+ switch (poller) {
+ case EContPoller::Default:
+ case EContPoller::Combined:
+ return MakeHolder<TVirtualize<TCombinedPoller>>(EContPoller::Combined);
+ case EContPoller::Select:
+ return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller);
+ case EContPoller::Poll:
+ return MakeHolder<TVirtualize<TPollPoller>>(poller);
+ case EContPoller::Epoll:
+#if defined(HAVE_EPOLL_POLLER)
+ return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller);
+#else
+ return nullptr;
+#endif
+ case EContPoller::Kqueue:
+#if defined(HAVE_KQUEUE_POLLER)
+ return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller);
+#else
+ return nullptr;
+#endif
+ default:
+ Y_FAIL("bad poller type");
+ }
+}
diff --git a/library/cpp/coroutine/engine/poller.h b/library/cpp/coroutine/engine/poller.h
new file mode 100644
index 0000000000..8ea012c0fc
--- /dev/null
+++ b/library/cpp/coroutine/engine/poller.h
@@ -0,0 +1,50 @@
+#pragma once
+
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+#include <util/network/socket.h>
+#include <util/network/pollerimpl.h>
+#include <util/datetime/base.h>
+
+enum class EContPoller {
+ Default /* "default" */,
+ Combined /* "combined" */,
+ Select /* "select" */,
+ Poll /* "poll" */,
+ Epoll /* "epoll" */,
+ Kqueue /* "kqueue" */
+};
+
+class IPollerFace {
+public:
+ struct TChange {
+ SOCKET Fd;
+ void* Data;
+ ui16 Flags;
+ };
+
+ struct TEvent {
+ void* Data;
+ int Status;
+ ui16 Filter;
+ };
+
+ using TEvents = TVector<TEvent>;
+
+ virtual ~IPollerFace() {
+ }
+
+ void Set(void* ptr, SOCKET fd, ui16 flags) {
+ const TChange c = {fd, ptr, flags};
+
+ Set(c);
+ }
+
+ virtual void Set(const TChange& change) = 0;
+ virtual void Wait(TEvents& events, TInstant deadLine) = 0;
+ virtual EContPoller PollEngine() const = 0;
+
+ static THolder<IPollerFace> Default();
+ static THolder<IPollerFace> Construct(TStringBuf name);
+ static THolder<IPollerFace> Construct(EContPoller poller);
+};
diff --git a/library/cpp/coroutine/engine/sockmap.h b/library/cpp/coroutine/engine/sockmap.h
new file mode 100644
index 0000000000..fd189e1774
--- /dev/null
+++ b/library/cpp/coroutine/engine/sockmap.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <util/generic/hash.h>
+#include <util/generic/vector.h>
+
+template <class T>
+class TSocketMap {
+public:
+ T& Get(size_t idx) {
+ if (idx < 128000) {
+ if (V_.size() <= idx) {
+ V_.resize(idx + 1);
+ }
+
+ return V_[idx];
+ }
+
+ return H_[idx];
+ }
+
+private:
+ TVector<T> V_;
+ THashMap<size_t, T> H_;
+};
diff --git a/library/cpp/coroutine/engine/sockpool.cpp b/library/cpp/coroutine/engine/sockpool.cpp
new file mode 100644
index 0000000000..b9482e780f
--- /dev/null
+++ b/library/cpp/coroutine/engine/sockpool.cpp
@@ -0,0 +1,58 @@
+#include "sockpool.h"
+
+void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa) {
+ SetSockOpt(sock, SOL_SOCKET, SO_REUSEADDR, 1);
+
+ if (!sa || sa->sa_family == AF_INET) {
+ sockaddr_in s_in;
+ s_in.sin_family = AF_INET;
+ s_in.sin_addr.s_addr = INADDR_ANY;
+ s_in.sin_port = 0;
+
+ if (bind(sock, (struct sockaddr*)&s_in, sizeof(s_in)) == -1) {
+ warn("bind");
+ }
+ } else if (sa->sa_family == AF_INET6) {
+ sockaddr_in6 s_in6(*(const sockaddr_in6*)sa);
+ Zero(s_in6.sin6_addr);
+ s_in6.sin6_port = 0;
+
+ if (bind(sock, (const struct sockaddr*)&s_in6, sizeof s_in6) == -1) {
+ warn("bind6");
+ }
+ } else {
+ Y_ASSERT(0);
+ }
+
+ SetNoDelay(sock, true);
+}
+
+TPooledSocket TSocketPool::AllocateMore(TConnectData* conn) {
+ TCont* cont = conn->Cont;
+
+ while (true) {
+ TSocketHolder s(NCoro::Socket(Addr_->Addr()->sa_family, SOCK_STREAM, 0));
+
+ if (s == INVALID_SOCKET) {
+ ythrow TSystemError(errno) << TStringBuf("can not create socket");
+ }
+
+ SetCommonSockOpts(s, Addr_->Addr());
+ SetZeroLinger(s);
+
+ const int ret = NCoro::ConnectD(cont, s, Addr_->Addr(), Addr_->Len(), conn->DeadLine);
+
+ if (ret == EINTR) {
+ continue;
+ } else if (ret) {
+ ythrow TSystemError(ret) << TStringBuf("can not connect(") << cont->Name() << ')';
+ }
+
+ THolder<TPooledSocket::TImpl> res(new TPooledSocket::TImpl(s, this));
+ s.Release();
+
+ if (res->IsOpen()) {
+ return res.Release();
+ }
+ }
+}
diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h
new file mode 100644
index 0000000000..1ebb7e7b38
--- /dev/null
+++ b/library/cpp/coroutine/engine/sockpool.h
@@ -0,0 +1,253 @@
+#pragma once
+
+#include "impl.h"
+#include "network.h"
+
+#include <util/network/address.h>
+#include <util/network/socket.h>
+#include <util/system/mutex.h>
+
+extern void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa = nullptr);
+
+class TSocketPool;
+
+class TPooledSocket {
+ class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> {
+ public:
+ TImpl(SOCKET fd, TSocketPool* pool) noexcept
+ : Pool_(pool)
+ , IsKeepAlive_(false)
+ , Fd_(fd)
+ {
+ Touch();
+ }
+
+ static void Destroy(TImpl* impl) noexcept {
+ impl->DoDestroy();
+ }
+
+ void DoDestroy() noexcept {
+ if (!Closed() && IsKeepAlive() && IsInGoodState()) {
+ ReturnToPool();
+ } else {
+ delete this;
+ }
+ }
+
+ bool IsKeepAlive() const noexcept {
+ return IsKeepAlive_;
+ }
+
+ void SetKeepAlive(bool ka) {
+ ::SetKeepAlive(Fd_, ka);
+ IsKeepAlive_ = ka;
+ }
+
+ SOCKET Socket() const noexcept {
+ return Fd_;
+ }
+
+ bool Closed() const noexcept {
+ return Fd_.Closed();
+ }
+
+ void Close() noexcept {
+ Fd_.Close();
+ }
+
+ bool IsInGoodState() const noexcept {
+ int err = 0;
+ socklen_t len = sizeof(err);
+
+ getsockopt(Fd_, SOL_SOCKET, SO_ERROR, (char*)&err, &len);
+
+ return !err;
+ }
+
+ bool IsOpen() const noexcept {
+ return IsInGoodState() && IsNotSocketClosedByOtherSide(Fd_);
+ }
+
+ void Touch() noexcept {
+ TouchTime_ = TInstant::Now();
+ }
+
+ const TInstant& LastTouch() const noexcept {
+ return TouchTime_;
+ }
+
+ private:
+ inline void ReturnToPool() noexcept;
+
+ private:
+ TSocketPool* Pool_;
+ bool IsKeepAlive_;
+ TSocketHolder Fd_;
+ TInstant TouchTime_;
+ };
+
+ friend class TSocketPool;
+
+public:
+ TPooledSocket()
+ : Impl_(nullptr)
+ {
+ }
+
+ TPooledSocket(TImpl* impl)
+ : Impl_(impl)
+ {
+ }
+
+ ~TPooledSocket() {
+ if (UncaughtException() && !!Impl_) {
+ Close();
+ }
+ }
+
+ operator SOCKET() const noexcept {
+ return Impl_->Socket();
+ }
+
+ void SetKeepAlive(bool ka) {
+ Impl_->SetKeepAlive(ka);
+ }
+
+ void Close() noexcept {
+ Impl_->Close();
+ }
+
+private:
+ TIntrusivePtr<TImpl> Impl_;
+};
+
+struct TConnectData {
+ TConnectData(TCont* cont, const TInstant& deadLine)
+ : Cont(cont)
+ , DeadLine(deadLine)
+ {
+ }
+
+ TConnectData(TCont* cont, const TDuration& timeOut)
+ : Cont(cont)
+ , DeadLine(TInstant::Now() + timeOut)
+ {
+ }
+
+ TCont* Cont;
+ const TInstant DeadLine;
+};
+
+class TSocketPool {
+ friend class TPooledSocket::TImpl;
+
+public:
+ typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef;
+
+ TSocketPool(int ip, int port)
+ : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port)))
+ {
+ }
+
+ TSocketPool(const TAddrRef& addr)
+ : Addr_(addr)
+ {
+ }
+
+ void EraseStale(const TInstant& maxAge) noexcept {
+ TSockets toDelete;
+
+ {
+ TGuard<TMutex> guard(Mutex_);
+
+ for (TSockets::TIterator it = Pool_.Begin(); it != Pool_.End();) {
+ if (it->LastTouch() < maxAge) {
+ toDelete.PushBack(&*(it++));
+ } else {
+ ++it;
+ }
+ }
+ }
+ }
+
+ TPooledSocket Get(TConnectData* conn) {
+ TPooledSocket ret;
+
+ if (TPooledSocket::TImpl* alive = GetImpl()) {
+ ret = TPooledSocket(alive);
+ } else {
+ ret = AllocateMore(conn);
+ }
+
+ ret.Impl_->Touch();
+
+ return ret;
+ }
+
+ bool GetAlive(TPooledSocket& socket) {
+ if (TPooledSocket::TImpl* alive = GetImpl()) {
+ alive->Touch();
+ socket = TPooledSocket(alive);
+ return true;
+ }
+ return false;
+ }
+
+private:
+ TPooledSocket::TImpl* GetImpl() {
+ TGuard<TMutex> guard(Mutex_);
+
+ while (!Pool_.Empty()) {
+ THolder<TPooledSocket::TImpl> ret(Pool_.PopFront());
+
+ if (ret->IsOpen()) {
+ return ret.Release();
+ }
+ }
+ return nullptr;
+ }
+
+ void Release(TPooledSocket::TImpl* impl) noexcept {
+ TGuard<TMutex> guard(Mutex_);
+
+ Pool_.PushFront(impl);
+ }
+
+ TPooledSocket AllocateMore(TConnectData* conn);
+
+private:
+ TAddrRef Addr_;
+ using TSockets = TIntrusiveListWithAutoDelete<TPooledSocket::TImpl, TDelete>;
+ TSockets Pool_;
+ TMutex Mutex_;
+};
+
+inline void TPooledSocket::TImpl::ReturnToPool() noexcept {
+ Pool_->Release(this);
+}
+
+
+class TContIO: public IInputStream, public IOutputStream {
+public:
+ TContIO(SOCKET fd, TCont* cont)
+ : Fd_(fd)
+ , Cont_(cont)
+ {
+ }
+
+ void DoWrite(const void* buf, size_t len) override {
+ NCoro::WriteI(Cont_, Fd_, buf, len).Checked();
+ }
+
+ size_t DoRead(void* buf, size_t len) override {
+ return NCoro::ReadI(Cont_, Fd_, buf, len).Checked();
+ }
+
+ SOCKET Fd() const noexcept {
+ return Fd_;
+ }
+
+private:
+ SOCKET Fd_;
+ TCont* Cont_;
+};
diff --git a/library/cpp/coroutine/engine/stack/benchmark/alloc_bm.cpp b/library/cpp/coroutine/engine/stack/benchmark/alloc_bm.cpp
new file mode 100644
index 0000000000..38d713d274
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/benchmark/alloc_bm.cpp
@@ -0,0 +1,316 @@
+#include <benchmark/benchmark.h>
+
+#include <util/generic/vector.h>
+#include <util/system/yassert.h>
+
+#include <library/cpp/coroutine/engine/stack/stack_allocator.h>
+#include <library/cpp/coroutine/engine/stack/stack_guards.h>
+#include <library/cpp/coroutine/engine/stack/stack_pool.h>
+#include <library/cpp/coroutine/engine/stack/stack_utils.h>
+
+
+namespace NCoro::NStack::NBenchmark {
+
+ const char* TestCoroName = "any_name";
+ constexpr uint64_t BigCoroSize = PageSize * 25;
+ constexpr uint64_t SmallCoroSize = PageSize * 4;
+ constexpr uint64_t ManyStacks = 4096;
+
+ void BasicOperations(TStackHolder& stack) {
+ Y_VERIFY(!stack.Get().empty());
+ stack.LowerCanaryOk();
+ stack.UpperCanaryOk();
+ }
+
+ void WriteStack(TStackHolder& stack) {
+ auto memory = stack.Get();
+ Y_VERIFY(!memory.empty());
+ stack.LowerCanaryOk();
+ stack.UpperCanaryOk();
+ for (uint64_t i = PageSize / 2; i < memory.size(); i += PageSize * 2) {
+ memory[i] = 42;
+ }
+ }
+
+ static void BM_GetAlignedMemory(benchmark::State& state) {
+ char* raw = nullptr;
+ char* aligned = nullptr;
+ for (auto _ : state) {
+ if (NCoro::NStack::GetAlignedMemory(state.range(0), raw, aligned)) {
+ free(raw);
+ }
+ }
+ }
+ BENCHMARK(BM_GetAlignedMemory)->RangeMultiplier(16)->Range(1, 1024 * 1024);
+
+ static void BM_GetAlignedMemoryReleaseRss(benchmark::State& state) {
+ char* raw = nullptr;
+ char* aligned = nullptr;
+ for (auto _ : state) {
+ if (NCoro::NStack::GetAlignedMemory(state.range(0), raw, aligned)) {
+ const auto toFree = state.range(0) > 2 ? state.range(0) - 2 : 1;
+ ReleaseRss(aligned, toFree);
+ free(raw);
+ }
+ }
+ }
+ BENCHMARK(BM_GetAlignedMemoryReleaseRss)->RangeMultiplier(16)->Range(1, 1024 * 1024);
+
+ static void BM_PoolAllocator(benchmark::State& state) {
+ auto allocator = GetAllocator(TPoolAllocatorSettings{}, (EGuard)state.range(0));
+ for (auto _ : state) {
+ TStackHolder stack(*allocator, state.range(1), TestCoroName);
+ BasicOperations(stack);
+ }
+ }
+ BENCHMARK(BM_PoolAllocator)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize});
+
+ static void BM_DefaultAllocator(benchmark::State& state) {
+ auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0));
+ for (auto _ : state) {
+ TStackHolder stack(*allocator, state.range(1), TestCoroName);
+ BasicOperations(stack);
+ }
+ }
+ BENCHMARK(BM_DefaultAllocator)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize});
+
+ static void BM_PoolAllocatorManyStacksOneAtTime(benchmark::State& state) {
+ TPoolAllocatorSettings settings;
+ settings.StacksPerChunk = state.range(2);
+ auto allocator = GetAllocator(settings, (EGuard)state.range(0));
+ for (auto _ : state) {
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ TStackHolder stack(*allocator, state.range(1), TestCoroName);
+ BasicOperations(stack);
+ }
+ }
+ }
+ BENCHMARK(BM_PoolAllocatorManyStacksOneAtTime)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 1})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1})
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1024});
+
+ static void BM_DefaultAllocatorManyStacksOneAtTime(benchmark::State& state) {
+ auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0));
+ for (auto _ : state) {
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ TStackHolder stack(*allocator, state.range(1), TestCoroName);
+ BasicOperations(stack);
+ }
+ }
+ }
+ BENCHMARK(BM_DefaultAllocatorManyStacksOneAtTime)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize});
+
+ static void BM_PoolAllocatorManyStacks(benchmark::State& state) {
+ TPoolAllocatorSettings settings;
+ settings.StacksPerChunk = state.range(2);
+ auto allocator = GetAllocator(settings, (EGuard)state.range(0));
+ TVector<TStackHolder> stacks; // store stacks during benchmark
+ stacks.reserve(ManyStacks);
+ for (auto _ : state) {
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.emplace_back(*allocator, state.range(1), TestCoroName);
+ BasicOperations(stacks.back());
+ }
+ }
+ }
+ BENCHMARK(BM_PoolAllocatorManyStacks)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 1})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1})
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1024});
+
+ static void BM_DefaultAllocatorManyStacks(benchmark::State& state) {
+ auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0));
+ TVector<TStackHolder> stacks; // store stacks during benchmark
+ stacks.reserve(ManyStacks);
+ for (auto _ : state) {
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName));
+ BasicOperations(stacks.back());
+ }
+ }
+ }
+ BENCHMARK(BM_DefaultAllocatorManyStacks)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize});
+
+ // ------------------------------------------------------------------------
+ static void BM_PoolAllocatorManyStacksReleased(benchmark::State& state) {
+ TPoolAllocatorSettings settings;
+ settings.StacksPerChunk = state.range(2);
+ auto allocator = GetAllocator(settings, (EGuard)state.range(0));
+ TVector<TStackHolder> stacks; // store stacks during benchmark
+ stacks.reserve(ManyStacks);
+ for (auto _ : state) {
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.emplace_back(*allocator, state.range(1), TestCoroName);
+ BasicOperations(stacks.back());
+ }
+ stacks.clear();
+ }
+ }
+ BENCHMARK(BM_PoolAllocatorManyStacksReleased)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 1})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1})
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1024});
+
+ static void BM_DefaultAllocatorManyStacksReleased(benchmark::State& state) {
+ auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0));
+ TVector<TStackHolder> stacks; // store stacks during benchmark
+ stacks.reserve(ManyStacks);
+ for (auto _ : state) {
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName));
+ BasicOperations(stacks.back());
+ }
+ stacks.clear();
+ }
+ }
+ BENCHMARK(BM_DefaultAllocatorManyStacksReleased)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize});
+
+ // ------------------------------------------------------------------------
+ static void BM_PoolAllocatorManyStacksReleasedAndRealloc(benchmark::State& state) {
+ TPoolAllocatorSettings settings;
+ settings.StacksPerChunk = state.range(2);
+ auto allocator = GetAllocator(settings, (EGuard)state.range(0));
+ TVector<TStackHolder> stacks; // store stacks during benchmark
+ stacks.reserve(ManyStacks);
+ for (auto _ : state) {
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.emplace_back(*allocator, state.range(1), TestCoroName);
+ BasicOperations(stacks.back());
+ }
+ stacks.clear();
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.emplace_back(*allocator, state.range(1), TestCoroName);
+ BasicOperations(stacks.back());
+ }
+ }
+ }
+ BENCHMARK(BM_PoolAllocatorManyStacksReleasedAndRealloc)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 1})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1})
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 8192})
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 8192})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 8192})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 8192});
+
+ static void BM_DefaultAllocatorManyStacksReleasedAndRealloc(benchmark::State& state) {
+ auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0));
+ TVector<TStackHolder> stacks; // store stacks during benchmark
+ stacks.reserve(ManyStacks);
+ for (auto _ : state) {
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName));
+ BasicOperations(stacks.back());
+ }
+ stacks.clear();
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName));
+ BasicOperations(stacks.back());
+ }
+ }
+ }
+ BENCHMARK(BM_DefaultAllocatorManyStacksReleasedAndRealloc)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize});
+
+ // ------------------------------------------------------------------------
+ static void BM_PoolAllocatorManyStacksMemoryWriteReleasedAndRealloc(benchmark::State& state) {
+ TPoolAllocatorSettings settings;
+ settings.StacksPerChunk = state.range(2);
+ auto allocator = GetAllocator(settings, (EGuard)state.range(0));
+ TVector<TStackHolder> stacks; // store stacks during benchmark
+ stacks.reserve(ManyStacks);
+ for (auto _ : state) {
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.emplace_back(*allocator, state.range(1), TestCoroName);
+ WriteStack(stacks.back());
+ }
+ stacks.clear();
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.emplace_back(*allocator, state.range(1), TestCoroName);
+ WriteStack(stacks.back());
+ }
+ }
+ }
+ BENCHMARK(BM_PoolAllocatorManyStacksMemoryWriteReleasedAndRealloc)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 1})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1})
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1024})
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize, 8192})
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 8192})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize, 8192})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize, 8192});
+
+ static void BM_DefaultAllocatorManyStacksMemoryWriteReleasedAndRealloc(benchmark::State& state) {
+ auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0));
+ TVector<TStackHolder> stacks; // store stacks during benchmark
+ stacks.reserve(ManyStacks);
+ for (auto _ : state) {
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName));
+ WriteStack(stacks.back());
+ }
+ stacks.clear();
+ for (uint64_t i = 0; i < ManyStacks; ++i) {
+ stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName));
+ WriteStack(stacks.back());
+ }
+ }
+ }
+ BENCHMARK(BM_DefaultAllocatorManyStacksMemoryWriteReleasedAndRealloc)
+ ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported
+ ->Args({(int64_t)EGuard::Canary, SmallCoroSize})
+ ->Args({(int64_t)EGuard::Page, BigCoroSize})
+ ->Args({(int64_t)EGuard::Page, SmallCoroSize});
+
+}
+
+BENCHMARK_MAIN();
diff --git a/library/cpp/coroutine/engine/stack/benchmark/ya.make b/library/cpp/coroutine/engine/stack/benchmark/ya.make
new file mode 100644
index 0000000000..b2942fe8ca
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/benchmark/ya.make
@@ -0,0 +1,13 @@
+G_BENCHMARK()
+
+OWNER(g:balancer)
+
+SRCS(
+ alloc_bm.cpp
+)
+
+PEERDIR(
+ library/cpp/coroutine/engine
+)
+
+END() \ No newline at end of file
diff --git a/library/cpp/coroutine/engine/stack/stack.cpp b/library/cpp/coroutine/engine/stack/stack.cpp
new file mode 100644
index 0000000000..e29450261d
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack.cpp
@@ -0,0 +1,67 @@
+#include "stack.h"
+
+#include "stack_allocator.h"
+#include "stack_guards.h"
+
+
+namespace NCoro::NStack {
+
+namespace NDetails {
+
+ TStack::TStack(void* rawMemory, void* alignedMemory, uint64_t alignedSize, const char* /*name*/)
+ : RawMemory_((char*)rawMemory)
+ , AlignedMemory_((char*)alignedMemory)
+ , Size_(alignedSize)
+ {
+ Y_ASSERT(AlignedMemory_ && RawMemory_ && Size_);
+ Y_ASSERT(!(Size_ & PageSizeMask));
+ Y_ASSERT(!((uint64_t)AlignedMemory_ & PageSizeMask));
+ }
+
+ TStack::TStack(TStack&& rhs) noexcept
+ : RawMemory_(rhs.RawMemory_)
+ , AlignedMemory_(rhs.AlignedMemory_)
+ , Size_(rhs.Size_)
+ {
+ rhs.Reset();
+ }
+
+ TStack& TStack::operator=(TStack&& rhs) noexcept {
+ std::swap(*this, rhs);
+ rhs.Reset();
+ return *this;
+ }
+
+ void TStack::Reset() noexcept {
+ Y_ASSERT(AlignedMemory_ && RawMemory_ && Size_);
+
+ RawMemory_ = nullptr;
+ AlignedMemory_ = nullptr;
+ Size_ = 0;
+ }
+
+} // namespace NDetails
+
+
+ TStackHolder::TStackHolder(NStack::IAllocator& allocator, uint32_t size, const char* name) noexcept
+ : Allocator_(allocator)
+ , Stack_(Allocator_.AllocStack(size, name))
+ {}
+
+ TStackHolder::~TStackHolder() {
+ Allocator_.FreeStack(Stack_);
+ }
+
+ TArrayRef<char> TStackHolder::Get() noexcept {
+ return Allocator_.GetStackWorkspace(Stack_.GetAlignedMemory(), Stack_.GetSize());
+ }
+
+ bool TStackHolder::LowerCanaryOk() const noexcept {
+ return Allocator_.CheckStackOverflow(Stack_.GetAlignedMemory());
+ }
+
+ bool TStackHolder::UpperCanaryOk() const noexcept {
+ return Allocator_.CheckStackOverride(Stack_.GetAlignedMemory(), Stack_.GetSize());
+ }
+
+}
diff --git a/library/cpp/coroutine/engine/stack/stack.h b/library/cpp/coroutine/engine/stack/stack.h
new file mode 100644
index 0000000000..7d98ba4c68
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack.h
@@ -0,0 +1,77 @@
+#pragma once
+
+#include <util/generic/array_ref.h>
+#include <util/generic/fwd.h>
+#include <util/generic/noncopyable.h>
+
+#include <cstdint>
+
+
+namespace NCoro::NStack {
+
+ class IAllocator;
+
+namespace NDetails {
+
+ //! Do not use directly, use TStackHolder instead
+ class TStack final : private TMoveOnly {
+ public:
+ /*! rawMemory: can be used by unaligned allocator to free stack memory after use
+ * alignedMemory: pointer to aligned memory on which stack workspace and guard are actually placed
+ * alignedSize: size of workspace memory + memory for guard
+ * guard: guard to protect this stack
+ * name: name of coroutine for which this stack is allocated
+ */
+ TStack(void* rawMemory, void* alignedMemory, uint64_t alignedSize, const char* name);
+ TStack(TStack&& rhs) noexcept;
+ TStack& operator=(TStack&& rhs) noexcept;
+
+ char* GetRawMemory() const noexcept {
+ return RawMemory_;
+ }
+
+ char* GetAlignedMemory() const noexcept {
+ return AlignedMemory_;
+ }
+
+ //! Stack size (includes memory for guard)
+ uint64_t GetSize() const noexcept {
+ return Size_;
+ }
+
+ //! Resets parameters, should be called after stack memory is freed
+ void Reset() noexcept;
+
+ private:
+ char* RawMemory_ = nullptr; // not owned
+ char* AlignedMemory_ = nullptr; // not owned
+ uint64_t Size_ = 0;
+ };
+
+} // namespace NDetails
+
+ class TStackHolder final : private TMoveOnly {
+ public:
+ explicit TStackHolder(IAllocator& allocator, uint32_t size, const char* name) noexcept;
+ TStackHolder(TStackHolder&&) = default;
+ TStackHolder& operator=(TStackHolder&&) = default;
+
+ ~TStackHolder();
+
+ char* GetAlignedMemory() const noexcept {
+ return Stack_.GetAlignedMemory();
+ }
+ uint64_t GetSize() const noexcept {
+ return Stack_.GetSize();
+ }
+
+ TArrayRef<char> Get() noexcept;
+ bool LowerCanaryOk() const noexcept;
+ bool UpperCanaryOk() const noexcept;
+
+ private:
+ IAllocator& Allocator_;
+ NDetails::TStack Stack_;
+ };
+
+}
diff --git a/library/cpp/coroutine/engine/stack/stack_allocator.cpp b/library/cpp/coroutine/engine/stack/stack_allocator.cpp
new file mode 100644
index 0000000000..bf12134e6b
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_allocator.cpp
@@ -0,0 +1,26 @@
+#include "stack_allocator.h"
+
+
+namespace NCoro::NStack {
+
+ THolder<IAllocator> GetAllocator(TMaybe<TPoolAllocatorSettings> poolSettings, EGuard guardType) {
+ THolder<IAllocator> allocator;
+ if (poolSettings) {
+ if (guardType == EGuard::Canary) {
+ allocator = MakeHolder<TPoolAllocator<TCanaryGuard>>(*poolSettings);
+ } else {
+ Y_ASSERT(guardType == EGuard::Page);
+ allocator = MakeHolder<TPoolAllocator<TPageGuard>>(*poolSettings);
+ }
+ } else {
+ if (guardType == EGuard::Canary) {
+ allocator = MakeHolder<TSimpleAllocator<TCanaryGuard>>();
+ } else {
+ Y_ASSERT(guardType == EGuard::Page);
+ allocator = MakeHolder<TSimpleAllocator<TPageGuard>>();
+ }
+ }
+ return allocator;
+ }
+
+} \ No newline at end of file
diff --git a/library/cpp/coroutine/engine/stack/stack_allocator.h b/library/cpp/coroutine/engine/stack/stack_allocator.h
new file mode 100644
index 0000000000..da3c3a93a1
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_allocator.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#include "stack.h"
+#include "stack_common.h"
+
+#include <util/generic/maybe.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+
+#include <cstdint>
+
+
+namespace NCoro::NStack {
+
+ class IAllocator : private TNonCopyable {
+ public:
+ virtual ~IAllocator() = default;
+
+ //! Size should be page-aligned. Stack would be protected by guard, thus, actual
+ //! workspace for stack = size - size of guard.
+ NDetails::TStack AllocStack(uint64_t size, const char* name) {
+ uint64_t alignedSize = (size + PageSize - 1) & ~PageSizeMask;
+ Y_ASSERT(alignedSize < 10 * 1024 * PageSize); // more than 10K pages for stack - do you really need it?
+#if defined(_san_enabled_) || !defined(NDEBUG)
+ alignedSize *= DebugOrSanStackMultiplier;
+#endif
+ return DoAllocStack(alignedSize, name);
+ }
+
+ void FreeStack(NDetails::TStack& stack) noexcept {
+ if (stack.GetAlignedMemory()) {
+ DoFreeStack(stack);
+ }
+ };
+
+ virtual TAllocatorStats GetStackStats() const noexcept = 0;
+
+ // Stack helpers
+ virtual TArrayRef<char> GetStackWorkspace(void* stack, uint64_t size) noexcept = 0;
+ virtual bool CheckStackOverflow(void* stack) const noexcept = 0;
+ virtual bool CheckStackOverride(void* stack, uint64_t size) const noexcept = 0;
+
+ private:
+ virtual NDetails::TStack DoAllocStack(uint64_t size, const char* name) = 0;
+ virtual void DoFreeStack(NDetails::TStack& stack) noexcept = 0;
+ };
+
+ THolder<IAllocator> GetAllocator(TMaybe<TPoolAllocatorSettings> poolSettings, EGuard guardType);
+
+}
+
+#include "stack_allocator.inl"
diff --git a/library/cpp/coroutine/engine/stack/stack_allocator.inl b/library/cpp/coroutine/engine/stack/stack_allocator.inl
new file mode 100644
index 0000000000..0f25a4167b
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_allocator.inl
@@ -0,0 +1,138 @@
+#include "stack_guards.h"
+#include "stack_pool.h"
+#include "stack_utils.h"
+
+#include <util/generic/hash.h>
+
+#ifdef _linux_
+#include <unistd.h>
+#endif
+
+
+namespace NCoro::NStack {
+
+ template<typename TGuard>
+ class TPoolAllocator final : public IAllocator {
+ public:
+ explicit TPoolAllocator(const TPoolAllocatorSettings& settings);
+
+ TArrayRef<char> GetStackWorkspace(void* stack, uint64_t size) noexcept override {
+ return Guard_.GetWorkspace(stack, size);
+ }
+ bool CheckStackOverflow(void* stack) const noexcept override {
+ return Guard_.CheckOverflow(stack);
+ }
+ bool CheckStackOverride(void* stack, uint64_t size) const noexcept override {
+ return Guard_.CheckOverride(stack, size);
+ }
+
+ TAllocatorStats GetStackStats() const noexcept override {
+ TAllocatorStats stats;
+ for (const auto& i : Pools_) {
+ stats.ReleasedSize += i.second.GetReleasedSize();
+ stats.NotReleasedSize += i.second.GetFullSize();
+ stats.NumOfAllocated += i.second.GetNumOfAllocated();
+ }
+ return stats;
+ }
+
+ private: // methods
+ NDetails::TStack DoAllocStack(uint64_t size, const char* name) override;
+ void DoFreeStack(NDetails::TStack& stack) noexcept override;
+
+ private: // data
+ const TPoolAllocatorSettings PoolSettings_;
+ const TGuard& Guard_;
+ THashMap<uint64_t, TPool<TGuard>> Pools_; // key - stack size
+ };
+
+ template<typename TGuard>
+ TPoolAllocator<TGuard>::TPoolAllocator(const TPoolAllocatorSettings& settings)
+ : PoolSettings_(settings)
+ , Guard_(GetGuard<TGuard>())
+ {
+#ifdef _linux_
+ Y_VERIFY(sysconf(_SC_PAGESIZE) == PageSize);
+#endif
+ }
+
+ template<typename TGuard>
+ NDetails::TStack TPoolAllocator<TGuard>::DoAllocStack(uint64_t alignedSize, const char* name) {
+ Y_ASSERT(alignedSize > Guard_.GetSize());
+
+ auto pool = Pools_.find(alignedSize);
+ if (pool == Pools_.end()) {
+ Y_ASSERT(Pools_.size() < 1000); // too many different sizes for coroutine stacks
+ auto [newPool, success] = Pools_.emplace(alignedSize, TPool<TGuard>{alignedSize, PoolSettings_, Guard_});
+ Y_VERIFY(success, "Failed to add new coroutine pool");
+ pool = newPool;
+ }
+ return pool->second.AllocStack(name);
+ }
+
+ template<typename TGuard>
+ void TPoolAllocator<TGuard>::DoFreeStack(NDetails::TStack& stack) noexcept {
+ auto pool = Pools_.find(stack.GetSize());
+ Y_VERIFY(pool != Pools_.end(), "Attempt to free stack from another allocator");
+ pool->second.FreeStack(stack);
+ }
+
+ // ------------------------------------------------------------------------
+ //
+ template<typename TGuard>
+ class TSimpleAllocator : public IAllocator {
+ public:
+ explicit TSimpleAllocator();
+
+ TArrayRef<char> GetStackWorkspace(void* stack, uint64_t size) noexcept override {
+ return Guard_.GetWorkspace(stack, size);
+ }
+ bool CheckStackOverflow(void* stack) const noexcept override {
+ return Guard_.CheckOverflow(stack);
+ }
+ bool CheckStackOverride(void* stack, uint64_t size) const noexcept override {
+ return Guard_.CheckOverride(stack, size);
+ }
+
+ TAllocatorStats GetStackStats() const noexcept override { return {}; } // not used for simple allocator
+
+ private: // methods
+ NDetails::TStack DoAllocStack(uint64_t size, const char* name) override;
+ void DoFreeStack(NDetails::TStack& stack) noexcept override;
+
+ private: // data
+ const TGuard& Guard_;
+ };
+
+
+ template<typename TGuard>
+ TSimpleAllocator<TGuard>::TSimpleAllocator()
+ : Guard_(GetGuard<TGuard>())
+ {}
+
+ template<typename TGuard>
+ NDetails::TStack TSimpleAllocator<TGuard>::DoAllocStack(uint64_t alignedSize, const char* name) {
+ Y_ASSERT(alignedSize > Guard_.GetSize());
+
+ char* rawPtr = nullptr;
+ char* alignedPtr = nullptr; // with extra space for previous guard in this type of allocator
+
+ Y_VERIFY(GetAlignedMemory((alignedSize + Guard_.GetPageAlignedSize()) / PageSize, rawPtr, alignedPtr)); // + memory for previous guard
+ char* alignedStackMemory = alignedPtr + Guard_.GetPageAlignedSize(); // after previous guard
+
+ // Default allocator sets both guards, because it doesn't have memory chunk with previous stack and guard on it
+ Guard_.Protect((void*)alignedPtr, Guard_.GetPageAlignedSize(), false); // first guard should be before stack memory
+ Guard_.Protect(alignedStackMemory, alignedSize, true); // second guard is placed on stack memory
+
+ return NDetails::TStack{rawPtr, alignedStackMemory, alignedSize, name};
+ }
+
+ template<typename TGuard>
+ void TSimpleAllocator<TGuard>::DoFreeStack(NDetails::TStack& stack) noexcept {
+ Guard_.RemoveProtection(stack.GetAlignedMemory() - Guard_.GetPageAlignedSize(), Guard_.GetPageAlignedSize());
+ Guard_.RemoveProtection(stack.GetAlignedMemory(), stack.GetSize());
+
+ free(stack.GetRawMemory());
+ stack.Reset();
+ }
+}
diff --git a/library/cpp/coroutine/engine/stack/stack_common.h b/library/cpp/coroutine/engine/stack/stack_common.h
new file mode 100644
index 0000000000..ed2d74d296
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_common.h
@@ -0,0 +1,35 @@
+#pragma once
+
+#include <cstdint>
+
+class TContExecutor;
+
+namespace NCoro::NStack {
+
+ static constexpr uint64_t PageSize = 4096;
+ static constexpr uint64_t PageSizeMask = PageSize - 1; // for checks
+ static constexpr uint64_t DebugOrSanStackMultiplier = 4; // for debug or sanitizer builds
+ static constexpr uint64_t SmallStackMaxSizeInPages = 6;
+
+ enum class EGuard {
+ Canary, //!< writes some data to check it for corruption
+ Page, //!< prohibits access to page memory
+ };
+
+ struct TPoolAllocatorSettings {
+ uint64_t RssPagesToKeep = 1;
+ uint64_t SmallStackRssPagesToKeep = 1; // for stack less than SmallStackMaxSizeInPages
+ uint64_t ReleaseRate = 2;
+#if !defined(_san_enabled_) && defined(NDEBUG)
+ uint64_t StacksPerChunk = 256;
+#else
+ uint64_t StacksPerChunk = 2;
+#endif
+ };
+
+ struct TAllocatorStats {
+ uint64_t ReleasedSize = 0;
+ uint64_t NotReleasedSize = 0;
+ uint64_t NumOfAllocated = 0;
+ };
+}
diff --git a/library/cpp/coroutine/engine/stack/stack_guards.cpp b/library/cpp/coroutine/engine/stack/stack_guards.cpp
new file mode 100644
index 0000000000..b8bcff039e
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_guards.cpp
@@ -0,0 +1,17 @@
+#include "stack_guards.h"
+
+
+namespace NCoro::NStack {
+
+ template<>
+ const TCanaryGuard& GetGuard<TCanaryGuard>() noexcept {
+ static const TCanaryGuard guard;
+ return guard;
+ }
+
+ template<>
+ const TPageGuard& GetGuard<TPageGuard>() noexcept {
+ static const TPageGuard guard;
+ return guard;
+ }
+} \ No newline at end of file
diff --git a/library/cpp/coroutine/engine/stack/stack_guards.h b/library/cpp/coroutine/engine/stack/stack_guards.h
new file mode 100644
index 0000000000..3a7ef26481
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_guards.h
@@ -0,0 +1,123 @@
+#pragma once
+
+#include "stack_common.h"
+
+#include <util/generic/array_ref.h>
+#include <util/generic/strbuf.h>
+#include <util/system/protect.h>
+
+
+namespace NCoro::NStack {
+
+ /*! Guard detect stack overflow/override, by setting memory before and after stack with predefined values/properties.
+ * Actually, it sets memory only after the end of stack workspace memory - previous guard section should be set
+ * already (for previous stack in case of pool allocator) and can be checked on demand.
+ * Stack pointer should be page-aligned.
+ */
+
+
+ //! Checks integrity by writing a predefined sequence and comparing it with original
+ class TCanaryGuard final {
+ public:
+ //! Size of guard section in bytes
+ static constexpr uint64_t GetSize() { return Canary.size(); }
+ //! Size of page-aligned guard section in bytes
+ static constexpr uint64_t GetPageAlignedSize() { return AlignedSize_; }
+
+ //! Get stack memory between guard sections
+ static TArrayRef<char> GetWorkspace(void* stack, uint64_t size) noexcept {
+ Y_ASSERT( !((uint64_t)stack & PageSizeMask) );
+ Y_ASSERT( !(size & PageSizeMask) );
+ Y_ASSERT(size > Canary.size());
+
+ return {(char*) stack, size - Canary.size()};
+ }
+
+ /*! Set guard section before the end of stack memory (at stack + size - guard size position)
+ * checkPrevious: check guard before stack memory for integrity
+ */
+ static void Protect(void* stack, uint64_t size, bool checkPrevious) noexcept {
+ Y_ASSERT( !((uint64_t)stack & PageSizeMask) ); // stack pointer should be page aligned
+ Y_ASSERT( !(size & PageSizeMask) ); // stack size should be page aligned
+ Y_ASSERT(size >= Canary.size()); // stack should have enough space to place guard
+
+ if (checkPrevious) {
+ Y_VERIFY(CheckOverflow(stack), "Previous stack was corrupted");
+ }
+ auto guardPos = (char*) stack + size - Canary.size();
+ memcpy(guardPos, Canary.data(), Canary.size());
+ }
+
+ //! This guard doesn't change memory flags
+ static constexpr void RemoveProtection(void*, uint64_t) {}
+ //! Should remove protection before returning memory to system
+ static constexpr bool ShouldRemoveProtectionBeforeFree() { return false; }
+
+ static bool CheckOverflow(void* stack) noexcept {
+ Y_ASSERT(stack);
+
+ char* guardPos = (char*) ((uint64_t)stack - Canary.size());
+ return TStringBuf(guardPos, Canary.size()) == Canary;
+ }
+
+ static bool CheckOverride(void* stack, uint64_t size) noexcept {
+ Y_ASSERT(stack);
+ Y_ASSERT(size > Canary.size());
+
+ char* guardPos = (char*) ((uint64_t)stack + size - Canary.size());
+ return TStringBuf(guardPos, Canary.size()) == Canary;
+ }
+
+ private:
+ static constexpr TStringBuf Canary = "[ThisIsACanaryCoroutineStackGuardIfYouReadThisTheStackIsStillOK]";
+ static_assert(Canary.size() == 64);
+ static constexpr uint64_t AlignedSize_ = (Canary.size() + PageSize - 1) & ~PageSizeMask;
+ };
+
+
+ // ------------------------------------------------------------------------
+ //
+ //! Ensures integrity by removing access rights for border pages
+ class TPageGuard final {
+ public:
+ //! Size of guard section in bytes
+ static constexpr uint64_t GetSize() { return PageSize; }
+ //! Size of page-aligned guard section in bytes
+ static constexpr uint64_t GetPageAlignedSize() { return PageSize; }
+
+ static TArrayRef<char> GetWorkspace(void* stack, uint64_t size) noexcept {
+ Y_ASSERT( !((uint64_t)stack & PageSizeMask) );
+ Y_ASSERT( !(size & PageSizeMask) );
+ Y_ASSERT(size > PageSize);
+
+ return {(char*)stack, size - PageSize};
+ }
+
+ static void Protect(void* stack, uint64_t size, bool /*checkPrevious*/) noexcept {
+ Y_ASSERT( !((uint64_t)stack & PageSizeMask) ); // stack pointer should be page aligned
+ Y_ASSERT( !(size & PageSizeMask) ); // stack size should be page aligned
+ Y_ASSERT(size >= PageSize); // stack should have enough space to place guard
+
+ ProtectMemory((char*)stack + size - PageSize, PageSize, PM_NONE);
+ }
+
+ //! Remove protection, to allow stack memory be freed
+ static void RemoveProtection(void* stack, uint64_t size) noexcept {
+ Y_ASSERT( !((uint64_t)stack & PageSizeMask) );
+ Y_ASSERT( !(size & PageSizeMask) );
+ Y_ASSERT(size >= PageSize);
+
+ ProtectMemory((char*)stack + size - PageSize, PageSize, PM_WRITE | PM_READ);
+ }
+ //! Should remove protection before returning memory to system
+ static constexpr bool ShouldRemoveProtectionBeforeFree() { return true; }
+
+ //! For page guard is not used - it crashes process at once in this case.
+ static constexpr bool CheckOverflow(void*) { return true; }
+ static constexpr bool CheckOverride(void*, uint64_t) { return true; }
+ };
+
+
+ template<typename TGuard>
+ const TGuard& GetGuard() noexcept;
+}
diff --git a/library/cpp/coroutine/engine/stack/stack_pool.h b/library/cpp/coroutine/engine/stack/stack_pool.h
new file mode 100644
index 0000000000..27a8e9394b
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_pool.h
@@ -0,0 +1,54 @@
+#pragma once
+
+#include "stack.h"
+#include "stack_common.h"
+
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+
+
+namespace NCoro::NStack {
+
+ class IGuard;
+ class TStorage;
+ struct TPoolAllocatorSettings;
+
+ template<typename TGuard>
+ class TPool final : private TMoveOnly {
+ struct TMemory {
+ char* Raw = nullptr;
+ char* Aligned = nullptr; // points to aligned memory, which includes space for first page guard
+ };
+ public:
+ TPool(uint64_t stackSize, const TPoolAllocatorSettings& settings, const TGuard& guard);
+ TPool(TPool&& other) noexcept;
+ ~TPool();
+
+ NDetails::TStack AllocStack(const char* name);
+ void FreeStack(NDetails::TStack& stack);
+
+ uint64_t GetReleasedSize() const noexcept;
+ uint64_t GetFullSize() const noexcept;
+ uint64_t GetNumOfAllocated() const noexcept { return NumOfAllocated_; }
+
+ private:
+ void AllocNewMemoryChunk();
+ bool IsSmallStack() const noexcept;
+ bool IsStackFromThisPool(const NDetails::TStack& stack) const noexcept;
+ NDetails::TStack AllocNewStack(const char* name);
+
+ private:
+ const uint64_t StackSize_ = 0;
+ uint64_t RssPagesToKeep_ = 0;
+ const TGuard& Guard_;
+ TVector<TMemory> Memory_; // memory chunks
+ THolder<TStorage> Storage_;
+ char* NextToAlloc_ = nullptr; // points to next available stack in the last memory chunk
+ const uint64_t ChunkSize_ = 0;
+ uint64_t NumOfAllocated_ = 0;
+ };
+
+}
+
+#include "stack_pool.inl"
diff --git a/library/cpp/coroutine/engine/stack/stack_pool.inl b/library/cpp/coroutine/engine/stack/stack_pool.inl
new file mode 100644
index 0000000000..6e08e05a48
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_pool.inl
@@ -0,0 +1,132 @@
+#include "stack_storage.h"
+#include "stack_utils.h"
+
+
+namespace NCoro::NStack {
+
+ template<typename TGuard>
+ TPool<TGuard>::TPool(uint64_t stackSize, const TPoolAllocatorSettings& settings, const TGuard& guard)
+ : StackSize_(stackSize)
+ , RssPagesToKeep_(IsSmallStack() ? settings.SmallStackRssPagesToKeep : settings.RssPagesToKeep)
+ , Guard_(guard)
+ , ChunkSize_(Guard_.GetPageAlignedSize() + StackSize_ * settings.StacksPerChunk)
+ {
+ Y_ASSERT(RssPagesToKeep_);
+ if (!RssPagesToKeep_) {
+ RssPagesToKeep_ = 1; // at least guard should be kept
+ }
+
+ const uint64_t stackSizeInPages = stackSize / PageSize;
+ Y_ASSERT(stackSizeInPages >= RssPagesToKeep_);
+ if (stackSizeInPages < RssPagesToKeep_) {
+ RssPagesToKeep_ = stackSizeInPages; // keep all stack pages
+ }
+
+ Y_ASSERT(StackSize_ && !(StackSize_ & PageSizeMask)); // stack size is not zero and page aligned
+ Y_ASSERT(Guard_.GetSize() < StackSize_); // stack has enough space to place guard
+ Y_ASSERT(stackSizeInPages >= RssPagesToKeep_);
+
+ Storage_ = MakeHolder<TStorage>(StackSize_, RssPagesToKeep_, settings.ReleaseRate);
+
+ AllocNewMemoryChunk();
+ }
+
+ template<typename TGuard>
+ TPool<TGuard>::TPool(TPool&& other) noexcept = default;
+
+ template<typename TGuard>
+ TPool<TGuard>::~TPool() {
+ if (!Memory_.empty()) {
+ Y_ASSERT(NextToAlloc_ && StackSize_);
+
+ for (const auto& chunk : Memory_) {
+ Y_ASSERT(chunk.Raw && chunk.Aligned);
+
+ if (Guard_.ShouldRemoveProtectionBeforeFree()) {
+ Guard_.RemoveProtection(chunk.Aligned, Guard_.GetPageAlignedSize()); // first page in chunk
+
+ const char* endOfStacksMemory = chunk.Aligned + ChunkSize_;
+ for (char* i = chunk.Aligned + Guard_.GetPageAlignedSize(); i < endOfStacksMemory; i += StackSize_) {
+ Guard_.RemoveProtection(i, StackSize_);
+ }
+ }
+
+ free(chunk.Raw);
+ }
+ }
+ }
+
+ template<typename TGuard>
+ NDetails::TStack TPool<TGuard>::AllocStack(const char* name) {
+ Y_ASSERT(!Memory_.empty());
+
+ if (!Storage_->IsEmpty()) {
+ return Storage_->GetStack(Guard_, name);
+ } else {
+ ++NumOfAllocated_;
+ return AllocNewStack(name);
+ }
+ }
+
+ template<typename TGuard>
+ void TPool<TGuard>::FreeStack(NDetails::TStack& stack) {
+ Y_ASSERT(Storage_->Size() < ((ChunkSize_ - Guard_.GetPageAlignedSize()) / StackSize_) * Memory_.size());
+ Y_ASSERT(IsStackFromThisPool(stack));
+
+ Storage_->ReturnStack(stack);
+ }
+
+ template<typename TGuard>
+ uint64_t TPool<TGuard>::GetReleasedSize() const noexcept {
+ return Storage_->GetReleasedSize();
+ }
+ template<typename TGuard>
+ uint64_t TPool<TGuard>::GetFullSize() const noexcept {
+ return Storage_->GetFullSize();
+ }
+
+ template<typename TGuard>
+ void TPool<TGuard>::AllocNewMemoryChunk() {
+ const uint64_t totalSizeInPages = ChunkSize_ / PageSize;
+
+ TMemory memory;
+ const auto res = GetAlignedMemory(totalSizeInPages, memory.Raw, memory.Aligned);
+ Y_VERIFY(res, "Failed to allocate memory for coro stack pool");
+
+ NextToAlloc_ = memory.Aligned + Guard_.GetPageAlignedSize(); // skip first guard page
+ Guard_.Protect(memory.Aligned, Guard_.GetPageAlignedSize(), false); // protect first guard page
+
+ Memory_.push_back(std::move(memory));
+ }
+
+ template<typename TGuard>
+ bool TPool<TGuard>::IsSmallStack() const noexcept {
+ return StackSize_ / PageSize <= SmallStackMaxSizeInPages;
+ }
+
+ template<typename TGuard>
+ bool TPool<TGuard>::IsStackFromThisPool(const NDetails::TStack& stack) const noexcept {
+ for (const auto& chunk : Memory_) {
+ const char* endOfStacksMemory = chunk.Aligned + ChunkSize_;
+ if (chunk.Raw <= stack.GetRawMemory() && stack.GetRawMemory() < endOfStacksMemory) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ template<typename TGuard>
+ NDetails::TStack TPool<TGuard>::AllocNewStack(const char* name) {
+ if (NextToAlloc_ + StackSize_ > Memory_.rbegin()->Aligned + ChunkSize_) {
+ AllocNewMemoryChunk(); // also sets NextToAlloc_ to first stack position in new allocated chunk of memory
+ }
+ Y_ASSERT(NextToAlloc_ + StackSize_ <= Memory_.rbegin()->Aligned + ChunkSize_);
+
+ char* newStack = NextToAlloc_;
+ NextToAlloc_ += StackSize_;
+
+ Guard_.Protect(newStack, StackSize_, true);
+ return NDetails::TStack{newStack, newStack, StackSize_, name};
+ }
+
+}
diff --git a/library/cpp/coroutine/engine/stack/stack_storage.cpp b/library/cpp/coroutine/engine/stack/stack_storage.cpp
new file mode 100644
index 0000000000..6dc2b2d44b
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_storage.cpp
@@ -0,0 +1,46 @@
+#include "stack_storage.h"
+
+#include "stack.h"
+#include "stack_utils.h"
+
+#include <library/cpp/coroutine/engine/impl.h>
+
+
+namespace NCoro::NStack {
+
+ TStorage::TStorage(uint64_t stackSize, uint64_t rssPagesToKeep, uint64_t releaseRate)
+ : StackSize_(stackSize)
+ , RssPagesToKeep_(rssPagesToKeep)
+ , ReleaseRate_(releaseRate ? releaseRate : 1)
+ {
+ Y_ASSERT(StackSize_ && RssPagesToKeep_);
+ }
+
+ bool TStorage::IsEmpty() const noexcept {
+ return Released_.empty() && Full_.empty();
+ }
+
+ uint64_t TStorage::Size() const noexcept {
+ return Released_.size() + Full_.size();
+ }
+
+ void TStorage::ReturnStack(NDetails::TStack& stack) {
+ thread_local uint64_t i = 0;
+ if (++i % ReleaseRate_ != 0) {
+ Full_.push_back(stack.GetAlignedMemory());
+ } else {
+ ReleaseMemory(stack.GetAlignedMemory(), RssPagesToKeep_);
+ Released_.push_back(stack.GetAlignedMemory());
+ }
+ stack.Reset();
+ }
+
+ void TStorage::ReleaseMemory([[maybe_unused]] char* alignedStackMemory, [[maybe_unused]] uint64_t pagesToKeep) noexcept {
+#if !defined(_san_enabled_) && defined(NDEBUG)
+ uint64_t numOfPagesToFree = StackSize_ / PageSize;
+ numOfPagesToFree -= pagesToKeep;
+ ReleaseRss(alignedStackMemory, numOfPagesToFree);
+#endif
+ }
+
+}
diff --git a/library/cpp/coroutine/engine/stack/stack_storage.h b/library/cpp/coroutine/engine/stack/stack_storage.h
new file mode 100644
index 0000000000..25fe2cfb17
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_storage.h
@@ -0,0 +1,60 @@
+#pragma once
+
+#include "stack.h"
+
+#include <util/datetime/base.h>
+#include <util/generic/deque.h>
+
+
+class TCont;
+
+namespace NCoro::NStack {
+
+ class IGuard;
+
+ class TStorage final : private TMoveOnly {
+ public:
+ TStorage(uint64_t stackSize, uint64_t rssPagesToKeep, uint64_t releaseRate);
+
+ bool IsEmpty() const noexcept;
+ uint64_t Size() const noexcept;
+
+ uint64_t GetReleasedSize() const noexcept { return Released_.size(); }
+ uint64_t GetFullSize() const noexcept { return Full_.size(); }
+
+ template<typename TGuard>
+ NDetails::TStack GetStack(const TGuard& guard, const char* name);
+ void ReturnStack(NDetails::TStack& stack);
+
+ private:
+ void ReleaseMemory(char* alignedStackMemory, uint64_t pagesToKeep) noexcept;
+
+ private:
+ TDeque<void*> Released_; //!< stacks memory with released RSS memory
+ TDeque<void*> Full_; //!< stacks memory with RSS memory
+ uint64_t StackSize_ = 0;
+ uint64_t RssPagesToKeep_ = 0;
+ const uint64_t ReleaseRate_ = 1;
+ };
+
+
+ template<typename TGuard>
+ NDetails::TStack TStorage::GetStack(const TGuard& guard, const char* name) {
+ Y_VERIFY(!IsEmpty()); // check before call
+
+ void* newStack = nullptr;
+ if (!Full_.empty()) {
+ newStack = Full_.back();
+ Full_.pop_back();
+ } else {
+ Y_ASSERT(!Released_.empty());
+ newStack = Released_.back();
+ Released_.pop_back();
+ }
+
+ Y_VERIFY(guard.CheckOverflow(newStack), "corrupted stack in pool");
+ Y_VERIFY(guard.CheckOverride(newStack, StackSize_), "corrupted stack in pool");
+
+ return NDetails::TStack{newStack, newStack, StackSize_, name};
+ }
+}
diff --git a/library/cpp/coroutine/engine/stack/stack_utils.cpp b/library/cpp/coroutine/engine/stack/stack_utils.cpp
new file mode 100644
index 0000000000..1548529b66
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_utils.cpp
@@ -0,0 +1,84 @@
+#include "stack_utils.h"
+
+#include <contrib/libs/linux-headers/asm-generic/errno-base.h>
+#include <util/generic/scope.h>
+#include <util/system/yassert.h>
+
+#ifdef _linux_
+#include <sys/mman.h>
+#endif
+
+#include <cerrno>
+#include <cstdlib>
+#include <cstring>
+
+
+namespace NCoro::NStack {
+
+#ifdef _linux_
+ bool GetAlignedMemory(uint64_t sizeInPages, char*& rawPtr, char*& alignedPtr) noexcept {
+ Y_ASSERT(sizeInPages);
+
+ void* ptr = nullptr;
+ int error = posix_memalign(&ptr, PageSize, sizeInPages * PageSize);
+ alignedPtr = rawPtr = (char*)ptr;
+ return rawPtr && alignedPtr && !error;
+ }
+#else
+ bool GetAlignedMemory(uint64_t sizeInPages, char*& rawPtr, char*& alignedPtr) noexcept {
+ Y_ASSERT(sizeInPages);
+
+ rawPtr = (char*) malloc((sizeInPages + 1) * PageSize); // +1 in case result would be unaligned
+ alignedPtr = (char*)( ((uint64_t)rawPtr + PageSize - 1) & ~PageSizeMask);
+ return rawPtr && alignedPtr;
+ }
+#endif
+
+#ifdef _linux_
+ void ReleaseRss(char* alignedPtr, uint64_t numOfPages) noexcept {
+ Y_VERIFY( !((uint64_t)alignedPtr & PageSizeMask), "Not aligned pointer to release RSS memory");
+ if (!numOfPages) {
+ return;
+ }
+ if (auto res = madvise((void*) alignedPtr, numOfPages * PageSize, MADV_DONTNEED); res) {
+ Y_VERIFY(errno == EAGAIN || errno == ENOMEM, "Failed to release memory");
+ }
+ }
+#else
+ void ReleaseRss(char*, uint64_t) noexcept {
+ }
+#endif
+
+#ifdef _linux_
+ uint64_t CountMapped(char* alignedPtr, uint64_t numOfPages) noexcept {
+ Y_VERIFY( !((uint64_t)alignedPtr & PageSizeMask) );
+ Y_ASSERT(numOfPages);
+
+ uint64_t result = 0;
+ unsigned char* mappedPages = (unsigned char*) calloc(numOfPages, numOfPages);
+ Y_VERIFY(mappedPages);
+ Y_DEFER {
+ free(mappedPages);
+ };
+
+ if (!mincore((void*)alignedPtr, numOfPages * PageSize, mappedPages)) {
+ for (uint64_t i = 0; i < numOfPages; ++i) {
+ if (mappedPages[i] & 1) {
+ ++result;
+ }
+ }
+ } else {
+ Y_ASSERT(false);
+ return 0;
+ }
+
+ return result;
+ }
+
+#else
+ uint64_t CountMapped(char*, uint64_t) noexcept {
+ return 0; // stub for Windows tests
+ }
+#endif
+
+}
diff --git a/library/cpp/coroutine/engine/stack/stack_utils.h b/library/cpp/coroutine/engine/stack/stack_utils.h
new file mode 100644
index 0000000000..46c65240b5
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/stack_utils.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include "stack_common.h"
+
+
+namespace NCoro::NStack {
+ /*! Actual size of allocated memory can exceed size in pages, due to unaligned allocation.
+ * @param sizeInPages : number of pages to allocate
+ * @param rawPtr : pointer to unaligned memory. Should be passed to free() when is not used any more.
+ * @param alignedPtr : pointer to beginning of first fully allocated page
+ * @return : true on success
+ */
+ bool GetAlignedMemory(uint64_t sizeInPages, char*& rawPtr, char*& alignedPtr) noexcept;
+
+ /*! Release mapped RSS memory.
+ * @param alignedPt : page-size aligned memory on which RSS memory should be freed
+ * @param numOfPages : number of pages to free from RSS memory
+ */
+ void ReleaseRss(char* alignedPtr, uint64_t numOfPages) noexcept;
+
+ /*! Count pages with RSS memory
+ * @param alignedPtr : pointer to page-aligned memory for which calculations would be done
+ * @param numOfPages : number of pages to check
+ * @return : number of pages with RSS memory
+ */
+ uint64_t CountMapped(char* alignedPtr, uint64_t numOfPages) noexcept;
+}
diff --git a/library/cpp/coroutine/engine/stack/ut/stack_allocator_ut.cpp b/library/cpp/coroutine/engine/stack/ut/stack_allocator_ut.cpp
new file mode 100644
index 0000000000..a7283d44a3
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/ut/stack_allocator_ut.cpp
@@ -0,0 +1,115 @@
+#include <library/cpp/coroutine/engine/stack/stack_allocator.h>
+#include <library/cpp/coroutine/engine/stack/stack_common.h>
+#include <library/cpp/testing/gtest/gtest.h>
+
+
+using namespace testing;
+
+namespace NCoro::NStack::Tests {
+
+ enum class EAllocator {
+ Pool, // allocates page-size aligned stacks from pools
+ Simple // uses malloc/free for each stack
+ };
+
+ class TAllocatorParamFixture : public TestWithParam< std::tuple<EGuard, EAllocator> > {
+ protected: // methods
+ void SetUp() override {
+ EGuard guardType;
+ EAllocator allocType;
+ std::tie(guardType, allocType) = GetParam();
+
+ TMaybe<TPoolAllocatorSettings> poolSettings;
+ if (allocType == EAllocator::Pool) {
+ poolSettings = TPoolAllocatorSettings{};
+ }
+
+ Allocator_ = GetAllocator(poolSettings, guardType);
+ }
+
+ protected: // data
+ THolder<IAllocator> Allocator_;
+ };
+
+
+ TEST_P(TAllocatorParamFixture, StackAllocationAndRelease) {
+ uint64_t stackSize = PageSize * 12;
+ auto stack = Allocator_->AllocStack(stackSize, "test_stack");
+#if defined(_san_enabled_) || !defined(NDEBUG)
+ stackSize *= DebugOrSanStackMultiplier;
+#endif
+
+ // Correct stack should have
+ EXPECT_EQ(stack.GetSize(), stackSize); // predefined size
+ EXPECT_FALSE((uint64_t)stack.GetAlignedMemory() & PageSizeMask); // aligned pointer
+ // Writable workspace
+ auto workspace = Allocator_->GetStackWorkspace(stack.GetAlignedMemory(), stack.GetSize());
+ for (uint64_t i = 0; i < workspace.size(); i += 512) {
+ workspace[i] = 42;
+ }
+ EXPECT_TRUE(Allocator_->CheckStackOverflow(stack.GetAlignedMemory()));
+ EXPECT_TRUE(Allocator_->CheckStackOverride(stack.GetAlignedMemory(), stack.GetSize()));
+
+ Allocator_->FreeStack(stack);
+ EXPECT_FALSE(stack.GetRawMemory());
+ }
+
+ INSTANTIATE_TEST_SUITE_P(AllocatorTestParams, TAllocatorParamFixture,
+ Combine(Values(EGuard::Canary, EGuard::Page), Values(EAllocator::Pool, EAllocator::Simple)));
+
+
+ // ------------------------------------------------------------------------
+ // Test that allocated stack has guards
+ //
+ template<class AllocatorType>
+ THolder<IAllocator> GetAllocator(EGuard guardType);
+
+ struct TPoolTag {};
+ struct TSimpleTag {};
+
+ template<>
+ THolder<IAllocator> GetAllocator<TPoolTag>(EGuard guardType) {
+ TMaybe<TPoolAllocatorSettings> poolSettings = TPoolAllocatorSettings{};
+ return GetAllocator(poolSettings, guardType);
+ }
+
+ template<>
+ THolder<IAllocator> GetAllocator<TSimpleTag>(EGuard guardType) {
+ TMaybe<TPoolAllocatorSettings> poolSettings;
+ return GetAllocator(poolSettings, guardType);
+ }
+
+
+ template <class AllocatorType>
+ class TAllocatorFixture : public Test {
+ protected:
+ TAllocatorFixture()
+ : Allocator_(GetAllocator<AllocatorType>(EGuard::Page))
+ {}
+
+ const uint64_t StackSize_ = PageSize * 2;
+ THolder<IAllocator> Allocator_;
+ };
+
+ typedef Types<TPoolTag, TSimpleTag> Implementations;
+ TYPED_TEST_SUITE(TAllocatorFixture, Implementations);
+
+ TYPED_TEST(TAllocatorFixture, StackOverflow) {
+ ASSERT_DEATH({
+ auto stack = this->Allocator_->AllocStack(this->StackSize_, "test_stack");
+
+ // Overwrite previous guard, crash is here
+ *(stack.GetAlignedMemory() - 1) = 42;
+ }, "");
+ }
+
+ TYPED_TEST(TAllocatorFixture, StackOverride) {
+ ASSERT_DEATH({
+ auto stack = this->Allocator_->AllocStack(this->StackSize_, "test_stack");
+
+ // Overwrite guard, crash is here
+ *(stack.GetAlignedMemory() + stack.GetSize() - 1) = 42;
+ }, "");
+ }
+
+}
diff --git a/library/cpp/coroutine/engine/stack/ut/stack_guards_ut.cpp b/library/cpp/coroutine/engine/stack/ut/stack_guards_ut.cpp
new file mode 100644
index 0000000000..9da9a9b3d5
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/ut/stack_guards_ut.cpp
@@ -0,0 +1,158 @@
+#include <library/cpp/coroutine/engine/stack/stack_common.h>
+#include <library/cpp/coroutine/engine/stack/stack_guards.h>
+#include <library/cpp/coroutine/engine/stack/stack_utils.h>
+#include <library/cpp/testing/gtest/gtest.h>
+
+
+using namespace testing;
+
+namespace NCoro::NStack::Tests {
+
+ template <class TGuard>
+ class TGuardFixture : public Test {
+ protected:
+ TGuardFixture() : Guard_(GetGuard<TGuard>()) {}
+
+ const TGuard& Guard_;
+ };
+
+ typedef Types<TCanaryGuard, TPageGuard> Implementations;
+ TYPED_TEST_SUITE(TGuardFixture, Implementations);
+
+ TYPED_TEST(TGuardFixture, GuardSize) {
+ const auto size = this->Guard_.GetSize();
+ EXPECT_GE(size, 64ul);
+ EXPECT_FALSE(size & 63ul); // check 64-byte alignment
+ }
+
+ TYPED_TEST(TGuardFixture, GuardAlignedSize) {
+ const auto size = this->Guard_.GetPageAlignedSize();
+ EXPECT_GE(size, PageSize);
+ EXPECT_FALSE(size & PageSizeMask); // check page-alignment
+ }
+
+ TYPED_TEST(TGuardFixture, StackWorkspace) {
+ for (uint64_t sizeInPages : {2, 5, 12}) {
+ char *rawPtr, *alignedPtr = nullptr;
+ ASSERT_TRUE(GetAlignedMemory(sizeInPages, rawPtr, alignedPtr));
+ auto workspace = this->Guard_.GetWorkspace(alignedPtr, sizeInPages * PageSize);
+ EXPECT_EQ(workspace.size(), sizeInPages * PageSize - this->Guard_.GetSize()) << " size in pages " << sizeInPages;
+
+ this->Guard_.Protect(alignedPtr, sizeInPages * PageSize, false);
+ workspace = this->Guard_.GetWorkspace(alignedPtr, sizeInPages * PageSize);
+ EXPECT_EQ(workspace.size(), sizeInPages * PageSize - this->Guard_.GetSize()) << " size in pages " << sizeInPages;
+
+ this->Guard_.RemoveProtection(alignedPtr, sizeInPages * PageSize);
+ workspace = this->Guard_.GetWorkspace(alignedPtr, sizeInPages * PageSize);
+ EXPECT_EQ(workspace.size(), sizeInPages * PageSize - this->Guard_.GetSize()) << " size in pages " << sizeInPages;
+
+ free(rawPtr);
+ }
+ }
+
+ TYPED_TEST(TGuardFixture, SetRemoveProtectionWorks) {
+ char *rawPtr, *alignedPtr = nullptr;
+ constexpr uint64_t sizeInPages = 4;
+ ASSERT_TRUE(GetAlignedMemory(sizeInPages + 1, rawPtr, alignedPtr));
+
+ this->Guard_.Protect(alignedPtr, PageSize, false); // set previous guard
+ alignedPtr += PageSize; // leave first page for previous guard
+ this->Guard_.Protect(alignedPtr, sizeInPages * PageSize, true);
+
+ EXPECT_TRUE(this->Guard_.CheckOverflow(alignedPtr));
+ EXPECT_TRUE(this->Guard_.CheckOverride(alignedPtr, sizeInPages * PageSize));
+
+ this->Guard_.RemoveProtection(alignedPtr, sizeInPages * PageSize);
+ this->Guard_.RemoveProtection(alignedPtr - PageSize, PageSize); // remove previous guard
+
+ free(rawPtr);
+ }
+
+ TEST(StackGuardTest, CanaryGuardTestOverflow) {
+ const auto& guard = GetGuard<TCanaryGuard>();
+
+ char *rawPtr, *alignedPtr = nullptr;
+ constexpr uint64_t sizeInPages = 4;
+ ASSERT_TRUE(GetAlignedMemory(sizeInPages + 1, rawPtr, alignedPtr));
+ guard.Protect(alignedPtr, PageSize, false); // set previous guard
+ alignedPtr += PageSize; // leave first page for previous guard
+ guard.Protect(alignedPtr, sizeInPages * PageSize, true);
+
+ EXPECT_TRUE(guard.CheckOverflow(alignedPtr));
+ EXPECT_TRUE(guard.CheckOverride(alignedPtr, sizeInPages * PageSize));
+
+ // Overwrite previous guard
+ *(alignedPtr - 1) = 42;
+
+ EXPECT_FALSE(guard.CheckOverflow(alignedPtr));
+
+ free(rawPtr);
+ }
+
+ TEST(StackGuardTest, CanaryGuardTestOverride) {
+ const auto& guard = GetGuard<TCanaryGuard>();
+
+ char *rawPtr, *alignedPtr = nullptr;
+ constexpr uint64_t sizeInPages = 4;
+ ASSERT_TRUE(GetAlignedMemory(sizeInPages + 1, rawPtr, alignedPtr));
+ guard.Protect(alignedPtr, PageSize, false); // set previous guard
+ alignedPtr += PageSize; // leave first page for previous guard
+ guard.Protect(alignedPtr, sizeInPages * PageSize, true);
+
+ EXPECT_TRUE(guard.CheckOverflow(alignedPtr));
+ EXPECT_TRUE(guard.CheckOverride(alignedPtr, sizeInPages * PageSize));
+
+ // Overwrite guard
+ *(alignedPtr + sizeInPages * PageSize - 1) = 42;
+
+ EXPECT_FALSE(guard.CheckOverride(alignedPtr, sizeInPages * PageSize));
+
+ free(rawPtr);
+ }
+
+ TEST(StackGuardDeathTest, PageGuardTestOverflow) {
+ ASSERT_DEATH({
+ const auto &guard = GetGuard<TPageGuard>();
+
+ char* rawPtr = nullptr;
+ char* alignedPtr = nullptr;
+ constexpr uint64_t sizeInPages = 4;
+ ASSERT_TRUE(GetAlignedMemory(sizeInPages + 1, rawPtr, alignedPtr));
+
+ guard.Protect(alignedPtr, PageSize, false); // set previous guard
+ alignedPtr += PageSize; // leave first page for previous guard
+ guard.Protect(alignedPtr, sizeInPages * PageSize, true);
+
+ // Overwrite previous guard, crash is here
+ *(alignedPtr - 1) = 42;
+
+ guard.RemoveProtection(alignedPtr, sizeInPages * PageSize);
+ guard.RemoveProtection(alignedPtr - PageSize, PageSize); // remove previous guard
+
+ free(rawPtr);
+ }, "");
+ }
+
+ TEST(StackGuardDeathTest, PageGuardTestOverride) {
+ ASSERT_DEATH({
+ const auto &guard = GetGuard<TPageGuard>();
+
+ char* rawPtr = nullptr;
+ char* alignedPtr = nullptr;
+ constexpr uint64_t sizeInPages = 4;
+ ASSERT_TRUE(GetAlignedMemory(sizeInPages + 1, rawPtr, alignedPtr));
+ guard.Protect(alignedPtr, PageSize, false); // set previous guard
+ alignedPtr += PageSize; // leave first page for previous guard
+ guard.Protect(alignedPtr, sizeInPages * PageSize, true);
+
+ // Overwrite guard, crash is here
+ *(alignedPtr + sizeInPages * PageSize - 1) = 42;
+
+ guard.RemoveProtection(alignedPtr, sizeInPages * PageSize);
+ guard.RemoveProtection(alignedPtr - PageSize, PageSize); // remove previous guard
+
+ free(rawPtr);
+ }, "");
+ }
+
+}
diff --git a/library/cpp/coroutine/engine/stack/ut/stack_pool_ut.cpp b/library/cpp/coroutine/engine/stack/ut/stack_pool_ut.cpp
new file mode 100644
index 0000000000..9e3e5e7117
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/ut/stack_pool_ut.cpp
@@ -0,0 +1,70 @@
+#include <library/cpp/coroutine/engine/stack/stack_common.h>
+#include <library/cpp/coroutine/engine/stack/stack_guards.h>
+#include <library/cpp/coroutine/engine/stack/stack_pool.h>
+#include <library/cpp/testing/gtest/gtest.h>
+
+
+using namespace testing;
+
+namespace NCoro::NStack::Tests {
+
+ template <class TGuard>
+ class TPoolFixture : public Test {
+ protected:
+ TPoolFixture() : Guard_(GetGuard<TGuard>()), Pool_(StackSize_, TPoolAllocatorSettings{1, 1, 8, 32}, Guard_) {}
+
+ const uint64_t StackSize_ = PageSize * 4;
+ const TGuard& Guard_;
+ TPool<TGuard> Pool_;
+ };
+
+ typedef Types<TCanaryGuard, TPageGuard> Implementations;
+ TYPED_TEST_SUITE(TPoolFixture, Implementations);
+
+ TYPED_TEST(TPoolFixture, AllocAndFreeStack) {
+ auto stack = this->Pool_.AllocStack("test_stack");
+ this->Pool_.FreeStack(stack);
+ EXPECT_FALSE(stack.GetRawMemory());
+ }
+
+ TYPED_TEST(TPoolFixture, FreedStackReused) {
+ auto stack = this->Pool_.AllocStack("test_stack");
+ auto rawPtr = stack.GetRawMemory();
+ auto alignedPtr = stack.GetAlignedMemory();
+
+ this->Pool_.FreeStack(stack);
+ EXPECT_FALSE(stack.GetRawMemory());
+
+ auto stack2 = this->Pool_.AllocStack("test_stack");
+ EXPECT_EQ(rawPtr, stack2.GetRawMemory());
+ EXPECT_EQ(alignedPtr, stack2.GetAlignedMemory());
+
+ this->Pool_.FreeStack(stack2);
+ EXPECT_FALSE(stack2.GetRawMemory());
+ }
+
+ TYPED_TEST(TPoolFixture, MruFreedStackReused) {
+ auto stack = this->Pool_.AllocStack("test_stack");
+ auto rawPtr = stack.GetRawMemory();
+ auto alignedPtr = stack.GetAlignedMemory();
+ auto stack2 = this->Pool_.AllocStack("test_stack");
+ auto stack3 = this->Pool_.AllocStack("test_stack");
+
+ this->Pool_.FreeStack(stack2);
+ EXPECT_FALSE(stack2.GetRawMemory());
+
+ this->Pool_.FreeStack(stack);
+ EXPECT_FALSE(stack.GetRawMemory());
+
+ auto stack4 = this->Pool_.AllocStack("test_stack");
+ EXPECT_EQ(rawPtr, stack4.GetRawMemory());
+ EXPECT_EQ(alignedPtr, stack4.GetAlignedMemory());
+
+ this->Pool_.FreeStack(stack3);
+ EXPECT_FALSE(stack.GetRawMemory());
+
+ this->Pool_.FreeStack(stack4);
+ EXPECT_FALSE(stack4.GetRawMemory());
+ }
+
+}
diff --git a/library/cpp/coroutine/engine/stack/ut/stack_ut.cpp b/library/cpp/coroutine/engine/stack/ut/stack_ut.cpp
new file mode 100644
index 0000000000..31f8ad6b61
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/ut/stack_ut.cpp
@@ -0,0 +1,60 @@
+#include <library/cpp/coroutine/engine/stack/stack.h>
+#include <library/cpp/coroutine/engine/stack/stack_common.h>
+#include <library/cpp/coroutine/engine/stack/stack_guards.h>
+#include <library/cpp/coroutine/engine/stack/stack_utils.h>
+#include <library/cpp/testing/gtest/gtest.h>
+
+
+using namespace testing;
+
+namespace NCoro::NStack::Tests {
+
+ constexpr uint64_t StackSizeInPages = 4;
+
+ template <class TGuard>
+ class TStackFixture : public Test {
+ protected: // methods
+ TStackFixture()
+ : Guard_(GetGuard<TGuard>())
+ , StackSize_(StackSizeInPages * PageSize)
+ {}
+
+ void SetUp() override {
+ ASSERT_TRUE(GetAlignedMemory(StackSizeInPages, RawMemory_, AlignedMemory_));
+ Stack_ = MakeHolder<NDetails::TStack>(RawMemory_, AlignedMemory_, StackSize_, "test_stack");
+ Guard_.Protect(AlignedMemory_, StackSize_, false);
+ }
+
+ void TearDown() override {
+ Guard_.RemoveProtection(AlignedMemory_, StackSize_);
+ free(Stack_->GetRawMemory());
+ Stack_->Reset();
+ EXPECT_EQ(Stack_->GetRawMemory(), nullptr);
+ }
+
+ protected: // data
+ const TGuard& Guard_;
+ const uint64_t StackSize_ = 0;
+ char* RawMemory_ = nullptr;
+ char* AlignedMemory_ = nullptr;
+ THolder<NDetails::TStack> Stack_;
+ };
+
+ typedef Types<TCanaryGuard, TPageGuard> Implementations;
+ TYPED_TEST_SUITE(TStackFixture, Implementations);
+
+ TYPED_TEST(TStackFixture, PointersAndSize) {
+ EXPECT_EQ(this->Stack_->GetRawMemory(), this->RawMemory_);
+ EXPECT_EQ(this->Stack_->GetAlignedMemory(), this->AlignedMemory_);
+ EXPECT_EQ(this->Stack_->GetSize(), this->StackSize_);
+ }
+
+ TYPED_TEST(TStackFixture, WriteStack) {
+ auto workspace = this->Guard_.GetWorkspace(this->Stack_->GetAlignedMemory(), this->Stack_->GetSize());
+ for (uint64_t i = 0; i < workspace.size(); i += 512) {
+ workspace[i] = 42;
+ }
+ EXPECT_TRUE(this->Guard_.CheckOverride(this->Stack_->GetAlignedMemory(), this->Stack_->GetSize()));
+ }
+
+}
diff --git a/library/cpp/coroutine/engine/stack/ut/stack_utils_ut.cpp b/library/cpp/coroutine/engine/stack/ut/stack_utils_ut.cpp
new file mode 100644
index 0000000000..dc0593dcf2
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/ut/stack_utils_ut.cpp
@@ -0,0 +1,73 @@
+#include <library/cpp/coroutine/engine/stack/stack_common.h>
+#include <library/cpp/coroutine/engine/stack/stack_utils.h>
+#include <library/cpp/testing/gtest/gtest.h>
+
+
+using namespace testing;
+
+namespace NCoro::NStack::Tests {
+
+ TEST(StackUtilsTest, Allocation) {
+ char *rawPtr, *alignedPtr = nullptr;
+ for (uint64_t i : {1, 2, 3, 4, 11}) {
+ EXPECT_TRUE(GetAlignedMemory(i, rawPtr, alignedPtr));
+ EXPECT_TRUE(rawPtr);
+ EXPECT_TRUE(alignedPtr);
+ EXPECT_FALSE((uint64_t)alignedPtr & PageSizeMask);
+ free(rawPtr);
+ }
+ }
+
+#if !defined(_san_enabled_) && defined(_linux_)
+
+ TEST(StackUtilsTest, RssReleaseOnePage) {
+ char *rawPtr, *alignedPtr = nullptr;
+ for (uint64_t i : {1, 2, 8}) {
+ EXPECT_TRUE(GetAlignedMemory(i, rawPtr, alignedPtr));
+ EXPECT_TRUE(rawPtr);
+ EXPECT_TRUE(alignedPtr);
+ EXPECT_FALSE((uint64_t)alignedPtr & PageSizeMask);
+
+ ReleaseRss(alignedPtr, i); // allocator can provide reused memory with RSS memory on it
+ EXPECT_EQ(CountMapped(alignedPtr, i), 0ul); // no RSS memory allocated
+
+ *(alignedPtr + (i - 1) * PageSize) = 42; // map RSS memory
+ EXPECT_EQ(CountMapped(alignedPtr, i), 1ul);
+
+ ReleaseRss(alignedPtr, i);
+ EXPECT_EQ(CountMapped(alignedPtr, i), 0ul) << "number of pages " << i; // no RSS memory allocated
+
+ free(rawPtr);
+ }
+ }
+
+ TEST(StackUtilsTest, RssReleaseSeveralPages) {
+ char *rawPtr, *alignedPtr = nullptr;
+
+ for (uint64_t i : {1, 2, 5, 8}) {
+ EXPECT_TRUE(GetAlignedMemory(i, rawPtr, alignedPtr));
+ EXPECT_TRUE(rawPtr);
+ EXPECT_TRUE(alignedPtr);
+ EXPECT_FALSE((uint64_t)alignedPtr & PageSizeMask);
+
+ ReleaseRss(alignedPtr, i); // allocator can provide reused memory with RSS memory on it
+ EXPECT_EQ(CountMapped(alignedPtr, i), 0ul); // no RSS memory allocated
+
+ for (uint64_t page = 0; page < i; ++page) {
+ *(alignedPtr + page * PageSize) = 42; // map RSS memory
+ EXPECT_EQ(CountMapped(alignedPtr, page + 1), page + 1);
+ }
+
+ const uint64_t pagesToKeep = (i > 2) ? 2 : i;
+
+ ReleaseRss(alignedPtr, i - pagesToKeep);
+ EXPECT_EQ(CountMapped(alignedPtr, i), pagesToKeep) << "number of pages " << i; // no RSS memory allocated
+
+ free(rawPtr);
+ }
+ }
+
+#endif
+
+}
+
diff --git a/library/cpp/coroutine/engine/stack/ut/ya.make b/library/cpp/coroutine/engine/stack/ut/ya.make
new file mode 100644
index 0000000000..65c5af9b7f
--- /dev/null
+++ b/library/cpp/coroutine/engine/stack/ut/ya.make
@@ -0,0 +1,17 @@
+GTEST()
+
+OWNER(g:balancer)
+
+SRCS(
+ stack_allocator_ut.cpp
+ stack_guards_ut.cpp
+ stack_pool_ut.cpp
+ stack_ut.cpp
+ stack_utils_ut.cpp
+)
+
+PEERDIR(
+ library/cpp/coroutine/engine
+)
+
+END() \ No newline at end of file
diff --git a/library/cpp/coroutine/engine/trampoline.cpp b/library/cpp/coroutine/engine/trampoline.cpp
new file mode 100644
index 0000000000..10ea69ddc3
--- /dev/null
+++ b/library/cpp/coroutine/engine/trampoline.cpp
@@ -0,0 +1,50 @@
+#include "impl.h"
+#include "trampoline.h"
+
+#include "stack/stack_allocator.h"
+
+#include <util/system/info.h>
+#include <util/system/protect.h>
+#include <util/system/valgrind.h>
+#include <util/system/yassert.h>
+
+#include <cstdlib>
+#include <util/stream/format.h>
+
+
+namespace NCoro {
+
+TTrampoline::TTrampoline(NStack::IAllocator& allocator, ui32 stackSize, TFunc f, TCont* cont) noexcept
+ : Stack_(allocator, stackSize, cont->Name())
+ , Clo_{this, Stack_.Get(), cont->Name()}
+ , Ctx_(Clo_)
+ , Func_(std::move(f))
+ , Cont_(cont)
+ {}
+
+ void TTrampoline::DoRun() {
+ if (Cont_->Executor()->FailOnError()) {
+ Func_(Cont_);
+ } else {
+ try {
+ Func_(Cont_);
+ } catch (...) {}
+ }
+
+ Cont_->Terminate();
+ }
+
+ TArrayRef<char> TTrampoline::Stack() noexcept {
+ return Stack_.Get();
+ }
+
+ const char* TTrampoline::ContName() const noexcept {
+ return Cont_->Name();
+ }
+
+ void TTrampoline::DoRunNaked() {
+ DoRun();
+
+ abort();
+ }
+}
diff --git a/library/cpp/coroutine/engine/trampoline.h b/library/cpp/coroutine/engine/trampoline.h
new file mode 100644
index 0000000000..37b61cf015
--- /dev/null
+++ b/library/cpp/coroutine/engine/trampoline.h
@@ -0,0 +1,60 @@
+#pragma once
+
+#include "stack/stack_common.h"
+#include "stack/stack.h"
+
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
+#include <util/system/context.h>
+#include <util/system/defaults.h>
+
+#if !defined(STACK_GROW_DOWN)
+# error "unsupported"
+#endif
+
+class TCont;
+typedef void (*TContFunc)(TCont*, void*);
+
+namespace NCoro {
+
+ namespace NStack {
+ class IAllocator;
+ }
+
+ class TTrampoline : public ITrampoLine, TNonCopyable {
+ public:
+ typedef std::function<void (TCont*)> TFunc;
+
+ TTrampoline(
+ NCoro::NStack::IAllocator& allocator,
+ uint32_t stackSize,
+ TFunc f,
+ TCont* cont
+ ) noexcept;
+
+ TArrayRef<char> Stack() noexcept;
+
+ TExceptionSafeContext* Context() noexcept {
+ return &Ctx_;
+ }
+
+ void SwitchTo(TExceptionSafeContext* ctx) noexcept {
+ Y_VERIFY(Stack_.LowerCanaryOk(), "Stack overflow (%s)", ContName());
+ Y_VERIFY(Stack_.UpperCanaryOk(), "Stack override (%s)", ContName());
+ Ctx_.SwitchTo(ctx);
+ }
+
+ void DoRun() override;
+
+ void DoRunNaked() override;
+
+ private:
+ const char* ContName() const noexcept;
+ private:
+ NStack::TStackHolder Stack_;
+ const TContClosure Clo_;
+ TExceptionSafeContext Ctx_;
+ TFunc Func_;
+ TCont* const Cont_;
+ };
+}
diff --git a/library/cpp/coroutine/engine/ya.make b/library/cpp/coroutine/engine/ya.make
new file mode 100644
index 0000000000..8c20b9afc3
--- /dev/null
+++ b/library/cpp/coroutine/engine/ya.make
@@ -0,0 +1,36 @@
+LIBRARY()
+
+OWNER(
+ pg
+ g:balancer
+)
+
+GENERATE_ENUM_SERIALIZATION(poller.h)
+GENERATE_ENUM_SERIALIZATION(stack/stack_common.h)
+
+PEERDIR(
+ contrib/libs/libc_compat
+ library/cpp/containers/intrusive_rb_tree
+)
+
+SRCS(
+ cont_poller.cpp
+ helper.cpp
+ impl.cpp
+ iostatus.cpp
+ network.cpp
+ poller.cpp
+ sockpool.cpp
+ stack/stack.cpp
+ stack/stack_allocator.cpp
+ stack/stack_guards.cpp
+ stack/stack_storage.cpp
+ stack/stack_utils.cpp
+ trampoline.cpp
+)
+
+END()
+
+RECURSE(
+ stack/benchmark
+)
diff --git a/library/cpp/coroutine/listener/listen.cpp b/library/cpp/coroutine/listener/listen.cpp
new file mode 100644
index 0000000000..3d4e711d1d
--- /dev/null
+++ b/library/cpp/coroutine/listener/listen.cpp
@@ -0,0 +1,354 @@
+#include "listen.h"
+
+#include <library/cpp/coroutine/engine/impl.h>
+#include <library/cpp/coroutine/engine/network.h>
+
+#include <util/network/ip.h>
+#include <util/network/address.h>
+#include <util/generic/ylimits.h>
+#include <util/generic/intrlist.h>
+
+using namespace NAddr;
+
+namespace {
+ union TSa {
+ const sockaddr* Sa;
+ const sockaddr_in* In;
+ const sockaddr_in6* In6;
+
+ inline TSa(const sockaddr* sa) noexcept
+ : Sa(sa)
+ {
+ }
+
+ inline bool operator==(const TSa& r) const noexcept {
+ if (Sa->sa_family == r.Sa->sa_family) {
+ switch (Sa->sa_family) {
+ case AF_INET:
+ return In->sin_port == r.In->sin_port && In->sin_addr.s_addr == r.In->sin_addr.s_addr;
+ case AF_INET6:
+ return In6->sin6_port == r.In6->sin6_port && !memcmp(&In6->sin6_addr, &r.In6->sin6_addr, sizeof(in6_addr));
+ }
+ }
+
+ return false;
+ }
+
+ inline bool operator!=(const TSa& r) const noexcept {
+ return !(*this == r);
+ }
+ };
+}
+
+class TContListener::TImpl {
+private:
+ struct TStoredAddrInfo: public TAddrInfo, private TNetworkAddress {
+ inline TStoredAddrInfo(const struct addrinfo* ai, const TNetworkAddress& addr) noexcept
+ : TAddrInfo(ai)
+ , TNetworkAddress(addr)
+ {
+ }
+ };
+
+private:
+ class TOneSocketListener: public TIntrusiveListItem<TOneSocketListener> {
+ public:
+ inline TOneSocketListener(TImpl* parent, IRemoteAddrPtr addr)
+ : Parent_(parent)
+ , C_(nullptr)
+ , ListenSocket_(socket(addr->Addr()->sa_family, SOCK_STREAM, 0))
+ , Addr_(std::move(addr))
+ {
+ if (ListenSocket_ == INVALID_SOCKET) {
+ ythrow TSystemError() << "can not create socket";
+ }
+
+ FixIPv6ListenSocket(ListenSocket_);
+ CheckedSetSockOpt(ListenSocket_, SOL_SOCKET, SO_REUSEADDR, 1, "reuse addr");
+
+ const TOptions& opts = Parent_->Opts_;
+ if (opts.SendBufSize) {
+ SetOutputBuffer(ListenSocket_, opts.SendBufSize);
+ }
+ if (opts.RecvBufSize) {
+ SetInputBuffer(ListenSocket_, opts.RecvBufSize);
+ }
+ if (opts.ReusePort) {
+ SetReusePort(ListenSocket_, opts.ReusePort);
+ }
+
+ SetNonBlock(ListenSocket_);
+
+ if (bind(ListenSocket_, Addr_->Addr(), Addr_->Len()) < 0) {
+ ythrow TSystemError() << "bind failed";
+ }
+ }
+
+ inline ~TOneSocketListener() {
+ Stop();
+ }
+
+ public:
+ inline void Run(TCont* cont) noexcept {
+ C_ = cont;
+ DoRun();
+ C_ = nullptr;
+ }
+
+ inline void StartListen() {
+ if (!C_) {
+ const TOptions& opts = Parent_->Opts_;
+
+ if (listen(ListenSocket_, (int)Min<size_t>(Max<int>(), opts.ListenQueue)) < 0) {
+ ythrow TSystemError() << "listen failed";
+ }
+
+ if (opts.EnableDeferAccept) {
+ SetDeferAccept(ListenSocket_);
+ }
+
+ C_ = Parent_->E_->Create<TOneSocketListener, &TOneSocketListener::Run>(this, "listen_job");
+ }
+ }
+
+ inline const IRemoteAddr* Addr() const noexcept {
+ return Addr_.Get();
+ }
+
+ inline void Stop() noexcept {
+ if (C_) {
+ C_->Cancel();
+
+ while (C_) {
+ Parent_->E_->Running()->Yield();
+ }
+ }
+ }
+
+ private:
+ inline void DoRun() noexcept {
+ while (!C_->Cancelled()) {
+ try {
+ TOpaqueAddr remote;
+ const int res = NCoro::AcceptI(C_, ListenSocket_, remote.MutableAddr(), remote.LenPtr());
+
+ if (res < 0) {
+ const int err = -res;
+
+ if (err != ECONNABORTED) {
+ if (err == ECANCELED) {
+ break;
+ }
+ if (errno == EMFILE) {
+ C_->SleepT(TDuration::MilliSeconds(1));
+ }
+
+ ythrow TSystemError(err) << "can not accept";
+ }
+ } else {
+ TSocketHolder c((SOCKET)res);
+
+ const ICallBack::TAcceptFull acc = {
+ &c,
+ &remote,
+ Addr(),
+ };
+
+ Parent_->Cb_->OnAcceptFull(acc);
+ }
+ } catch (...) {
+ try {
+ Parent_->Cb_->OnError();
+ } catch (...) {
+ }
+ }
+ }
+
+ try {
+ Parent_->Cb_->OnStop(&ListenSocket_);
+ } catch (...) {
+ }
+ }
+
+ private:
+ const TImpl* const Parent_;
+ TCont* C_;
+ TSocketHolder ListenSocket_;
+ const IRemoteAddrPtr Addr_;
+ };
+
+private:
+ class TListeners: public TIntrusiveListWithAutoDelete<TOneSocketListener, TDelete> {
+ private:
+ template <class T>
+ using TIt = std::conditional_t<std::is_const<T>::value, typename T::TConstIterator, typename T::TIterator>;
+
+ template <class T>
+ static inline TIt<T> FindImpl(T* t, const IRemoteAddr& addr) {
+ const TSa sa(addr.Addr());
+
+ TIt<T> it = t->Begin();
+ TIt<T> const end = t->End();
+
+ while (it != end && sa != it->Addr()->Addr()) {
+ ++it;
+ }
+
+ return it;
+ }
+
+ public:
+ inline TIterator Find(const IRemoteAddr& addr) {
+ return FindImpl(this, addr);
+ }
+
+ inline TConstIterator Find(const IRemoteAddr& addr) const {
+ return FindImpl(this, addr);
+ }
+ };
+
+public:
+ inline TImpl(ICallBack* cb, TContExecutor* e, const TOptions& opts) noexcept
+ : E_(e)
+ , Cb_(cb)
+ , Opts_(opts)
+ {
+ }
+
+ inline void Listen() {
+ for (TListeners::TIterator it = L_.Begin(); it != L_.End(); ++it) {
+ it->StartListen();
+ }
+ }
+
+ inline void Listen(const IRemoteAddr& addr) {
+ const TListeners::TIterator it = L_.Find(addr);
+
+ if (it != L_.End()) {
+ it->StartListen();
+ }
+ }
+
+ inline void Bind(const IRemoteAddr& addr) {
+ const TSa sa(addr.Addr());
+
+ switch (sa.Sa->sa_family) {
+ case AF_INET:
+ L_.PushBack(new TOneSocketListener(this, MakeHolder<TIPv4Addr>(*sa.In)));
+ break;
+ case AF_INET6:
+ L_.PushBack(new TOneSocketListener(this, MakeHolder<TIPv6Addr>(*sa.In6)));
+ break;
+ default:
+ ythrow yexception() << TStringBuf("unknown protocol");
+ }
+ }
+
+ inline void Bind(const TIpAddress& addr) {
+ L_.PushBack(new TOneSocketListener(this, MakeHolder<TIPv4Addr>(addr)));
+ }
+
+ inline void Bind(const TNetworkAddress& addr) {
+ for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) {
+ L_.PushBack(new TOneSocketListener(this, MakeHolder<TStoredAddrInfo>(&*it, addr)));
+ }
+ }
+
+ inline void StopListenAddr(const IRemoteAddr& addr) {
+ const TListeners::TIterator it = L_.Find(addr);
+
+ if (it != L_.End()) {
+ delete &*it;
+ }
+ }
+
+private:
+ TContExecutor* const E_;
+ ICallBack* const Cb_;
+ TListeners L_;
+ const TOptions Opts_;
+};
+
+TContListener::TContListener(ICallBack* cb, TContExecutor* e, const TOptions& opts)
+ : Impl_(new TImpl(cb, e, opts))
+{
+}
+
+TContListener::~TContListener() {
+}
+
+namespace {
+ template <class T>
+ static inline T&& CheckImpl(T&& impl) {
+ Y_ENSURE_EX(impl, yexception() << "not running");
+ return std::forward<T>(impl);
+ }
+}
+
+void TContListener::Listen(const IRemoteAddr& addr) {
+ CheckImpl(Impl_)->Listen(addr);
+}
+
+void TContListener::Listen(const TIpAddress& addr) {
+ return Listen(TIPv4Addr(addr));
+}
+
+void TContListener::Listen(const TNetworkAddress& addr) {
+ for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) {
+ Listen(TAddrInfo(&*it));
+ }
+}
+
+void TContListener::Listen() {
+ CheckImpl(Impl_)->Listen();
+}
+
+void TContListener::Bind(const IRemoteAddr& addr) {
+ CheckImpl(Impl_)->Bind(addr);
+}
+
+void TContListener::Bind(const TIpAddress& addr) {
+ return Bind(TIPv4Addr(addr));
+}
+
+void TContListener::Bind(const TNetworkAddress& addr) {
+ CheckImpl(Impl_)->Bind(addr);
+}
+
+void TContListener::Stop() noexcept {
+ Impl_.Destroy();
+}
+
+void TContListener::StopListenAddr(const IRemoteAddr& addr) {
+ CheckImpl(Impl_)->StopListenAddr(addr);
+}
+
+void TContListener::StopListenAddr(const TIpAddress& addr) {
+ return StopListenAddr(TIPv4Addr(addr));
+}
+
+void TContListener::StopListenAddr(const TNetworkAddress& addr) {
+ for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) {
+ StopListenAddr(TAddrInfo(&*it));
+ }
+}
+
+void TContListener::ICallBack::OnAcceptFull(const TAcceptFull& params) {
+ const TSa remote(params.Remote->Addr());
+ const TSa local(params.Local->Addr());
+
+ if (local.Sa->sa_family == AF_INET) {
+ const TIpAddress r(*remote.In);
+ const TIpAddress l(*local.In);
+
+ const TAccept a = {
+ params.S, &r, &l};
+
+ OnAccept(a);
+ }
+}
+
+void TContListener::ICallBack::OnStop(TSocketHolder* s) {
+ s->ShutDown(SHUT_RDWR);
+ s->Close();
+}
diff --git a/library/cpp/coroutine/listener/listen.h b/library/cpp/coroutine/listener/listen.h
new file mode 100644
index 0000000000..3a89cd3ecc
--- /dev/null
+++ b/library/cpp/coroutine/listener/listen.h
@@ -0,0 +1,125 @@
+#pragma once
+
+#include <util/generic/ptr.h>
+#include <util/generic/ylimits.h>
+
+struct TIpAddress;
+class TContExecutor;
+class TSocketHolder;
+class TNetworkAddress;
+
+namespace NAddr {
+ class IRemoteAddr;
+}
+
+class TContListener {
+public:
+ struct TOptions {
+ inline TOptions() noexcept
+ : ListenQueue(Max<size_t>())
+ , SendBufSize(0)
+ , RecvBufSize(0)
+ , EnableDeferAccept(false)
+ , ReusePort(false)
+ {
+ }
+
+ inline TOptions& SetListenQueue(size_t len) noexcept {
+ ListenQueue = len;
+
+ return *this;
+ }
+
+ inline TOptions& SetDeferAccept(bool enable) noexcept {
+ EnableDeferAccept = enable;
+
+ return *this;
+ }
+
+ inline TOptions& SetSendBufSize(unsigned size) noexcept {
+ SendBufSize = size;
+
+ return *this;
+ }
+
+ inline TOptions& SetRecvBufSize(unsigned size) noexcept {
+ RecvBufSize = size;
+
+ return *this;
+ }
+
+ inline TOptions& SetReusePort(bool reusePort) noexcept {
+ ReusePort = reusePort;
+
+ return *this;
+ }
+
+ size_t ListenQueue;
+ unsigned SendBufSize;
+ unsigned RecvBufSize;
+ bool EnableDeferAccept;
+ bool ReusePort;
+ };
+
+ class ICallBack {
+ public:
+ struct TAccept {
+ TSocketHolder* S;
+ const TIpAddress* Remote;
+ const TIpAddress* Local;
+ };
+
+ struct TAcceptFull {
+ TSocketHolder* S;
+ const NAddr::IRemoteAddr* Remote;
+ const NAddr::IRemoteAddr* Local;
+ };
+
+ virtual void OnAccept(const TAccept&) {
+ }
+
+ virtual void OnAcceptFull(const TAcceptFull&);
+
+ /*
+ * will be called from catch (...) {} context
+ * so your can re-throw current exception and work around it
+ */
+ virtual void OnError() = 0;
+
+ virtual void OnStop(TSocketHolder*);
+
+ virtual ~ICallBack() {
+ }
+ };
+
+ TContListener(ICallBack* cb, TContExecutor* e, const TOptions& opts = TOptions());
+ ~TContListener();
+
+ /// start listener threads
+ void Listen();
+
+ void Listen(const NAddr::IRemoteAddr& addr);
+ void Listen(const TIpAddress& addr);
+ void Listen(const TNetworkAddress& addr);
+
+ /// bind server on address. Can be called multiple times to bind on more then one address
+ void Bind(const NAddr::IRemoteAddr& addr);
+ void Bind(const TIpAddress& addr);
+ void Bind(const TNetworkAddress& addr);
+
+ void Stop() noexcept;
+
+ void StopListenAddr(const NAddr::IRemoteAddr& addr);
+ void StopListenAddr(const TIpAddress& addr);
+ void StopListenAddr(const TNetworkAddress& addr);
+
+ template <class T>
+ inline void StartListenAddr(const T& addr) {
+ Bind(addr);
+ Listen(addr);
+ }
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
diff --git a/library/cpp/coroutine/listener/ya.make b/library/cpp/coroutine/listener/ya.make
new file mode 100644
index 0000000000..700c3abe3e
--- /dev/null
+++ b/library/cpp/coroutine/listener/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+OWNER(pg)
+
+PEERDIR(
+ library/cpp/coroutine/engine
+)
+
+SRCS(
+ listen.cpp
+)
+
+END()
diff --git a/library/cpp/coroutine/ya.make b/library/cpp/coroutine/ya.make
new file mode 100644
index 0000000000..34e30f2b25
--- /dev/null
+++ b/library/cpp/coroutine/ya.make
@@ -0,0 +1,11 @@
+RECURSE(
+ benchmark
+ dns
+ dns/example
+ engine
+ engine/stack/ut
+ listener
+ test
+ ut
+ util
+)