diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/coroutine | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/coroutine')
50 files changed, 5980 insertions, 0 deletions
diff --git a/library/cpp/coroutine/engine/callbacks.h b/library/cpp/coroutine/engine/callbacks.h new file mode 100644 index 0000000000..e81b17344f --- /dev/null +++ b/library/cpp/coroutine/engine/callbacks.h @@ -0,0 +1,18 @@ +#pragma once + +class TCont; +class TContExecutor; + +namespace NCoro { + class IScheduleCallback { + public: + virtual void OnSchedule(TContExecutor&, TCont&) = 0; + virtual void OnUnschedule(TContExecutor&) = 0; + }; + + class IEnterPollerCallback { + public: + virtual void OnEnterPoller() = 0; + virtual void OnExitPoller() = 0; + }; +} diff --git a/library/cpp/coroutine/engine/condvar.h b/library/cpp/coroutine/engine/condvar.h new file mode 100644 index 0000000000..ffceede6fa --- /dev/null +++ b/library/cpp/coroutine/engine/condvar.h @@ -0,0 +1,38 @@ +#pragma once + +#include "events.h" +#include "mutex.h" + +class TContCondVar { +public: + int WaitD(TCont* current, TContMutex* mutex, TInstant deadline) { + mutex->UnLock(); + + const int ret = WaitQueue_.WaitD(current, deadline); + + if (ret != EWAKEDUP) { + return ret; + } + + return mutex->LockD(current, deadline); + } + + int WaitT(TCont* current, TContMutex* mutex, TDuration timeout) { + return WaitD(current, mutex, timeout.ToDeadLine()); + } + + int WaitI(TCont* current, TContMutex* mutex) { + return WaitD(current, mutex, TInstant::Max()); + } + + void Signal() noexcept { + WaitQueue_.Signal(); + } + + void BroadCast() noexcept { + WaitQueue_.BroadCast(); + } + +private: + TContWaitQueue WaitQueue_; +}; diff --git a/library/cpp/coroutine/engine/cont_poller.cpp b/library/cpp/coroutine/engine/cont_poller.cpp new file mode 100644 index 0000000000..76593d4e9b --- /dev/null +++ b/library/cpp/coroutine/engine/cont_poller.cpp @@ -0,0 +1,70 @@ +#include "cont_poller.h" +#include "impl.h" + +namespace NCoro { + namespace { + template <class T> + int DoExecuteEvent(T* event) noexcept { + auto* cont = event->Cont(); + + if (cont->Cancelled()) { + return ECANCELED; + } + + cont->Executor()->ScheduleIoWait(event); + cont->Switch(); + + if (cont->Cancelled()) { + return ECANCELED; + } + + return event->Status(); + } + } + + void TContPollEvent::Wake() noexcept { + UnLink(); + Cont()->ReSchedule(); + } + + + TInstant TEventWaitQueue::WakeTimedout(TInstant now) noexcept { + TIoWait::TIterator it = IoWait_.Begin(); + + if (it != IoWait_.End()) { + if (it->DeadLine() > now) { + return it->DeadLine(); + } + + do { + (it++)->Wake(ETIMEDOUT); + } while (it != IoWait_.End() && it->DeadLine() <= now); + } + + return now; + } + + void TEventWaitQueue::Register(NCoro::TContPollEvent* event) { + IoWait_.Insert(event); + event->Cont()->Unlink(); + } + + void TEventWaitQueue::Abort() noexcept { + auto visitor = [](TContPollEvent& e) { + e.Cont()->Cancel(); + }; + IoWait_.ForEach(visitor); + } +} + +void TFdEvent::RemoveFromIOWait() noexcept { + this->Cont()->Executor()->Poller()->Remove(this); +} + +int ExecuteEvent(TFdEvent* event) noexcept { + return NCoro::DoExecuteEvent(event); +} + +int ExecuteEvent(TTimerEvent* event) noexcept { + return NCoro::DoExecuteEvent(event); +} diff --git a/library/cpp/coroutine/engine/cont_poller.h b/library/cpp/coroutine/engine/cont_poller.h new file mode 100644 index 0000000000..b638b2df1a --- /dev/null +++ b/library/cpp/coroutine/engine/cont_poller.h @@ -0,0 +1,245 @@ +#pragma once + +#include "poller.h" +#include "sockmap.h" + +#include <library/cpp/containers/intrusive_rb_tree/rb_tree.h> + +#include <util/datetime/base.h> +#include <util/memory/pool.h> +#include <util/memory/smallobj.h> +#include <util/network/init.h> + +#include <cerrno> + + +class TCont; +class TContExecutor; +class TFdEvent; + +namespace NCoro { + + class IPollEvent; + + + struct TContPollEventCompare { + template <class T> + static inline bool Compare(const T& l, const T& r) noexcept { + return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r); + } + }; + + + class TContPollEvent : public TRbTreeItem<TContPollEvent, TContPollEventCompare> { + public: + TContPollEvent(TCont* cont, TInstant deadLine) noexcept + : Cont_(cont) + , DeadLine_(deadLine) + {} + + static bool Compare(const TContPollEvent& l, const TContPollEvent& r) noexcept { + return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r); + } + + int Status() const noexcept { + return Status_; + } + + void SetStatus(int status) noexcept { + Status_ = status; + } + + TCont* Cont() noexcept { + return Cont_; + } + + TInstant DeadLine() const noexcept { + return DeadLine_; + } + + void Wake(int status) noexcept { + SetStatus(status); + Wake(); + } + + private: + void Wake() noexcept; + + private: + TCont* Cont_; + TInstant DeadLine_; + int Status_ = EINPROGRESS; + }; + + + class IPollEvent: public TIntrusiveListItem<IPollEvent> { + public: + IPollEvent(SOCKET fd, ui16 what) noexcept + : Fd_(fd) + , What_(what) + {} + + virtual ~IPollEvent() {} + + SOCKET Fd() const noexcept { + return Fd_; + } + + int What() const noexcept { + return What_; + } + + virtual void OnPollEvent(int status) noexcept = 0; + + private: + SOCKET Fd_; + ui16 What_; + }; + + + template <class T> + class TBigArray { + struct TValue: public T, public TObjectFromPool<TValue> { + TValue() {} + }; + + public: + TBigArray() + : Pool_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance()) + {} + + T* Get(size_t index) { + TRef& ret = Lst_.Get(index); + if (!ret) { + ret = TRef(new (&Pool_) TValue()); + } + return ret.Get(); + } + + private: + using TRef = THolder<TValue>; + typename TValue::TPool Pool_; + TSocketMap<TRef> Lst_; + }; + + + using TPollEventList = TIntrusiveList<IPollEvent>; + + class TContPoller { + public: + using TEvent = IPollerFace::TEvent; + using TEvents = IPollerFace::TEvents; + + TContPoller() + : P_(IPollerFace::Default()) + { + } + + explicit TContPoller(THolder<IPollerFace> poller) + : P_(std::move(poller)) + {} + + void Schedule(IPollEvent* event) { + auto* lst = Lists_.Get(event->Fd()); + const ui16 oldFlags = Flags(*lst); + lst->PushFront(event); + ui16 newFlags = Flags(*lst); + + if (newFlags != oldFlags) { + if (oldFlags) { + newFlags |= CONT_POLL_MODIFY; + } + + P_->Set(lst, event->Fd(), newFlags); + } + } + + void Remove(IPollEvent* event) noexcept { + auto* lst = Lists_.Get(event->Fd()); + const ui16 oldFlags = Flags(*lst); + event->Unlink(); + ui16 newFlags = Flags(*lst); + + if (newFlags != oldFlags) { + if (newFlags) { + newFlags |= CONT_POLL_MODIFY; + } + + P_->Set(lst, event->Fd(), newFlags); + } + } + + void Wait(TEvents& events, TInstant deadLine) { + events.clear(); + P_->Wait(events, deadLine); + } + + EContPoller PollEngine() const { + return P_->PollEngine(); + } + private: + static ui16 Flags(TIntrusiveList<IPollEvent>& lst) noexcept { + ui16 ret = 0; + for (auto&& item : lst) { + ret |= item.What(); + } + return ret; + } + + private: + TBigArray<TPollEventList> Lists_; + THolder<IPollerFace> P_; + }; + + + class TEventWaitQueue { + using TIoWait = TRbTree<NCoro::TContPollEvent, NCoro::TContPollEventCompare>; + + public: + void Register(NCoro::TContPollEvent* event); + + bool Empty() const noexcept { + return IoWait_.Empty(); + } + + void Abort() noexcept; + + TInstant WakeTimedout(TInstant now) noexcept; + + private: + TIoWait IoWait_; + }; +} + +class TFdEvent final: + public NCoro::TContPollEvent, + public NCoro::IPollEvent +{ +public: + TFdEvent(TCont* cont, SOCKET fd, ui16 what, TInstant deadLine) noexcept + : TContPollEvent(cont, deadLine) + , IPollEvent(fd, what) + {} + + ~TFdEvent() { + RemoveFromIOWait(); + } + + void RemoveFromIOWait() noexcept; + + void OnPollEvent(int status) noexcept override { + Wake(status); + } +}; + + +class TTimerEvent: public NCoro::TContPollEvent { +public: + TTimerEvent(TCont* cont, TInstant deadLine) noexcept + : TContPollEvent(cont, deadLine) + {} +}; + +int ExecuteEvent(TFdEvent* event) noexcept; + +int ExecuteEvent(TTimerEvent* event) noexcept; diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp new file mode 100644 index 0000000000..8b372496a2 --- /dev/null +++ b/library/cpp/coroutine/engine/coroutine_ut.cpp @@ -0,0 +1,1007 @@ +#include "impl.h" +#include "condvar.h" +#include "network.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/string/cast.h> +#include <util/system/pipe.h> +#include <util/system/env.h> +#include <util/system/info.h> +#include <util/system/thread.h> +#include <util/generic/xrange.h> +#include <util/generic/serialized_enum.h> + +// TODO (velavokr): BALANCER-1345 add more tests on pollers + +class TCoroTest: public TTestBase { + UNIT_TEST_SUITE(TCoroTest); + UNIT_TEST(TestSimpleX1); + UNIT_TEST(TestSimpleX1MultiThread); + UNIT_TEST(TestSimpleX2); + UNIT_TEST(TestSimpleX3); + UNIT_TEST(TestMemFun); + UNIT_TEST(TestMutex); + UNIT_TEST(TestCondVar); + UNIT_TEST(TestJoinDefault); + UNIT_TEST(TestJoinEpoll); + UNIT_TEST(TestJoinKqueue); + UNIT_TEST(TestJoinPoll); + UNIT_TEST(TestJoinSelect); + UNIT_TEST(TestException); + UNIT_TEST(TestJoinCancelExitRaceBug); + UNIT_TEST(TestWaitWakeLivelockBug); + UNIT_TEST(TestFastPathWakeDefault) + // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs +// UNIT_TEST(TestFastPathWakeEpoll) + UNIT_TEST(TestFastPathWakeKqueue) + UNIT_TEST(TestFastPathWakePoll) + UNIT_TEST(TestFastPathWakeSelect) + UNIT_TEST(TestLegacyCancelYieldRaceBug) + UNIT_TEST(TestJoinRescheduleBug); + UNIT_TEST(TestEventQueue) + UNIT_TEST(TestNestedExecutor) + UNIT_TEST(TestComputeCoroutineYield) + UNIT_TEST(TestPollEngines); + UNIT_TEST(TestUserEvent); + UNIT_TEST(TestPause); + UNIT_TEST(TestOverrideTime); + UNIT_TEST_SUITE_END(); + +public: + void TestException(); + void TestSimpleX1(); + void TestSimpleX1MultiThread(); + void TestSimpleX2(); + void TestSimpleX3(); + void TestMemFun(); + void TestMutex(); + void TestCondVar(); + void TestJoinDefault(); + void TestJoinEpoll(); + void TestJoinKqueue(); + void TestJoinPoll(); + void TestJoinSelect(); + void TestJoinCancelExitRaceBug(); + void TestWaitWakeLivelockBug(); + void TestFastPathWakeDefault(); + void TestFastPathWakeEpoll(); + void TestFastPathWakeKqueue(); + void TestFastPathWakePoll(); + void TestFastPathWakeSelect(); + void TestLegacyCancelYieldRaceBug(); + void TestJoinRescheduleBug(); + void TestEventQueue(); + void TestNestedExecutor(); + void TestComputeCoroutineYield(); + void TestPollEngines(); + void TestUserEvent(); + void TestPause(); + void TestOverrideTime(); +}; + +void TCoroTest::TestException() { + TContExecutor e(1000000); + + bool f2run = false; + + auto f1 = [&f2run](TCont* c) { + struct TCtx { + ~TCtx() { + Y_VERIFY(!*F2); + + C->Yield(); + } + + TCont* C; + bool* F2; + }; + + try { + TCtx ctx = {c, &f2run}; + + throw 1; + } catch (...) { + } + }; + + bool unc = true; + + auto f2 = [&unc, &f2run](TCont*) { + f2run = true; + unc = std::uncaught_exception(); + + // check segfault + try { + throw 2; + } catch (int) { + } + }; + + e.Create(f1, "f1"); + e.Create(f2, "f2"); + e.Execute(); + + UNIT_ASSERT(!unc); +} + +static int i0; + +static void CoRun(TCont* c, void* /*run*/) { + while (i0 < 100000) { + ++i0; + UNIT_ASSERT(RunningCont() == c); + c->Yield(); + UNIT_ASSERT(RunningCont() == c); + } +} + +static void CoMain(TCont* c, void* /*arg*/) { + for (volatile size_t i2 = 0; i2 < 10; ++i2) { + UNIT_ASSERT(RunningCont() == c); + c->Executor()->Create(CoRun, nullptr, "run"); + UNIT_ASSERT(RunningCont() == c); + } +} + +void TCoroTest::TestSimpleX1() { + i0 = 0; + TContExecutor e(32000); + + UNIT_ASSERT(RunningCont() == nullptr); + + e.Execute(CoMain); + UNIT_ASSERT_VALUES_EQUAL(i0, 100000); + + UNIT_ASSERT(RunningCont() == nullptr); +} + +void TCoroTest::TestSimpleX1MultiThread() { + TVector<THolder<TThread>> threads; + const size_t nThreads = 0; + TAtomic c = 0; + for (size_t i = 0; i < nThreads; ++i) { + threads.push_back(MakeHolder<TThread>([&]() { + TestSimpleX1(); + AtomicIncrement(c); + })); + } + + for (auto& t : threads) { + t->Start(); + } + + for (auto& t: threads) { + t->Join(); + } + + UNIT_ASSERT_EQUAL(c, nThreads); +} + +struct TTestObject { + int i = 0; + int j = 0; + +public: + void RunTask1(TCont*) { + i = 1; + } + void RunTask2(TCont*) { + j = 2; + } +}; + +void TCoroTest::TestMemFun() { + i0 = 0; + TContExecutor e(32000); + TTestObject obj; + e.Create<TTestObject, &TTestObject::RunTask1>(&obj, "test1"); + e.Execute<TTestObject, &TTestObject::RunTask2>(&obj); + UNIT_ASSERT_EQUAL(obj.i, 1); + UNIT_ASSERT_EQUAL(obj.j, 2); +} + +void TCoroTest::TestSimpleX2() { + { + i0 = 0; + + { + TContExecutor e(32000); + e.Execute(CoMain); + } + + UNIT_ASSERT_EQUAL(i0, 100000); + } + + { + i0 = 0; + + { + TContExecutor e(32000); + e.Execute(CoMain); + } + + UNIT_ASSERT_EQUAL(i0, 100000); + } +} + +struct TRunner { + inline TRunner() + : Runs(0) + { + } + + inline void operator()(TCont* c) { + ++Runs; + c->Yield(); + } + + size_t Runs; +}; + +void TCoroTest::TestSimpleX3() { + TContExecutor e(32000); + TRunner runner; + + for (volatile size_t i3 = 0; i3 < 1000; ++i3) { + e.Create(runner, "runner"); + } + + e.Execute(); + + UNIT_ASSERT_EQUAL(runner.Runs, 1000); +} + +static TString res; +static TContMutex mutex; + +static void CoMutex(TCont* c, void* /*run*/) { + { + mutex.LockI(c); + c->Yield(); + res += c->Name(); + mutex.UnLock(); + } + + c->Yield(); + + { + mutex.LockI(c); + c->Yield(); + res += c->Name(); + mutex.UnLock(); + } +} + +static void CoMutexTest(TCont* c, void* /*run*/) { + c->Executor()->Create(CoMutex, nullptr, "1"); + c->Executor()->Create(CoMutex, nullptr, "2"); +} + +void TCoroTest::TestMutex() { + TContExecutor e(32000); + e.Execute(CoMutexTest); + UNIT_ASSERT_EQUAL(res, "1212"); + res.clear(); +} + +static TContMutex m1; +static TContCondVar c1; + +static void CoCondVar(TCont* c, void* /*run*/) { + for (size_t i4 = 0; i4 < 3; ++i4) { + UNIT_ASSERT_EQUAL(m1.LockI(c), 0); + UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0); + res += c->Name(); + m1.UnLock(); + } +} + +static void CoCondVarTest(TCont* c, void* /*run*/) { + c->Executor()->Create(CoCondVar, nullptr, "1"); + c->Yield(); + c->Executor()->Create(CoCondVar, nullptr, "2"); + c->Yield(); + c->Executor()->Create(CoCondVar, nullptr, "3"); + c->Yield(); + c->Executor()->Create(CoCondVar, nullptr, "4"); + c->Yield(); + c->Executor()->Create(CoCondVar, nullptr, "5"); + c->Yield(); + c->Executor()->Create(CoCondVar, nullptr, "6"); + c->Yield(); + + for (size_t i5 = 0; i5 < 3; ++i5) { + res += ToString((size_t)i5) + "^"; + c1.BroadCast(); + c->Yield(); + } +} + +void TCoroTest::TestCondVar() { + TContExecutor e(32000); + e.Execute(CoCondVarTest); + UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456"); + res.clear(); +} + +namespace NCoroTestJoin { + struct TSleepCont { + const TInstant Deadline; + int Result; + + inline void operator()(TCont* c) { + Result = c->SleepD(Deadline); + } + }; + + struct TReadCont { + const TInstant Deadline; + const SOCKET Sock; + int Result; + + inline void operator()(TCont* c) { + char buf = 0; + Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status(); + } + }; + + struct TJoinCont { + const TInstant Deadline; + TCont* const Cont; + bool Result; + + inline void operator()(TCont* c) { + Result = c->Join(Cont, Deadline); + } + }; + + void DoTestJoin(EContPoller pollerType) { + auto poller = IPollerFace::Construct(pollerType); + + if (!poller) { + return; + } + + TContExecutor e(32000, std::move(poller)); + + TPipe in, out; + TPipe::Pipe(in, out); + SetNonBlock(in.GetHandle()); + + { + TSleepCont sc = {TInstant::Max(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(sc.Result, ECANCELED); + UNIT_ASSERT_EQUAL(jc.Result, false); + } + + { + TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT); + UNIT_ASSERT_EQUAL(jc.Result, true); + } + + { + TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(sc.Result, ECANCELED); + UNIT_ASSERT_EQUAL(jc.Result, false); + } + + { + TReadCont rc = {TInstant::Max(), in.GetHandle(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(rc.Result, ECANCELED); + UNIT_ASSERT_EQUAL(jc.Result, false); + } + + { + TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT); + UNIT_ASSERT_EQUAL(jc.Result, true); + } + + { + TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(rc.Result, ECANCELED); + UNIT_ASSERT_EQUAL(jc.Result, false); + } + } +} + +void TCoroTest::TestJoinDefault() { + NCoroTestJoin::DoTestJoin(EContPoller::Default); +} + +void TCoroTest::TestJoinEpoll() { + NCoroTestJoin::DoTestJoin(EContPoller::Epoll); +} + +void TCoroTest::TestJoinKqueue() { + NCoroTestJoin::DoTestJoin(EContPoller::Kqueue); +} + +void TCoroTest::TestJoinPoll() { + NCoroTestJoin::DoTestJoin(EContPoller::Poll); +} + +void TCoroTest::TestJoinSelect() { + NCoroTestJoin::DoTestJoin(EContPoller::Select); +} + +namespace NCoroJoinCancelExitRaceBug { + struct TState { + TCont* Sub = nullptr; + }; + + static void DoAux(TCont*, void* argPtr) noexcept { + TState& state = *(TState*)(argPtr); + + // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]} + state.Sub->Cancel(); + } + + static void DoSub2(TCont*, void*) noexcept { + // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]} + // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]} + } + + static void DoSub(TCont* cont, void* argPtr) noexcept { + TState& state = *(TState*)(argPtr); + state.Sub = cont; + + // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]} + auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2"); + + // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux) + // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]} + // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit + cont->Join(sub2); + + state.Sub = nullptr; + } + + static void DoMain(TCont* cont) noexcept { + TState state; + + // 01.{Ready:[]} > {Ready:[Sub]} + auto* sub = cont->Executor()->Create(DoSub, &state, "Sub"); + + // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]} + cont->Executor()->Create(DoAux, &state, "Aux"); + + // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub) + cont->Join(sub); + } +} + +void TCoroTest::TestJoinCancelExitRaceBug() { + TContExecutor exec(20000); + exec.SetFailOnError(true); + exec.Execute(NCoroJoinCancelExitRaceBug::DoMain); +} + +namespace NCoroWaitWakeLivelockBug { + struct TState; + + struct TSubState { + TSubState(TState& parent, ui32 self) + : Parent(parent) + , Name(TStringBuilder() << "Sub" << self) + , Self(self) + { + UNIT_ASSERT(self < 2); + } + + TSubState& OtherState(); + + TState& Parent; + TTimerEvent* Event = nullptr; + TCont* Cont = nullptr; + TString Name; + ui32 Self = -1; + }; + + struct TState { + TState() + : Subs{{*this, 0}, {*this, 1}} + {} + + TSubState Subs[2]; + bool Stop = false; + }; + + TSubState& TSubState::OtherState() { + return Parent.Subs[1 - Self]; + } + + static void DoStop(TCont* cont, void* argPtr) { + TState& state = *(TState*)(argPtr); + + TTimerEvent event(cont, TInstant::Now()); + ExecuteEvent(&event); + state.Stop = true; + for (auto& sub: state.Subs) { + if (sub.Event) { + sub.Event->Wake(EWAKEDUP); + } + } + } + + static void DoSub(TCont* cont, void* argPtr) { + TSubState& state = *(TSubState*)(argPtr); + + while (!state.Parent.Stop) { + TTimerEvent event(cont, TInstant::Max()); + if (state.OtherState().Event) { + state.OtherState().Event->Wake(EWAKEDUP); + } + state.Event = &event; + ExecuteEvent(&event); + state.Event = nullptr; + } + + state.Cont = nullptr; + } + + static void DoMain(TCont* cont) noexcept { + TState state; + + for (auto& subState : state.Subs) { + subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data()); + } + + cont->Join( + cont->Executor()->Create(DoStop, &state, "Stop") + ); + + for (auto& subState : state.Subs) { + if (subState.Cont) { + cont->Join(subState.Cont); + } + } + } +} + +void TCoroTest::TestWaitWakeLivelockBug() { + TContExecutor exec(20000); + exec.SetFailOnError(true); + exec.Execute(NCoroWaitWakeLivelockBug::DoMain); +} + +namespace NCoroTestFastPathWake { + struct TState; + + struct TSubState { + TSubState(TState& parent, ui32 self) + : Parent(parent) + , Name(TStringBuilder() << "Sub" << self) + {} + + TState& Parent; + TInstant Finish; + TTimerEvent* Event = nullptr; + TCont* Cont = nullptr; + TString Name; + }; + + struct TState { + TState() + : Subs{{*this, 0}, {*this, 1}} + { + TPipe::Pipe(In, Out); + SetNonBlock(In.GetHandle()); + } + + TSubState Subs[2]; + TPipe In, Out; + bool IoSleepRunning = false; + }; + + static void DoIoSleep(TCont* cont, void* argPtr) noexcept { + try { + TState& state = *(TState*) (argPtr); + state.IoSleepRunning = true; + + TTempBuf tmp; + // Wait for the event from io + auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine()); + UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0); + state.IoSleepRunning = false; + } catch (const NUnitTest::TAssertException& ex) { + Cerr << ex.AsStrBuf() << Endl; + ex.BackTrace()->PrintTo(Cerr); + throw; + } catch (...) { + Cerr << CurrentExceptionMessage() << Endl; + throw; + } + } + + static void DoSub(TCont* cont, void* argPtr) noexcept { + TSubState& state = *(TSubState*)(argPtr); + + TTimerEvent event(cont, TInstant::Max()); + state.Event = &event; + ExecuteEvent(&event); + state.Event = nullptr; + state.Cont = nullptr; + state.Finish = TInstant::Now(); + } + + static void DoMain(TCont* cont) noexcept { + try { + TState state; + TInstant start = TInstant::Now(); + + // This guy sleeps on io + auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper"); + + // These guys are to be woken up right away + for (auto& subState : state.Subs) { + subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data()); + } + + // Give way + cont->Yield(); + + // Check everyone has started, wake those to be woken + UNIT_ASSERT(state.IoSleepRunning); + + for (auto& subState : state.Subs) { + UNIT_ASSERT(subState.Event); + subState.Event->Wake(EWAKEDUP); + } + + // Give way again + cont->Yield(); + + // Check the woken guys have finished and quite soon + for (auto& subState : state.Subs) { + UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100)); + UNIT_ASSERT(!subState.Cont); + } + + // Wake the io guy and finish + state.Out.Close(); + + if (state.IoSleepRunning) { + cont->Join(sleeper); + } + + // Check everything has ended sooner than the timeout + UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1)); + } catch (const NUnitTest::TAssertException& ex) { + Cerr << ex.AsStrBuf() << Endl; + ex.BackTrace()->PrintTo(Cerr); + throw; + } catch (...) { + Cerr << CurrentExceptionMessage() << Endl; + throw; + } + } + + static void DoTestFastPathWake(EContPoller pollerType) { + if (auto poller = IPollerFace::Construct(pollerType)) { + TContExecutor exec(20000, std::move(poller)); + exec.SetFailOnError(true); + exec.Execute(NCoroTestFastPathWake::DoMain); + } + } +} + +void TCoroTest::TestFastPathWakeDefault() { + NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default); +} + +void TCoroTest::TestFastPathWakeEpoll() { + NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll); +} + +void TCoroTest::TestFastPathWakeKqueue() { + NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue); +} + +void TCoroTest::TestFastPathWakePoll() { + NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll); +} + +void TCoroTest::TestFastPathWakeSelect() { + NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select); +} + +namespace NCoroTestLegacyCancelYieldRaceBug { + enum class EState { + Idle, Running, Finished, + }; + + struct TState { + EState SubState = EState::Idle; + }; + + static void DoSub(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + state.SubState = EState::Running; + cont->Yield(); + cont->Yield(); + state.SubState = EState::Finished; + } + + static void DoMain(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub"); + sub->Cancel(); + cont->Yield(); + UNIT_ASSERT_EQUAL(state.SubState, EState::Finished); + } +} + +void TCoroTest::TestLegacyCancelYieldRaceBug() { + NCoroTestLegacyCancelYieldRaceBug::TState state; + TContExecutor exec(20000); + exec.SetFailOnError(true); + exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state); +} + +namespace NCoroTestJoinRescheduleBug { + enum class EState { + Idle, Running, Finished, + }; + + struct TState { + TCont* volatile SubA = nullptr; + volatile EState SubAState = EState::Idle; + volatile EState SubBState = EState::Idle; + volatile EState SubCState = EState::Idle; + }; + + static void DoSubC(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + state.SubCState = EState::Running; + while (state.SubBState != EState::Running) { + cont->Yield(); + } + while (cont->SleepD(TInstant::Max()) != ECANCELED) { + } + state.SubCState = EState::Finished; + } + + static void DoSubB(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + state.SubBState = EState::Running; + while (state.SubAState != EState::Running && state.SubCState != EState::Running) { + cont->Yield(); + } + for (auto i : xrange(100)) { + Y_UNUSED(i); + if (!state.SubA) { + break; + } + state.SubA->ReSchedule(); + cont->Yield(); + } + state.SubBState = EState::Finished; + } + + static void DoSubA(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + state.SubAState = EState::Running; + TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC"); + while (state.SubBState != EState::Running && state.SubCState != EState::Running) { + cont->Yield(); + } + cont->Join(subC); + UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished); + state.SubA = nullptr; + state.SubAState = EState::Finished; + } + + static void DoMain(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA"); + state.SubA = subA; + cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB")); + + if (state.SubA) { + subA->Cancel(); + cont->Join(subA); + } + } +} + +void TCoroTest::TestJoinRescheduleBug() { + using namespace NCoroTestJoinRescheduleBug; + TState state; + { + TContExecutor exec(20000); + exec.Execute(DoMain, &state); + } + UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished); + UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished); + UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished); +} + +void TCoroTest::TestEventQueue() { + NCoro::TEventWaitQueue queue; + UNIT_ASSERT(queue.Empty()); + UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant()); + TContExecutor exec(32000); + exec.Execute([](TCont* cont, void* arg) { + NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg; + TTimerEvent ev(cont, TInstant::Max()); + TTimerEvent ev2(cont, TInstant::Seconds(12345)); + q->Register(&ev); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max()); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max()); + q->Register(&ev2); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345)); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345)); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345)); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max()); + }, &queue); +} + +void TCoroTest::TestNestedExecutor() { +#ifndef _win_ + //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr + TContExecutor exec(32000); + UNIT_ASSERT(!RunningCont()); + + exec.Execute([](TCont* cont, void*) { + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont); + + TContExecutor exec2(32000); + exec2.Execute([](TCont* cont2, void*) { + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2); + TContExecutor exec3(32000); + exec3.Execute([](TCont* cont3, void*) { + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont3); + }); + + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont2); + }); + + UNIT_ASSERT_VALUES_EQUAL(RunningCont(), cont); + }); + + UNIT_ASSERT(!RunningCont()); +#endif +} + +void TCoroTest::TestComputeCoroutineYield() { +//if we have busy (e.g., on cpu) coroutine, when it yields, io must flow + TContExecutor exec(32000); + exec.SetFailOnError(true); + + TPipe in, out; + TPipe::Pipe(in, out); + SetNonBlock(in.GetHandle()); + size_t lastRead = 42; + + auto compute = [&](TCont* cont) { + for (size_t i = 0; i < 10; ++i) { + write(out.GetHandle(), &i, sizeof i); + Sleep(TDuration::MilliSeconds(10)); + cont->Yield(); + UNIT_ASSERT(lastRead == i); + } + }; + + auto io = [&](TCont* cont) { + for (size_t i = 0; i < 10; ++i) { + NCoro::ReadI(cont, in.GetHandle(), &lastRead, sizeof lastRead); + } + }; + + exec.Create(compute, "compute"); + exec.Create(io, "io"); + + exec.Execute(); +} + +void TCoroTest::TestPollEngines() { + bool defaultChecked = false; + for (auto engine : GetEnumAllValues<EContPoller>()) { + auto poller = IPollerFace::Construct(engine); + if (!poller) { + continue; + } + + TContExecutor exec(32000, IPollerFace::Construct(engine)); + + if (engine == EContPoller::Default) { + defaultChecked = true; + UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), EContPoller::Combined); + } else { + UNIT_ASSERT_VALUES_EQUAL(exec.Poller()->PollEngine(), engine); + } + } + + UNIT_ASSERT(defaultChecked); +} + +void TCoroTest::TestPause() { + TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing()}; + + int i = 0; + executor.CreateOwned([&](TCont*) { + i++; + executor.Pause(); + i++; + }, "coro"); + + UNIT_ASSERT_EQUAL(i, 0); + executor.Execute(); + UNIT_ASSERT_EQUAL(i, 1); + executor.Execute(); + UNIT_ASSERT_EQUAL(i, 2); +} + +void TCoroTest::TestUserEvent() { + TContExecutor exec(32000); + + struct TUserEvent : public IUserEvent { + bool Called = false; + void Execute() override { + Called = true; + } + } event; + + auto f = [&](TCont* cont) { + UNIT_ASSERT(!event.Called); + exec.ScheduleUserEvent(&event); + UNIT_ASSERT(!event.Called); + cont->Yield(); + UNIT_ASSERT(event.Called); + }; + + exec.Execute(f); + + UNIT_ASSERT(event.Called); +} + +void TCoroTest::TestOverrideTime() { + class TTime: public NCoro::ITime { + public: + TInstant Now() override { + return Current; + } + + TInstant Current = TInstant::Zero(); + }; + + TTime time; + TContExecutor executor{1024*1024, IPollerFace::Default(), nullptr, nullptr, NCoro::NStack::EGuard::Canary, Nothing(), &time}; + + executor.CreateOwned([&](TCont* cont) { + UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Zero()); + time.Current = TInstant::Seconds(1); + cont->SleepD(TInstant::Seconds(1)); + UNIT_ASSERT_EQUAL(cont->Executor()->Now(), TInstant::Seconds(1)); + }, "coro"); + + executor.Execute(); +} +UNIT_TEST_SUITE_REGISTRATION(TCoroTest); diff --git a/library/cpp/coroutine/engine/custom_time.h b/library/cpp/coroutine/engine/custom_time.h new file mode 100644 index 0000000000..0bff0f6a60 --- /dev/null +++ b/library/cpp/coroutine/engine/custom_time.h @@ -0,0 +1,10 @@ +#pragma once + +#include <util/datetime/base.h> + +namespace NCoro { +class ITime { + public: + virtual TInstant Now() = 0; +}; +} diff --git a/library/cpp/coroutine/engine/events.h b/library/cpp/coroutine/engine/events.h new file mode 100644 index 0000000000..07cc4d25e8 --- /dev/null +++ b/library/cpp/coroutine/engine/events.h @@ -0,0 +1,148 @@ +#pragma once + +#include "impl.h" + +#include <util/datetime/base.h> + +class TContEvent { +public: + TContEvent(TCont* current) noexcept + : Cont_(current) + , Status_(0) + { + } + + ~TContEvent() { + } + + int WaitD(TInstant deadline) { + Status_ = 0; + const int ret = Cont_->SleepD(deadline); + + return Status_ ? Status_ : ret; + } + + int WaitT(TDuration timeout) { + return WaitD(timeout.ToDeadLine()); + } + + int WaitI() { + return WaitD(TInstant::Max()); + } + + void Wake() noexcept { + SetStatus(EWAKEDUP); + Cont_->ReSchedule(); + } + + TCont* Cont() noexcept { + return Cont_; + } + + int Status() const noexcept { + return Status_; + } + + void SetStatus(int status) noexcept { + Status_ = status; + } + +private: + TCont* Cont_; + int Status_; +}; + +class TContWaitQueue { + class TWaiter: public TContEvent, public TIntrusiveListItem<TWaiter> { + public: + TWaiter(TCont* current) noexcept + : TContEvent(current) + { + } + + ~TWaiter() { + } + }; + +public: + TContWaitQueue() noexcept { + } + + ~TContWaitQueue() { + Y_ASSERT(Waiters_.Empty()); + } + + int WaitD(TCont* current, TInstant deadline) { + TWaiter waiter(current); + + Waiters_.PushBack(&waiter); + + return waiter.WaitD(deadline); + } + + int WaitT(TCont* current, TDuration timeout) { + return WaitD(current, timeout.ToDeadLine()); + } + + int WaitI(TCont* current) { + return WaitD(current, TInstant::Max()); + } + + void Signal() noexcept { + if (!Waiters_.Empty()) { + Waiters_.PopFront()->Wake(); + } + } + + void BroadCast() noexcept { + while (!Waiters_.Empty()) { + Waiters_.PopFront()->Wake(); + } + } + + void BroadCast(size_t number) noexcept { + for (size_t i = 0; i < number && !Waiters_.Empty(); ++i) { + Waiters_.PopFront()->Wake(); + } + } + +private: + TIntrusiveList<TWaiter> Waiters_; +}; + + +class TContSimpleEvent { +public: + TContSimpleEvent(TContExecutor* e) + : E_(e) + { + } + + TContExecutor* Executor() const noexcept { + return E_; + } + + void Signal() noexcept { + Q_.Signal(); + } + + void BroadCast() noexcept { + Q_.BroadCast(); + } + + int WaitD(TInstant deadLine) noexcept { + return Q_.WaitD(E_->Running(), deadLine); + } + + int WaitT(TDuration timeout) noexcept { + return WaitD(timeout.ToDeadLine()); + } + + int WaitI() noexcept { + return WaitD(TInstant::Max()); + } + +private: + TContWaitQueue Q_; + TContExecutor* E_; +}; diff --git a/library/cpp/coroutine/engine/helper.cpp b/library/cpp/coroutine/engine/helper.cpp new file mode 100644 index 0000000000..bffe208dc8 --- /dev/null +++ b/library/cpp/coroutine/engine/helper.cpp @@ -0,0 +1,37 @@ +#include "helper.h" + +#include "impl.h" +#include "network.h" + +namespace NCoro { + + bool TryConnect(const TString& host, ui16 port, TDuration timeout) { + bool connected = false; + + auto f = [&connected, &host, port, timeout](TCont* c) { + TSocketHolder socket; + TNetworkAddress address(host, port); + connected = (0 == NCoro::ConnectT(c, socket, address, timeout)); + }; + + TContExecutor e(128 * 1024); + e.Create(f, "try_connect"); + e.Execute(); + return connected; + } + + bool WaitUntilConnectable(const TString& host, ui16 port, TDuration timeout) { + const TInstant deadline = timeout.ToDeadLine(); + + for (size_t i = 1; Now() < deadline; ++i) { + const TDuration waitTime = TDuration::MilliSeconds(100) * i * i; + SleepUntil(Min(Now() + waitTime, deadline)); + + if (TryConnect(host, port, waitTime)) { + return true; + } + } + + return false; + } +} diff --git a/library/cpp/coroutine/engine/helper.h b/library/cpp/coroutine/engine/helper.h new file mode 100644 index 0000000000..ec2711ba52 --- /dev/null +++ b/library/cpp/coroutine/engine/helper.h @@ -0,0 +1,15 @@ +#pragma once + +#include <util/generic/string.h> +#include <util/datetime/base.h> + +namespace NCoro { + + // @brief check that address `host`:`port` is connectable + bool TryConnect(const TString& host, ui16 port, TDuration timeout = TDuration::Seconds(1)); + + // @brief waits until address `host`:`port` became connectable, but not more than timeout + // @return true on success, false if timeout exceeded + bool WaitUntilConnectable(const TString& host, ui16 port, TDuration timeout); + +} diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp new file mode 100644 index 0000000000..7ae6f74051 --- /dev/null +++ b/library/cpp/coroutine/engine/impl.cpp @@ -0,0 +1,374 @@ +#include "impl.h" + +#include "stack/stack_allocator.h" +#include "stack/stack_guards.h" + +#include <util/generic/scope.h> +#include <util/thread/singleton.h> +#include <util/stream/format.h> +#include <util/stream/output.h> +#include <util/system/yassert.h> + + +TCont::TJoinWait::TJoinWait(TCont& c) noexcept + : Cont_(c) +{} + +void TCont::TJoinWait::Wake() noexcept { + Cont_.ReSchedule(); +} + +TCont::TCont(NCoro::NStack::IAllocator& allocator, + uint32_t stackSize, + TContExecutor& executor, + NCoro::TTrampoline::TFunc func, + const char* name) noexcept + : Executor_(executor) + , Name_(name) + , Trampoline_( + allocator, + stackSize, + std::move(func), + this + ) +{} + + +void TCont::PrintMe(IOutputStream& out) const noexcept { + out << "cont(" + << "name = " << Name_ << ", " + << "addr = " << Hex((size_t)this) + << ")"; +} + +bool TCont::Join(TCont* c, TInstant deadLine) noexcept { + TJoinWait ev(*this); + c->Waiters_.PushBack(&ev); + + do { + if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) { + if (!ev.Empty()) { + c->Cancel(); + + do { + Switch(); + } while (!ev.Empty()); + } + + return false; + } + } while (!ev.Empty()); + + return true; +} + +int TCont::SleepD(TInstant deadline) noexcept { + TTimerEvent event(this, deadline); + + return ExecuteEvent(&event); +} + +void TCont::Switch() noexcept { + Executor()->RunScheduler(); +} + +void TCont::Yield() noexcept { + if (SleepD(TInstant::Zero())) { + ReScheduleAndSwitch(); + } +} + +void TCont::ReScheduleAndSwitch() noexcept { + ReSchedule(); + Switch(); +} + +void TCont::Terminate() { + while (!Waiters_.Empty()) { + Waiters_.PopFront()->Wake(); + } + Executor()->Exit(this); +} + +bool TCont::IAmRunning() const noexcept { + return this == Executor()->Running(); +} + +void TCont::Cancel() noexcept { + if (Cancelled()) { + return; + } + + Cancelled_ = true; + + if (!IAmRunning()) { + ReSchedule(); + } +} + +void TCont::ReSchedule() noexcept { + if (Cancelled()) { + // Legacy code may expect a Cancelled coroutine to be scheduled without delay. + Executor()->ScheduleExecutionNow(this); + } else { + Executor()->ScheduleExecution(this); + } +} + + +TContExecutor::TContExecutor( + uint32_t defaultStackSize, + THolder<IPollerFace> poller, + NCoro::IScheduleCallback* scheduleCallback, + NCoro::IEnterPollerCallback* enterPollerCallback, + NCoro::NStack::EGuard defaultGuard, + TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings, + NCoro::ITime* time +) + : ScheduleCallback_(scheduleCallback) + , EnterPollerCallback_(enterPollerCallback) + , DefaultStackSize_(defaultStackSize) + , Poller_(std::move(poller)) + , Time_(time) +{ + StackAllocator_ = NCoro::NStack::GetAllocator(poolSettings, defaultGuard); +} + +TContExecutor::~TContExecutor() { + Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_); +} + +void TContExecutor::Execute() noexcept { + auto nop = [](void*){}; + Execute(nop); +} + +void TContExecutor::Execute(TContFunc func, void* arg) noexcept { + CreateOwned([=](TCont* cont) { + func(cont, arg); + }, "sys_main"); + RunScheduler(); +} + +void TContExecutor::WaitForIO() { + while (Ready_.Empty() && !WaitQueue_.Empty()) { + const auto now = Now(); + + // Waking a coroutine puts it into ReadyNext_ list + const auto next = WaitQueue_.WakeTimedout(now); + + if (!UserEvents_.Empty()) { + TIntrusiveList<IUserEvent> userEvents; + userEvents.Swap(UserEvents_); + do { + userEvents.PopFront()->Execute(); + } while (!userEvents.Empty()); + } + + // Polling will return as soon as there is an event to process or a timeout. + // If there are woken coroutines we do not want to sleep in the poller + // yet still we want to check for new io + // to prevent ourselves from locking out of io by constantly waking coroutines. + + if (ReadyNext_.Empty()) { + if (EnterPollerCallback_) { + EnterPollerCallback_->OnEnterPoller(); + } + Poll(next); + if (EnterPollerCallback_) { + EnterPollerCallback_->OnExitPoller(); + } + } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) { + if (EnterPollerCallback_) { + EnterPollerCallback_->OnEnterPoller(); + } + Poll(now); + if (EnterPollerCallback_) { + EnterPollerCallback_->OnExitPoller(); + } + } + + Ready_.Append(ReadyNext_); + } +} + +void TContExecutor::Poll(TInstant deadline) { + Poller_.Wait(PollerEvents_, deadline); + LastPoll_ = Now(); + + // Waking a coroutine puts it into ReadyNext_ list + for (auto event : PollerEvents_) { + auto* lst = (NCoro::TPollEventList*)event.Data; + const int status = event.Status; + + if (status) { + for (auto it = lst->Begin(); it != lst->End();) { + (it++)->OnPollEvent(status); + } + } else { + const ui16 filter = event.Filter; + + for (auto it = lst->Begin(); it != lst->End();) { + if (it->What() & filter) { + (it++)->OnPollEvent(0); + } else { + ++it; + } + } + } + } +} + +void TContExecutor::Abort() noexcept { + WaitQueue_.Abort(); + auto visitor = [](TCont* c) { + c->Cancel(); + }; + Ready_.ForEach(visitor); + ReadyNext_.ForEach(visitor); +} + +TCont* TContExecutor::Create( + TContFunc func, + void* arg, + const char* name, + TMaybe<ui32> customStackSize +) noexcept { + return CreateOwned([=](TCont* cont) { + func(cont, arg); + }, name, customStackSize); +} + +TCont* TContExecutor::CreateOwned( + NCoro::TTrampoline::TFunc func, + const char* name, + TMaybe<ui32> customStackSize +) noexcept { + Allocated_ += 1; + if (!customStackSize) { + customStackSize = DefaultStackSize_; + } + auto* cont = new TCont(*StackAllocator_, *customStackSize, *this, std::move(func), name); + ScheduleExecution(cont); + return cont; +} + +NCoro::NStack::TAllocatorStats TContExecutor::GetAllocatorStats() const noexcept { + return StackAllocator_->GetStackStats(); +} + +void TContExecutor::Release(TCont* cont) noexcept { + delete cont; + Allocated_ -= 1; +} + +void TContExecutor::ScheduleToDelete(TCont* cont) noexcept { + ToDelete_.PushBack(cont); +} + +void TContExecutor::ScheduleExecution(TCont* cont) noexcept { + cont->Scheduled_ = true; + ReadyNext_.PushBack(cont); +} + +void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept { + cont->Scheduled_ = true; + Ready_.PushBack(cont); +} + +namespace { + inline TContExecutor*& ThisThreadExecutor() { + struct TThisThreadExecutorHolder { + TContExecutor* Executor = nullptr; + }; + return FastTlsSingletonWithPriority<TThisThreadExecutorHolder, 0>()->Executor; + } +} + +void TContExecutor::DeleteScheduled() noexcept { + ToDelete_.ForEach([this](TCont* c) { + Release(c); + }); +} + +TCont* RunningCont() { + TContExecutor* thisThreadExecutor = ThisThreadExecutor(); + return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr; +} + +void TContExecutor::RunScheduler() noexcept { + try { + TContExecutor* const prev = ThisThreadExecutor(); + ThisThreadExecutor() = this; + TCont* caller = Current_; + TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_; + Y_DEFER { + ThisThreadExecutor() = prev; + }; + + while (true) { + if (ScheduleCallback_ && Current_) { + ScheduleCallback_->OnUnschedule(*this); + } + + WaitForIO(); + DeleteScheduled(); + Ready_.Append(ReadyNext_); + + if (Ready_.Empty()) { + Current_ = nullptr; + if (caller) { + context->SwitchTo(&SchedContext_); + } + break; + } + + TCont* cont = Ready_.PopFront(); + + if (ScheduleCallback_) { + ScheduleCallback_->OnSchedule(*this, *cont); + } + + Current_ = cont; + cont->Scheduled_ = false; + if (cont == caller) { + break; + } + context->SwitchTo(cont->Trampoline_.Context()); + if (Paused_) { + Paused_ = false; + Current_ = nullptr; + break; + } + if (caller) { + break; + } + } + } catch (...) { + TBackTrace::FromCurrentException().PrintTo(Cerr); + Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str()); + } +} + +void TContExecutor::Pause() { + if (auto cont = Running()) { + Paused_ = true; + ScheduleExecutionNow(cont); + cont->SwitchTo(&SchedContext_); + } +} + +void TContExecutor::Exit(TCont* cont) noexcept { + ScheduleToDelete(cont); + cont->SwitchTo(&SchedContext_); + Y_FAIL("can not return from exit"); +} + +TInstant TContExecutor::Now() { + return Y_LIKELY(Time_ == nullptr) ? TInstant::Now() : Time_->Now(); +} + +template <> +void Out<TCont>(IOutputStream& out, const TCont& c) { + c.PrintMe(out); +} diff --git a/library/cpp/coroutine/engine/impl.h b/library/cpp/coroutine/engine/impl.h new file mode 100644 index 0000000000..283a96ecf1 --- /dev/null +++ b/library/cpp/coroutine/engine/impl.h @@ -0,0 +1,313 @@ +#pragma once + +#include "callbacks.h" +#include "cont_poller.h" +#include "iostatus.h" +#include "poller.h" +#include "stack/stack_common.h" +#include "trampoline.h" +#include "custom_time.h" + +#include <library/cpp/containers/intrusive_rb_tree/rb_tree.h> + +#include <util/system/error.h> +#include <util/generic/ptr.h> +#include <util/generic/intrlist.h> +#include <util/datetime/base.h> +#include <util/generic/maybe.h> +#include <util/generic/function.h> + + +#define EWAKEDUP 34567 + +class TCont; +struct TContRep; +class TContExecutor; +class TContPollEvent; + +namespace NCoro::NStack { + class IAllocator; +} + +class TCont : private TIntrusiveListItem<TCont> { + struct TJoinWait: public TIntrusiveListItem<TJoinWait> { + TJoinWait(TCont& c) noexcept; + + void Wake() noexcept; + + public: + TCont& Cont_; + }; + + friend class TContExecutor; + friend class TIntrusiveListItem<TCont>; + friend class NCoro::TEventWaitQueue; + friend class NCoro::TTrampoline; + +private: + TCont( + NCoro::NStack::IAllocator& allocator, + uint32_t stackSize, + TContExecutor& executor, + NCoro::TTrampoline::TFunc func, + const char* name + ) noexcept; + +public: + TContExecutor* Executor() noexcept { + return &Executor_; + } + + const TContExecutor* Executor() const noexcept { + return &Executor_; + } + + const char* Name() const noexcept { + return Name_; + } + + void PrintMe(IOutputStream& out) const noexcept; + + void Yield() noexcept; + + void ReScheduleAndSwitch() noexcept; + + /// @return ETIMEDOUT on success + int SleepD(TInstant deadline) noexcept; + + int SleepT(TDuration timeout) noexcept { + return SleepD(timeout.ToDeadLine()); + } + + int SleepI() noexcept { + return SleepD(TInstant::Max()); + } + + bool IAmRunning() const noexcept; + + void Cancel() noexcept; + + bool Cancelled() const noexcept { + return Cancelled_; + } + + bool Scheduled() const noexcept { + return Scheduled_; + } + + bool Join(TCont* c, TInstant deadLine = TInstant::Max()) noexcept; + + void ReSchedule() noexcept; + + void Switch() noexcept; + + void SwitchTo(TExceptionSafeContext* ctx) { + Trampoline_.SwitchTo(ctx); + } + +private: + void Terminate(); + +private: + TContExecutor& Executor_; + + // TODO(velavokr): allow name storage owning (for generated names backed by TString) + const char* Name_ = nullptr; + + NCoro::TTrampoline Trampoline_; + + TIntrusiveList<TJoinWait> Waiters_; + bool Cancelled_ = false; + bool Scheduled_ = false; +}; + +TCont* RunningCont(); + + +template <class Functor> +static void ContHelperFunc(TCont* cont, void* arg) { + (*((Functor*)(arg)))(cont); +} + +template <typename T, void (T::*M)(TCont*)> +static void ContHelperMemberFunc(TCont* c, void* arg) { + ((reinterpret_cast<T*>(arg))->*M)(c); +} + +class IUserEvent + : public TIntrusiveListItem<IUserEvent> +{ +public: + virtual ~IUserEvent() = default; + + virtual void Execute() = 0; +}; + +/// Central coroutine class. +/// Note, coroutines are single-threaded, and all methods must be called from the single thread +class TContExecutor { + friend class TCont; + using TContList = TIntrusiveList<TCont>; + +public: + TContExecutor( + uint32_t defaultStackSize, + THolder<IPollerFace> poller = IPollerFace::Default(), + NCoro::IScheduleCallback* = nullptr, + NCoro::IEnterPollerCallback* = nullptr, + NCoro::NStack::EGuard stackGuard = NCoro::NStack::EGuard::Canary, + TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings = Nothing(), + NCoro::ITime* time = nullptr + ); + + ~TContExecutor(); + + // if we already have a coroutine to run + void Execute() noexcept; + + void Execute(TContFunc func, void* arg = nullptr) noexcept; + + template <class Functor> + void Execute(Functor& f) noexcept { + Execute((TContFunc)ContHelperFunc<Functor>, (void*)&f); + } + + template <typename T, void (T::*M)(TCont*)> + void Execute(T* obj) noexcept { + Execute(ContHelperMemberFunc<T, M>, obj); + } + + template <class Functor> + TCont* Create( + Functor& f, + const char* name, + TMaybe<ui32> customStackSize = Nothing() + ) noexcept { + return Create((TContFunc)ContHelperFunc<Functor>, (void*)&f, name, customStackSize); + } + + template <typename T, void (T::*M)(TCont*)> + TCont* Create( + T* obj, + const char* name, + TMaybe<ui32> customStackSize = Nothing() + ) noexcept { + return Create(ContHelperMemberFunc<T, M>, obj, name, customStackSize); + } + + TCont* Create( + TContFunc func, + void* arg, + const char* name, + TMaybe<ui32> customStackSize = Nothing() + ) noexcept; + + TCont* CreateOwned( + NCoro::TTrampoline::TFunc func, + const char* name, + TMaybe<ui32> customStackSize = Nothing() + ) noexcept; + + NCoro::TContPoller* Poller() noexcept { + return &Poller_; + } + + TCont* Running() noexcept { + return Current_; + } + + const TCont* Running() const noexcept { + return Current_; + } + + size_t TotalReadyConts() const noexcept { + return Ready_.Size() + TotalScheduledConts(); + } + + size_t TotalScheduledConts() const noexcept { + return ReadyNext_.Size(); + } + + size_t TotalConts() const noexcept { + return Allocated_; + } + + size_t TotalWaitingConts() const noexcept { + return TotalConts() - TotalReadyConts(); + } + + NCoro::NStack::TAllocatorStats GetAllocatorStats() const noexcept; + + // TODO(velavokr): rename, it is just CancelAll actually + void Abort() noexcept; + + void SetFailOnError(bool fail) noexcept { + FailOnError_ = fail; + } + + bool FailOnError() const noexcept { + return FailOnError_; + } + + void RegisterInWaitQueue(NCoro::TContPollEvent* event) { + WaitQueue_.Register(event); + } + + void ScheduleIoWait(TFdEvent* event) { + RegisterInWaitQueue(event); + Poller_.Schedule(event); + } + + void ScheduleIoWait(TTimerEvent* event) noexcept { + RegisterInWaitQueue(event); + } + + void ScheduleUserEvent(IUserEvent* event) { + UserEvents_.PushBack(event); + } + + void Pause(); + TInstant Now(); +private: + void Release(TCont* cont) noexcept; + + void Exit(TCont* cont) noexcept; + + void RunScheduler() noexcept; + + void ScheduleToDelete(TCont* cont) noexcept; + + void ScheduleExecution(TCont* cont) noexcept; + + void ScheduleExecutionNow(TCont* cont) noexcept; + + void DeleteScheduled() noexcept; + + void WaitForIO(); + + void Poll(TInstant deadline); + +private: + NCoro::IScheduleCallback* const ScheduleCallback_ = nullptr; + NCoro::IEnterPollerCallback* const EnterPollerCallback_ = nullptr; + const uint32_t DefaultStackSize_; + THolder<NCoro::NStack::IAllocator> StackAllocator_; + + TExceptionSafeContext SchedContext_; + + TContList ToDelete_; + TContList Ready_; + TContList ReadyNext_; + NCoro::TEventWaitQueue WaitQueue_; + NCoro::TContPoller Poller_; + NCoro::TContPoller::TEvents PollerEvents_; + TInstant LastPoll_; + + TIntrusiveList<IUserEvent> UserEvents_; + + size_t Allocated_ = 0; + TCont* Current_ = nullptr; + bool FailOnError_ = false; + bool Paused_ = false; + NCoro::ITime* Time_ = nullptr; +}; diff --git a/library/cpp/coroutine/engine/iostatus.cpp b/library/cpp/coroutine/engine/iostatus.cpp new file mode 100644 index 0000000000..8298229b39 --- /dev/null +++ b/library/cpp/coroutine/engine/iostatus.cpp @@ -0,0 +1 @@ +#include "iostatus.h" diff --git a/library/cpp/coroutine/engine/iostatus.h b/library/cpp/coroutine/engine/iostatus.h new file mode 100644 index 0000000000..bf6036805d --- /dev/null +++ b/library/cpp/coroutine/engine/iostatus.h @@ -0,0 +1,91 @@ +#pragma once + +#include <util/generic/yexception.h> + +class TIOStatus { +public: + TIOStatus(int status) noexcept + : Status_(status) + { + } + + static TIOStatus Error(int status) noexcept { + return TIOStatus(status); + } + + static TIOStatus Error() noexcept { + return TIOStatus(LastSystemError()); + } + + static TIOStatus Success() noexcept { + return TIOStatus(0); + } + + void Check() const { + if (Status_) { + ythrow TSystemError(Status_) << "io error"; + } + } + + bool Failed() const noexcept { + return (bool)Status_; + } + + bool Succeed() const noexcept { + return !Failed(); + } + + int Status() const noexcept { + return Status_; + } + +private: + int Status_; +}; + + +class TContIOStatus { +public: + TContIOStatus(size_t processed, TIOStatus status) noexcept + : Processed_(processed) + , Status_(status) + { + } + + static TContIOStatus Error(TIOStatus status) noexcept { + return TContIOStatus(0, status); + } + + static TContIOStatus Error() noexcept { + return TContIOStatus(0, TIOStatus::Error()); + } + + static TContIOStatus Success(size_t processed) noexcept { + return TContIOStatus(processed, TIOStatus::Success()); + } + + static TContIOStatus Eof() noexcept { + return Success(0); + } + + ~TContIOStatus() { + } + + size_t Processed() const noexcept { + return Processed_; + } + + int Status() const noexcept { + return Status_.Status(); + } + + size_t Checked() const { + Status_.Check(); + + return Processed_; + } + +private: + size_t Processed_; + TIOStatus Status_; +}; diff --git a/library/cpp/coroutine/engine/mutex.h b/library/cpp/coroutine/engine/mutex.h new file mode 100644 index 0000000000..93e9119503 --- /dev/null +++ b/library/cpp/coroutine/engine/mutex.h @@ -0,0 +1,49 @@ +#pragma once + +#include "impl.h" +#include "events.h" + +class TContMutex { +public: + TContMutex() noexcept + : Token_(true) + { + } + + ~TContMutex() { + Y_ASSERT(Token_); + } + + int LockD(TCont* current, TInstant deadline) { + while (!Token_) { + const int ret = WaitQueue_.WaitD(current, deadline); + + if (ret != EWAKEDUP) { + return ret; + } + } + + Token_ = false; + + return 0; + } + + int LockT(TCont* current, TDuration timeout) { + return LockD(current, timeout.ToDeadLine()); + } + + int LockI(TCont* current) { + return LockD(current, TInstant::Max()); + } + + void UnLock() noexcept { + Y_ASSERT(!Token_); + + Token_ = true; + WaitQueue_.Signal(); + } + +private: + TContWaitQueue WaitQueue_; + bool Token_; +}; diff --git a/library/cpp/coroutine/engine/network.cpp b/library/cpp/coroutine/engine/network.cpp new file mode 100644 index 0000000000..85b647d210 --- /dev/null +++ b/library/cpp/coroutine/engine/network.cpp @@ -0,0 +1,325 @@ +#include "impl.h" +#include "network.h" + +#include <util/generic/scope.h> +#include <util/generic/xrange.h> + +#include <sys/uio.h> + +#if defined(_bionic_) +# define IOV_MAX 1024 +#endif + + +namespace NCoro { + namespace { + bool IsBlocked(int lasterr) noexcept { + return lasterr == EAGAIN || lasterr == EWOULDBLOCK; + } + + ssize_t DoReadVector(SOCKET fd, TContIOVector* vec) noexcept { + return readv(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count())); + } + + ssize_t DoWriteVector(SOCKET fd, TContIOVector* vec) noexcept { + return writev(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count())); + } + } + + + int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept { + if (cont->Cancelled()) { + return ECANCELED; + } + + if (nfds == 0) { + return 0; + } + + TTempArray<TFdEvent> events(nfds); + + for (auto i : xrange(nfds)) { + new(events.Data() + i) TFdEvent(cont, fds[i], (ui16) what[i], deadline); + } + + Y_DEFER { + for (auto i : xrange(nfds)) { + (events.Data() + i)->~TFdEvent(); + } + }; + + for (auto i : xrange(nfds)) { + cont->Executor()->ScheduleIoWait(events.Data() + i); + } + cont->Switch(); + + if (cont->Cancelled()) { + return ECANCELED; + } + + TFdEvent* ret = nullptr; + int status = EINPROGRESS; + + for (auto i : xrange(nfds)) { + auto& ev = *(events.Data() + i); + switch (ev.Status()) { + case EINPROGRESS: + break; + case ETIMEDOUT: + if (status != EINPROGRESS) { + break; + } + [[fallthrough]]; + default: + status = ev.Status(); + ret = &ev; + } + } + + if (ret) { + if (outfd) { + *outfd = ret->Fd(); + } + return ret->Status(); + } + + return EINPROGRESS; + } + + int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept { + return SelectD(cont, fds, what, nfds, outfd, timeout.ToDeadLine()); + } + + int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd) { + return SelectD(cont, fds, what, nfds, outfd, TInstant::Max()); + } + + + int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept { + TFdEvent event(cont, fd, (ui16)what, deadline); + return ExecuteEvent(&event); + } + + int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept { + return PollD(cont, fd, what, timeout.ToDeadLine()); + } + + int PollI(TCont* cont, SOCKET fd, int what) noexcept { + return PollD(cont, fd, what, TInstant::Max()); + } + + + TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { + while (true) { + ssize_t res = DoReadVector(fd, vec); + + if (res >= 0) { + return TContIOStatus::Success((size_t) res); + } + + { + const int err = LastSystemError(); + + if (!IsBlocked(err)) { + return TContIOStatus::Error(err); + } + } + + if ((res = PollD(cont, fd, CONT_POLL_READ, deadline)) != 0) { + return TContIOStatus::Error((int) res); + } + } + } + + TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { + return ReadVectorD(cont, fd, vec, timeOut.ToDeadLine()); + } + + TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { + return ReadVectorD(cont, fd, vec, TInstant::Max()); + } + + + TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept { + IOutputStream::TPart part(buf, len); + TContIOVector vec(&part, 1); + return ReadVectorD(cont, fd, &vec, deadline); + } + + TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept { + return ReadD(cont, fd, buf, len, timeout.ToDeadLine()); + } + + TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept { + return ReadD(cont, fd, buf, len, TInstant::Max()); + } + + + TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { + size_t written = 0; + + while (!vec->Complete()) { + ssize_t res = DoWriteVector(fd, vec); + + if (res >= 0) { + written += res; + + vec->Proceed((size_t) res); + } else { + { + const int err = LastSystemError(); + + if (!IsBlocked(err)) { + return TContIOStatus(written, err); + } + } + + if ((res = PollD(cont, fd, CONT_POLL_WRITE, deadline)) != 0) { + return TContIOStatus(written, (int) res); + } + } + } + + return TContIOStatus::Success(written); + } + + TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { + return WriteVectorD(cont, fd, vec, timeOut.ToDeadLine()); + } + + TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { + return WriteVectorD(cont, fd, vec, TInstant::Max()); + } + + + TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept { + IOutputStream::TPart part(buf, len); + TContIOVector vec(&part, 1); + return WriteVectorD(cont, fd, &vec, deadline); + } + + TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept { + return WriteD(cont, fd, buf, len, timeout.ToDeadLine()); + } + + TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept { + return WriteD(cont, fd, buf, len, TInstant::Max()); + } + + + int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept { + TSocketHolder res(Socket(ai)); + + if (res.Closed()) { + return LastSystemError(); + } + + const int ret = ConnectD(cont, res, ai.ai_addr, (socklen_t) ai.ai_addrlen, deadline); + + if (!ret) { + s.Swap(res); + } + + return ret; + } + + int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept { + int ret = EHOSTUNREACH; + + for (auto it = addr.Begin(); it != addr.End(); ++it) { + ret = ConnectD(cont, s, *it, deadline); + + if (ret == 0 || ret == ETIMEDOUT) { + return ret; + } + } + + return ret; + } + + int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept { + return ConnectD(cont, s, addr, timeout.ToDeadLine()); + } + + int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept { + return ConnectD(cont, s, addr, TInstant::Max()); + } + + int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept { + if (connect(s, name, namelen)) { + const int err = LastSystemError(); + + if (!IsBlocked(err) && err != EINPROGRESS) { + return err; + } + + int ret = PollD(cont, s, CONT_POLL_WRITE, deadline); + + if (ret) { + return ret; + } + + // check if we really connected + // FIXME: Unportable ?? + int serr = 0; + socklen_t slen = sizeof(serr); + + ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (char*) &serr, &slen); + + if (ret) { + return LastSystemError(); + } + + if (serr) { + return serr; + } + } + + return 0; + } + + int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept { + return ConnectD(cont, s, name, namelen, timeout.ToDeadLine()); + } + + int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept { + return ConnectD(cont, s, name, namelen, TInstant::Max()); + } + + + int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept { + SOCKET ret; + + while ((ret = Accept4(s, addr, addrlen)) == INVALID_SOCKET) { + int err = LastSystemError(); + + if (!IsBlocked(err)) { + return -err; + } + + err = PollD(cont, s, CONT_POLL_READ, deadline); + + if (err) { + return -err; + } + } + + return (int) ret; + } + + int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept { + return AcceptD(cont, s, addr, addrlen, timeout.ToDeadLine()); + } + + int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept { + return AcceptD(cont, s, addr, addrlen, TInstant::Max()); + } + + SOCKET Socket(int domain, int type, int protocol) noexcept { + return Socket4(domain, type, protocol); + } + + SOCKET Socket(const struct addrinfo& ai) noexcept { + return Socket(ai.ai_family, ai.ai_socktype, ai.ai_protocol); + } +} diff --git a/library/cpp/coroutine/engine/network.h b/library/cpp/coroutine/engine/network.h new file mode 100644 index 0000000000..f2c9afe4f8 --- /dev/null +++ b/library/cpp/coroutine/engine/network.h @@ -0,0 +1,55 @@ +#pragma once + +#include "iostatus.h" + +#include <util/datetime/base.h> +#include <util/network/init.h> +#include <util/network/iovec.h> +#include <util/network/nonblock.h> +#include <util/network/socket.h> + +class TCont; + +namespace NCoro { + + int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept; + int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept; + int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd); + + int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept; + int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept; + int PollI(TCont* cont, SOCKET fd, int what) noexcept; + + TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept; + TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept; + TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept; + + TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept; + TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept; + TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept; + + TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept; + TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept; + TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept; + + TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept; + TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept; + TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept; + + int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept; + + int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept; + int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept; + int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept; + + int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept; + int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept; + int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept; + + int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept; + int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept; + int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept; + + SOCKET Socket(int domain, int type, int protocol) noexcept; + SOCKET Socket(const struct addrinfo& ai) noexcept; +} diff --git a/library/cpp/coroutine/engine/poller.cpp b/library/cpp/coroutine/engine/poller.cpp new file mode 100644 index 0000000000..61164fa56b --- /dev/null +++ b/library/cpp/coroutine/engine/poller.cpp @@ -0,0 +1,390 @@ +#include "poller.h" +#include "sockmap.h" + +#include <util/memory/smallobj.h> +#include <util/generic/intrlist.h> +#include <util/generic/singleton.h> +#include <util/system/env.h> +#include <util/string/cast.h> + +namespace { + using TChange = IPollerFace::TChange; + using TEvent = IPollerFace::TEvent; + using TEvents = IPollerFace::TEvents; + + template <class T> + class TUnsafeBuf { + public: + TUnsafeBuf() noexcept + : L_(0) + { + } + + T* operator~() const noexcept { + return B_.Get(); + } + + size_t operator+() const noexcept { + return L_; + } + + void Reserve(size_t len) { + len = FastClp2(len); + + if (len > L_) { + B_.Reset(new T[len]); + L_ = len; + } + } + + private: + TArrayHolder<T> B_; + size_t L_; + }; + + + template <class T> + class TVirtualize: public IPollerFace { + public: + TVirtualize(EContPoller pollerEngine) + : PollerEngine_(pollerEngine) + { + } + + void Set(const TChange& c) override { + P_.Set(c); + } + + void Wait(TEvents& events, TInstant deadLine) override { + P_.Wait(events, deadLine); + } + + EContPoller PollEngine() const override { + return PollerEngine_; + } + private: + T P_; + const EContPoller PollerEngine_; + }; + + + template <class T> + class TPoller { + using TInternalEvent = typename T::TEvent; + + public: + TPoller() { + E_.Reserve(1); + } + + void Set(const TChange& c) { + P_.Set(c.Data, c.Fd, c.Flags); + } + + void Reserve(size_t size) { + E_.Reserve(size); + } + + void Wait(TEvents& events, TInstant deadLine) { + const size_t ret = P_.WaitD(~E_, +E_, deadLine); + + events.reserve(ret); + + for (size_t i = 0; i < ret; ++i) { + const TInternalEvent* ie = ~E_ + i; + + const TEvent e = { + T::ExtractEvent(ie), + T::ExtractStatus(ie), + (ui16)T::ExtractFilter(ie), + }; + + events.push_back(e); + } + + E_.Reserve(ret + 1); + } + + private: + T P_; + TUnsafeBuf<TInternalEvent> E_; + }; + + + template <class T> + class TIndexedArray { + struct TVal: + public T, + public TIntrusiveListItem<TVal>, + public TObjectFromPool<TVal> + { + // NOTE Constructor must be user-defined (and not =default) here + // because TVal objects are created in the UB-capable placement + // TObjectFromPool::new operator that stores data in a memory + // allocated for the object. Without user defined constructor + // zero-initialization takes place in TVal() expression and the + // data is overwritten. + TVal() { + } + }; + + typedef TIntrusiveList<TVal> TListType; + + public: + typedef typename TListType::TIterator TIterator; + typedef typename TListType::TConstIterator TConstIterator; + + TIndexedArray() + : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance()) + { + } + + TIterator Begin() noexcept { + return I_.Begin(); + } + + TIterator End() noexcept { + return I_.End(); + } + + TConstIterator Begin() const noexcept { + return I_.Begin(); + } + + TConstIterator End() const noexcept { + return I_.End(); + } + + T& operator[](size_t i) { + return *Get(i); + } + + T* Get(size_t i) { + TValRef& v = V_.Get(i); + + if (Y_UNLIKELY(!v)) { + v.Reset(new (&P_) TVal()); + I_.PushFront(v.Get()); + } + + Y_PREFETCH_WRITE(v.Get(), 1); + + return v.Get(); + } + + void Erase(size_t i) noexcept { + V_.Get(i).Destroy(); + } + + size_t Size() const noexcept { + return I_.Size(); + } + + private: + using TValRef = THolder<TVal>; + typename TVal::TPool P_; + TSocketMap<TValRef> V_; + TListType I_; + }; + + + inline short PollFlags(ui16 flags) noexcept { + short ret = 0; + + if (flags & CONT_POLL_READ) { + ret |= POLLIN; + } + + if (flags & CONT_POLL_WRITE) { + ret |= POLLOUT; + } + +#if defined(_linux_) + if (flags & CONT_POLL_RDHUP) { + ret |= POLLRDHUP; + } +#endif + + return ret; + } + + + class TPollPoller { + public: + size_t Size() const noexcept { + return S_.Size(); + } + + template <class T> + void Build(T& t) const { + for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) { + t.Set(*it); + } + + t.Reserve(Size()); + } + + void Set(const TChange& c) { + if (c.Flags) { + S_[c.Fd] = c; + } else { + S_.Erase(c.Fd); + } + } + + void Wait(TEvents& events, TInstant deadLine) { + T_.clear(); + T_.reserve(Size()); + + for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) { + const pollfd pfd = { + it->Fd, + PollFlags(it->Flags), + 0, + }; + + T_.push_back(pfd); + } + + const ssize_t ret = PollD(T_.data(), (nfds_t) T_.size(), deadLine); + + if (ret <= 0) { + return; + } + + events.reserve(T_.size()); + + for (size_t i = 0; i < T_.size(); ++i) { + const pollfd& pfd = T_[i]; + const short ev = pfd.revents; + + if (!ev) { + continue; + } + + int status = 0; + ui16 filter = 0; + + // We are perfectly fine with an EOF while reading a pipe or a unix socket + if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) { + filter |= CONT_POLL_READ; + } + + if (ev & POLLOUT) { + filter |= CONT_POLL_WRITE; + } + +#if defined(_linux_) + if (ev & POLLRDHUP) { + filter |= CONT_POLL_RDHUP; + } +#endif + + if (ev & POLLERR) { + status = EIO; + } else if (ev & POLLHUP && pfd.events & POLLOUT) { + // Only write operations may cause EPIPE + status = EPIPE; + } else if (ev & POLLNVAL) { + status = EINVAL; + } + + if (status) { + filter = CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP; + } + + const TEvent res = { + S_[pfd.fd].Data, + status, + filter, + }; + + events.push_back(res); + } + } + + private: + typedef TIndexedArray<TChange> TFds; + TFds S_; + typedef TVector<pollfd> TPollVec; + TPollVec T_; + }; + + + class TCombinedPoller { + typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller; + + public: + TCombinedPoller() { + P_.Reset(new TPollPoller()); + } + + void Set(const TChange& c) { + if (!P_) { + D_->Set(c); + } else { + P_->Set(c); + } + } + + void Wait(TEvents& events, TInstant deadLine) { + if (!P_) { + D_->Wait(events, deadLine); + } else { + if (P_->Size() > 200) { + D_.Reset(new TDefaultPoller()); + P_->Build(*D_); + P_.Destroy(); + D_->Wait(events, deadLine); + } else { + P_->Wait(events, deadLine); + } + } + } + + private: + THolder<TPollPoller> P_; + THolder<TDefaultPoller> D_; + }; + + struct TUserPoller: public TString { + TUserPoller() + : TString(GetEnv("USER_POLLER")) + { + } + }; +} + +THolder<IPollerFace> IPollerFace::Default() { + return Construct(*SingletonWithPriority<TUserPoller, 0>()); +} + +THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) { + return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default); +} + +THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) { + switch (poller) { + case EContPoller::Default: + case EContPoller::Combined: + return MakeHolder<TVirtualize<TCombinedPoller>>(EContPoller::Combined); + case EContPoller::Select: + return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller); + case EContPoller::Poll: + return MakeHolder<TVirtualize<TPollPoller>>(poller); + case EContPoller::Epoll: +#if defined(HAVE_EPOLL_POLLER) + return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller); +#else + return nullptr; +#endif + case EContPoller::Kqueue: +#if defined(HAVE_KQUEUE_POLLER) + return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller); +#else + return nullptr; +#endif + default: + Y_FAIL("bad poller type"); + } +} diff --git a/library/cpp/coroutine/engine/poller.h b/library/cpp/coroutine/engine/poller.h new file mode 100644 index 0000000000..8ea012c0fc --- /dev/null +++ b/library/cpp/coroutine/engine/poller.h @@ -0,0 +1,50 @@ +#pragma once + +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/network/socket.h> +#include <util/network/pollerimpl.h> +#include <util/datetime/base.h> + +enum class EContPoller { + Default /* "default" */, + Combined /* "combined" */, + Select /* "select" */, + Poll /* "poll" */, + Epoll /* "epoll" */, + Kqueue /* "kqueue" */ +}; + +class IPollerFace { +public: + struct TChange { + SOCKET Fd; + void* Data; + ui16 Flags; + }; + + struct TEvent { + void* Data; + int Status; + ui16 Filter; + }; + + using TEvents = TVector<TEvent>; + + virtual ~IPollerFace() { + } + + void Set(void* ptr, SOCKET fd, ui16 flags) { + const TChange c = {fd, ptr, flags}; + + Set(c); + } + + virtual void Set(const TChange& change) = 0; + virtual void Wait(TEvents& events, TInstant deadLine) = 0; + virtual EContPoller PollEngine() const = 0; + + static THolder<IPollerFace> Default(); + static THolder<IPollerFace> Construct(TStringBuf name); + static THolder<IPollerFace> Construct(EContPoller poller); +}; diff --git a/library/cpp/coroutine/engine/sockmap.h b/library/cpp/coroutine/engine/sockmap.h new file mode 100644 index 0000000000..fd189e1774 --- /dev/null +++ b/library/cpp/coroutine/engine/sockmap.h @@ -0,0 +1,24 @@ +#pragma once + +#include <util/generic/hash.h> +#include <util/generic/vector.h> + +template <class T> +class TSocketMap { +public: + T& Get(size_t idx) { + if (idx < 128000) { + if (V_.size() <= idx) { + V_.resize(idx + 1); + } + + return V_[idx]; + } + + return H_[idx]; + } + +private: + TVector<T> V_; + THashMap<size_t, T> H_; +}; diff --git a/library/cpp/coroutine/engine/sockpool.cpp b/library/cpp/coroutine/engine/sockpool.cpp new file mode 100644 index 0000000000..b9482e780f --- /dev/null +++ b/library/cpp/coroutine/engine/sockpool.cpp @@ -0,0 +1,58 @@ +#include "sockpool.h" + +void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa) { + SetSockOpt(sock, SOL_SOCKET, SO_REUSEADDR, 1); + + if (!sa || sa->sa_family == AF_INET) { + sockaddr_in s_in; + s_in.sin_family = AF_INET; + s_in.sin_addr.s_addr = INADDR_ANY; + s_in.sin_port = 0; + + if (bind(sock, (struct sockaddr*)&s_in, sizeof(s_in)) == -1) { + warn("bind"); + } + } else if (sa->sa_family == AF_INET6) { + sockaddr_in6 s_in6(*(const sockaddr_in6*)sa); + Zero(s_in6.sin6_addr); + s_in6.sin6_port = 0; + + if (bind(sock, (const struct sockaddr*)&s_in6, sizeof s_in6) == -1) { + warn("bind6"); + } + } else { + Y_ASSERT(0); + } + + SetNoDelay(sock, true); +} + +TPooledSocket TSocketPool::AllocateMore(TConnectData* conn) { + TCont* cont = conn->Cont; + + while (true) { + TSocketHolder s(NCoro::Socket(Addr_->Addr()->sa_family, SOCK_STREAM, 0)); + + if (s == INVALID_SOCKET) { + ythrow TSystemError(errno) << TStringBuf("can not create socket"); + } + + SetCommonSockOpts(s, Addr_->Addr()); + SetZeroLinger(s); + + const int ret = NCoro::ConnectD(cont, s, Addr_->Addr(), Addr_->Len(), conn->DeadLine); + + if (ret == EINTR) { + continue; + } else if (ret) { + ythrow TSystemError(ret) << TStringBuf("can not connect(") << cont->Name() << ')'; + } + + THolder<TPooledSocket::TImpl> res(new TPooledSocket::TImpl(s, this)); + s.Release(); + + if (res->IsOpen()) { + return res.Release(); + } + } +} diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h new file mode 100644 index 0000000000..1ebb7e7b38 --- /dev/null +++ b/library/cpp/coroutine/engine/sockpool.h @@ -0,0 +1,253 @@ +#pragma once + +#include "impl.h" +#include "network.h" + +#include <util/network/address.h> +#include <util/network/socket.h> +#include <util/system/mutex.h> + +extern void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa = nullptr); + +class TSocketPool; + +class TPooledSocket { + class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> { + public: + TImpl(SOCKET fd, TSocketPool* pool) noexcept + : Pool_(pool) + , IsKeepAlive_(false) + , Fd_(fd) + { + Touch(); + } + + static void Destroy(TImpl* impl) noexcept { + impl->DoDestroy(); + } + + void DoDestroy() noexcept { + if (!Closed() && IsKeepAlive() && IsInGoodState()) { + ReturnToPool(); + } else { + delete this; + } + } + + bool IsKeepAlive() const noexcept { + return IsKeepAlive_; + } + + void SetKeepAlive(bool ka) { + ::SetKeepAlive(Fd_, ka); + IsKeepAlive_ = ka; + } + + SOCKET Socket() const noexcept { + return Fd_; + } + + bool Closed() const noexcept { + return Fd_.Closed(); + } + + void Close() noexcept { + Fd_.Close(); + } + + bool IsInGoodState() const noexcept { + int err = 0; + socklen_t len = sizeof(err); + + getsockopt(Fd_, SOL_SOCKET, SO_ERROR, (char*)&err, &len); + + return !err; + } + + bool IsOpen() const noexcept { + return IsInGoodState() && IsNotSocketClosedByOtherSide(Fd_); + } + + void Touch() noexcept { + TouchTime_ = TInstant::Now(); + } + + const TInstant& LastTouch() const noexcept { + return TouchTime_; + } + + private: + inline void ReturnToPool() noexcept; + + private: + TSocketPool* Pool_; + bool IsKeepAlive_; + TSocketHolder Fd_; + TInstant TouchTime_; + }; + + friend class TSocketPool; + +public: + TPooledSocket() + : Impl_(nullptr) + { + } + + TPooledSocket(TImpl* impl) + : Impl_(impl) + { + } + + ~TPooledSocket() { + if (UncaughtException() && !!Impl_) { + Close(); + } + } + + operator SOCKET() const noexcept { + return Impl_->Socket(); + } + + void SetKeepAlive(bool ka) { + Impl_->SetKeepAlive(ka); + } + + void Close() noexcept { + Impl_->Close(); + } + +private: + TIntrusivePtr<TImpl> Impl_; +}; + +struct TConnectData { + TConnectData(TCont* cont, const TInstant& deadLine) + : Cont(cont) + , DeadLine(deadLine) + { + } + + TConnectData(TCont* cont, const TDuration& timeOut) + : Cont(cont) + , DeadLine(TInstant::Now() + timeOut) + { + } + + TCont* Cont; + const TInstant DeadLine; +}; + +class TSocketPool { + friend class TPooledSocket::TImpl; + +public: + typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef; + + TSocketPool(int ip, int port) + : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port))) + { + } + + TSocketPool(const TAddrRef& addr) + : Addr_(addr) + { + } + + void EraseStale(const TInstant& maxAge) noexcept { + TSockets toDelete; + + { + TGuard<TMutex> guard(Mutex_); + + for (TSockets::TIterator it = Pool_.Begin(); it != Pool_.End();) { + if (it->LastTouch() < maxAge) { + toDelete.PushBack(&*(it++)); + } else { + ++it; + } + } + } + } + + TPooledSocket Get(TConnectData* conn) { + TPooledSocket ret; + + if (TPooledSocket::TImpl* alive = GetImpl()) { + ret = TPooledSocket(alive); + } else { + ret = AllocateMore(conn); + } + + ret.Impl_->Touch(); + + return ret; + } + + bool GetAlive(TPooledSocket& socket) { + if (TPooledSocket::TImpl* alive = GetImpl()) { + alive->Touch(); + socket = TPooledSocket(alive); + return true; + } + return false; + } + +private: + TPooledSocket::TImpl* GetImpl() { + TGuard<TMutex> guard(Mutex_); + + while (!Pool_.Empty()) { + THolder<TPooledSocket::TImpl> ret(Pool_.PopFront()); + + if (ret->IsOpen()) { + return ret.Release(); + } + } + return nullptr; + } + + void Release(TPooledSocket::TImpl* impl) noexcept { + TGuard<TMutex> guard(Mutex_); + + Pool_.PushFront(impl); + } + + TPooledSocket AllocateMore(TConnectData* conn); + +private: + TAddrRef Addr_; + using TSockets = TIntrusiveListWithAutoDelete<TPooledSocket::TImpl, TDelete>; + TSockets Pool_; + TMutex Mutex_; +}; + +inline void TPooledSocket::TImpl::ReturnToPool() noexcept { + Pool_->Release(this); +} + + +class TContIO: public IInputStream, public IOutputStream { +public: + TContIO(SOCKET fd, TCont* cont) + : Fd_(fd) + , Cont_(cont) + { + } + + void DoWrite(const void* buf, size_t len) override { + NCoro::WriteI(Cont_, Fd_, buf, len).Checked(); + } + + size_t DoRead(void* buf, size_t len) override { + return NCoro::ReadI(Cont_, Fd_, buf, len).Checked(); + } + + SOCKET Fd() const noexcept { + return Fd_; + } + +private: + SOCKET Fd_; + TCont* Cont_; +}; diff --git a/library/cpp/coroutine/engine/stack/benchmark/alloc_bm.cpp b/library/cpp/coroutine/engine/stack/benchmark/alloc_bm.cpp new file mode 100644 index 0000000000..38d713d274 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/benchmark/alloc_bm.cpp @@ -0,0 +1,316 @@ +#include <benchmark/benchmark.h> + +#include <util/generic/vector.h> +#include <util/system/yassert.h> + +#include <library/cpp/coroutine/engine/stack/stack_allocator.h> +#include <library/cpp/coroutine/engine/stack/stack_guards.h> +#include <library/cpp/coroutine/engine/stack/stack_pool.h> +#include <library/cpp/coroutine/engine/stack/stack_utils.h> + + +namespace NCoro::NStack::NBenchmark { + + const char* TestCoroName = "any_name"; + constexpr uint64_t BigCoroSize = PageSize * 25; + constexpr uint64_t SmallCoroSize = PageSize * 4; + constexpr uint64_t ManyStacks = 4096; + + void BasicOperations(TStackHolder& stack) { + Y_VERIFY(!stack.Get().empty()); + stack.LowerCanaryOk(); + stack.UpperCanaryOk(); + } + + void WriteStack(TStackHolder& stack) { + auto memory = stack.Get(); + Y_VERIFY(!memory.empty()); + stack.LowerCanaryOk(); + stack.UpperCanaryOk(); + for (uint64_t i = PageSize / 2; i < memory.size(); i += PageSize * 2) { + memory[i] = 42; + } + } + + static void BM_GetAlignedMemory(benchmark::State& state) { + char* raw = nullptr; + char* aligned = nullptr; + for (auto _ : state) { + if (NCoro::NStack::GetAlignedMemory(state.range(0), raw, aligned)) { + free(raw); + } + } + } + BENCHMARK(BM_GetAlignedMemory)->RangeMultiplier(16)->Range(1, 1024 * 1024); + + static void BM_GetAlignedMemoryReleaseRss(benchmark::State& state) { + char* raw = nullptr; + char* aligned = nullptr; + for (auto _ : state) { + if (NCoro::NStack::GetAlignedMemory(state.range(0), raw, aligned)) { + const auto toFree = state.range(0) > 2 ? state.range(0) - 2 : 1; + ReleaseRss(aligned, toFree); + free(raw); + } + } + } + BENCHMARK(BM_GetAlignedMemoryReleaseRss)->RangeMultiplier(16)->Range(1, 1024 * 1024); + + static void BM_PoolAllocator(benchmark::State& state) { + auto allocator = GetAllocator(TPoolAllocatorSettings{}, (EGuard)state.range(0)); + for (auto _ : state) { + TStackHolder stack(*allocator, state.range(1), TestCoroName); + BasicOperations(stack); + } + } + BENCHMARK(BM_PoolAllocator) + ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize}) + ->Args({(int64_t)EGuard::Page, BigCoroSize}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize}); + + static void BM_DefaultAllocator(benchmark::State& state) { + auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0)); + for (auto _ : state) { + TStackHolder stack(*allocator, state.range(1), TestCoroName); + BasicOperations(stack); + } + } + BENCHMARK(BM_DefaultAllocator) + ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize}) + ->Args({(int64_t)EGuard::Page, BigCoroSize}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize}); + + static void BM_PoolAllocatorManyStacksOneAtTime(benchmark::State& state) { + TPoolAllocatorSettings settings; + settings.StacksPerChunk = state.range(2); + auto allocator = GetAllocator(settings, (EGuard)state.range(0)); + for (auto _ : state) { + for (uint64_t i = 0; i < ManyStacks; ++i) { + TStackHolder stack(*allocator, state.range(1), TestCoroName); + BasicOperations(stack); + } + } + } + BENCHMARK(BM_PoolAllocatorManyStacksOneAtTime) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 1}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1}) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1024}) + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1024}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 1024}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1024}); + + static void BM_DefaultAllocatorManyStacksOneAtTime(benchmark::State& state) { + auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0)); + for (auto _ : state) { + for (uint64_t i = 0; i < ManyStacks; ++i) { + TStackHolder stack(*allocator, state.range(1), TestCoroName); + BasicOperations(stack); + } + } + } + BENCHMARK(BM_DefaultAllocatorManyStacksOneAtTime) + ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize}) + ->Args({(int64_t)EGuard::Page, BigCoroSize}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize}); + + static void BM_PoolAllocatorManyStacks(benchmark::State& state) { + TPoolAllocatorSettings settings; + settings.StacksPerChunk = state.range(2); + auto allocator = GetAllocator(settings, (EGuard)state.range(0)); + TVector<TStackHolder> stacks; // store stacks during benchmark + stacks.reserve(ManyStacks); + for (auto _ : state) { + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.emplace_back(*allocator, state.range(1), TestCoroName); + BasicOperations(stacks.back()); + } + } + } + BENCHMARK(BM_PoolAllocatorManyStacks) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 1}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1}) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1024}) + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1024}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 1024}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1024}); + + static void BM_DefaultAllocatorManyStacks(benchmark::State& state) { + auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0)); + TVector<TStackHolder> stacks; // store stacks during benchmark + stacks.reserve(ManyStacks); + for (auto _ : state) { + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName)); + BasicOperations(stacks.back()); + } + } + } + BENCHMARK(BM_DefaultAllocatorManyStacks) + ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize}) + ->Args({(int64_t)EGuard::Page, BigCoroSize}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize}); + + // ------------------------------------------------------------------------ + static void BM_PoolAllocatorManyStacksReleased(benchmark::State& state) { + TPoolAllocatorSettings settings; + settings.StacksPerChunk = state.range(2); + auto allocator = GetAllocator(settings, (EGuard)state.range(0)); + TVector<TStackHolder> stacks; // store stacks during benchmark + stacks.reserve(ManyStacks); + for (auto _ : state) { + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.emplace_back(*allocator, state.range(1), TestCoroName); + BasicOperations(stacks.back()); + } + stacks.clear(); + } + } + BENCHMARK(BM_PoolAllocatorManyStacksReleased) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 1}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1}) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1024}) + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1024}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 1024}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1024}); + + static void BM_DefaultAllocatorManyStacksReleased(benchmark::State& state) { + auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0)); + TVector<TStackHolder> stacks; // store stacks during benchmark + stacks.reserve(ManyStacks); + for (auto _ : state) { + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName)); + BasicOperations(stacks.back()); + } + stacks.clear(); + } + } + BENCHMARK(BM_DefaultAllocatorManyStacksReleased) + ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize}) + ->Args({(int64_t)EGuard::Page, BigCoroSize}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize}); + + // ------------------------------------------------------------------------ + static void BM_PoolAllocatorManyStacksReleasedAndRealloc(benchmark::State& state) { + TPoolAllocatorSettings settings; + settings.StacksPerChunk = state.range(2); + auto allocator = GetAllocator(settings, (EGuard)state.range(0)); + TVector<TStackHolder> stacks; // store stacks during benchmark + stacks.reserve(ManyStacks); + for (auto _ : state) { + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.emplace_back(*allocator, state.range(1), TestCoroName); + BasicOperations(stacks.back()); + } + stacks.clear(); + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.emplace_back(*allocator, state.range(1), TestCoroName); + BasicOperations(stacks.back()); + } + } + } + BENCHMARK(BM_PoolAllocatorManyStacksReleasedAndRealloc) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 1}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1}) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1024}) + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1024}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 1024}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1024}) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 8192}) + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 8192}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 8192}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 8192}); + + static void BM_DefaultAllocatorManyStacksReleasedAndRealloc(benchmark::State& state) { + auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0)); + TVector<TStackHolder> stacks; // store stacks during benchmark + stacks.reserve(ManyStacks); + for (auto _ : state) { + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName)); + BasicOperations(stacks.back()); + } + stacks.clear(); + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName)); + BasicOperations(stacks.back()); + } + } + } + BENCHMARK(BM_DefaultAllocatorManyStacksReleasedAndRealloc) + ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize}) + ->Args({(int64_t)EGuard::Page, BigCoroSize}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize}); + + // ------------------------------------------------------------------------ + static void BM_PoolAllocatorManyStacksMemoryWriteReleasedAndRealloc(benchmark::State& state) { + TPoolAllocatorSettings settings; + settings.StacksPerChunk = state.range(2); + auto allocator = GetAllocator(settings, (EGuard)state.range(0)); + TVector<TStackHolder> stacks; // store stacks during benchmark + stacks.reserve(ManyStacks); + for (auto _ : state) { + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.emplace_back(*allocator, state.range(1), TestCoroName); + WriteStack(stacks.back()); + } + stacks.clear(); + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.emplace_back(*allocator, state.range(1), TestCoroName); + WriteStack(stacks.back()); + } + } + } + BENCHMARK(BM_PoolAllocatorManyStacksMemoryWriteReleasedAndRealloc) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 1}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1}) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 1024}) + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 1024}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 1024}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 1024}) + ->Args({(int64_t)EGuard::Canary, BigCoroSize, 8192}) + ->Args({(int64_t)EGuard::Canary, SmallCoroSize, 8192}) + ->Args({(int64_t)EGuard::Page, BigCoroSize, 8192}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize, 8192}); + + static void BM_DefaultAllocatorManyStacksMemoryWriteReleasedAndRealloc(benchmark::State& state) { + auto allocator = GetAllocator(Nothing(), (EGuard)state.range(0)); + TVector<TStackHolder> stacks; // store stacks during benchmark + stacks.reserve(ManyStacks); + for (auto _ : state) { + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName)); + WriteStack(stacks.back()); + } + stacks.clear(); + for (uint64_t i = 0; i < ManyStacks; ++i) { + stacks.push_back(TStackHolder(*allocator, state.range(1), TestCoroName)); + WriteStack(stacks.back()); + } + } + } + BENCHMARK(BM_DefaultAllocatorManyStacksMemoryWriteReleasedAndRealloc) + ->Args({(int64_t)EGuard::Canary, BigCoroSize}) // old version - ArgsProduct() is not supported + ->Args({(int64_t)EGuard::Canary, SmallCoroSize}) + ->Args({(int64_t)EGuard::Page, BigCoroSize}) + ->Args({(int64_t)EGuard::Page, SmallCoroSize}); + +} + +BENCHMARK_MAIN(); diff --git a/library/cpp/coroutine/engine/stack/benchmark/ya.make b/library/cpp/coroutine/engine/stack/benchmark/ya.make new file mode 100644 index 0000000000..b2942fe8ca --- /dev/null +++ b/library/cpp/coroutine/engine/stack/benchmark/ya.make @@ -0,0 +1,13 @@ +G_BENCHMARK() + +OWNER(g:balancer) + +SRCS( + alloc_bm.cpp +) + +PEERDIR( + library/cpp/coroutine/engine +) + +END()
\ No newline at end of file diff --git a/library/cpp/coroutine/engine/stack/stack.cpp b/library/cpp/coroutine/engine/stack/stack.cpp new file mode 100644 index 0000000000..e29450261d --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack.cpp @@ -0,0 +1,67 @@ +#include "stack.h" + +#include "stack_allocator.h" +#include "stack_guards.h" + + +namespace NCoro::NStack { + +namespace NDetails { + + TStack::TStack(void* rawMemory, void* alignedMemory, uint64_t alignedSize, const char* /*name*/) + : RawMemory_((char*)rawMemory) + , AlignedMemory_((char*)alignedMemory) + , Size_(alignedSize) + { + Y_ASSERT(AlignedMemory_ && RawMemory_ && Size_); + Y_ASSERT(!(Size_ & PageSizeMask)); + Y_ASSERT(!((uint64_t)AlignedMemory_ & PageSizeMask)); + } + + TStack::TStack(TStack&& rhs) noexcept + : RawMemory_(rhs.RawMemory_) + , AlignedMemory_(rhs.AlignedMemory_) + , Size_(rhs.Size_) + { + rhs.Reset(); + } + + TStack& TStack::operator=(TStack&& rhs) noexcept { + std::swap(*this, rhs); + rhs.Reset(); + return *this; + } + + void TStack::Reset() noexcept { + Y_ASSERT(AlignedMemory_ && RawMemory_ && Size_); + + RawMemory_ = nullptr; + AlignedMemory_ = nullptr; + Size_ = 0; + } + +} // namespace NDetails + + + TStackHolder::TStackHolder(NStack::IAllocator& allocator, uint32_t size, const char* name) noexcept + : Allocator_(allocator) + , Stack_(Allocator_.AllocStack(size, name)) + {} + + TStackHolder::~TStackHolder() { + Allocator_.FreeStack(Stack_); + } + + TArrayRef<char> TStackHolder::Get() noexcept { + return Allocator_.GetStackWorkspace(Stack_.GetAlignedMemory(), Stack_.GetSize()); + } + + bool TStackHolder::LowerCanaryOk() const noexcept { + return Allocator_.CheckStackOverflow(Stack_.GetAlignedMemory()); + } + + bool TStackHolder::UpperCanaryOk() const noexcept { + return Allocator_.CheckStackOverride(Stack_.GetAlignedMemory(), Stack_.GetSize()); + } + +} diff --git a/library/cpp/coroutine/engine/stack/stack.h b/library/cpp/coroutine/engine/stack/stack.h new file mode 100644 index 0000000000..7d98ba4c68 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack.h @@ -0,0 +1,77 @@ +#pragma once + +#include <util/generic/array_ref.h> +#include <util/generic/fwd.h> +#include <util/generic/noncopyable.h> + +#include <cstdint> + + +namespace NCoro::NStack { + + class IAllocator; + +namespace NDetails { + + //! Do not use directly, use TStackHolder instead + class TStack final : private TMoveOnly { + public: + /*! rawMemory: can be used by unaligned allocator to free stack memory after use + * alignedMemory: pointer to aligned memory on which stack workspace and guard are actually placed + * alignedSize: size of workspace memory + memory for guard + * guard: guard to protect this stack + * name: name of coroutine for which this stack is allocated + */ + TStack(void* rawMemory, void* alignedMemory, uint64_t alignedSize, const char* name); + TStack(TStack&& rhs) noexcept; + TStack& operator=(TStack&& rhs) noexcept; + + char* GetRawMemory() const noexcept { + return RawMemory_; + } + + char* GetAlignedMemory() const noexcept { + return AlignedMemory_; + } + + //! Stack size (includes memory for guard) + uint64_t GetSize() const noexcept { + return Size_; + } + + //! Resets parameters, should be called after stack memory is freed + void Reset() noexcept; + + private: + char* RawMemory_ = nullptr; // not owned + char* AlignedMemory_ = nullptr; // not owned + uint64_t Size_ = 0; + }; + +} // namespace NDetails + + class TStackHolder final : private TMoveOnly { + public: + explicit TStackHolder(IAllocator& allocator, uint32_t size, const char* name) noexcept; + TStackHolder(TStackHolder&&) = default; + TStackHolder& operator=(TStackHolder&&) = default; + + ~TStackHolder(); + + char* GetAlignedMemory() const noexcept { + return Stack_.GetAlignedMemory(); + } + uint64_t GetSize() const noexcept { + return Stack_.GetSize(); + } + + TArrayRef<char> Get() noexcept; + bool LowerCanaryOk() const noexcept; + bool UpperCanaryOk() const noexcept; + + private: + IAllocator& Allocator_; + NDetails::TStack Stack_; + }; + +} diff --git a/library/cpp/coroutine/engine/stack/stack_allocator.cpp b/library/cpp/coroutine/engine/stack/stack_allocator.cpp new file mode 100644 index 0000000000..bf12134e6b --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_allocator.cpp @@ -0,0 +1,26 @@ +#include "stack_allocator.h" + + +namespace NCoro::NStack { + + THolder<IAllocator> GetAllocator(TMaybe<TPoolAllocatorSettings> poolSettings, EGuard guardType) { + THolder<IAllocator> allocator; + if (poolSettings) { + if (guardType == EGuard::Canary) { + allocator = MakeHolder<TPoolAllocator<TCanaryGuard>>(*poolSettings); + } else { + Y_ASSERT(guardType == EGuard::Page); + allocator = MakeHolder<TPoolAllocator<TPageGuard>>(*poolSettings); + } + } else { + if (guardType == EGuard::Canary) { + allocator = MakeHolder<TSimpleAllocator<TCanaryGuard>>(); + } else { + Y_ASSERT(guardType == EGuard::Page); + allocator = MakeHolder<TSimpleAllocator<TPageGuard>>(); + } + } + return allocator; + } + +}
\ No newline at end of file diff --git a/library/cpp/coroutine/engine/stack/stack_allocator.h b/library/cpp/coroutine/engine/stack/stack_allocator.h new file mode 100644 index 0000000000..da3c3a93a1 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_allocator.h @@ -0,0 +1,52 @@ +#pragma once + +#include "stack.h" +#include "stack_common.h" + +#include <util/generic/maybe.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> + +#include <cstdint> + + +namespace NCoro::NStack { + + class IAllocator : private TNonCopyable { + public: + virtual ~IAllocator() = default; + + //! Size should be page-aligned. Stack would be protected by guard, thus, actual + //! workspace for stack = size - size of guard. + NDetails::TStack AllocStack(uint64_t size, const char* name) { + uint64_t alignedSize = (size + PageSize - 1) & ~PageSizeMask; + Y_ASSERT(alignedSize < 10 * 1024 * PageSize); // more than 10K pages for stack - do you really need it? +#if defined(_san_enabled_) || !defined(NDEBUG) + alignedSize *= DebugOrSanStackMultiplier; +#endif + return DoAllocStack(alignedSize, name); + } + + void FreeStack(NDetails::TStack& stack) noexcept { + if (stack.GetAlignedMemory()) { + DoFreeStack(stack); + } + }; + + virtual TAllocatorStats GetStackStats() const noexcept = 0; + + // Stack helpers + virtual TArrayRef<char> GetStackWorkspace(void* stack, uint64_t size) noexcept = 0; + virtual bool CheckStackOverflow(void* stack) const noexcept = 0; + virtual bool CheckStackOverride(void* stack, uint64_t size) const noexcept = 0; + + private: + virtual NDetails::TStack DoAllocStack(uint64_t size, const char* name) = 0; + virtual void DoFreeStack(NDetails::TStack& stack) noexcept = 0; + }; + + THolder<IAllocator> GetAllocator(TMaybe<TPoolAllocatorSettings> poolSettings, EGuard guardType); + +} + +#include "stack_allocator.inl" diff --git a/library/cpp/coroutine/engine/stack/stack_allocator.inl b/library/cpp/coroutine/engine/stack/stack_allocator.inl new file mode 100644 index 0000000000..0f25a4167b --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_allocator.inl @@ -0,0 +1,138 @@ +#include "stack_guards.h" +#include "stack_pool.h" +#include "stack_utils.h" + +#include <util/generic/hash.h> + +#ifdef _linux_ +#include <unistd.h> +#endif + + +namespace NCoro::NStack { + + template<typename TGuard> + class TPoolAllocator final : public IAllocator { + public: + explicit TPoolAllocator(const TPoolAllocatorSettings& settings); + + TArrayRef<char> GetStackWorkspace(void* stack, uint64_t size) noexcept override { + return Guard_.GetWorkspace(stack, size); + } + bool CheckStackOverflow(void* stack) const noexcept override { + return Guard_.CheckOverflow(stack); + } + bool CheckStackOverride(void* stack, uint64_t size) const noexcept override { + return Guard_.CheckOverride(stack, size); + } + + TAllocatorStats GetStackStats() const noexcept override { + TAllocatorStats stats; + for (const auto& i : Pools_) { + stats.ReleasedSize += i.second.GetReleasedSize(); + stats.NotReleasedSize += i.second.GetFullSize(); + stats.NumOfAllocated += i.second.GetNumOfAllocated(); + } + return stats; + } + + private: // methods + NDetails::TStack DoAllocStack(uint64_t size, const char* name) override; + void DoFreeStack(NDetails::TStack& stack) noexcept override; + + private: // data + const TPoolAllocatorSettings PoolSettings_; + const TGuard& Guard_; + THashMap<uint64_t, TPool<TGuard>> Pools_; // key - stack size + }; + + template<typename TGuard> + TPoolAllocator<TGuard>::TPoolAllocator(const TPoolAllocatorSettings& settings) + : PoolSettings_(settings) + , Guard_(GetGuard<TGuard>()) + { +#ifdef _linux_ + Y_VERIFY(sysconf(_SC_PAGESIZE) == PageSize); +#endif + } + + template<typename TGuard> + NDetails::TStack TPoolAllocator<TGuard>::DoAllocStack(uint64_t alignedSize, const char* name) { + Y_ASSERT(alignedSize > Guard_.GetSize()); + + auto pool = Pools_.find(alignedSize); + if (pool == Pools_.end()) { + Y_ASSERT(Pools_.size() < 1000); // too many different sizes for coroutine stacks + auto [newPool, success] = Pools_.emplace(alignedSize, TPool<TGuard>{alignedSize, PoolSettings_, Guard_}); + Y_VERIFY(success, "Failed to add new coroutine pool"); + pool = newPool; + } + return pool->second.AllocStack(name); + } + + template<typename TGuard> + void TPoolAllocator<TGuard>::DoFreeStack(NDetails::TStack& stack) noexcept { + auto pool = Pools_.find(stack.GetSize()); + Y_VERIFY(pool != Pools_.end(), "Attempt to free stack from another allocator"); + pool->second.FreeStack(stack); + } + + // ------------------------------------------------------------------------ + // + template<typename TGuard> + class TSimpleAllocator : public IAllocator { + public: + explicit TSimpleAllocator(); + + TArrayRef<char> GetStackWorkspace(void* stack, uint64_t size) noexcept override { + return Guard_.GetWorkspace(stack, size); + } + bool CheckStackOverflow(void* stack) const noexcept override { + return Guard_.CheckOverflow(stack); + } + bool CheckStackOverride(void* stack, uint64_t size) const noexcept override { + return Guard_.CheckOverride(stack, size); + } + + TAllocatorStats GetStackStats() const noexcept override { return {}; } // not used for simple allocator + + private: // methods + NDetails::TStack DoAllocStack(uint64_t size, const char* name) override; + void DoFreeStack(NDetails::TStack& stack) noexcept override; + + private: // data + const TGuard& Guard_; + }; + + + template<typename TGuard> + TSimpleAllocator<TGuard>::TSimpleAllocator() + : Guard_(GetGuard<TGuard>()) + {} + + template<typename TGuard> + NDetails::TStack TSimpleAllocator<TGuard>::DoAllocStack(uint64_t alignedSize, const char* name) { + Y_ASSERT(alignedSize > Guard_.GetSize()); + + char* rawPtr = nullptr; + char* alignedPtr = nullptr; // with extra space for previous guard in this type of allocator + + Y_VERIFY(GetAlignedMemory((alignedSize + Guard_.GetPageAlignedSize()) / PageSize, rawPtr, alignedPtr)); // + memory for previous guard + char* alignedStackMemory = alignedPtr + Guard_.GetPageAlignedSize(); // after previous guard + + // Default allocator sets both guards, because it doesn't have memory chunk with previous stack and guard on it + Guard_.Protect((void*)alignedPtr, Guard_.GetPageAlignedSize(), false); // first guard should be before stack memory + Guard_.Protect(alignedStackMemory, alignedSize, true); // second guard is placed on stack memory + + return NDetails::TStack{rawPtr, alignedStackMemory, alignedSize, name}; + } + + template<typename TGuard> + void TSimpleAllocator<TGuard>::DoFreeStack(NDetails::TStack& stack) noexcept { + Guard_.RemoveProtection(stack.GetAlignedMemory() - Guard_.GetPageAlignedSize(), Guard_.GetPageAlignedSize()); + Guard_.RemoveProtection(stack.GetAlignedMemory(), stack.GetSize()); + + free(stack.GetRawMemory()); + stack.Reset(); + } +} diff --git a/library/cpp/coroutine/engine/stack/stack_common.h b/library/cpp/coroutine/engine/stack/stack_common.h new file mode 100644 index 0000000000..ed2d74d296 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_common.h @@ -0,0 +1,35 @@ +#pragma once + +#include <cstdint> + +class TContExecutor; + +namespace NCoro::NStack { + + static constexpr uint64_t PageSize = 4096; + static constexpr uint64_t PageSizeMask = PageSize - 1; // for checks + static constexpr uint64_t DebugOrSanStackMultiplier = 4; // for debug or sanitizer builds + static constexpr uint64_t SmallStackMaxSizeInPages = 6; + + enum class EGuard { + Canary, //!< writes some data to check it for corruption + Page, //!< prohibits access to page memory + }; + + struct TPoolAllocatorSettings { + uint64_t RssPagesToKeep = 1; + uint64_t SmallStackRssPagesToKeep = 1; // for stack less than SmallStackMaxSizeInPages + uint64_t ReleaseRate = 2; +#if !defined(_san_enabled_) && defined(NDEBUG) + uint64_t StacksPerChunk = 256; +#else + uint64_t StacksPerChunk = 2; +#endif + }; + + struct TAllocatorStats { + uint64_t ReleasedSize = 0; + uint64_t NotReleasedSize = 0; + uint64_t NumOfAllocated = 0; + }; +} diff --git a/library/cpp/coroutine/engine/stack/stack_guards.cpp b/library/cpp/coroutine/engine/stack/stack_guards.cpp new file mode 100644 index 0000000000..b8bcff039e --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_guards.cpp @@ -0,0 +1,17 @@ +#include "stack_guards.h" + + +namespace NCoro::NStack { + + template<> + const TCanaryGuard& GetGuard<TCanaryGuard>() noexcept { + static const TCanaryGuard guard; + return guard; + } + + template<> + const TPageGuard& GetGuard<TPageGuard>() noexcept { + static const TPageGuard guard; + return guard; + } +}
\ No newline at end of file diff --git a/library/cpp/coroutine/engine/stack/stack_guards.h b/library/cpp/coroutine/engine/stack/stack_guards.h new file mode 100644 index 0000000000..3a7ef26481 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_guards.h @@ -0,0 +1,123 @@ +#pragma once + +#include "stack_common.h" + +#include <util/generic/array_ref.h> +#include <util/generic/strbuf.h> +#include <util/system/protect.h> + + +namespace NCoro::NStack { + + /*! Guard detect stack overflow/override, by setting memory before and after stack with predefined values/properties. + * Actually, it sets memory only after the end of stack workspace memory - previous guard section should be set + * already (for previous stack in case of pool allocator) and can be checked on demand. + * Stack pointer should be page-aligned. + */ + + + //! Checks integrity by writing a predefined sequence and comparing it with original + class TCanaryGuard final { + public: + //! Size of guard section in bytes + static constexpr uint64_t GetSize() { return Canary.size(); } + //! Size of page-aligned guard section in bytes + static constexpr uint64_t GetPageAlignedSize() { return AlignedSize_; } + + //! Get stack memory between guard sections + static TArrayRef<char> GetWorkspace(void* stack, uint64_t size) noexcept { + Y_ASSERT( !((uint64_t)stack & PageSizeMask) ); + Y_ASSERT( !(size & PageSizeMask) ); + Y_ASSERT(size > Canary.size()); + + return {(char*) stack, size - Canary.size()}; + } + + /*! Set guard section before the end of stack memory (at stack + size - guard size position) + * checkPrevious: check guard before stack memory for integrity + */ + static void Protect(void* stack, uint64_t size, bool checkPrevious) noexcept { + Y_ASSERT( !((uint64_t)stack & PageSizeMask) ); // stack pointer should be page aligned + Y_ASSERT( !(size & PageSizeMask) ); // stack size should be page aligned + Y_ASSERT(size >= Canary.size()); // stack should have enough space to place guard + + if (checkPrevious) { + Y_VERIFY(CheckOverflow(stack), "Previous stack was corrupted"); + } + auto guardPos = (char*) stack + size - Canary.size(); + memcpy(guardPos, Canary.data(), Canary.size()); + } + + //! This guard doesn't change memory flags + static constexpr void RemoveProtection(void*, uint64_t) {} + //! Should remove protection before returning memory to system + static constexpr bool ShouldRemoveProtectionBeforeFree() { return false; } + + static bool CheckOverflow(void* stack) noexcept { + Y_ASSERT(stack); + + char* guardPos = (char*) ((uint64_t)stack - Canary.size()); + return TStringBuf(guardPos, Canary.size()) == Canary; + } + + static bool CheckOverride(void* stack, uint64_t size) noexcept { + Y_ASSERT(stack); + Y_ASSERT(size > Canary.size()); + + char* guardPos = (char*) ((uint64_t)stack + size - Canary.size()); + return TStringBuf(guardPos, Canary.size()) == Canary; + } + + private: + static constexpr TStringBuf Canary = "[ThisIsACanaryCoroutineStackGuardIfYouReadThisTheStackIsStillOK]"; + static_assert(Canary.size() == 64); + static constexpr uint64_t AlignedSize_ = (Canary.size() + PageSize - 1) & ~PageSizeMask; + }; + + + // ------------------------------------------------------------------------ + // + //! Ensures integrity by removing access rights for border pages + class TPageGuard final { + public: + //! Size of guard section in bytes + static constexpr uint64_t GetSize() { return PageSize; } + //! Size of page-aligned guard section in bytes + static constexpr uint64_t GetPageAlignedSize() { return PageSize; } + + static TArrayRef<char> GetWorkspace(void* stack, uint64_t size) noexcept { + Y_ASSERT( !((uint64_t)stack & PageSizeMask) ); + Y_ASSERT( !(size & PageSizeMask) ); + Y_ASSERT(size > PageSize); + + return {(char*)stack, size - PageSize}; + } + + static void Protect(void* stack, uint64_t size, bool /*checkPrevious*/) noexcept { + Y_ASSERT( !((uint64_t)stack & PageSizeMask) ); // stack pointer should be page aligned + Y_ASSERT( !(size & PageSizeMask) ); // stack size should be page aligned + Y_ASSERT(size >= PageSize); // stack should have enough space to place guard + + ProtectMemory((char*)stack + size - PageSize, PageSize, PM_NONE); + } + + //! Remove protection, to allow stack memory be freed + static void RemoveProtection(void* stack, uint64_t size) noexcept { + Y_ASSERT( !((uint64_t)stack & PageSizeMask) ); + Y_ASSERT( !(size & PageSizeMask) ); + Y_ASSERT(size >= PageSize); + + ProtectMemory((char*)stack + size - PageSize, PageSize, PM_WRITE | PM_READ); + } + //! Should remove protection before returning memory to system + static constexpr bool ShouldRemoveProtectionBeforeFree() { return true; } + + //! For page guard is not used - it crashes process at once in this case. + static constexpr bool CheckOverflow(void*) { return true; } + static constexpr bool CheckOverride(void*, uint64_t) { return true; } + }; + + + template<typename TGuard> + const TGuard& GetGuard() noexcept; +} diff --git a/library/cpp/coroutine/engine/stack/stack_pool.h b/library/cpp/coroutine/engine/stack/stack_pool.h new file mode 100644 index 0000000000..27a8e9394b --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_pool.h @@ -0,0 +1,54 @@ +#pragma once + +#include "stack.h" +#include "stack_common.h" + +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/generic/vector.h> + + +namespace NCoro::NStack { + + class IGuard; + class TStorage; + struct TPoolAllocatorSettings; + + template<typename TGuard> + class TPool final : private TMoveOnly { + struct TMemory { + char* Raw = nullptr; + char* Aligned = nullptr; // points to aligned memory, which includes space for first page guard + }; + public: + TPool(uint64_t stackSize, const TPoolAllocatorSettings& settings, const TGuard& guard); + TPool(TPool&& other) noexcept; + ~TPool(); + + NDetails::TStack AllocStack(const char* name); + void FreeStack(NDetails::TStack& stack); + + uint64_t GetReleasedSize() const noexcept; + uint64_t GetFullSize() const noexcept; + uint64_t GetNumOfAllocated() const noexcept { return NumOfAllocated_; } + + private: + void AllocNewMemoryChunk(); + bool IsSmallStack() const noexcept; + bool IsStackFromThisPool(const NDetails::TStack& stack) const noexcept; + NDetails::TStack AllocNewStack(const char* name); + + private: + const uint64_t StackSize_ = 0; + uint64_t RssPagesToKeep_ = 0; + const TGuard& Guard_; + TVector<TMemory> Memory_; // memory chunks + THolder<TStorage> Storage_; + char* NextToAlloc_ = nullptr; // points to next available stack in the last memory chunk + const uint64_t ChunkSize_ = 0; + uint64_t NumOfAllocated_ = 0; + }; + +} + +#include "stack_pool.inl" diff --git a/library/cpp/coroutine/engine/stack/stack_pool.inl b/library/cpp/coroutine/engine/stack/stack_pool.inl new file mode 100644 index 0000000000..6e08e05a48 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_pool.inl @@ -0,0 +1,132 @@ +#include "stack_storage.h" +#include "stack_utils.h" + + +namespace NCoro::NStack { + + template<typename TGuard> + TPool<TGuard>::TPool(uint64_t stackSize, const TPoolAllocatorSettings& settings, const TGuard& guard) + : StackSize_(stackSize) + , RssPagesToKeep_(IsSmallStack() ? settings.SmallStackRssPagesToKeep : settings.RssPagesToKeep) + , Guard_(guard) + , ChunkSize_(Guard_.GetPageAlignedSize() + StackSize_ * settings.StacksPerChunk) + { + Y_ASSERT(RssPagesToKeep_); + if (!RssPagesToKeep_) { + RssPagesToKeep_ = 1; // at least guard should be kept + } + + const uint64_t stackSizeInPages = stackSize / PageSize; + Y_ASSERT(stackSizeInPages >= RssPagesToKeep_); + if (stackSizeInPages < RssPagesToKeep_) { + RssPagesToKeep_ = stackSizeInPages; // keep all stack pages + } + + Y_ASSERT(StackSize_ && !(StackSize_ & PageSizeMask)); // stack size is not zero and page aligned + Y_ASSERT(Guard_.GetSize() < StackSize_); // stack has enough space to place guard + Y_ASSERT(stackSizeInPages >= RssPagesToKeep_); + + Storage_ = MakeHolder<TStorage>(StackSize_, RssPagesToKeep_, settings.ReleaseRate); + + AllocNewMemoryChunk(); + } + + template<typename TGuard> + TPool<TGuard>::TPool(TPool&& other) noexcept = default; + + template<typename TGuard> + TPool<TGuard>::~TPool() { + if (!Memory_.empty()) { + Y_ASSERT(NextToAlloc_ && StackSize_); + + for (const auto& chunk : Memory_) { + Y_ASSERT(chunk.Raw && chunk.Aligned); + + if (Guard_.ShouldRemoveProtectionBeforeFree()) { + Guard_.RemoveProtection(chunk.Aligned, Guard_.GetPageAlignedSize()); // first page in chunk + + const char* endOfStacksMemory = chunk.Aligned + ChunkSize_; + for (char* i = chunk.Aligned + Guard_.GetPageAlignedSize(); i < endOfStacksMemory; i += StackSize_) { + Guard_.RemoveProtection(i, StackSize_); + } + } + + free(chunk.Raw); + } + } + } + + template<typename TGuard> + NDetails::TStack TPool<TGuard>::AllocStack(const char* name) { + Y_ASSERT(!Memory_.empty()); + + if (!Storage_->IsEmpty()) { + return Storage_->GetStack(Guard_, name); + } else { + ++NumOfAllocated_; + return AllocNewStack(name); + } + } + + template<typename TGuard> + void TPool<TGuard>::FreeStack(NDetails::TStack& stack) { + Y_ASSERT(Storage_->Size() < ((ChunkSize_ - Guard_.GetPageAlignedSize()) / StackSize_) * Memory_.size()); + Y_ASSERT(IsStackFromThisPool(stack)); + + Storage_->ReturnStack(stack); + } + + template<typename TGuard> + uint64_t TPool<TGuard>::GetReleasedSize() const noexcept { + return Storage_->GetReleasedSize(); + } + template<typename TGuard> + uint64_t TPool<TGuard>::GetFullSize() const noexcept { + return Storage_->GetFullSize(); + } + + template<typename TGuard> + void TPool<TGuard>::AllocNewMemoryChunk() { + const uint64_t totalSizeInPages = ChunkSize_ / PageSize; + + TMemory memory; + const auto res = GetAlignedMemory(totalSizeInPages, memory.Raw, memory.Aligned); + Y_VERIFY(res, "Failed to allocate memory for coro stack pool"); + + NextToAlloc_ = memory.Aligned + Guard_.GetPageAlignedSize(); // skip first guard page + Guard_.Protect(memory.Aligned, Guard_.GetPageAlignedSize(), false); // protect first guard page + + Memory_.push_back(std::move(memory)); + } + + template<typename TGuard> + bool TPool<TGuard>::IsSmallStack() const noexcept { + return StackSize_ / PageSize <= SmallStackMaxSizeInPages; + } + + template<typename TGuard> + bool TPool<TGuard>::IsStackFromThisPool(const NDetails::TStack& stack) const noexcept { + for (const auto& chunk : Memory_) { + const char* endOfStacksMemory = chunk.Aligned + ChunkSize_; + if (chunk.Raw <= stack.GetRawMemory() && stack.GetRawMemory() < endOfStacksMemory) { + return true; + } + } + return false; + } + + template<typename TGuard> + NDetails::TStack TPool<TGuard>::AllocNewStack(const char* name) { + if (NextToAlloc_ + StackSize_ > Memory_.rbegin()->Aligned + ChunkSize_) { + AllocNewMemoryChunk(); // also sets NextToAlloc_ to first stack position in new allocated chunk of memory + } + Y_ASSERT(NextToAlloc_ + StackSize_ <= Memory_.rbegin()->Aligned + ChunkSize_); + + char* newStack = NextToAlloc_; + NextToAlloc_ += StackSize_; + + Guard_.Protect(newStack, StackSize_, true); + return NDetails::TStack{newStack, newStack, StackSize_, name}; + } + +} diff --git a/library/cpp/coroutine/engine/stack/stack_storage.cpp b/library/cpp/coroutine/engine/stack/stack_storage.cpp new file mode 100644 index 0000000000..6dc2b2d44b --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_storage.cpp @@ -0,0 +1,46 @@ +#include "stack_storage.h" + +#include "stack.h" +#include "stack_utils.h" + +#include <library/cpp/coroutine/engine/impl.h> + + +namespace NCoro::NStack { + + TStorage::TStorage(uint64_t stackSize, uint64_t rssPagesToKeep, uint64_t releaseRate) + : StackSize_(stackSize) + , RssPagesToKeep_(rssPagesToKeep) + , ReleaseRate_(releaseRate ? releaseRate : 1) + { + Y_ASSERT(StackSize_ && RssPagesToKeep_); + } + + bool TStorage::IsEmpty() const noexcept { + return Released_.empty() && Full_.empty(); + } + + uint64_t TStorage::Size() const noexcept { + return Released_.size() + Full_.size(); + } + + void TStorage::ReturnStack(NDetails::TStack& stack) { + thread_local uint64_t i = 0; + if (++i % ReleaseRate_ != 0) { + Full_.push_back(stack.GetAlignedMemory()); + } else { + ReleaseMemory(stack.GetAlignedMemory(), RssPagesToKeep_); + Released_.push_back(stack.GetAlignedMemory()); + } + stack.Reset(); + } + + void TStorage::ReleaseMemory([[maybe_unused]] char* alignedStackMemory, [[maybe_unused]] uint64_t pagesToKeep) noexcept { +#if !defined(_san_enabled_) && defined(NDEBUG) + uint64_t numOfPagesToFree = StackSize_ / PageSize; + numOfPagesToFree -= pagesToKeep; + ReleaseRss(alignedStackMemory, numOfPagesToFree); +#endif + } + +} diff --git a/library/cpp/coroutine/engine/stack/stack_storage.h b/library/cpp/coroutine/engine/stack/stack_storage.h new file mode 100644 index 0000000000..25fe2cfb17 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_storage.h @@ -0,0 +1,60 @@ +#pragma once + +#include "stack.h" + +#include <util/datetime/base.h> +#include <util/generic/deque.h> + + +class TCont; + +namespace NCoro::NStack { + + class IGuard; + + class TStorage final : private TMoveOnly { + public: + TStorage(uint64_t stackSize, uint64_t rssPagesToKeep, uint64_t releaseRate); + + bool IsEmpty() const noexcept; + uint64_t Size() const noexcept; + + uint64_t GetReleasedSize() const noexcept { return Released_.size(); } + uint64_t GetFullSize() const noexcept { return Full_.size(); } + + template<typename TGuard> + NDetails::TStack GetStack(const TGuard& guard, const char* name); + void ReturnStack(NDetails::TStack& stack); + + private: + void ReleaseMemory(char* alignedStackMemory, uint64_t pagesToKeep) noexcept; + + private: + TDeque<void*> Released_; //!< stacks memory with released RSS memory + TDeque<void*> Full_; //!< stacks memory with RSS memory + uint64_t StackSize_ = 0; + uint64_t RssPagesToKeep_ = 0; + const uint64_t ReleaseRate_ = 1; + }; + + + template<typename TGuard> + NDetails::TStack TStorage::GetStack(const TGuard& guard, const char* name) { + Y_VERIFY(!IsEmpty()); // check before call + + void* newStack = nullptr; + if (!Full_.empty()) { + newStack = Full_.back(); + Full_.pop_back(); + } else { + Y_ASSERT(!Released_.empty()); + newStack = Released_.back(); + Released_.pop_back(); + } + + Y_VERIFY(guard.CheckOverflow(newStack), "corrupted stack in pool"); + Y_VERIFY(guard.CheckOverride(newStack, StackSize_), "corrupted stack in pool"); + + return NDetails::TStack{newStack, newStack, StackSize_, name}; + } +} diff --git a/library/cpp/coroutine/engine/stack/stack_utils.cpp b/library/cpp/coroutine/engine/stack/stack_utils.cpp new file mode 100644 index 0000000000..1548529b66 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_utils.cpp @@ -0,0 +1,84 @@ +#include "stack_utils.h" + +#include <contrib/libs/linux-headers/asm-generic/errno-base.h> +#include <util/generic/scope.h> +#include <util/system/yassert.h> + +#ifdef _linux_ +#include <sys/mman.h> +#endif + +#include <cerrno> +#include <cstdlib> +#include <cstring> + + +namespace NCoro::NStack { + +#ifdef _linux_ + bool GetAlignedMemory(uint64_t sizeInPages, char*& rawPtr, char*& alignedPtr) noexcept { + Y_ASSERT(sizeInPages); + + void* ptr = nullptr; + int error = posix_memalign(&ptr, PageSize, sizeInPages * PageSize); + alignedPtr = rawPtr = (char*)ptr; + return rawPtr && alignedPtr && !error; + } +#else + bool GetAlignedMemory(uint64_t sizeInPages, char*& rawPtr, char*& alignedPtr) noexcept { + Y_ASSERT(sizeInPages); + + rawPtr = (char*) malloc((sizeInPages + 1) * PageSize); // +1 in case result would be unaligned + alignedPtr = (char*)( ((uint64_t)rawPtr + PageSize - 1) & ~PageSizeMask); + return rawPtr && alignedPtr; + } +#endif + +#ifdef _linux_ + void ReleaseRss(char* alignedPtr, uint64_t numOfPages) noexcept { + Y_VERIFY( !((uint64_t)alignedPtr & PageSizeMask), "Not aligned pointer to release RSS memory"); + if (!numOfPages) { + return; + } + if (auto res = madvise((void*) alignedPtr, numOfPages * PageSize, MADV_DONTNEED); res) { + Y_VERIFY(errno == EAGAIN || errno == ENOMEM, "Failed to release memory"); + } + } +#else + void ReleaseRss(char*, uint64_t) noexcept { + } +#endif + +#ifdef _linux_ + uint64_t CountMapped(char* alignedPtr, uint64_t numOfPages) noexcept { + Y_VERIFY( !((uint64_t)alignedPtr & PageSizeMask) ); + Y_ASSERT(numOfPages); + + uint64_t result = 0; + unsigned char* mappedPages = (unsigned char*) calloc(numOfPages, numOfPages); + Y_VERIFY(mappedPages); + Y_DEFER { + free(mappedPages); + }; + + if (!mincore((void*)alignedPtr, numOfPages * PageSize, mappedPages)) { + for (uint64_t i = 0; i < numOfPages; ++i) { + if (mappedPages[i] & 1) { + ++result; + } + } + } else { + Y_ASSERT(false); + return 0; + } + + return result; + } + +#else + uint64_t CountMapped(char*, uint64_t) noexcept { + return 0; // stub for Windows tests + } +#endif + +} diff --git a/library/cpp/coroutine/engine/stack/stack_utils.h b/library/cpp/coroutine/engine/stack/stack_utils.h new file mode 100644 index 0000000000..46c65240b5 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/stack_utils.h @@ -0,0 +1,27 @@ +#pragma once + +#include "stack_common.h" + + +namespace NCoro::NStack { + /*! Actual size of allocated memory can exceed size in pages, due to unaligned allocation. + * @param sizeInPages : number of pages to allocate + * @param rawPtr : pointer to unaligned memory. Should be passed to free() when is not used any more. + * @param alignedPtr : pointer to beginning of first fully allocated page + * @return : true on success + */ + bool GetAlignedMemory(uint64_t sizeInPages, char*& rawPtr, char*& alignedPtr) noexcept; + + /*! Release mapped RSS memory. + * @param alignedPt : page-size aligned memory on which RSS memory should be freed + * @param numOfPages : number of pages to free from RSS memory + */ + void ReleaseRss(char* alignedPtr, uint64_t numOfPages) noexcept; + + /*! Count pages with RSS memory + * @param alignedPtr : pointer to page-aligned memory for which calculations would be done + * @param numOfPages : number of pages to check + * @return : number of pages with RSS memory + */ + uint64_t CountMapped(char* alignedPtr, uint64_t numOfPages) noexcept; +} diff --git a/library/cpp/coroutine/engine/stack/ut/stack_allocator_ut.cpp b/library/cpp/coroutine/engine/stack/ut/stack_allocator_ut.cpp new file mode 100644 index 0000000000..a7283d44a3 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/ut/stack_allocator_ut.cpp @@ -0,0 +1,115 @@ +#include <library/cpp/coroutine/engine/stack/stack_allocator.h> +#include <library/cpp/coroutine/engine/stack/stack_common.h> +#include <library/cpp/testing/gtest/gtest.h> + + +using namespace testing; + +namespace NCoro::NStack::Tests { + + enum class EAllocator { + Pool, // allocates page-size aligned stacks from pools + Simple // uses malloc/free for each stack + }; + + class TAllocatorParamFixture : public TestWithParam< std::tuple<EGuard, EAllocator> > { + protected: // methods + void SetUp() override { + EGuard guardType; + EAllocator allocType; + std::tie(guardType, allocType) = GetParam(); + + TMaybe<TPoolAllocatorSettings> poolSettings; + if (allocType == EAllocator::Pool) { + poolSettings = TPoolAllocatorSettings{}; + } + + Allocator_ = GetAllocator(poolSettings, guardType); + } + + protected: // data + THolder<IAllocator> Allocator_; + }; + + + TEST_P(TAllocatorParamFixture, StackAllocationAndRelease) { + uint64_t stackSize = PageSize * 12; + auto stack = Allocator_->AllocStack(stackSize, "test_stack"); +#if defined(_san_enabled_) || !defined(NDEBUG) + stackSize *= DebugOrSanStackMultiplier; +#endif + + // Correct stack should have + EXPECT_EQ(stack.GetSize(), stackSize); // predefined size + EXPECT_FALSE((uint64_t)stack.GetAlignedMemory() & PageSizeMask); // aligned pointer + // Writable workspace + auto workspace = Allocator_->GetStackWorkspace(stack.GetAlignedMemory(), stack.GetSize()); + for (uint64_t i = 0; i < workspace.size(); i += 512) { + workspace[i] = 42; + } + EXPECT_TRUE(Allocator_->CheckStackOverflow(stack.GetAlignedMemory())); + EXPECT_TRUE(Allocator_->CheckStackOverride(stack.GetAlignedMemory(), stack.GetSize())); + + Allocator_->FreeStack(stack); + EXPECT_FALSE(stack.GetRawMemory()); + } + + INSTANTIATE_TEST_SUITE_P(AllocatorTestParams, TAllocatorParamFixture, + Combine(Values(EGuard::Canary, EGuard::Page), Values(EAllocator::Pool, EAllocator::Simple))); + + + // ------------------------------------------------------------------------ + // Test that allocated stack has guards + // + template<class AllocatorType> + THolder<IAllocator> GetAllocator(EGuard guardType); + + struct TPoolTag {}; + struct TSimpleTag {}; + + template<> + THolder<IAllocator> GetAllocator<TPoolTag>(EGuard guardType) { + TMaybe<TPoolAllocatorSettings> poolSettings = TPoolAllocatorSettings{}; + return GetAllocator(poolSettings, guardType); + } + + template<> + THolder<IAllocator> GetAllocator<TSimpleTag>(EGuard guardType) { + TMaybe<TPoolAllocatorSettings> poolSettings; + return GetAllocator(poolSettings, guardType); + } + + + template <class AllocatorType> + class TAllocatorFixture : public Test { + protected: + TAllocatorFixture() + : Allocator_(GetAllocator<AllocatorType>(EGuard::Page)) + {} + + const uint64_t StackSize_ = PageSize * 2; + THolder<IAllocator> Allocator_; + }; + + typedef Types<TPoolTag, TSimpleTag> Implementations; + TYPED_TEST_SUITE(TAllocatorFixture, Implementations); + + TYPED_TEST(TAllocatorFixture, StackOverflow) { + ASSERT_DEATH({ + auto stack = this->Allocator_->AllocStack(this->StackSize_, "test_stack"); + + // Overwrite previous guard, crash is here + *(stack.GetAlignedMemory() - 1) = 42; + }, ""); + } + + TYPED_TEST(TAllocatorFixture, StackOverride) { + ASSERT_DEATH({ + auto stack = this->Allocator_->AllocStack(this->StackSize_, "test_stack"); + + // Overwrite guard, crash is here + *(stack.GetAlignedMemory() + stack.GetSize() - 1) = 42; + }, ""); + } + +} diff --git a/library/cpp/coroutine/engine/stack/ut/stack_guards_ut.cpp b/library/cpp/coroutine/engine/stack/ut/stack_guards_ut.cpp new file mode 100644 index 0000000000..9da9a9b3d5 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/ut/stack_guards_ut.cpp @@ -0,0 +1,158 @@ +#include <library/cpp/coroutine/engine/stack/stack_common.h> +#include <library/cpp/coroutine/engine/stack/stack_guards.h> +#include <library/cpp/coroutine/engine/stack/stack_utils.h> +#include <library/cpp/testing/gtest/gtest.h> + + +using namespace testing; + +namespace NCoro::NStack::Tests { + + template <class TGuard> + class TGuardFixture : public Test { + protected: + TGuardFixture() : Guard_(GetGuard<TGuard>()) {} + + const TGuard& Guard_; + }; + + typedef Types<TCanaryGuard, TPageGuard> Implementations; + TYPED_TEST_SUITE(TGuardFixture, Implementations); + + TYPED_TEST(TGuardFixture, GuardSize) { + const auto size = this->Guard_.GetSize(); + EXPECT_GE(size, 64ul); + EXPECT_FALSE(size & 63ul); // check 64-byte alignment + } + + TYPED_TEST(TGuardFixture, GuardAlignedSize) { + const auto size = this->Guard_.GetPageAlignedSize(); + EXPECT_GE(size, PageSize); + EXPECT_FALSE(size & PageSizeMask); // check page-alignment + } + + TYPED_TEST(TGuardFixture, StackWorkspace) { + for (uint64_t sizeInPages : {2, 5, 12}) { + char *rawPtr, *alignedPtr = nullptr; + ASSERT_TRUE(GetAlignedMemory(sizeInPages, rawPtr, alignedPtr)); + auto workspace = this->Guard_.GetWorkspace(alignedPtr, sizeInPages * PageSize); + EXPECT_EQ(workspace.size(), sizeInPages * PageSize - this->Guard_.GetSize()) << " size in pages " << sizeInPages; + + this->Guard_.Protect(alignedPtr, sizeInPages * PageSize, false); + workspace = this->Guard_.GetWorkspace(alignedPtr, sizeInPages * PageSize); + EXPECT_EQ(workspace.size(), sizeInPages * PageSize - this->Guard_.GetSize()) << " size in pages " << sizeInPages; + + this->Guard_.RemoveProtection(alignedPtr, sizeInPages * PageSize); + workspace = this->Guard_.GetWorkspace(alignedPtr, sizeInPages * PageSize); + EXPECT_EQ(workspace.size(), sizeInPages * PageSize - this->Guard_.GetSize()) << " size in pages " << sizeInPages; + + free(rawPtr); + } + } + + TYPED_TEST(TGuardFixture, SetRemoveProtectionWorks) { + char *rawPtr, *alignedPtr = nullptr; + constexpr uint64_t sizeInPages = 4; + ASSERT_TRUE(GetAlignedMemory(sizeInPages + 1, rawPtr, alignedPtr)); + + this->Guard_.Protect(alignedPtr, PageSize, false); // set previous guard + alignedPtr += PageSize; // leave first page for previous guard + this->Guard_.Protect(alignedPtr, sizeInPages * PageSize, true); + + EXPECT_TRUE(this->Guard_.CheckOverflow(alignedPtr)); + EXPECT_TRUE(this->Guard_.CheckOverride(alignedPtr, sizeInPages * PageSize)); + + this->Guard_.RemoveProtection(alignedPtr, sizeInPages * PageSize); + this->Guard_.RemoveProtection(alignedPtr - PageSize, PageSize); // remove previous guard + + free(rawPtr); + } + + TEST(StackGuardTest, CanaryGuardTestOverflow) { + const auto& guard = GetGuard<TCanaryGuard>(); + + char *rawPtr, *alignedPtr = nullptr; + constexpr uint64_t sizeInPages = 4; + ASSERT_TRUE(GetAlignedMemory(sizeInPages + 1, rawPtr, alignedPtr)); + guard.Protect(alignedPtr, PageSize, false); // set previous guard + alignedPtr += PageSize; // leave first page for previous guard + guard.Protect(alignedPtr, sizeInPages * PageSize, true); + + EXPECT_TRUE(guard.CheckOverflow(alignedPtr)); + EXPECT_TRUE(guard.CheckOverride(alignedPtr, sizeInPages * PageSize)); + + // Overwrite previous guard + *(alignedPtr - 1) = 42; + + EXPECT_FALSE(guard.CheckOverflow(alignedPtr)); + + free(rawPtr); + } + + TEST(StackGuardTest, CanaryGuardTestOverride) { + const auto& guard = GetGuard<TCanaryGuard>(); + + char *rawPtr, *alignedPtr = nullptr; + constexpr uint64_t sizeInPages = 4; + ASSERT_TRUE(GetAlignedMemory(sizeInPages + 1, rawPtr, alignedPtr)); + guard.Protect(alignedPtr, PageSize, false); // set previous guard + alignedPtr += PageSize; // leave first page for previous guard + guard.Protect(alignedPtr, sizeInPages * PageSize, true); + + EXPECT_TRUE(guard.CheckOverflow(alignedPtr)); + EXPECT_TRUE(guard.CheckOverride(alignedPtr, sizeInPages * PageSize)); + + // Overwrite guard + *(alignedPtr + sizeInPages * PageSize - 1) = 42; + + EXPECT_FALSE(guard.CheckOverride(alignedPtr, sizeInPages * PageSize)); + + free(rawPtr); + } + + TEST(StackGuardDeathTest, PageGuardTestOverflow) { + ASSERT_DEATH({ + const auto &guard = GetGuard<TPageGuard>(); + + char* rawPtr = nullptr; + char* alignedPtr = nullptr; + constexpr uint64_t sizeInPages = 4; + ASSERT_TRUE(GetAlignedMemory(sizeInPages + 1, rawPtr, alignedPtr)); + + guard.Protect(alignedPtr, PageSize, false); // set previous guard + alignedPtr += PageSize; // leave first page for previous guard + guard.Protect(alignedPtr, sizeInPages * PageSize, true); + + // Overwrite previous guard, crash is here + *(alignedPtr - 1) = 42; + + guard.RemoveProtection(alignedPtr, sizeInPages * PageSize); + guard.RemoveProtection(alignedPtr - PageSize, PageSize); // remove previous guard + + free(rawPtr); + }, ""); + } + + TEST(StackGuardDeathTest, PageGuardTestOverride) { + ASSERT_DEATH({ + const auto &guard = GetGuard<TPageGuard>(); + + char* rawPtr = nullptr; + char* alignedPtr = nullptr; + constexpr uint64_t sizeInPages = 4; + ASSERT_TRUE(GetAlignedMemory(sizeInPages + 1, rawPtr, alignedPtr)); + guard.Protect(alignedPtr, PageSize, false); // set previous guard + alignedPtr += PageSize; // leave first page for previous guard + guard.Protect(alignedPtr, sizeInPages * PageSize, true); + + // Overwrite guard, crash is here + *(alignedPtr + sizeInPages * PageSize - 1) = 42; + + guard.RemoveProtection(alignedPtr, sizeInPages * PageSize); + guard.RemoveProtection(alignedPtr - PageSize, PageSize); // remove previous guard + + free(rawPtr); + }, ""); + } + +} diff --git a/library/cpp/coroutine/engine/stack/ut/stack_pool_ut.cpp b/library/cpp/coroutine/engine/stack/ut/stack_pool_ut.cpp new file mode 100644 index 0000000000..9e3e5e7117 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/ut/stack_pool_ut.cpp @@ -0,0 +1,70 @@ +#include <library/cpp/coroutine/engine/stack/stack_common.h> +#include <library/cpp/coroutine/engine/stack/stack_guards.h> +#include <library/cpp/coroutine/engine/stack/stack_pool.h> +#include <library/cpp/testing/gtest/gtest.h> + + +using namespace testing; + +namespace NCoro::NStack::Tests { + + template <class TGuard> + class TPoolFixture : public Test { + protected: + TPoolFixture() : Guard_(GetGuard<TGuard>()), Pool_(StackSize_, TPoolAllocatorSettings{1, 1, 8, 32}, Guard_) {} + + const uint64_t StackSize_ = PageSize * 4; + const TGuard& Guard_; + TPool<TGuard> Pool_; + }; + + typedef Types<TCanaryGuard, TPageGuard> Implementations; + TYPED_TEST_SUITE(TPoolFixture, Implementations); + + TYPED_TEST(TPoolFixture, AllocAndFreeStack) { + auto stack = this->Pool_.AllocStack("test_stack"); + this->Pool_.FreeStack(stack); + EXPECT_FALSE(stack.GetRawMemory()); + } + + TYPED_TEST(TPoolFixture, FreedStackReused) { + auto stack = this->Pool_.AllocStack("test_stack"); + auto rawPtr = stack.GetRawMemory(); + auto alignedPtr = stack.GetAlignedMemory(); + + this->Pool_.FreeStack(stack); + EXPECT_FALSE(stack.GetRawMemory()); + + auto stack2 = this->Pool_.AllocStack("test_stack"); + EXPECT_EQ(rawPtr, stack2.GetRawMemory()); + EXPECT_EQ(alignedPtr, stack2.GetAlignedMemory()); + + this->Pool_.FreeStack(stack2); + EXPECT_FALSE(stack2.GetRawMemory()); + } + + TYPED_TEST(TPoolFixture, MruFreedStackReused) { + auto stack = this->Pool_.AllocStack("test_stack"); + auto rawPtr = stack.GetRawMemory(); + auto alignedPtr = stack.GetAlignedMemory(); + auto stack2 = this->Pool_.AllocStack("test_stack"); + auto stack3 = this->Pool_.AllocStack("test_stack"); + + this->Pool_.FreeStack(stack2); + EXPECT_FALSE(stack2.GetRawMemory()); + + this->Pool_.FreeStack(stack); + EXPECT_FALSE(stack.GetRawMemory()); + + auto stack4 = this->Pool_.AllocStack("test_stack"); + EXPECT_EQ(rawPtr, stack4.GetRawMemory()); + EXPECT_EQ(alignedPtr, stack4.GetAlignedMemory()); + + this->Pool_.FreeStack(stack3); + EXPECT_FALSE(stack.GetRawMemory()); + + this->Pool_.FreeStack(stack4); + EXPECT_FALSE(stack4.GetRawMemory()); + } + +} diff --git a/library/cpp/coroutine/engine/stack/ut/stack_ut.cpp b/library/cpp/coroutine/engine/stack/ut/stack_ut.cpp new file mode 100644 index 0000000000..31f8ad6b61 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/ut/stack_ut.cpp @@ -0,0 +1,60 @@ +#include <library/cpp/coroutine/engine/stack/stack.h> +#include <library/cpp/coroutine/engine/stack/stack_common.h> +#include <library/cpp/coroutine/engine/stack/stack_guards.h> +#include <library/cpp/coroutine/engine/stack/stack_utils.h> +#include <library/cpp/testing/gtest/gtest.h> + + +using namespace testing; + +namespace NCoro::NStack::Tests { + + constexpr uint64_t StackSizeInPages = 4; + + template <class TGuard> + class TStackFixture : public Test { + protected: // methods + TStackFixture() + : Guard_(GetGuard<TGuard>()) + , StackSize_(StackSizeInPages * PageSize) + {} + + void SetUp() override { + ASSERT_TRUE(GetAlignedMemory(StackSizeInPages, RawMemory_, AlignedMemory_)); + Stack_ = MakeHolder<NDetails::TStack>(RawMemory_, AlignedMemory_, StackSize_, "test_stack"); + Guard_.Protect(AlignedMemory_, StackSize_, false); + } + + void TearDown() override { + Guard_.RemoveProtection(AlignedMemory_, StackSize_); + free(Stack_->GetRawMemory()); + Stack_->Reset(); + EXPECT_EQ(Stack_->GetRawMemory(), nullptr); + } + + protected: // data + const TGuard& Guard_; + const uint64_t StackSize_ = 0; + char* RawMemory_ = nullptr; + char* AlignedMemory_ = nullptr; + THolder<NDetails::TStack> Stack_; + }; + + typedef Types<TCanaryGuard, TPageGuard> Implementations; + TYPED_TEST_SUITE(TStackFixture, Implementations); + + TYPED_TEST(TStackFixture, PointersAndSize) { + EXPECT_EQ(this->Stack_->GetRawMemory(), this->RawMemory_); + EXPECT_EQ(this->Stack_->GetAlignedMemory(), this->AlignedMemory_); + EXPECT_EQ(this->Stack_->GetSize(), this->StackSize_); + } + + TYPED_TEST(TStackFixture, WriteStack) { + auto workspace = this->Guard_.GetWorkspace(this->Stack_->GetAlignedMemory(), this->Stack_->GetSize()); + for (uint64_t i = 0; i < workspace.size(); i += 512) { + workspace[i] = 42; + } + EXPECT_TRUE(this->Guard_.CheckOverride(this->Stack_->GetAlignedMemory(), this->Stack_->GetSize())); + } + +} diff --git a/library/cpp/coroutine/engine/stack/ut/stack_utils_ut.cpp b/library/cpp/coroutine/engine/stack/ut/stack_utils_ut.cpp new file mode 100644 index 0000000000..dc0593dcf2 --- /dev/null +++ b/library/cpp/coroutine/engine/stack/ut/stack_utils_ut.cpp @@ -0,0 +1,73 @@ +#include <library/cpp/coroutine/engine/stack/stack_common.h> +#include <library/cpp/coroutine/engine/stack/stack_utils.h> +#include <library/cpp/testing/gtest/gtest.h> + + +using namespace testing; + +namespace NCoro::NStack::Tests { + + TEST(StackUtilsTest, Allocation) { + char *rawPtr, *alignedPtr = nullptr; + for (uint64_t i : {1, 2, 3, 4, 11}) { + EXPECT_TRUE(GetAlignedMemory(i, rawPtr, alignedPtr)); + EXPECT_TRUE(rawPtr); + EXPECT_TRUE(alignedPtr); + EXPECT_FALSE((uint64_t)alignedPtr & PageSizeMask); + free(rawPtr); + } + } + +#if !defined(_san_enabled_) && defined(_linux_) + + TEST(StackUtilsTest, RssReleaseOnePage) { + char *rawPtr, *alignedPtr = nullptr; + for (uint64_t i : {1, 2, 8}) { + EXPECT_TRUE(GetAlignedMemory(i, rawPtr, alignedPtr)); + EXPECT_TRUE(rawPtr); + EXPECT_TRUE(alignedPtr); + EXPECT_FALSE((uint64_t)alignedPtr & PageSizeMask); + + ReleaseRss(alignedPtr, i); // allocator can provide reused memory with RSS memory on it + EXPECT_EQ(CountMapped(alignedPtr, i), 0ul); // no RSS memory allocated + + *(alignedPtr + (i - 1) * PageSize) = 42; // map RSS memory + EXPECT_EQ(CountMapped(alignedPtr, i), 1ul); + + ReleaseRss(alignedPtr, i); + EXPECT_EQ(CountMapped(alignedPtr, i), 0ul) << "number of pages " << i; // no RSS memory allocated + + free(rawPtr); + } + } + + TEST(StackUtilsTest, RssReleaseSeveralPages) { + char *rawPtr, *alignedPtr = nullptr; + + for (uint64_t i : {1, 2, 5, 8}) { + EXPECT_TRUE(GetAlignedMemory(i, rawPtr, alignedPtr)); + EXPECT_TRUE(rawPtr); + EXPECT_TRUE(alignedPtr); + EXPECT_FALSE((uint64_t)alignedPtr & PageSizeMask); + + ReleaseRss(alignedPtr, i); // allocator can provide reused memory with RSS memory on it + EXPECT_EQ(CountMapped(alignedPtr, i), 0ul); // no RSS memory allocated + + for (uint64_t page = 0; page < i; ++page) { + *(alignedPtr + page * PageSize) = 42; // map RSS memory + EXPECT_EQ(CountMapped(alignedPtr, page + 1), page + 1); + } + + const uint64_t pagesToKeep = (i > 2) ? 2 : i; + + ReleaseRss(alignedPtr, i - pagesToKeep); + EXPECT_EQ(CountMapped(alignedPtr, i), pagesToKeep) << "number of pages " << i; // no RSS memory allocated + + free(rawPtr); + } + } + +#endif + +} + diff --git a/library/cpp/coroutine/engine/stack/ut/ya.make b/library/cpp/coroutine/engine/stack/ut/ya.make new file mode 100644 index 0000000000..65c5af9b7f --- /dev/null +++ b/library/cpp/coroutine/engine/stack/ut/ya.make @@ -0,0 +1,17 @@ +GTEST() + +OWNER(g:balancer) + +SRCS( + stack_allocator_ut.cpp + stack_guards_ut.cpp + stack_pool_ut.cpp + stack_ut.cpp + stack_utils_ut.cpp +) + +PEERDIR( + library/cpp/coroutine/engine +) + +END()
\ No newline at end of file diff --git a/library/cpp/coroutine/engine/trampoline.cpp b/library/cpp/coroutine/engine/trampoline.cpp new file mode 100644 index 0000000000..10ea69ddc3 --- /dev/null +++ b/library/cpp/coroutine/engine/trampoline.cpp @@ -0,0 +1,50 @@ +#include "impl.h" +#include "trampoline.h" + +#include "stack/stack_allocator.h" + +#include <util/system/info.h> +#include <util/system/protect.h> +#include <util/system/valgrind.h> +#include <util/system/yassert.h> + +#include <cstdlib> +#include <util/stream/format.h> + + +namespace NCoro { + +TTrampoline::TTrampoline(NStack::IAllocator& allocator, ui32 stackSize, TFunc f, TCont* cont) noexcept + : Stack_(allocator, stackSize, cont->Name()) + , Clo_{this, Stack_.Get(), cont->Name()} + , Ctx_(Clo_) + , Func_(std::move(f)) + , Cont_(cont) + {} + + void TTrampoline::DoRun() { + if (Cont_->Executor()->FailOnError()) { + Func_(Cont_); + } else { + try { + Func_(Cont_); + } catch (...) {} + } + + Cont_->Terminate(); + } + + TArrayRef<char> TTrampoline::Stack() noexcept { + return Stack_.Get(); + } + + const char* TTrampoline::ContName() const noexcept { + return Cont_->Name(); + } + + void TTrampoline::DoRunNaked() { + DoRun(); + + abort(); + } +} diff --git a/library/cpp/coroutine/engine/trampoline.h b/library/cpp/coroutine/engine/trampoline.h new file mode 100644 index 0000000000..37b61cf015 --- /dev/null +++ b/library/cpp/coroutine/engine/trampoline.h @@ -0,0 +1,60 @@ +#pragma once + +#include "stack/stack_common.h" +#include "stack/stack.h" + +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> +#include <util/system/context.h> +#include <util/system/defaults.h> + +#if !defined(STACK_GROW_DOWN) +# error "unsupported" +#endif + +class TCont; +typedef void (*TContFunc)(TCont*, void*); + +namespace NCoro { + + namespace NStack { + class IAllocator; + } + + class TTrampoline : public ITrampoLine, TNonCopyable { + public: + typedef std::function<void (TCont*)> TFunc; + + TTrampoline( + NCoro::NStack::IAllocator& allocator, + uint32_t stackSize, + TFunc f, + TCont* cont + ) noexcept; + + TArrayRef<char> Stack() noexcept; + + TExceptionSafeContext* Context() noexcept { + return &Ctx_; + } + + void SwitchTo(TExceptionSafeContext* ctx) noexcept { + Y_VERIFY(Stack_.LowerCanaryOk(), "Stack overflow (%s)", ContName()); + Y_VERIFY(Stack_.UpperCanaryOk(), "Stack override (%s)", ContName()); + Ctx_.SwitchTo(ctx); + } + + void DoRun() override; + + void DoRunNaked() override; + + private: + const char* ContName() const noexcept; + private: + NStack::TStackHolder Stack_; + const TContClosure Clo_; + TExceptionSafeContext Ctx_; + TFunc Func_; + TCont* const Cont_; + }; +} diff --git a/library/cpp/coroutine/engine/ya.make b/library/cpp/coroutine/engine/ya.make new file mode 100644 index 0000000000..8c20b9afc3 --- /dev/null +++ b/library/cpp/coroutine/engine/ya.make @@ -0,0 +1,36 @@ +LIBRARY() + +OWNER( + pg + g:balancer +) + +GENERATE_ENUM_SERIALIZATION(poller.h) +GENERATE_ENUM_SERIALIZATION(stack/stack_common.h) + +PEERDIR( + contrib/libs/libc_compat + library/cpp/containers/intrusive_rb_tree +) + +SRCS( + cont_poller.cpp + helper.cpp + impl.cpp + iostatus.cpp + network.cpp + poller.cpp + sockpool.cpp + stack/stack.cpp + stack/stack_allocator.cpp + stack/stack_guards.cpp + stack/stack_storage.cpp + stack/stack_utils.cpp + trampoline.cpp +) + +END() + +RECURSE( + stack/benchmark +) diff --git a/library/cpp/coroutine/listener/listen.cpp b/library/cpp/coroutine/listener/listen.cpp new file mode 100644 index 0000000000..3d4e711d1d --- /dev/null +++ b/library/cpp/coroutine/listener/listen.cpp @@ -0,0 +1,354 @@ +#include "listen.h" + +#include <library/cpp/coroutine/engine/impl.h> +#include <library/cpp/coroutine/engine/network.h> + +#include <util/network/ip.h> +#include <util/network/address.h> +#include <util/generic/ylimits.h> +#include <util/generic/intrlist.h> + +using namespace NAddr; + +namespace { + union TSa { + const sockaddr* Sa; + const sockaddr_in* In; + const sockaddr_in6* In6; + + inline TSa(const sockaddr* sa) noexcept + : Sa(sa) + { + } + + inline bool operator==(const TSa& r) const noexcept { + if (Sa->sa_family == r.Sa->sa_family) { + switch (Sa->sa_family) { + case AF_INET: + return In->sin_port == r.In->sin_port && In->sin_addr.s_addr == r.In->sin_addr.s_addr; + case AF_INET6: + return In6->sin6_port == r.In6->sin6_port && !memcmp(&In6->sin6_addr, &r.In6->sin6_addr, sizeof(in6_addr)); + } + } + + return false; + } + + inline bool operator!=(const TSa& r) const noexcept { + return !(*this == r); + } + }; +} + +class TContListener::TImpl { +private: + struct TStoredAddrInfo: public TAddrInfo, private TNetworkAddress { + inline TStoredAddrInfo(const struct addrinfo* ai, const TNetworkAddress& addr) noexcept + : TAddrInfo(ai) + , TNetworkAddress(addr) + { + } + }; + +private: + class TOneSocketListener: public TIntrusiveListItem<TOneSocketListener> { + public: + inline TOneSocketListener(TImpl* parent, IRemoteAddrPtr addr) + : Parent_(parent) + , C_(nullptr) + , ListenSocket_(socket(addr->Addr()->sa_family, SOCK_STREAM, 0)) + , Addr_(std::move(addr)) + { + if (ListenSocket_ == INVALID_SOCKET) { + ythrow TSystemError() << "can not create socket"; + } + + FixIPv6ListenSocket(ListenSocket_); + CheckedSetSockOpt(ListenSocket_, SOL_SOCKET, SO_REUSEADDR, 1, "reuse addr"); + + const TOptions& opts = Parent_->Opts_; + if (opts.SendBufSize) { + SetOutputBuffer(ListenSocket_, opts.SendBufSize); + } + if (opts.RecvBufSize) { + SetInputBuffer(ListenSocket_, opts.RecvBufSize); + } + if (opts.ReusePort) { + SetReusePort(ListenSocket_, opts.ReusePort); + } + + SetNonBlock(ListenSocket_); + + if (bind(ListenSocket_, Addr_->Addr(), Addr_->Len()) < 0) { + ythrow TSystemError() << "bind failed"; + } + } + + inline ~TOneSocketListener() { + Stop(); + } + + public: + inline void Run(TCont* cont) noexcept { + C_ = cont; + DoRun(); + C_ = nullptr; + } + + inline void StartListen() { + if (!C_) { + const TOptions& opts = Parent_->Opts_; + + if (listen(ListenSocket_, (int)Min<size_t>(Max<int>(), opts.ListenQueue)) < 0) { + ythrow TSystemError() << "listen failed"; + } + + if (opts.EnableDeferAccept) { + SetDeferAccept(ListenSocket_); + } + + C_ = Parent_->E_->Create<TOneSocketListener, &TOneSocketListener::Run>(this, "listen_job"); + } + } + + inline const IRemoteAddr* Addr() const noexcept { + return Addr_.Get(); + } + + inline void Stop() noexcept { + if (C_) { + C_->Cancel(); + + while (C_) { + Parent_->E_->Running()->Yield(); + } + } + } + + private: + inline void DoRun() noexcept { + while (!C_->Cancelled()) { + try { + TOpaqueAddr remote; + const int res = NCoro::AcceptI(C_, ListenSocket_, remote.MutableAddr(), remote.LenPtr()); + + if (res < 0) { + const int err = -res; + + if (err != ECONNABORTED) { + if (err == ECANCELED) { + break; + } + if (errno == EMFILE) { + C_->SleepT(TDuration::MilliSeconds(1)); + } + + ythrow TSystemError(err) << "can not accept"; + } + } else { + TSocketHolder c((SOCKET)res); + + const ICallBack::TAcceptFull acc = { + &c, + &remote, + Addr(), + }; + + Parent_->Cb_->OnAcceptFull(acc); + } + } catch (...) { + try { + Parent_->Cb_->OnError(); + } catch (...) { + } + } + } + + try { + Parent_->Cb_->OnStop(&ListenSocket_); + } catch (...) { + } + } + + private: + const TImpl* const Parent_; + TCont* C_; + TSocketHolder ListenSocket_; + const IRemoteAddrPtr Addr_; + }; + +private: + class TListeners: public TIntrusiveListWithAutoDelete<TOneSocketListener, TDelete> { + private: + template <class T> + using TIt = std::conditional_t<std::is_const<T>::value, typename T::TConstIterator, typename T::TIterator>; + + template <class T> + static inline TIt<T> FindImpl(T* t, const IRemoteAddr& addr) { + const TSa sa(addr.Addr()); + + TIt<T> it = t->Begin(); + TIt<T> const end = t->End(); + + while (it != end && sa != it->Addr()->Addr()) { + ++it; + } + + return it; + } + + public: + inline TIterator Find(const IRemoteAddr& addr) { + return FindImpl(this, addr); + } + + inline TConstIterator Find(const IRemoteAddr& addr) const { + return FindImpl(this, addr); + } + }; + +public: + inline TImpl(ICallBack* cb, TContExecutor* e, const TOptions& opts) noexcept + : E_(e) + , Cb_(cb) + , Opts_(opts) + { + } + + inline void Listen() { + for (TListeners::TIterator it = L_.Begin(); it != L_.End(); ++it) { + it->StartListen(); + } + } + + inline void Listen(const IRemoteAddr& addr) { + const TListeners::TIterator it = L_.Find(addr); + + if (it != L_.End()) { + it->StartListen(); + } + } + + inline void Bind(const IRemoteAddr& addr) { + const TSa sa(addr.Addr()); + + switch (sa.Sa->sa_family) { + case AF_INET: + L_.PushBack(new TOneSocketListener(this, MakeHolder<TIPv4Addr>(*sa.In))); + break; + case AF_INET6: + L_.PushBack(new TOneSocketListener(this, MakeHolder<TIPv6Addr>(*sa.In6))); + break; + default: + ythrow yexception() << TStringBuf("unknown protocol"); + } + } + + inline void Bind(const TIpAddress& addr) { + L_.PushBack(new TOneSocketListener(this, MakeHolder<TIPv4Addr>(addr))); + } + + inline void Bind(const TNetworkAddress& addr) { + for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) { + L_.PushBack(new TOneSocketListener(this, MakeHolder<TStoredAddrInfo>(&*it, addr))); + } + } + + inline void StopListenAddr(const IRemoteAddr& addr) { + const TListeners::TIterator it = L_.Find(addr); + + if (it != L_.End()) { + delete &*it; + } + } + +private: + TContExecutor* const E_; + ICallBack* const Cb_; + TListeners L_; + const TOptions Opts_; +}; + +TContListener::TContListener(ICallBack* cb, TContExecutor* e, const TOptions& opts) + : Impl_(new TImpl(cb, e, opts)) +{ +} + +TContListener::~TContListener() { +} + +namespace { + template <class T> + static inline T&& CheckImpl(T&& impl) { + Y_ENSURE_EX(impl, yexception() << "not running"); + return std::forward<T>(impl); + } +} + +void TContListener::Listen(const IRemoteAddr& addr) { + CheckImpl(Impl_)->Listen(addr); +} + +void TContListener::Listen(const TIpAddress& addr) { + return Listen(TIPv4Addr(addr)); +} + +void TContListener::Listen(const TNetworkAddress& addr) { + for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) { + Listen(TAddrInfo(&*it)); + } +} + +void TContListener::Listen() { + CheckImpl(Impl_)->Listen(); +} + +void TContListener::Bind(const IRemoteAddr& addr) { + CheckImpl(Impl_)->Bind(addr); +} + +void TContListener::Bind(const TIpAddress& addr) { + return Bind(TIPv4Addr(addr)); +} + +void TContListener::Bind(const TNetworkAddress& addr) { + CheckImpl(Impl_)->Bind(addr); +} + +void TContListener::Stop() noexcept { + Impl_.Destroy(); +} + +void TContListener::StopListenAddr(const IRemoteAddr& addr) { + CheckImpl(Impl_)->StopListenAddr(addr); +} + +void TContListener::StopListenAddr(const TIpAddress& addr) { + return StopListenAddr(TIPv4Addr(addr)); +} + +void TContListener::StopListenAddr(const TNetworkAddress& addr) { + for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) { + StopListenAddr(TAddrInfo(&*it)); + } +} + +void TContListener::ICallBack::OnAcceptFull(const TAcceptFull& params) { + const TSa remote(params.Remote->Addr()); + const TSa local(params.Local->Addr()); + + if (local.Sa->sa_family == AF_INET) { + const TIpAddress r(*remote.In); + const TIpAddress l(*local.In); + + const TAccept a = { + params.S, &r, &l}; + + OnAccept(a); + } +} + +void TContListener::ICallBack::OnStop(TSocketHolder* s) { + s->ShutDown(SHUT_RDWR); + s->Close(); +} diff --git a/library/cpp/coroutine/listener/listen.h b/library/cpp/coroutine/listener/listen.h new file mode 100644 index 0000000000..3a89cd3ecc --- /dev/null +++ b/library/cpp/coroutine/listener/listen.h @@ -0,0 +1,125 @@ +#pragma once + +#include <util/generic/ptr.h> +#include <util/generic/ylimits.h> + +struct TIpAddress; +class TContExecutor; +class TSocketHolder; +class TNetworkAddress; + +namespace NAddr { + class IRemoteAddr; +} + +class TContListener { +public: + struct TOptions { + inline TOptions() noexcept + : ListenQueue(Max<size_t>()) + , SendBufSize(0) + , RecvBufSize(0) + , EnableDeferAccept(false) + , ReusePort(false) + { + } + + inline TOptions& SetListenQueue(size_t len) noexcept { + ListenQueue = len; + + return *this; + } + + inline TOptions& SetDeferAccept(bool enable) noexcept { + EnableDeferAccept = enable; + + return *this; + } + + inline TOptions& SetSendBufSize(unsigned size) noexcept { + SendBufSize = size; + + return *this; + } + + inline TOptions& SetRecvBufSize(unsigned size) noexcept { + RecvBufSize = size; + + return *this; + } + + inline TOptions& SetReusePort(bool reusePort) noexcept { + ReusePort = reusePort; + + return *this; + } + + size_t ListenQueue; + unsigned SendBufSize; + unsigned RecvBufSize; + bool EnableDeferAccept; + bool ReusePort; + }; + + class ICallBack { + public: + struct TAccept { + TSocketHolder* S; + const TIpAddress* Remote; + const TIpAddress* Local; + }; + + struct TAcceptFull { + TSocketHolder* S; + const NAddr::IRemoteAddr* Remote; + const NAddr::IRemoteAddr* Local; + }; + + virtual void OnAccept(const TAccept&) { + } + + virtual void OnAcceptFull(const TAcceptFull&); + + /* + * will be called from catch (...) {} context + * so your can re-throw current exception and work around it + */ + virtual void OnError() = 0; + + virtual void OnStop(TSocketHolder*); + + virtual ~ICallBack() { + } + }; + + TContListener(ICallBack* cb, TContExecutor* e, const TOptions& opts = TOptions()); + ~TContListener(); + + /// start listener threads + void Listen(); + + void Listen(const NAddr::IRemoteAddr& addr); + void Listen(const TIpAddress& addr); + void Listen(const TNetworkAddress& addr); + + /// bind server on address. Can be called multiple times to bind on more then one address + void Bind(const NAddr::IRemoteAddr& addr); + void Bind(const TIpAddress& addr); + void Bind(const TNetworkAddress& addr); + + void Stop() noexcept; + + void StopListenAddr(const NAddr::IRemoteAddr& addr); + void StopListenAddr(const TIpAddress& addr); + void StopListenAddr(const TNetworkAddress& addr); + + template <class T> + inline void StartListenAddr(const T& addr) { + Bind(addr); + Listen(addr); + } + +private: + class TImpl; + THolder<TImpl> Impl_; +}; diff --git a/library/cpp/coroutine/listener/ya.make b/library/cpp/coroutine/listener/ya.make new file mode 100644 index 0000000000..700c3abe3e --- /dev/null +++ b/library/cpp/coroutine/listener/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +OWNER(pg) + +PEERDIR( + library/cpp/coroutine/engine +) + +SRCS( + listen.cpp +) + +END() diff --git a/library/cpp/coroutine/ya.make b/library/cpp/coroutine/ya.make new file mode 100644 index 0000000000..34e30f2b25 --- /dev/null +++ b/library/cpp/coroutine/ya.make @@ -0,0 +1,11 @@ +RECURSE( + benchmark + dns + dns/example + engine + engine/stack/ut + listener + test + ut + util +) |