diff options
author | Ruslan Kovalev <ruslan.a.kovalev@gmail.com> | 2022-02-10 16:46:45 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:45 +0300 |
commit | 9123176b341b6f2658cff5132482b8237c1416c8 (patch) | |
tree | 49e222ea1c5804306084bb3ae065bb702625360f /library/cpp/coroutine | |
parent | 59e19371de37995fcb36beb16cd6ec030af960bc (diff) | |
download | ydb-9123176b341b6f2658cff5132482b8237c1416c8.tar.gz |
Restoring authorship annotation for Ruslan Kovalev <ruslan.a.kovalev@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/coroutine')
22 files changed, 1684 insertions, 1684 deletions
diff --git a/library/cpp/coroutine/engine/callbacks.h b/library/cpp/coroutine/engine/callbacks.h index 640e6c25d7..e81b17344f 100644 --- a/library/cpp/coroutine/engine/callbacks.h +++ b/library/cpp/coroutine/engine/callbacks.h @@ -1,18 +1,18 @@ -#pragma once - -class TCont; -class TContExecutor; - -namespace NCoro { - class IScheduleCallback { - public: +#pragma once + +class TCont; +class TContExecutor; + +namespace NCoro { + class IScheduleCallback { + public: virtual void OnSchedule(TContExecutor&, TCont&) = 0; virtual void OnUnschedule(TContExecutor&) = 0; }; - + class IEnterPollerCallback { public: virtual void OnEnterPoller() = 0; virtual void OnExitPoller() = 0; - }; -} + }; +} diff --git a/library/cpp/coroutine/engine/condvar.h b/library/cpp/coroutine/engine/condvar.h index ecfeab5b73..ffceede6fa 100644 --- a/library/cpp/coroutine/engine/condvar.h +++ b/library/cpp/coroutine/engine/condvar.h @@ -1,11 +1,11 @@ -#pragma once +#pragma once + +#include "events.h" +#include "mutex.h" -#include "events.h" -#include "mutex.h" - class TContCondVar { public: - int WaitD(TCont* current, TContMutex* mutex, TInstant deadline) { + int WaitD(TCont* current, TContMutex* mutex, TInstant deadline) { mutex->UnLock(); const int ret = WaitQueue_.WaitD(current, deadline); @@ -17,19 +17,19 @@ public: return mutex->LockD(current, deadline); } - int WaitT(TCont* current, TContMutex* mutex, TDuration timeout) { + int WaitT(TCont* current, TContMutex* mutex, TDuration timeout) { return WaitD(current, mutex, timeout.ToDeadLine()); } - int WaitI(TCont* current, TContMutex* mutex) { + int WaitI(TCont* current, TContMutex* mutex) { return WaitD(current, mutex, TInstant::Max()); } - void Signal() noexcept { + void Signal() noexcept { WaitQueue_.Signal(); } - void BroadCast() noexcept { + void BroadCast() noexcept { WaitQueue_.BroadCast(); } diff --git a/library/cpp/coroutine/engine/cont_poller.cpp b/library/cpp/coroutine/engine/cont_poller.cpp index d801fe617f..76593d4e9b 100644 --- a/library/cpp/coroutine/engine/cont_poller.cpp +++ b/library/cpp/coroutine/engine/cont_poller.cpp @@ -1,70 +1,70 @@ -#include "cont_poller.h" +#include "cont_poller.h" #include "impl.h" -namespace NCoro { - namespace { - template <class T> - int DoExecuteEvent(T* event) noexcept { - auto* cont = event->Cont(); - - if (cont->Cancelled()) { - return ECANCELED; - } - - cont->Executor()->ScheduleIoWait(event); +namespace NCoro { + namespace { + template <class T> + int DoExecuteEvent(T* event) noexcept { + auto* cont = event->Cont(); + + if (cont->Cancelled()) { + return ECANCELED; + } + + cont->Executor()->ScheduleIoWait(event); cont->Switch(); - - if (cont->Cancelled()) { - return ECANCELED; - } - - return event->Status(); - } + + if (cont->Cancelled()) { + return ECANCELED; + } + + return event->Status(); + } + } + + void TContPollEvent::Wake() noexcept { + UnLink(); + Cont()->ReSchedule(); } - void TContPollEvent::Wake() noexcept { - UnLink(); - Cont()->ReSchedule(); - } - - - TInstant TEventWaitQueue::WakeTimedout(TInstant now) noexcept { - TIoWait::TIterator it = IoWait_.Begin(); - - if (it != IoWait_.End()) { - if (it->DeadLine() > now) { - return it->DeadLine(); - } - - do { - (it++)->Wake(ETIMEDOUT); - } while (it != IoWait_.End() && it->DeadLine() <= now); - } - - return now; - } - - void TEventWaitQueue::Register(NCoro::TContPollEvent* event) { - IoWait_.Insert(event); - event->Cont()->Unlink(); - } - - void TEventWaitQueue::Abort() noexcept { - auto visitor = [](TContPollEvent& e) { - e.Cont()->Cancel(); - }; - IoWait_.ForEach(visitor); - } + + TInstant TEventWaitQueue::WakeTimedout(TInstant now) noexcept { + TIoWait::TIterator it = IoWait_.Begin(); + + if (it != IoWait_.End()) { + if (it->DeadLine() > now) { + return it->DeadLine(); + } + + do { + (it++)->Wake(ETIMEDOUT); + } while (it != IoWait_.End() && it->DeadLine() <= now); + } + + return now; + } + + void TEventWaitQueue::Register(NCoro::TContPollEvent* event) { + IoWait_.Insert(event); + event->Cont()->Unlink(); + } + + void TEventWaitQueue::Abort() noexcept { + auto visitor = [](TContPollEvent& e) { + e.Cont()->Cancel(); + }; + IoWait_.ForEach(visitor); + } +} + +void TFdEvent::RemoveFromIOWait() noexcept { + this->Cont()->Executor()->Poller()->Remove(this); +} + +int ExecuteEvent(TFdEvent* event) noexcept { + return NCoro::DoExecuteEvent(event); +} + +int ExecuteEvent(TTimerEvent* event) noexcept { + return NCoro::DoExecuteEvent(event); } - -void TFdEvent::RemoveFromIOWait() noexcept { - this->Cont()->Executor()->Poller()->Remove(this); -} - -int ExecuteEvent(TFdEvent* event) noexcept { - return NCoro::DoExecuteEvent(event); -} - -int ExecuteEvent(TTimerEvent* event) noexcept { - return NCoro::DoExecuteEvent(event); -} diff --git a/library/cpp/coroutine/engine/cont_poller.h b/library/cpp/coroutine/engine/cont_poller.h index 824b05489a..b638b2df1a 100644 --- a/library/cpp/coroutine/engine/cont_poller.h +++ b/library/cpp/coroutine/engine/cont_poller.h @@ -1,245 +1,245 @@ -#pragma once +#pragma once #include "poller.h" -#include "sockmap.h" +#include "sockmap.h" #include <library/cpp/containers/intrusive_rb_tree/rb_tree.h> - -#include <util/datetime/base.h> -#include <util/memory/pool.h> + +#include <util/datetime/base.h> +#include <util/memory/pool.h> #include <util/memory/smallobj.h> -#include <util/network/init.h> +#include <util/network/init.h> + +#include <cerrno> + -#include <cerrno> - - class TCont; class TContExecutor; -class TFdEvent; +class TFdEvent; -namespace NCoro { - - class IPollEvent; +namespace NCoro { + class IPollEvent; - struct TContPollEventCompare { - template <class T> - static inline bool Compare(const T& l, const T& r) noexcept { - return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r); - } - }; - - class TContPollEvent : public TRbTreeItem<TContPollEvent, TContPollEventCompare> { - public: - TContPollEvent(TCont* cont, TInstant deadLine) noexcept - : Cont_(cont) - , DeadLine_(deadLine) - {} + struct TContPollEventCompare { + template <class T> + static inline bool Compare(const T& l, const T& r) noexcept { + return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r); + } + }; - static bool Compare(const TContPollEvent& l, const TContPollEvent& r) noexcept { - return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r); - } - int Status() const noexcept { - return Status_; - } + class TContPollEvent : public TRbTreeItem<TContPollEvent, TContPollEventCompare> { + public: + TContPollEvent(TCont* cont, TInstant deadLine) noexcept + : Cont_(cont) + , DeadLine_(deadLine) + {} - void SetStatus(int status) noexcept { - Status_ = status; - } + static bool Compare(const TContPollEvent& l, const TContPollEvent& r) noexcept { + return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r); + } - TCont* Cont() noexcept { - return Cont_; - } + int Status() const noexcept { + return Status_; + } + + void SetStatus(int status) noexcept { + Status_ = status; + } + + TCont* Cont() noexcept { + return Cont_; + } - TInstant DeadLine() const noexcept { - return DeadLine_; - } + TInstant DeadLine() const noexcept { + return DeadLine_; + } - void Wake(int status) noexcept { - SetStatus(status); - Wake(); + void Wake(int status) noexcept { + SetStatus(status); + Wake(); } - private: - void Wake() noexcept; + private: + void Wake() noexcept; - private: - TCont* Cont_; - TInstant DeadLine_; - int Status_ = EINPROGRESS; + private: + TCont* Cont_; + TInstant DeadLine_; + int Status_ = EINPROGRESS; }; - class IPollEvent: public TIntrusiveListItem<IPollEvent> { - public: - IPollEvent(SOCKET fd, ui16 what) noexcept - : Fd_(fd) - , What_(what) - {} + class IPollEvent: public TIntrusiveListItem<IPollEvent> { + public: + IPollEvent(SOCKET fd, ui16 what) noexcept + : Fd_(fd) + , What_(what) + {} + + virtual ~IPollEvent() {} - virtual ~IPollEvent() {} + SOCKET Fd() const noexcept { + return Fd_; + } - SOCKET Fd() const noexcept { - return Fd_; - } + int What() const noexcept { + return What_; + } - int What() const noexcept { - return What_; - } + virtual void OnPollEvent(int status) noexcept = 0; - virtual void OnPollEvent(int status) noexcept = 0; + private: + SOCKET Fd_; + ui16 What_; + }; - private: - SOCKET Fd_; - ui16 What_; - }; + template <class T> + class TBigArray { + struct TValue: public T, public TObjectFromPool<TValue> { + TValue() {} + }; - template <class T> - class TBigArray { - struct TValue: public T, public TObjectFromPool<TValue> { - TValue() {} - }; + public: + TBigArray() + : Pool_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance()) + {} - public: - TBigArray() - : Pool_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance()) - {} - - T* Get(size_t index) { - TRef& ret = Lst_.Get(index); - if (!ret) { + T* Get(size_t index) { + TRef& ret = Lst_.Get(index); + if (!ret) { ret = TRef(new (&Pool_) TValue()); - } - return ret.Get(); - } - - private: - using TRef = THolder<TValue>; - typename TValue::TPool Pool_; - TSocketMap<TRef> Lst_; - }; - - - using TPollEventList = TIntrusiveList<IPollEvent>; - - class TContPoller { - public: - using TEvent = IPollerFace::TEvent; - using TEvents = IPollerFace::TEvents; - - TContPoller() - : P_(IPollerFace::Default()) - { - } - - explicit TContPoller(THolder<IPollerFace> poller) - : P_(std::move(poller)) - {} - - void Schedule(IPollEvent* event) { - auto* lst = Lists_.Get(event->Fd()); - const ui16 oldFlags = Flags(*lst); - lst->PushFront(event); + } + return ret.Get(); + } + + private: + using TRef = THolder<TValue>; + typename TValue::TPool Pool_; + TSocketMap<TRef> Lst_; + }; + + + using TPollEventList = TIntrusiveList<IPollEvent>; + + class TContPoller { + public: + using TEvent = IPollerFace::TEvent; + using TEvents = IPollerFace::TEvents; + + TContPoller() + : P_(IPollerFace::Default()) + { + } + + explicit TContPoller(THolder<IPollerFace> poller) + : P_(std::move(poller)) + {} + + void Schedule(IPollEvent* event) { + auto* lst = Lists_.Get(event->Fd()); + const ui16 oldFlags = Flags(*lst); + lst->PushFront(event); ui16 newFlags = Flags(*lst); - if (newFlags != oldFlags) { + if (newFlags != oldFlags) { if (oldFlags) { newFlags |= CONT_POLL_MODIFY; } - P_->Set(lst, event->Fd(), newFlags); - } + P_->Set(lst, event->Fd(), newFlags); + } } - void Remove(IPollEvent* event) noexcept { - auto* lst = Lists_.Get(event->Fd()); - const ui16 oldFlags = Flags(*lst); - event->Unlink(); + void Remove(IPollEvent* event) noexcept { + auto* lst = Lists_.Get(event->Fd()); + const ui16 oldFlags = Flags(*lst); + event->Unlink(); ui16 newFlags = Flags(*lst); - if (newFlags != oldFlags) { + if (newFlags != oldFlags) { if (newFlags) { newFlags |= CONT_POLL_MODIFY; } - P_->Set(lst, event->Fd(), newFlags); - } + P_->Set(lst, event->Fd(), newFlags); + } } - void Wait(TEvents& events, TInstant deadLine) { - events.clear(); - P_->Wait(events, deadLine); + void Wait(TEvents& events, TInstant deadLine) { + events.clear(); + P_->Wait(events, deadLine); } EContPoller PollEngine() const { return P_->PollEngine(); } - private: - static ui16 Flags(TIntrusiveList<IPollEvent>& lst) noexcept { - ui16 ret = 0; - for (auto&& item : lst) { - ret |= item.What(); + private: + static ui16 Flags(TIntrusiveList<IPollEvent>& lst) noexcept { + ui16 ret = 0; + for (auto&& item : lst) { + ret |= item.What(); } - return ret; + return ret; } - private: - TBigArray<TPollEventList> Lists_; - THolder<IPollerFace> P_; + private: + TBigArray<TPollEventList> Lists_; + THolder<IPollerFace> P_; }; - - - class TEventWaitQueue { - using TIoWait = TRbTree<NCoro::TContPollEvent, NCoro::TContPollEventCompare>; - - public: - void Register(NCoro::TContPollEvent* event); - - bool Empty() const noexcept { - return IoWait_.Empty(); - } - - void Abort() noexcept; - - TInstant WakeTimedout(TInstant now) noexcept; - - private: - TIoWait IoWait_; - }; -} - -class TFdEvent final: - public NCoro::TContPollEvent, - public NCoro::IPollEvent -{ + + + class TEventWaitQueue { + using TIoWait = TRbTree<NCoro::TContPollEvent, NCoro::TContPollEventCompare>; + + public: + void Register(NCoro::TContPollEvent* event); + + bool Empty() const noexcept { + return IoWait_.Empty(); + } + + void Abort() noexcept; + + TInstant WakeTimedout(TInstant now) noexcept; + + private: + TIoWait IoWait_; + }; +} + +class TFdEvent final: + public NCoro::TContPollEvent, + public NCoro::IPollEvent +{ public: - TFdEvent(TCont* cont, SOCKET fd, ui16 what, TInstant deadLine) noexcept - : TContPollEvent(cont, deadLine) - , IPollEvent(fd, what) - {} - - ~TFdEvent() { - RemoveFromIOWait(); + TFdEvent(TCont* cont, SOCKET fd, ui16 what, TInstant deadLine) noexcept + : TContPollEvent(cont, deadLine) + , IPollEvent(fd, what) + {} + + ~TFdEvent() { + RemoveFromIOWait(); } - void RemoveFromIOWait() noexcept; + void RemoveFromIOWait() noexcept; - void OnPollEvent(int status) noexcept override { - Wake(status); + void OnPollEvent(int status) noexcept override { + Wake(status); } }; - -class TTimerEvent: public NCoro::TContPollEvent { + +class TTimerEvent: public NCoro::TContPollEvent { public: - TTimerEvent(TCont* cont, TInstant deadLine) noexcept - : TContPollEvent(cont, deadLine) - {} + TTimerEvent(TCont* cont, TInstant deadLine) noexcept + : TContPollEvent(cont, deadLine) + {} }; - -int ExecuteEvent(TFdEvent* event) noexcept; - -int ExecuteEvent(TTimerEvent* event) noexcept; + +int ExecuteEvent(TFdEvent* event) noexcept; + +int ExecuteEvent(TTimerEvent* event) noexcept; diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp index 1ba31245b0..8b372496a2 100644 --- a/library/cpp/coroutine/engine/coroutine_ut.cpp +++ b/library/cpp/coroutine/engine/coroutine_ut.cpp @@ -1,19 +1,19 @@ #include "impl.h" -#include "condvar.h" -#include "network.h" +#include "condvar.h" +#include "network.h" #include <library/cpp/testing/unittest/registar.h> #include <util/string/cast.h> #include <util/system/pipe.h> -#include <util/system/env.h> -#include <util/system/info.h> +#include <util/system/env.h> +#include <util/system/info.h> #include <util/system/thread.h> -#include <util/generic/xrange.h> +#include <util/generic/xrange.h> #include <util/generic/serialized_enum.h> -// TODO (velavokr): BALANCER-1345 add more tests on pollers - +// TODO (velavokr): BALANCER-1345 add more tests on pollers + class TCoroTest: public TTestBase { UNIT_TEST_SUITE(TCoroTest); UNIT_TEST(TestSimpleX1); @@ -23,23 +23,23 @@ class TCoroTest: public TTestBase { UNIT_TEST(TestMemFun); UNIT_TEST(TestMutex); UNIT_TEST(TestCondVar); - UNIT_TEST(TestJoinDefault); - UNIT_TEST(TestJoinEpoll); - UNIT_TEST(TestJoinKqueue); - UNIT_TEST(TestJoinPoll); - UNIT_TEST(TestJoinSelect); + UNIT_TEST(TestJoinDefault); + UNIT_TEST(TestJoinEpoll); + UNIT_TEST(TestJoinKqueue); + UNIT_TEST(TestJoinPoll); + UNIT_TEST(TestJoinSelect); UNIT_TEST(TestException); - UNIT_TEST(TestJoinCancelExitRaceBug); - UNIT_TEST(TestWaitWakeLivelockBug); - UNIT_TEST(TestFastPathWakeDefault) - // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs -// UNIT_TEST(TestFastPathWakeEpoll) - UNIT_TEST(TestFastPathWakeKqueue) - UNIT_TEST(TestFastPathWakePoll) + UNIT_TEST(TestJoinCancelExitRaceBug); + UNIT_TEST(TestWaitWakeLivelockBug); + UNIT_TEST(TestFastPathWakeDefault) + // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs +// UNIT_TEST(TestFastPathWakeEpoll) + UNIT_TEST(TestFastPathWakeKqueue) + UNIT_TEST(TestFastPathWakePoll) UNIT_TEST(TestFastPathWakeSelect) - UNIT_TEST(TestLegacyCancelYieldRaceBug) - UNIT_TEST(TestJoinRescheduleBug); - UNIT_TEST(TestEventQueue) + UNIT_TEST(TestLegacyCancelYieldRaceBug) + UNIT_TEST(TestJoinRescheduleBug); + UNIT_TEST(TestEventQueue) UNIT_TEST(TestNestedExecutor) UNIT_TEST(TestComputeCoroutineYield) UNIT_TEST(TestPollEngines); @@ -57,21 +57,21 @@ public: void TestMemFun(); void TestMutex(); void TestCondVar(); - void TestJoinDefault(); - void TestJoinEpoll(); - void TestJoinKqueue(); - void TestJoinPoll(); - void TestJoinSelect(); - void TestJoinCancelExitRaceBug(); - void TestWaitWakeLivelockBug(); - void TestFastPathWakeDefault(); - void TestFastPathWakeEpoll(); - void TestFastPathWakeKqueue(); - void TestFastPathWakePoll(); - void TestFastPathWakeSelect(); - void TestLegacyCancelYieldRaceBug(); - void TestJoinRescheduleBug(); - void TestEventQueue(); + void TestJoinDefault(); + void TestJoinEpoll(); + void TestJoinKqueue(); + void TestJoinPoll(); + void TestJoinSelect(); + void TestJoinCancelExitRaceBug(); + void TestWaitWakeLivelockBug(); + void TestFastPathWakeDefault(); + void TestFastPathWakeEpoll(); + void TestFastPathWakeKqueue(); + void TestFastPathWakePoll(); + void TestFastPathWakeSelect(); + void TestLegacyCancelYieldRaceBug(); + void TestJoinRescheduleBug(); + void TestEventQueue(); void TestNestedExecutor(); void TestComputeCoroutineYield(); void TestPollEngines(); @@ -88,7 +88,7 @@ void TCoroTest::TestException() { auto f1 = [&f2run](TCont* c) { struct TCtx { ~TCtx() { - Y_VERIFY(!*F2); + Y_VERIFY(!*F2); C->Yield(); } @@ -144,18 +144,18 @@ static void CoMain(TCont* c, void* /*arg*/) { } } -void TCoroTest::TestSimpleX1() { - i0 = 0; - TContExecutor e(32000); +void TCoroTest::TestSimpleX1() { + i0 = 0; + TContExecutor e(32000); UNIT_ASSERT(RunningCont() == nullptr); - e.Execute(CoMain); - UNIT_ASSERT_VALUES_EQUAL(i0, 100000); + e.Execute(CoMain); + UNIT_ASSERT_VALUES_EQUAL(i0, 100000); UNIT_ASSERT(RunningCont() == nullptr); -} - +} + void TCoroTest::TestSimpleX1MultiThread() { TVector<THolder<TThread>> threads; const size_t nThreads = 0; @@ -179,10 +179,10 @@ void TCoroTest::TestSimpleX1MultiThread() { } struct TTestObject { - int i = 0; - int j = 0; + int i = 0; + int j = 0; -public: +public: void RunTask1(TCont*) { i = 1; } @@ -325,7 +325,7 @@ void TCoroTest::TestCondVar() { res.clear(); } -namespace NCoroTestJoin { +namespace NCoroTestJoin { struct TSleepCont { const TInstant Deadline; int Result; @@ -342,7 +342,7 @@ namespace NCoroTestJoin { inline void operator()(TCont* c) { char buf = 0; - Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status(); + Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status(); } }; @@ -356,513 +356,513 @@ namespace NCoroTestJoin { } }; - void DoTestJoin(EContPoller pollerType) { - auto poller = IPollerFace::Construct(pollerType); - - if (!poller) { - return; - } - - TContExecutor e(32000, std::move(poller)); - - TPipe in, out; - TPipe::Pipe(in, out); - SetNonBlock(in.GetHandle()); - - { - TSleepCont sc = {TInstant::Max(), 0}; - TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true}; - - e.Execute(jc); - - UNIT_ASSERT_EQUAL(sc.Result, ECANCELED); - UNIT_ASSERT_EQUAL(jc.Result, false); - } - - { - TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0}; - TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false}; - - e.Execute(jc); - - UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT); - UNIT_ASSERT_EQUAL(jc.Result, true); - } - - { - TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0}; - TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true}; - - e.Execute(jc); - - UNIT_ASSERT_EQUAL(sc.Result, ECANCELED); - UNIT_ASSERT_EQUAL(jc.Result, false); - } - - { - TReadCont rc = {TInstant::Max(), in.GetHandle(), 0}; - TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true}; - - e.Execute(jc); - - UNIT_ASSERT_EQUAL(rc.Result, ECANCELED); - UNIT_ASSERT_EQUAL(jc.Result, false); - } - - { - TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0}; - TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false}; - - e.Execute(jc); - - UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT); - UNIT_ASSERT_EQUAL(jc.Result, true); - } - - { - TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0}; - TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true}; - - e.Execute(jc); - - UNIT_ASSERT_EQUAL(rc.Result, ECANCELED); - UNIT_ASSERT_EQUAL(jc.Result, false); - } - } -} - -void TCoroTest::TestJoinDefault() { - NCoroTestJoin::DoTestJoin(EContPoller::Default); -} - -void TCoroTest::TestJoinEpoll() { - NCoroTestJoin::DoTestJoin(EContPoller::Epoll); -} - -void TCoroTest::TestJoinKqueue() { - NCoroTestJoin::DoTestJoin(EContPoller::Kqueue); -} - -void TCoroTest::TestJoinPoll() { - NCoroTestJoin::DoTestJoin(EContPoller::Poll); -} - -void TCoroTest::TestJoinSelect() { - NCoroTestJoin::DoTestJoin(EContPoller::Select); -} - -namespace NCoroJoinCancelExitRaceBug { - struct TState { - TCont* Sub = nullptr; - }; - - static void DoAux(TCont*, void* argPtr) noexcept { - TState& state = *(TState*)(argPtr); - - // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]} - state.Sub->Cancel(); - } - - static void DoSub2(TCont*, void*) noexcept { - // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]} - // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]} - } - - static void DoSub(TCont* cont, void* argPtr) noexcept { - TState& state = *(TState*)(argPtr); - state.Sub = cont; - - // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]} - auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2"); - - // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux) - // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]} - // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit - cont->Join(sub2); - - state.Sub = nullptr; - } - - static void DoMain(TCont* cont) noexcept { - TState state; - - // 01.{Ready:[]} > {Ready:[Sub]} - auto* sub = cont->Executor()->Create(DoSub, &state, "Sub"); - - // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]} - cont->Executor()->Create(DoAux, &state, "Aux"); - - // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub) - cont->Join(sub); - } -} - -void TCoroTest::TestJoinCancelExitRaceBug() { - TContExecutor exec(20000); - exec.SetFailOnError(true); - exec.Execute(NCoroJoinCancelExitRaceBug::DoMain); -} - -namespace NCoroWaitWakeLivelockBug { - struct TState; - - struct TSubState { - TSubState(TState& parent, ui32 self) - : Parent(parent) + void DoTestJoin(EContPoller pollerType) { + auto poller = IPollerFace::Construct(pollerType); + + if (!poller) { + return; + } + + TContExecutor e(32000, std::move(poller)); + + TPipe in, out; + TPipe::Pipe(in, out); + SetNonBlock(in.GetHandle()); + + { + TSleepCont sc = {TInstant::Max(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(sc.Result, ECANCELED); + UNIT_ASSERT_EQUAL(jc.Result, false); + } + + { + TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT); + UNIT_ASSERT_EQUAL(jc.Result, true); + } + + { + TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(sc.Result, ECANCELED); + UNIT_ASSERT_EQUAL(jc.Result, false); + } + + { + TReadCont rc = {TInstant::Max(), in.GetHandle(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(rc.Result, ECANCELED); + UNIT_ASSERT_EQUAL(jc.Result, false); + } + + { + TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT); + UNIT_ASSERT_EQUAL(jc.Result, true); + } + + { + TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0}; + TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true}; + + e.Execute(jc); + + UNIT_ASSERT_EQUAL(rc.Result, ECANCELED); + UNIT_ASSERT_EQUAL(jc.Result, false); + } + } +} + +void TCoroTest::TestJoinDefault() { + NCoroTestJoin::DoTestJoin(EContPoller::Default); +} + +void TCoroTest::TestJoinEpoll() { + NCoroTestJoin::DoTestJoin(EContPoller::Epoll); +} + +void TCoroTest::TestJoinKqueue() { + NCoroTestJoin::DoTestJoin(EContPoller::Kqueue); +} + +void TCoroTest::TestJoinPoll() { + NCoroTestJoin::DoTestJoin(EContPoller::Poll); +} + +void TCoroTest::TestJoinSelect() { + NCoroTestJoin::DoTestJoin(EContPoller::Select); +} + +namespace NCoroJoinCancelExitRaceBug { + struct TState { + TCont* Sub = nullptr; + }; + + static void DoAux(TCont*, void* argPtr) noexcept { + TState& state = *(TState*)(argPtr); + + // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]} + state.Sub->Cancel(); + } + + static void DoSub2(TCont*, void*) noexcept { + // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]} + // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]} + } + + static void DoSub(TCont* cont, void* argPtr) noexcept { + TState& state = *(TState*)(argPtr); + state.Sub = cont; + + // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]} + auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2"); + + // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux) + // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]} + // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit + cont->Join(sub2); + + state.Sub = nullptr; + } + + static void DoMain(TCont* cont) noexcept { + TState state; + + // 01.{Ready:[]} > {Ready:[Sub]} + auto* sub = cont->Executor()->Create(DoSub, &state, "Sub"); + + // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]} + cont->Executor()->Create(DoAux, &state, "Aux"); + + // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub) + cont->Join(sub); + } +} + +void TCoroTest::TestJoinCancelExitRaceBug() { + TContExecutor exec(20000); + exec.SetFailOnError(true); + exec.Execute(NCoroJoinCancelExitRaceBug::DoMain); +} + +namespace NCoroWaitWakeLivelockBug { + struct TState; + + struct TSubState { + TSubState(TState& parent, ui32 self) + : Parent(parent) , Name(TStringBuilder() << "Sub" << self) - , Self(self) - { - UNIT_ASSERT(self < 2); - } - - TSubState& OtherState(); - - TState& Parent; - TTimerEvent* Event = nullptr; - TCont* Cont = nullptr; - TString Name; - ui32 Self = -1; - }; - - struct TState { - TState() - : Subs{{*this, 0}, {*this, 1}} - {} - - TSubState Subs[2]; - bool Stop = false; - }; - - TSubState& TSubState::OtherState() { - return Parent.Subs[1 - Self]; - } - - static void DoStop(TCont* cont, void* argPtr) { - TState& state = *(TState*)(argPtr); - - TTimerEvent event(cont, TInstant::Now()); - ExecuteEvent(&event); - state.Stop = true; - for (auto& sub: state.Subs) { - if (sub.Event) { - sub.Event->Wake(EWAKEDUP); - } - } - } - - static void DoSub(TCont* cont, void* argPtr) { - TSubState& state = *(TSubState*)(argPtr); - - while (!state.Parent.Stop) { - TTimerEvent event(cont, TInstant::Max()); - if (state.OtherState().Event) { - state.OtherState().Event->Wake(EWAKEDUP); - } - state.Event = &event; - ExecuteEvent(&event); - state.Event = nullptr; - } - - state.Cont = nullptr; - } - - static void DoMain(TCont* cont) noexcept { - TState state; - - for (auto& subState : state.Subs) { - subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data()); - } - - cont->Join( - cont->Executor()->Create(DoStop, &state, "Stop") - ); - - for (auto& subState : state.Subs) { - if (subState.Cont) { - cont->Join(subState.Cont); - } - } - } -} - -void TCoroTest::TestWaitWakeLivelockBug() { - TContExecutor exec(20000); - exec.SetFailOnError(true); - exec.Execute(NCoroWaitWakeLivelockBug::DoMain); -} - -namespace NCoroTestFastPathWake { - struct TState; - - struct TSubState { - TSubState(TState& parent, ui32 self) - : Parent(parent) - , Name(TStringBuilder() << "Sub" << self) - {} - - TState& Parent; - TInstant Finish; - TTimerEvent* Event = nullptr; - TCont* Cont = nullptr; - TString Name; - }; - - struct TState { - TState() - : Subs{{*this, 0}, {*this, 1}} - { - TPipe::Pipe(In, Out); - SetNonBlock(In.GetHandle()); - } - - TSubState Subs[2]; - TPipe In, Out; - bool IoSleepRunning = false; - }; - - static void DoIoSleep(TCont* cont, void* argPtr) noexcept { - try { - TState& state = *(TState*) (argPtr); - state.IoSleepRunning = true; - - TTempBuf tmp; - // Wait for the event from io - auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine()); - UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0); - state.IoSleepRunning = false; - } catch (const NUnitTest::TAssertException& ex) { - Cerr << ex.AsStrBuf() << Endl; - ex.BackTrace()->PrintTo(Cerr); - throw; - } catch (...) { - Cerr << CurrentExceptionMessage() << Endl; - throw; - } - } - - static void DoSub(TCont* cont, void* argPtr) noexcept { - TSubState& state = *(TSubState*)(argPtr); - - TTimerEvent event(cont, TInstant::Max()); - state.Event = &event; - ExecuteEvent(&event); - state.Event = nullptr; - state.Cont = nullptr; - state.Finish = TInstant::Now(); - } - - static void DoMain(TCont* cont) noexcept { - try { - TState state; - TInstant start = TInstant::Now(); - - // This guy sleeps on io - auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper"); - - // These guys are to be woken up right away - for (auto& subState : state.Subs) { - subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data()); - } - - // Give way - cont->Yield(); - - // Check everyone has started, wake those to be woken - UNIT_ASSERT(state.IoSleepRunning); - - for (auto& subState : state.Subs) { - UNIT_ASSERT(subState.Event); - subState.Event->Wake(EWAKEDUP); - } - - // Give way again - cont->Yield(); - - // Check the woken guys have finished and quite soon - for (auto& subState : state.Subs) { - UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100)); - UNIT_ASSERT(!subState.Cont); - } - - // Wake the io guy and finish - state.Out.Close(); - - if (state.IoSleepRunning) { - cont->Join(sleeper); - } - - // Check everything has ended sooner than the timeout - UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1)); - } catch (const NUnitTest::TAssertException& ex) { - Cerr << ex.AsStrBuf() << Endl; - ex.BackTrace()->PrintTo(Cerr); - throw; - } catch (...) { - Cerr << CurrentExceptionMessage() << Endl; - throw; - } - } - - static void DoTestFastPathWake(EContPoller pollerType) { - if (auto poller = IPollerFace::Construct(pollerType)) { - TContExecutor exec(20000, std::move(poller)); - exec.SetFailOnError(true); - exec.Execute(NCoroTestFastPathWake::DoMain); - } - } -} - -void TCoroTest::TestFastPathWakeDefault() { - NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default); -} - -void TCoroTest::TestFastPathWakeEpoll() { - NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll); -} - -void TCoroTest::TestFastPathWakeKqueue() { - NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue); -} - -void TCoroTest::TestFastPathWakePoll() { - NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll); -} - -void TCoroTest::TestFastPathWakeSelect() { - NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select); -} - -namespace NCoroTestLegacyCancelYieldRaceBug { - enum class EState { - Idle, Running, Finished, - }; - - struct TState { - EState SubState = EState::Idle; - }; - - static void DoSub(TCont* cont, void* argPtr) { - TState& state = *(TState*)argPtr; - state.SubState = EState::Running; - cont->Yield(); - cont->Yield(); - state.SubState = EState::Finished; - } - - static void DoMain(TCont* cont, void* argPtr) { - TState& state = *(TState*)argPtr; - TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub"); - sub->Cancel(); - cont->Yield(); - UNIT_ASSERT_EQUAL(state.SubState, EState::Finished); - } -} - -void TCoroTest::TestLegacyCancelYieldRaceBug() { - NCoroTestLegacyCancelYieldRaceBug::TState state; - TContExecutor exec(20000); - exec.SetFailOnError(true); - exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state); -} - -namespace NCoroTestJoinRescheduleBug { - enum class EState { - Idle, Running, Finished, - }; - - struct TState { - TCont* volatile SubA = nullptr; - volatile EState SubAState = EState::Idle; - volatile EState SubBState = EState::Idle; - volatile EState SubCState = EState::Idle; - }; - - static void DoSubC(TCont* cont, void* argPtr) { - TState& state = *(TState*)argPtr; - state.SubCState = EState::Running; - while (state.SubBState != EState::Running) { - cont->Yield(); - } - while (cont->SleepD(TInstant::Max()) != ECANCELED) { - } - state.SubCState = EState::Finished; - } - - static void DoSubB(TCont* cont, void* argPtr) { - TState& state = *(TState*)argPtr; - state.SubBState = EState::Running; - while (state.SubAState != EState::Running && state.SubCState != EState::Running) { - cont->Yield(); - } - for (auto i : xrange(100)) { - Y_UNUSED(i); - if (!state.SubA) { - break; - } - state.SubA->ReSchedule(); - cont->Yield(); - } - state.SubBState = EState::Finished; - } - - static void DoSubA(TCont* cont, void* argPtr) { - TState& state = *(TState*)argPtr; - state.SubAState = EState::Running; - TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC"); - while (state.SubBState != EState::Running && state.SubCState != EState::Running) { - cont->Yield(); - } - cont->Join(subC); - UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished); - state.SubA = nullptr; - state.SubAState = EState::Finished; - } - - static void DoMain(TCont* cont, void* argPtr) { - TState& state = *(TState*)argPtr; - TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA"); - state.SubA = subA; - cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB")); - - if (state.SubA) { - subA->Cancel(); - cont->Join(subA); - } - } -} - -void TCoroTest::TestJoinRescheduleBug() { - using namespace NCoroTestJoinRescheduleBug; - TState state; - { - TContExecutor exec(20000); - exec.Execute(DoMain, &state); - } - UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished); - UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished); - UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished); -} - -void TCoroTest::TestEventQueue() { - NCoro::TEventWaitQueue queue; - UNIT_ASSERT(queue.Empty()); - UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant()); - TContExecutor exec(32000); - exec.Execute([](TCont* cont, void* arg) { - NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg; - TTimerEvent ev(cont, TInstant::Max()); - TTimerEvent ev2(cont, TInstant::Seconds(12345)); - q->Register(&ev); - UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max()); - UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max()); - q->Register(&ev2); - UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345)); - UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345)); - UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345)); - UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max()); - }, &queue); -} - + , Self(self) + { + UNIT_ASSERT(self < 2); + } + + TSubState& OtherState(); + + TState& Parent; + TTimerEvent* Event = nullptr; + TCont* Cont = nullptr; + TString Name; + ui32 Self = -1; + }; + + struct TState { + TState() + : Subs{{*this, 0}, {*this, 1}} + {} + + TSubState Subs[2]; + bool Stop = false; + }; + + TSubState& TSubState::OtherState() { + return Parent.Subs[1 - Self]; + } + + static void DoStop(TCont* cont, void* argPtr) { + TState& state = *(TState*)(argPtr); + + TTimerEvent event(cont, TInstant::Now()); + ExecuteEvent(&event); + state.Stop = true; + for (auto& sub: state.Subs) { + if (sub.Event) { + sub.Event->Wake(EWAKEDUP); + } + } + } + + static void DoSub(TCont* cont, void* argPtr) { + TSubState& state = *(TSubState*)(argPtr); + + while (!state.Parent.Stop) { + TTimerEvent event(cont, TInstant::Max()); + if (state.OtherState().Event) { + state.OtherState().Event->Wake(EWAKEDUP); + } + state.Event = &event; + ExecuteEvent(&event); + state.Event = nullptr; + } + + state.Cont = nullptr; + } + + static void DoMain(TCont* cont) noexcept { + TState state; + + for (auto& subState : state.Subs) { + subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data()); + } + + cont->Join( + cont->Executor()->Create(DoStop, &state, "Stop") + ); + + for (auto& subState : state.Subs) { + if (subState.Cont) { + cont->Join(subState.Cont); + } + } + } +} + +void TCoroTest::TestWaitWakeLivelockBug() { + TContExecutor exec(20000); + exec.SetFailOnError(true); + exec.Execute(NCoroWaitWakeLivelockBug::DoMain); +} + +namespace NCoroTestFastPathWake { + struct TState; + + struct TSubState { + TSubState(TState& parent, ui32 self) + : Parent(parent) + , Name(TStringBuilder() << "Sub" << self) + {} + + TState& Parent; + TInstant Finish; + TTimerEvent* Event = nullptr; + TCont* Cont = nullptr; + TString Name; + }; + + struct TState { + TState() + : Subs{{*this, 0}, {*this, 1}} + { + TPipe::Pipe(In, Out); + SetNonBlock(In.GetHandle()); + } + + TSubState Subs[2]; + TPipe In, Out; + bool IoSleepRunning = false; + }; + + static void DoIoSleep(TCont* cont, void* argPtr) noexcept { + try { + TState& state = *(TState*) (argPtr); + state.IoSleepRunning = true; + + TTempBuf tmp; + // Wait for the event from io + auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine()); + UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0); + state.IoSleepRunning = false; + } catch (const NUnitTest::TAssertException& ex) { + Cerr << ex.AsStrBuf() << Endl; + ex.BackTrace()->PrintTo(Cerr); + throw; + } catch (...) { + Cerr << CurrentExceptionMessage() << Endl; + throw; + } + } + + static void DoSub(TCont* cont, void* argPtr) noexcept { + TSubState& state = *(TSubState*)(argPtr); + + TTimerEvent event(cont, TInstant::Max()); + state.Event = &event; + ExecuteEvent(&event); + state.Event = nullptr; + state.Cont = nullptr; + state.Finish = TInstant::Now(); + } + + static void DoMain(TCont* cont) noexcept { + try { + TState state; + TInstant start = TInstant::Now(); + + // This guy sleeps on io + auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper"); + + // These guys are to be woken up right away + for (auto& subState : state.Subs) { + subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data()); + } + + // Give way + cont->Yield(); + + // Check everyone has started, wake those to be woken + UNIT_ASSERT(state.IoSleepRunning); + + for (auto& subState : state.Subs) { + UNIT_ASSERT(subState.Event); + subState.Event->Wake(EWAKEDUP); + } + + // Give way again + cont->Yield(); + + // Check the woken guys have finished and quite soon + for (auto& subState : state.Subs) { + UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100)); + UNIT_ASSERT(!subState.Cont); + } + + // Wake the io guy and finish + state.Out.Close(); + + if (state.IoSleepRunning) { + cont->Join(sleeper); + } + + // Check everything has ended sooner than the timeout + UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1)); + } catch (const NUnitTest::TAssertException& ex) { + Cerr << ex.AsStrBuf() << Endl; + ex.BackTrace()->PrintTo(Cerr); + throw; + } catch (...) { + Cerr << CurrentExceptionMessage() << Endl; + throw; + } + } + + static void DoTestFastPathWake(EContPoller pollerType) { + if (auto poller = IPollerFace::Construct(pollerType)) { + TContExecutor exec(20000, std::move(poller)); + exec.SetFailOnError(true); + exec.Execute(NCoroTestFastPathWake::DoMain); + } + } +} + +void TCoroTest::TestFastPathWakeDefault() { + NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default); +} + +void TCoroTest::TestFastPathWakeEpoll() { + NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll); +} + +void TCoroTest::TestFastPathWakeKqueue() { + NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue); +} + +void TCoroTest::TestFastPathWakePoll() { + NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll); +} + +void TCoroTest::TestFastPathWakeSelect() { + NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select); +} + +namespace NCoroTestLegacyCancelYieldRaceBug { + enum class EState { + Idle, Running, Finished, + }; + + struct TState { + EState SubState = EState::Idle; + }; + + static void DoSub(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + state.SubState = EState::Running; + cont->Yield(); + cont->Yield(); + state.SubState = EState::Finished; + } + + static void DoMain(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub"); + sub->Cancel(); + cont->Yield(); + UNIT_ASSERT_EQUAL(state.SubState, EState::Finished); + } +} + +void TCoroTest::TestLegacyCancelYieldRaceBug() { + NCoroTestLegacyCancelYieldRaceBug::TState state; + TContExecutor exec(20000); + exec.SetFailOnError(true); + exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state); +} + +namespace NCoroTestJoinRescheduleBug { + enum class EState { + Idle, Running, Finished, + }; + + struct TState { + TCont* volatile SubA = nullptr; + volatile EState SubAState = EState::Idle; + volatile EState SubBState = EState::Idle; + volatile EState SubCState = EState::Idle; + }; + + static void DoSubC(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + state.SubCState = EState::Running; + while (state.SubBState != EState::Running) { + cont->Yield(); + } + while (cont->SleepD(TInstant::Max()) != ECANCELED) { + } + state.SubCState = EState::Finished; + } + + static void DoSubB(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + state.SubBState = EState::Running; + while (state.SubAState != EState::Running && state.SubCState != EState::Running) { + cont->Yield(); + } + for (auto i : xrange(100)) { + Y_UNUSED(i); + if (!state.SubA) { + break; + } + state.SubA->ReSchedule(); + cont->Yield(); + } + state.SubBState = EState::Finished; + } + + static void DoSubA(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + state.SubAState = EState::Running; + TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC"); + while (state.SubBState != EState::Running && state.SubCState != EState::Running) { + cont->Yield(); + } + cont->Join(subC); + UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished); + state.SubA = nullptr; + state.SubAState = EState::Finished; + } + + static void DoMain(TCont* cont, void* argPtr) { + TState& state = *(TState*)argPtr; + TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA"); + state.SubA = subA; + cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB")); + + if (state.SubA) { + subA->Cancel(); + cont->Join(subA); + } + } +} + +void TCoroTest::TestJoinRescheduleBug() { + using namespace NCoroTestJoinRescheduleBug; + TState state; + { + TContExecutor exec(20000); + exec.Execute(DoMain, &state); + } + UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished); + UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished); + UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished); +} + +void TCoroTest::TestEventQueue() { + NCoro::TEventWaitQueue queue; + UNIT_ASSERT(queue.Empty()); + UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant()); + TContExecutor exec(32000); + exec.Execute([](TCont* cont, void* arg) { + NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg; + TTimerEvent ev(cont, TInstant::Max()); + TTimerEvent ev2(cont, TInstant::Seconds(12345)); + q->Register(&ev); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max()); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max()); + q->Register(&ev2); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345)); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345)); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345)); + UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max()); + }, &queue); +} + void TCoroTest::TestNestedExecutor() { #ifndef _win_ //nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr diff --git a/library/cpp/coroutine/engine/events.h b/library/cpp/coroutine/engine/events.h index e58d9d239b..07cc4d25e8 100644 --- a/library/cpp/coroutine/engine/events.h +++ b/library/cpp/coroutine/engine/events.h @@ -1,49 +1,49 @@ -#pragma once +#pragma once + +#include "impl.h" + +#include <util/datetime/base.h> -#include "impl.h" - -#include <util/datetime/base.h> - class TContEvent { public: - TContEvent(TCont* current) noexcept + TContEvent(TCont* current) noexcept : Cont_(current) , Status_(0) { } - ~TContEvent() { + ~TContEvent() { } - int WaitD(TInstant deadline) { + int WaitD(TInstant deadline) { Status_ = 0; const int ret = Cont_->SleepD(deadline); return Status_ ? Status_ : ret; } - int WaitT(TDuration timeout) { + int WaitT(TDuration timeout) { return WaitD(timeout.ToDeadLine()); } - int WaitI() { + int WaitI() { return WaitD(TInstant::Max()); } - void Wake() noexcept { + void Wake() noexcept { SetStatus(EWAKEDUP); Cont_->ReSchedule(); } - TCont* Cont() noexcept { + TCont* Cont() noexcept { return Cont_; } - int Status() const noexcept { + int Status() const noexcept { return Status_; } - void SetStatus(int status) noexcept { + void SetStatus(int status) noexcept { Status_ = status; } @@ -55,24 +55,24 @@ private: class TContWaitQueue { class TWaiter: public TContEvent, public TIntrusiveListItem<TWaiter> { public: - TWaiter(TCont* current) noexcept + TWaiter(TCont* current) noexcept : TContEvent(current) { } - ~TWaiter() { + ~TWaiter() { } }; public: - TContWaitQueue() noexcept { + TContWaitQueue() noexcept { } - ~TContWaitQueue() { + ~TContWaitQueue() { Y_ASSERT(Waiters_.Empty()); } - int WaitD(TCont* current, TInstant deadline) { + int WaitD(TCont* current, TInstant deadline) { TWaiter waiter(current); Waiters_.PushBack(&waiter); @@ -80,27 +80,27 @@ public: return waiter.WaitD(deadline); } - int WaitT(TCont* current, TDuration timeout) { + int WaitT(TCont* current, TDuration timeout) { return WaitD(current, timeout.ToDeadLine()); } - int WaitI(TCont* current) { + int WaitI(TCont* current) { return WaitD(current, TInstant::Max()); } - void Signal() noexcept { + void Signal() noexcept { if (!Waiters_.Empty()) { Waiters_.PopFront()->Wake(); } } - void BroadCast() noexcept { + void BroadCast() noexcept { while (!Waiters_.Empty()) { Waiters_.PopFront()->Wake(); } } - void BroadCast(size_t number) noexcept { + void BroadCast(size_t number) noexcept { for (size_t i = 0; i < number && !Waiters_.Empty(); ++i) { Waiters_.PopFront()->Wake(); } @@ -110,35 +110,35 @@ private: TIntrusiveList<TWaiter> Waiters_; }; - + class TContSimpleEvent { public: - TContSimpleEvent(TContExecutor* e) + TContSimpleEvent(TContExecutor* e) : E_(e) { } - TContExecutor* Executor() const noexcept { + TContExecutor* Executor() const noexcept { return E_; } - void Signal() noexcept { + void Signal() noexcept { Q_.Signal(); } - void BroadCast() noexcept { + void BroadCast() noexcept { Q_.BroadCast(); } - int WaitD(TInstant deadLine) noexcept { - return Q_.WaitD(E_->Running(), deadLine); + int WaitD(TInstant deadLine) noexcept { + return Q_.WaitD(E_->Running(), deadLine); } - int WaitT(TDuration timeout) noexcept { + int WaitT(TDuration timeout) noexcept { return WaitD(timeout.ToDeadLine()); } - int WaitI() noexcept { + int WaitI() noexcept { return WaitD(TInstant::Max()); } diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp index 6bf8b03348..7ae6f74051 100644 --- a/library/cpp/coroutine/engine/impl.cpp +++ b/library/cpp/coroutine/engine/impl.cpp @@ -1,162 +1,162 @@ -#include "impl.h" +#include "impl.h" #include "stack/stack_allocator.h" #include "stack/stack_guards.h" #include <util/generic/scope.h> #include <util/thread/singleton.h> -#include <util/stream/format.h> +#include <util/stream/format.h> #include <util/stream/output.h> #include <util/system/yassert.h> -TCont::TJoinWait::TJoinWait(TCont& c) noexcept - : Cont_(c) -{} - -void TCont::TJoinWait::Wake() noexcept { - Cont_.ReSchedule(); -} - +TCont::TJoinWait::TJoinWait(TCont& c) noexcept + : Cont_(c) +{} + +void TCont::TJoinWait::Wake() noexcept { + Cont_.ReSchedule(); +} + TCont::TCont(NCoro::NStack::IAllocator& allocator, uint32_t stackSize, TContExecutor& executor, NCoro::TTrampoline::TFunc func, const char* name) noexcept - : Executor_(executor) + : Executor_(executor) , Name_(name) - , Trampoline_( + , Trampoline_( allocator, - stackSize, + stackSize, std::move(func), this - ) -{} - - + ) +{} + + void TCont::PrintMe(IOutputStream& out) const noexcept { out << "cont(" - << "name = " << Name_ << ", " - << "addr = " << Hex((size_t)this) + << "name = " << Name_ << ", " + << "addr = " << Hex((size_t)this) << ")"; } -bool TCont::Join(TCont* c, TInstant deadLine) noexcept { - TJoinWait ev(*this); - c->Waiters_.PushBack(&ev); - - do { - if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) { - if (!ev.Empty()) { - c->Cancel(); - - do { +bool TCont::Join(TCont* c, TInstant deadLine) noexcept { + TJoinWait ev(*this); + c->Waiters_.PushBack(&ev); + + do { + if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) { + if (!ev.Empty()) { + c->Cancel(); + + do { Switch(); - } while (!ev.Empty()); - } - - return false; - } - } while (!ev.Empty()); - - return true; -} - -int TCont::SleepD(TInstant deadline) noexcept { - TTimerEvent event(this, deadline); - - return ExecuteEvent(&event); -} - + } while (!ev.Empty()); + } + + return false; + } + } while (!ev.Empty()); + + return true; +} + +int TCont::SleepD(TInstant deadline) noexcept { + TTimerEvent event(this, deadline); + + return ExecuteEvent(&event); +} + void TCont::Switch() noexcept { Executor()->RunScheduler(); } -void TCont::Yield() noexcept { - if (SleepD(TInstant::Zero())) { - ReScheduleAndSwitch(); - } -} - -void TCont::ReScheduleAndSwitch() noexcept { - ReSchedule(); +void TCont::Yield() noexcept { + if (SleepD(TInstant::Zero())) { + ReScheduleAndSwitch(); + } +} + +void TCont::ReScheduleAndSwitch() noexcept { + ReSchedule(); Switch(); -} - -void TCont::Terminate() { - while (!Waiters_.Empty()) { - Waiters_.PopFront()->Wake(); - } - Executor()->Exit(this); -} - -bool TCont::IAmRunning() const noexcept { - return this == Executor()->Running(); -} - -void TCont::Cancel() noexcept { - if (Cancelled()) { - return; - } - - Cancelled_ = true; - - if (!IAmRunning()) { - ReSchedule(); - } -} - -void TCont::ReSchedule() noexcept { - if (Cancelled()) { - // Legacy code may expect a Cancelled coroutine to be scheduled without delay. - Executor()->ScheduleExecutionNow(this); - } else { - Executor()->ScheduleExecution(this); - } -} - - -TContExecutor::TContExecutor( +} + +void TCont::Terminate() { + while (!Waiters_.Empty()) { + Waiters_.PopFront()->Wake(); + } + Executor()->Exit(this); +} + +bool TCont::IAmRunning() const noexcept { + return this == Executor()->Running(); +} + +void TCont::Cancel() noexcept { + if (Cancelled()) { + return; + } + + Cancelled_ = true; + + if (!IAmRunning()) { + ReSchedule(); + } +} + +void TCont::ReSchedule() noexcept { + if (Cancelled()) { + // Legacy code may expect a Cancelled coroutine to be scheduled without delay. + Executor()->ScheduleExecutionNow(this); + } else { + Executor()->ScheduleExecution(this); + } +} + + +TContExecutor::TContExecutor( uint32_t defaultStackSize, - THolder<IPollerFace> poller, + THolder<IPollerFace> poller, NCoro::IScheduleCallback* scheduleCallback, NCoro::IEnterPollerCallback* enterPollerCallback, NCoro::NStack::EGuard defaultGuard, TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings, NCoro::ITime* time -) +) : ScheduleCallback_(scheduleCallback) , EnterPollerCallback_(enterPollerCallback) - , DefaultStackSize_(defaultStackSize) - , Poller_(std::move(poller)) + , DefaultStackSize_(defaultStackSize) + , Poller_(std::move(poller)) , Time_(time) { StackAllocator_ = NCoro::NStack::GetAllocator(poolSettings, defaultGuard); } - -TContExecutor::~TContExecutor() { - Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_); -} - -void TContExecutor::Execute() noexcept { - auto nop = [](void*){}; - Execute(nop); -} - -void TContExecutor::Execute(TContFunc func, void* arg) noexcept { + +TContExecutor::~TContExecutor() { + Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_); +} + +void TContExecutor::Execute() noexcept { + auto nop = [](void*){}; + Execute(nop); +} + +void TContExecutor::Execute(TContFunc func, void* arg) noexcept { CreateOwned([=](TCont* cont) { func(cont, arg); }, "sys_main"); - RunScheduler(); + RunScheduler(); } -void TContExecutor::WaitForIO() { - while (Ready_.Empty() && !WaitQueue_.Empty()) { +void TContExecutor::WaitForIO() { + while (Ready_.Empty() && !WaitQueue_.Empty()) { const auto now = Now(); - - // Waking a coroutine puts it into ReadyNext_ list - const auto next = WaitQueue_.WakeTimedout(now); - + + // Waking a coroutine puts it into ReadyNext_ list + const auto next = WaitQueue_.WakeTimedout(now); + if (!UserEvents_.Empty()) { TIntrusiveList<IUserEvent> userEvents; userEvents.Swap(UserEvents_); @@ -165,11 +165,11 @@ void TContExecutor::WaitForIO() { } while (!userEvents.Empty()); } - // Polling will return as soon as there is an event to process or a timeout. - // If there are woken coroutines we do not want to sleep in the poller - // yet still we want to check for new io - // to prevent ourselves from locking out of io by constantly waking coroutines. - + // Polling will return as soon as there is an event to process or a timeout. + // If there are woken coroutines we do not want to sleep in the poller + // yet still we want to check for new io + // to prevent ourselves from locking out of io by constantly waking coroutines. + if (ReadyNext_.Empty()) { if (EnterPollerCallback_) { EnterPollerCallback_->OnEnterPoller(); @@ -187,9 +187,9 @@ void TContExecutor::WaitForIO() { EnterPollerCallback_->OnExitPoller(); } } - - Ready_.Append(ReadyNext_); - } + + Ready_.Append(ReadyNext_); + } } void TContExecutor::Poll(TInstant deadline) { @@ -198,42 +198,42 @@ void TContExecutor::Poll(TInstant deadline) { // Waking a coroutine puts it into ReadyNext_ list for (auto event : PollerEvents_) { - auto* lst = (NCoro::TPollEventList*)event.Data; - const int status = event.Status; - - if (status) { - for (auto it = lst->Begin(); it != lst->End();) { - (it++)->OnPollEvent(status); - } - } else { - const ui16 filter = event.Filter; - - for (auto it = lst->Begin(); it != lst->End();) { - if (it->What() & filter) { - (it++)->OnPollEvent(0); - } else { - ++it; - } - } - } - } -} - -void TContExecutor::Abort() noexcept { - WaitQueue_.Abort(); - auto visitor = [](TCont* c) { - c->Cancel(); - }; - Ready_.ForEach(visitor); - ReadyNext_.ForEach(visitor); -} - -TCont* TContExecutor::Create( - TContFunc func, - void* arg, - const char* name, - TMaybe<ui32> customStackSize -) noexcept { + auto* lst = (NCoro::TPollEventList*)event.Data; + const int status = event.Status; + + if (status) { + for (auto it = lst->Begin(); it != lst->End();) { + (it++)->OnPollEvent(status); + } + } else { + const ui16 filter = event.Filter; + + for (auto it = lst->Begin(); it != lst->End();) { + if (it->What() & filter) { + (it++)->OnPollEvent(0); + } else { + ++it; + } + } + } + } +} + +void TContExecutor::Abort() noexcept { + WaitQueue_.Abort(); + auto visitor = [](TCont* c) { + c->Cancel(); + }; + Ready_.ForEach(visitor); + ReadyNext_.ForEach(visitor); +} + +TCont* TContExecutor::Create( + TContFunc func, + void* arg, + const char* name, + TMaybe<ui32> customStackSize +) noexcept { return CreateOwned([=](TCont* cont) { func(cont, arg); }, name, customStackSize); @@ -244,38 +244,38 @@ TCont* TContExecutor::CreateOwned( const char* name, TMaybe<ui32> customStackSize ) noexcept { - Allocated_ += 1; - if (!customStackSize) { - customStackSize = DefaultStackSize_; - } + Allocated_ += 1; + if (!customStackSize) { + customStackSize = DefaultStackSize_; + } auto* cont = new TCont(*StackAllocator_, *customStackSize, *this, std::move(func), name); - ScheduleExecution(cont); - return cont; -} - + ScheduleExecution(cont); + return cont; +} + NCoro::NStack::TAllocatorStats TContExecutor::GetAllocatorStats() const noexcept { return StackAllocator_->GetStackStats(); } -void TContExecutor::Release(TCont* cont) noexcept { - delete cont; - Allocated_ -= 1; -} - -void TContExecutor::ScheduleToDelete(TCont* cont) noexcept { - ToDelete_.PushBack(cont); -} - -void TContExecutor::ScheduleExecution(TCont* cont) noexcept { - cont->Scheduled_ = true; - ReadyNext_.PushBack(cont); -} - -void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept { - cont->Scheduled_ = true; - Ready_.PushBack(cont); -} - +void TContExecutor::Release(TCont* cont) noexcept { + delete cont; + Allocated_ -= 1; +} + +void TContExecutor::ScheduleToDelete(TCont* cont) noexcept { + ToDelete_.PushBack(cont); +} + +void TContExecutor::ScheduleExecution(TCont* cont) noexcept { + cont->Scheduled_ = true; + ReadyNext_.PushBack(cont); +} + +void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept { + cont->Scheduled_ = true; + Ready_.PushBack(cont); +} + namespace { inline TContExecutor*& ThisThreadExecutor() { struct TThisThreadExecutorHolder { @@ -285,12 +285,12 @@ namespace { } } -void TContExecutor::DeleteScheduled() noexcept { - ToDelete_.ForEach([this](TCont* c) { - Release(c); - }); -} - +void TContExecutor::DeleteScheduled() noexcept { + ToDelete_.ForEach([this](TCont* c) { + Release(c); + }); +} + TCont* RunningCont() { TContExecutor* thisThreadExecutor = ThisThreadExecutor(); return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr; @@ -314,7 +314,7 @@ void TContExecutor::RunScheduler() noexcept { WaitForIO(); DeleteScheduled(); Ready_.Append(ReadyNext_); - + if (Ready_.Empty()) { Current_ = nullptr; if (caller) { @@ -322,18 +322,18 @@ void TContExecutor::RunScheduler() noexcept { } break; } - - TCont* cont = Ready_.PopFront(); + + TCont* cont = Ready_.PopFront(); if (ScheduleCallback_) { ScheduleCallback_->OnSchedule(*this, *cont); - } + } Current_ = cont; cont->Scheduled_ = false; if (cont == caller) { break; - } + } context->SwitchTo(cont->Trampoline_.Context()); if (Paused_) { Paused_ = false; @@ -346,9 +346,9 @@ void TContExecutor::RunScheduler() noexcept { } } catch (...) { TBackTrace::FromCurrentException().PrintTo(Cerr); - Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str()); + Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str()); } -} +} void TContExecutor::Pause() { if (auto cont = Running()) { @@ -358,17 +358,17 @@ void TContExecutor::Pause() { } } -void TContExecutor::Exit(TCont* cont) noexcept { - ScheduleToDelete(cont); - cont->SwitchTo(&SchedContext_); - Y_FAIL("can not return from exit"); -} - +void TContExecutor::Exit(TCont* cont) noexcept { + ScheduleToDelete(cont); + cont->SwitchTo(&SchedContext_); + Y_FAIL("can not return from exit"); +} + TInstant TContExecutor::Now() { return Y_LIKELY(Time_ == nullptr) ? TInstant::Now() : Time_->Now(); } -template <> -void Out<TCont>(IOutputStream& out, const TCont& c) { - c.PrintMe(out); +template <> +void Out<TCont>(IOutputStream& out, const TCont& c) { + c.PrintMe(out); } diff --git a/library/cpp/coroutine/engine/impl.h b/library/cpp/coroutine/engine/impl.h index 4713b61efe..283a96ecf1 100644 --- a/library/cpp/coroutine/engine/impl.h +++ b/library/cpp/coroutine/engine/impl.h @@ -1,25 +1,25 @@ -#pragma once +#pragma once #include "callbacks.h" -#include "cont_poller.h" -#include "iostatus.h" +#include "cont_poller.h" +#include "iostatus.h" #include "poller.h" #include "stack/stack_common.h" -#include "trampoline.h" +#include "trampoline.h" #include "custom_time.h" #include <library/cpp/containers/intrusive_rb_tree/rb_tree.h> - + #include <util/system/error.h> #include <util/generic/ptr.h> #include <util/generic/intrlist.h> #include <util/datetime/base.h> -#include <util/generic/maybe.h> +#include <util/generic/maybe.h> #include <util/generic/function.h> - -#define EWAKEDUP 34567 - + +#define EWAKEDUP 34567 + class TCont; struct TContRep; class TContExecutor; @@ -28,97 +28,97 @@ class TContPollEvent; namespace NCoro::NStack { class IAllocator; } - -class TCont : private TIntrusiveListItem<TCont> { - struct TJoinWait: public TIntrusiveListItem<TJoinWait> { - TJoinWait(TCont& c) noexcept; - void Wake() noexcept; +class TCont : private TIntrusiveListItem<TCont> { + struct TJoinWait: public TIntrusiveListItem<TJoinWait> { + TJoinWait(TCont& c) noexcept; + + void Wake() noexcept; - public: - TCont& Cont_; - }; + public: + TCont& Cont_; + }; - friend class TContExecutor; - friend class TIntrusiveListItem<TCont>; - friend class NCoro::TEventWaitQueue; - friend class NCoro::TTrampoline; + friend class TContExecutor; + friend class TIntrusiveListItem<TCont>; + friend class NCoro::TEventWaitQueue; + friend class NCoro::TTrampoline; -private: - TCont( +private: + TCont( NCoro::NStack::IAllocator& allocator, uint32_t stackSize, - TContExecutor& executor, + TContExecutor& executor, NCoro::TTrampoline::TFunc func, - const char* name - ) noexcept; + const char* name + ) noexcept; -public: - TContExecutor* Executor() noexcept { - return &Executor_; +public: + TContExecutor* Executor() noexcept { + return &Executor_; } - const TContExecutor* Executor() const noexcept { - return &Executor_; + const TContExecutor* Executor() const noexcept { + return &Executor_; } - const char* Name() const noexcept { + const char* Name() const noexcept { return Name_; } void PrintMe(IOutputStream& out) const noexcept; - void Yield() noexcept; + void Yield() noexcept; - void ReScheduleAndSwitch() noexcept; + void ReScheduleAndSwitch() noexcept; /// @return ETIMEDOUT on success - int SleepD(TInstant deadline) noexcept; + int SleepD(TInstant deadline) noexcept; - int SleepT(TDuration timeout) noexcept { + int SleepT(TDuration timeout) noexcept { return SleepD(timeout.ToDeadLine()); } - int SleepI() noexcept { + int SleepI() noexcept { return SleepD(TInstant::Max()); } - bool IAmRunning() const noexcept; + bool IAmRunning() const noexcept; - void Cancel() noexcept; + void Cancel() noexcept; - bool Cancelled() const noexcept { + bool Cancelled() const noexcept { return Cancelled_; } - bool Scheduled() const noexcept { + bool Scheduled() const noexcept { return Scheduled_; } - bool Join(TCont* c, TInstant deadLine = TInstant::Max()) noexcept; + bool Join(TCont* c, TInstant deadLine = TInstant::Max()) noexcept; - void ReSchedule() noexcept; + void ReSchedule() noexcept; void Switch() noexcept; - void SwitchTo(TExceptionSafeContext* ctx) { - Trampoline_.SwitchTo(ctx); - } + void SwitchTo(TExceptionSafeContext* ctx) { + Trampoline_.SwitchTo(ctx); + } private: - void Terminate(); - + void Terminate(); + private: - TContExecutor& Executor_; - - // TODO(velavokr): allow name storage owning (for generated names backed by TString) - const char* Name_ = nullptr; + TContExecutor& Executor_; + + // TODO(velavokr): allow name storage owning (for generated names backed by TString) + const char* Name_ = nullptr; NCoro::TTrampoline Trampoline_; TIntrusiveList<TJoinWait> Waiters_; - bool Cancelled_ = false; - bool Scheduled_ = false; + bool Cancelled_ = false; + bool Scheduled_ = false; }; TCont* RunningCont(); @@ -147,60 +147,60 @@ public: /// Note, coroutines are single-threaded, and all methods must be called from the single thread class TContExecutor { friend class TCont; - using TContList = TIntrusiveList<TCont>; + using TContList = TIntrusiveList<TCont>; public: - TContExecutor( + TContExecutor( uint32_t defaultStackSize, - THolder<IPollerFace> poller = IPollerFace::Default(), - NCoro::IScheduleCallback* = nullptr, + THolder<IPollerFace> poller = IPollerFace::Default(), + NCoro::IScheduleCallback* = nullptr, NCoro::IEnterPollerCallback* = nullptr, NCoro::NStack::EGuard stackGuard = NCoro::NStack::EGuard::Canary, TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings = Nothing(), NCoro::ITime* time = nullptr - ); + ); ~TContExecutor(); - // if we already have a coroutine to run - void Execute() noexcept; + // if we already have a coroutine to run + void Execute() noexcept; - void Execute(TContFunc func, void* arg = nullptr) noexcept; + void Execute(TContFunc func, void* arg = nullptr) noexcept; template <class Functor> - void Execute(Functor& f) noexcept { + void Execute(Functor& f) noexcept { Execute((TContFunc)ContHelperFunc<Functor>, (void*)&f); } template <typename T, void (T::*M)(TCont*)> - void Execute(T* obj) noexcept { + void Execute(T* obj) noexcept { Execute(ContHelperMemberFunc<T, M>, obj); } template <class Functor> - TCont* Create( - Functor& f, - const char* name, - TMaybe<ui32> customStackSize = Nothing() - ) noexcept { - return Create((TContFunc)ContHelperFunc<Functor>, (void*)&f, name, customStackSize); + TCont* Create( + Functor& f, + const char* name, + TMaybe<ui32> customStackSize = Nothing() + ) noexcept { + return Create((TContFunc)ContHelperFunc<Functor>, (void*)&f, name, customStackSize); } template <typename T, void (T::*M)(TCont*)> - TCont* Create( - T* obj, - const char* name, - TMaybe<ui32> customStackSize = Nothing() - ) noexcept { - return Create(ContHelperMemberFunc<T, M>, obj, name, customStackSize); + TCont* Create( + T* obj, + const char* name, + TMaybe<ui32> customStackSize = Nothing() + ) noexcept { + return Create(ContHelperMemberFunc<T, M>, obj, name, customStackSize); } - TCont* Create( - TContFunc func, - void* arg, - const char* name, - TMaybe<ui32> customStackSize = Nothing() - ) noexcept; + TCont* Create( + TContFunc func, + void* arg, + const char* name, + TMaybe<ui32> customStackSize = Nothing() + ) noexcept; TCont* CreateOwned( NCoro::TTrampoline::TFunc func, @@ -208,44 +208,44 @@ public: TMaybe<ui32> customStackSize = Nothing() ) noexcept; - NCoro::TContPoller* Poller() noexcept { + NCoro::TContPoller* Poller() noexcept { return &Poller_; } - TCont* Running() noexcept { + TCont* Running() noexcept { return Current_; } - const TCont* Running() const noexcept { + const TCont* Running() const noexcept { return Current_; } - size_t TotalReadyConts() const noexcept { - return Ready_.Size() + TotalScheduledConts(); - } - - size_t TotalScheduledConts() const noexcept { - return ReadyNext_.Size(); - } - - size_t TotalConts() const noexcept { - return Allocated_; - } - - size_t TotalWaitingConts() const noexcept { - return TotalConts() - TotalReadyConts(); - } - + size_t TotalReadyConts() const noexcept { + return Ready_.Size() + TotalScheduledConts(); + } + + size_t TotalScheduledConts() const noexcept { + return ReadyNext_.Size(); + } + + size_t TotalConts() const noexcept { + return Allocated_; + } + + size_t TotalWaitingConts() const noexcept { + return TotalConts() - TotalReadyConts(); + } + NCoro::NStack::TAllocatorStats GetAllocatorStats() const noexcept; - // TODO(velavokr): rename, it is just CancelAll actually - void Abort() noexcept; + // TODO(velavokr): rename, it is just CancelAll actually + void Abort() noexcept; - void SetFailOnError(bool fail) noexcept { + void SetFailOnError(bool fail) noexcept { FailOnError_ = fail; } - bool FailOnError() const noexcept { + bool FailOnError() const noexcept { return FailOnError_; } @@ -253,12 +253,12 @@ public: WaitQueue_.Register(event); } - void ScheduleIoWait(TFdEvent* event) { + void ScheduleIoWait(TFdEvent* event) { RegisterInWaitQueue(event); Poller_.Schedule(event); } - void ScheduleIoWait(TTimerEvent* event) noexcept { + void ScheduleIoWait(TTimerEvent* event) noexcept { RegisterInWaitQueue(event); } @@ -269,45 +269,45 @@ public: void Pause(); TInstant Now(); private: - void Release(TCont* cont) noexcept; + void Release(TCont* cont) noexcept; - void Exit(TCont* cont) noexcept; + void Exit(TCont* cont) noexcept; void RunScheduler() noexcept; - void ScheduleToDelete(TCont* cont) noexcept; + void ScheduleToDelete(TCont* cont) noexcept; - void ScheduleExecution(TCont* cont) noexcept; + void ScheduleExecution(TCont* cont) noexcept; - void ScheduleExecutionNow(TCont* cont) noexcept; - - void DeleteScheduled() noexcept; + void ScheduleExecutionNow(TCont* cont) noexcept; + + void DeleteScheduled() noexcept; void WaitForIO(); void Poll(TInstant deadline); - + private: NCoro::IScheduleCallback* const ScheduleCallback_ = nullptr; NCoro::IEnterPollerCallback* const EnterPollerCallback_ = nullptr; const uint32_t DefaultStackSize_; THolder<NCoro::NStack::IAllocator> StackAllocator_; - - TExceptionSafeContext SchedContext_; - + + TExceptionSafeContext SchedContext_; + TContList ToDelete_; TContList Ready_; - TContList ReadyNext_; - NCoro::TEventWaitQueue WaitQueue_; - NCoro::TContPoller Poller_; + TContList ReadyNext_; + NCoro::TEventWaitQueue WaitQueue_; + NCoro::TContPoller Poller_; NCoro::TContPoller::TEvents PollerEvents_; TInstant LastPoll_; - + TIntrusiveList<IUserEvent> UserEvents_; - size_t Allocated_ = 0; - TCont* Current_ = nullptr; - bool FailOnError_ = false; + size_t Allocated_ = 0; + TCont* Current_ = nullptr; + bool FailOnError_ = false; bool Paused_ = false; NCoro::ITime* Time_ = nullptr; }; diff --git a/library/cpp/coroutine/engine/iostatus.h b/library/cpp/coroutine/engine/iostatus.h index 201cc77825..bf6036805d 100644 --- a/library/cpp/coroutine/engine/iostatus.h +++ b/library/cpp/coroutine/engine/iostatus.h @@ -1,41 +1,41 @@ -#pragma once +#pragma once #include <util/generic/yexception.h> class TIOStatus { public: - TIOStatus(int status) noexcept + TIOStatus(int status) noexcept : Status_(status) { } - static TIOStatus Error(int status) noexcept { + static TIOStatus Error(int status) noexcept { return TIOStatus(status); } - static TIOStatus Error() noexcept { + static TIOStatus Error() noexcept { return TIOStatus(LastSystemError()); } - static TIOStatus Success() noexcept { + static TIOStatus Success() noexcept { return TIOStatus(0); } - void Check() const { + void Check() const { if (Status_) { ythrow TSystemError(Status_) << "io error"; } } - bool Failed() const noexcept { + bool Failed() const noexcept { return (bool)Status_; } - bool Succeed() const noexcept { + bool Succeed() const noexcept { return !Failed(); } - int Status() const noexcept { + int Status() const noexcept { return Status_; } @@ -43,43 +43,43 @@ private: int Status_; }; - + class TContIOStatus { public: - TContIOStatus(size_t processed, TIOStatus status) noexcept + TContIOStatus(size_t processed, TIOStatus status) noexcept : Processed_(processed) , Status_(status) { } - static TContIOStatus Error(TIOStatus status) noexcept { + static TContIOStatus Error(TIOStatus status) noexcept { return TContIOStatus(0, status); } - static TContIOStatus Error() noexcept { + static TContIOStatus Error() noexcept { return TContIOStatus(0, TIOStatus::Error()); } - static TContIOStatus Success(size_t processed) noexcept { + static TContIOStatus Success(size_t processed) noexcept { return TContIOStatus(processed, TIOStatus::Success()); } - static TContIOStatus Eof() noexcept { + static TContIOStatus Eof() noexcept { return Success(0); } - ~TContIOStatus() { + ~TContIOStatus() { } - size_t Processed() const noexcept { + size_t Processed() const noexcept { return Processed_; } - int Status() const noexcept { + int Status() const noexcept { return Status_.Status(); } - size_t Checked() const { + size_t Checked() const { Status_.Check(); return Processed_; diff --git a/library/cpp/coroutine/engine/mutex.h b/library/cpp/coroutine/engine/mutex.h index 5a73e13c2c..93e9119503 100644 --- a/library/cpp/coroutine/engine/mutex.h +++ b/library/cpp/coroutine/engine/mutex.h @@ -1,20 +1,20 @@ -#pragma once +#pragma once + +#include "impl.h" +#include "events.h" -#include "impl.h" -#include "events.h" - class TContMutex { public: - TContMutex() noexcept + TContMutex() noexcept : Token_(true) { } - ~TContMutex() { + ~TContMutex() { Y_ASSERT(Token_); } - int LockD(TCont* current, TInstant deadline) { + int LockD(TCont* current, TInstant deadline) { while (!Token_) { const int ret = WaitQueue_.WaitD(current, deadline); @@ -28,15 +28,15 @@ public: return 0; } - int LockT(TCont* current, TDuration timeout) { + int LockT(TCont* current, TDuration timeout) { return LockD(current, timeout.ToDeadLine()); } - int LockI(TCont* current) { + int LockI(TCont* current) { return LockD(current, TInstant::Max()); } - void UnLock() noexcept { + void UnLock() noexcept { Y_ASSERT(!Token_); Token_ = true; diff --git a/library/cpp/coroutine/engine/network.cpp b/library/cpp/coroutine/engine/network.cpp index 46100a8023..85b647d210 100644 --- a/library/cpp/coroutine/engine/network.cpp +++ b/library/cpp/coroutine/engine/network.cpp @@ -1,325 +1,325 @@ -#include "impl.h" -#include "network.h" +#include "impl.h" +#include "network.h" -#include <util/generic/scope.h> -#include <util/generic/xrange.h> +#include <util/generic/scope.h> +#include <util/generic/xrange.h> #include <sys/uio.h> -#if defined(_bionic_) -# define IOV_MAX 1024 -#endif +#if defined(_bionic_) +# define IOV_MAX 1024 +#endif -namespace NCoro { - namespace { - bool IsBlocked(int lasterr) noexcept { - return lasterr == EAGAIN || lasterr == EWOULDBLOCK; - } +namespace NCoro { + namespace { + bool IsBlocked(int lasterr) noexcept { + return lasterr == EAGAIN || lasterr == EWOULDBLOCK; + } - ssize_t DoReadVector(SOCKET fd, TContIOVector* vec) noexcept { + ssize_t DoReadVector(SOCKET fd, TContIOVector* vec) noexcept { return readv(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count())); - } + } - ssize_t DoWriteVector(SOCKET fd, TContIOVector* vec) noexcept { + ssize_t DoWriteVector(SOCKET fd, TContIOVector* vec) noexcept { return writev(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count())); - } - } - - - int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept { - if (cont->Cancelled()) { - return ECANCELED; - } - - if (nfds == 0) { - return 0; - } - - TTempArray<TFdEvent> events(nfds); - - for (auto i : xrange(nfds)) { - new(events.Data() + i) TFdEvent(cont, fds[i], (ui16) what[i], deadline); - } - - Y_DEFER { - for (auto i : xrange(nfds)) { - (events.Data() + i)->~TFdEvent(); - } - }; - - for (auto i : xrange(nfds)) { - cont->Executor()->ScheduleIoWait(events.Data() + i); - } + } + } + + + int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept { + if (cont->Cancelled()) { + return ECANCELED; + } + + if (nfds == 0) { + return 0; + } + + TTempArray<TFdEvent> events(nfds); + + for (auto i : xrange(nfds)) { + new(events.Data() + i) TFdEvent(cont, fds[i], (ui16) what[i], deadline); + } + + Y_DEFER { + for (auto i : xrange(nfds)) { + (events.Data() + i)->~TFdEvent(); + } + }; + + for (auto i : xrange(nfds)) { + cont->Executor()->ScheduleIoWait(events.Data() + i); + } cont->Switch(); - - if (cont->Cancelled()) { - return ECANCELED; - } - - TFdEvent* ret = nullptr; - int status = EINPROGRESS; - - for (auto i : xrange(nfds)) { - auto& ev = *(events.Data() + i); - switch (ev.Status()) { - case EINPROGRESS: - break; - case ETIMEDOUT: - if (status != EINPROGRESS) { - break; + + if (cont->Cancelled()) { + return ECANCELED; + } + + TFdEvent* ret = nullptr; + int status = EINPROGRESS; + + for (auto i : xrange(nfds)) { + auto& ev = *(events.Data() + i); + switch (ev.Status()) { + case EINPROGRESS: + break; + case ETIMEDOUT: + if (status != EINPROGRESS) { + break; } [[fallthrough]]; - default: - status = ev.Status(); - ret = &ev; - } - } - - if (ret) { - if (outfd) { - *outfd = ret->Fd(); + default: + status = ev.Status(); + ret = &ev; } - return ret->Status(); } - return EINPROGRESS; + if (ret) { + if (outfd) { + *outfd = ret->Fd(); + } + return ret->Status(); + } + + return EINPROGRESS; + } + + int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept { + return SelectD(cont, fds, what, nfds, outfd, timeout.ToDeadLine()); } - int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept { - return SelectD(cont, fds, what, nfds, outfd, timeout.ToDeadLine()); + int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd) { + return SelectD(cont, fds, what, nfds, outfd, TInstant::Max()); } - int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd) { - return SelectD(cont, fds, what, nfds, outfd, TInstant::Max()); - } - - int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept { - TFdEvent event(cont, fd, (ui16)what, deadline); - return ExecuteEvent(&event); + int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept { + TFdEvent event(cont, fd, (ui16)what, deadline); + return ExecuteEvent(&event); } - int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept { - return PollD(cont, fd, what, timeout.ToDeadLine()); - } - - int PollI(TCont* cont, SOCKET fd, int what) noexcept { - return PollD(cont, fd, what, TInstant::Max()); + int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept { + return PollD(cont, fd, what, timeout.ToDeadLine()); + } + + int PollI(TCont* cont, SOCKET fd, int what) noexcept { + return PollD(cont, fd, what, TInstant::Max()); } - TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { - while (true) { - ssize_t res = DoReadVector(fd, vec); + TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { + while (true) { + ssize_t res = DoReadVector(fd, vec); - if (res >= 0) { - return TContIOStatus::Success((size_t) res); + if (res >= 0) { + return TContIOStatus::Success((size_t) res); } { const int err = LastSystemError(); if (!IsBlocked(err)) { - return TContIOStatus::Error(err); + return TContIOStatus::Error(err); } } - if ((res = PollD(cont, fd, CONT_POLL_READ, deadline)) != 0) { - return TContIOStatus::Error((int) res); + if ((res = PollD(cont, fd, CONT_POLL_READ, deadline)) != 0) { + return TContIOStatus::Error((int) res); } } } - TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { - return ReadVectorD(cont, fd, vec, timeOut.ToDeadLine()); - } + TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { + return ReadVectorD(cont, fd, vec, timeOut.ToDeadLine()); + } - TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { - return ReadVectorD(cont, fd, vec, TInstant::Max()); - } + TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { + return ReadVectorD(cont, fd, vec, TInstant::Max()); + } - TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept { - IOutputStream::TPart part(buf, len); - TContIOVector vec(&part, 1); - return ReadVectorD(cont, fd, &vec, deadline); - } + TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept { + IOutputStream::TPart part(buf, len); + TContIOVector vec(&part, 1); + return ReadVectorD(cont, fd, &vec, deadline); + } - TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept { - return ReadD(cont, fd, buf, len, timeout.ToDeadLine()); - } + TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept { + return ReadD(cont, fd, buf, len, timeout.ToDeadLine()); + } - TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept { - return ReadD(cont, fd, buf, len, TInstant::Max()); + TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept { + return ReadD(cont, fd, buf, len, TInstant::Max()); } - TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { - size_t written = 0; + TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { + size_t written = 0; - while (!vec->Complete()) { - ssize_t res = DoWriteVector(fd, vec); + while (!vec->Complete()) { + ssize_t res = DoWriteVector(fd, vec); - if (res >= 0) { - written += res; + if (res >= 0) { + written += res; - vec->Proceed((size_t) res); - } else { - { - const int err = LastSystemError(); + vec->Proceed((size_t) res); + } else { + { + const int err = LastSystemError(); - if (!IsBlocked(err)) { - return TContIOStatus(written, err); - } - } + if (!IsBlocked(err)) { + return TContIOStatus(written, err); + } + } - if ((res = PollD(cont, fd, CONT_POLL_WRITE, deadline)) != 0) { - return TContIOStatus(written, (int) res); - } - } + if ((res = PollD(cont, fd, CONT_POLL_WRITE, deadline)) != 0) { + return TContIOStatus(written, (int) res); + } + } } - return TContIOStatus::Success(written); + return TContIOStatus::Success(written); } - TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { - return WriteVectorD(cont, fd, vec, timeOut.ToDeadLine()); - } + TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { + return WriteVectorD(cont, fd, vec, timeOut.ToDeadLine()); + } - TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { - return WriteVectorD(cont, fd, vec, TInstant::Max()); - } + TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { + return WriteVectorD(cont, fd, vec, TInstant::Max()); + } - TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept { - IOutputStream::TPart part(buf, len); - TContIOVector vec(&part, 1); - return WriteVectorD(cont, fd, &vec, deadline); - } + TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept { + IOutputStream::TPart part(buf, len); + TContIOVector vec(&part, 1); + return WriteVectorD(cont, fd, &vec, deadline); + } - TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept { - return WriteD(cont, fd, buf, len, timeout.ToDeadLine()); - } + TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept { + return WriteD(cont, fd, buf, len, timeout.ToDeadLine()); + } - TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept { - return WriteD(cont, fd, buf, len, TInstant::Max()); + TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept { + return WriteD(cont, fd, buf, len, TInstant::Max()); } - int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept { - TSocketHolder res(Socket(ai)); + int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept { + TSocketHolder res(Socket(ai)); - if (res.Closed()) { - return LastSystemError(); - } + if (res.Closed()) { + return LastSystemError(); + } - const int ret = ConnectD(cont, res, ai.ai_addr, (socklen_t) ai.ai_addrlen, deadline); + const int ret = ConnectD(cont, res, ai.ai_addr, (socklen_t) ai.ai_addrlen, deadline); - if (!ret) { - s.Swap(res); - } - - return ret; + if (!ret) { + s.Swap(res); + } + + return ret; } - int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept { - int ret = EHOSTUNREACH; + int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept { + int ret = EHOSTUNREACH; - for (auto it = addr.Begin(); it != addr.End(); ++it) { - ret = ConnectD(cont, s, *it, deadline); + for (auto it = addr.Begin(); it != addr.End(); ++it) { + ret = ConnectD(cont, s, *it, deadline); - if (ret == 0 || ret == ETIMEDOUT) { - return ret; - } - } + if (ret == 0 || ret == ETIMEDOUT) { + return ret; + } + } - return ret; + return ret; } - int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept { - return ConnectD(cont, s, addr, timeout.ToDeadLine()); - } + int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept { + return ConnectD(cont, s, addr, timeout.ToDeadLine()); + } - int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept { - return ConnectD(cont, s, addr, TInstant::Max()); + int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept { + return ConnectD(cont, s, addr, TInstant::Max()); } - int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept { - if (connect(s, name, namelen)) { - const int err = LastSystemError(); - - if (!IsBlocked(err) && err != EINPROGRESS) { - return err; - } - - int ret = PollD(cont, s, CONT_POLL_WRITE, deadline); - - if (ret) { - return ret; - } - - // check if we really connected - // FIXME: Unportable ?? - int serr = 0; - socklen_t slen = sizeof(serr); - - ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (char*) &serr, &slen); - - if (ret) { - return LastSystemError(); - } - - if (serr) { - return serr; - } - } - - return 0; - } - - int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept { - return ConnectD(cont, s, name, namelen, timeout.ToDeadLine()); - } - - int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept { - return ConnectD(cont, s, name, namelen, TInstant::Max()); - } - - - int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept { - SOCKET ret; - - while ((ret = Accept4(s, addr, addrlen)) == INVALID_SOCKET) { - int err = LastSystemError(); - - if (!IsBlocked(err)) { - return -err; + int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept { + if (connect(s, name, namelen)) { + const int err = LastSystemError(); + + if (!IsBlocked(err) && err != EINPROGRESS) { + return err; } - - err = PollD(cont, s, CONT_POLL_READ, deadline); - if (err) { - return -err; - } - } + int ret = PollD(cont, s, CONT_POLL_WRITE, deadline); + + if (ret) { + return ret; + } + + // check if we really connected + // FIXME: Unportable ?? + int serr = 0; + socklen_t slen = sizeof(serr); + + ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (char*) &serr, &slen); + + if (ret) { + return LastSystemError(); + } + + if (serr) { + return serr; + } + } + + return 0; + } + + int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept { + return ConnectD(cont, s, name, namelen, timeout.ToDeadLine()); + } - return (int) ret; + int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept { + return ConnectD(cont, s, name, namelen, TInstant::Max()); } - int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept { - return AcceptD(cont, s, addr, addrlen, timeout.ToDeadLine()); - } - - int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept { - return AcceptD(cont, s, addr, addrlen, TInstant::Max()); - } - - SOCKET Socket(int domain, int type, int protocol) noexcept { - return Socket4(domain, type, protocol); - } - - SOCKET Socket(const struct addrinfo& ai) noexcept { - return Socket(ai.ai_family, ai.ai_socktype, ai.ai_protocol); - } -} + + int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept { + SOCKET ret; + + while ((ret = Accept4(s, addr, addrlen)) == INVALID_SOCKET) { + int err = LastSystemError(); + + if (!IsBlocked(err)) { + return -err; + } + + err = PollD(cont, s, CONT_POLL_READ, deadline); + + if (err) { + return -err; + } + } + + return (int) ret; + } + + int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept { + return AcceptD(cont, s, addr, addrlen, timeout.ToDeadLine()); + } + + int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept { + return AcceptD(cont, s, addr, addrlen, TInstant::Max()); + } + + SOCKET Socket(int domain, int type, int protocol) noexcept { + return Socket4(domain, type, protocol); + } + + SOCKET Socket(const struct addrinfo& ai) noexcept { + return Socket(ai.ai_family, ai.ai_socktype, ai.ai_protocol); + } +} diff --git a/library/cpp/coroutine/engine/network.h b/library/cpp/coroutine/engine/network.h index 79462be586..f2c9afe4f8 100644 --- a/library/cpp/coroutine/engine/network.h +++ b/library/cpp/coroutine/engine/network.h @@ -1,55 +1,55 @@ -#pragma once +#pragma once -#include "iostatus.h" +#include "iostatus.h" -#include <util/datetime/base.h> -#include <util/network/init.h> +#include <util/datetime/base.h> +#include <util/network/init.h> #include <util/network/iovec.h> -#include <util/network/nonblock.h> +#include <util/network/nonblock.h> #include <util/network/socket.h> class TCont; -namespace NCoro { - - int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept; - int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept; - int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd); +namespace NCoro { - int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept; - int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept; - int PollI(TCont* cont, SOCKET fd, int what) noexcept; + int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept; + int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept; + int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd); - TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept; - TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept; - TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept; + int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept; + int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept; + int PollI(TCont* cont, SOCKET fd, int what) noexcept; - TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept; - TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept; - TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept; + TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept; + TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept; + TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept; - TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept; - TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept; - TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept; + TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept; + TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept; + TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept; - TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept; - TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept; - TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept; + TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept; + TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept; + TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept; - int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept; + TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept; + TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept; + TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept; - int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept; - int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept; - int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept; + int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept; - int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept; - int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept; - int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept; + int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept; + int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept; + int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept; - int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept; - int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept; - int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept; + int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept; + int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept; + int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept; - SOCKET Socket(int domain, int type, int protocol) noexcept; - SOCKET Socket(const struct addrinfo& ai) noexcept; + int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept; + int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept; + int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept; + + SOCKET Socket(int domain, int type, int protocol) noexcept; + SOCKET Socket(const struct addrinfo& ai) noexcept; } diff --git a/library/cpp/coroutine/engine/poller.cpp b/library/cpp/coroutine/engine/poller.cpp index 4efabd0c7e..61164fa56b 100644 --- a/library/cpp/coroutine/engine/poller.cpp +++ b/library/cpp/coroutine/engine/poller.cpp @@ -5,30 +5,30 @@ #include <util/generic/intrlist.h> #include <util/generic/singleton.h> #include <util/system/env.h> -#include <util/string/cast.h> +#include <util/string/cast.h> namespace { - using TChange = IPollerFace::TChange; - using TEvent = IPollerFace::TEvent; - using TEvents = IPollerFace::TEvents; + using TChange = IPollerFace::TChange; + using TEvent = IPollerFace::TEvent; + using TEvents = IPollerFace::TEvents; template <class T> class TUnsafeBuf { public: - TUnsafeBuf() noexcept + TUnsafeBuf() noexcept : L_(0) { } - T* operator~() const noexcept { + T* operator~() const noexcept { return B_.Get(); } - size_t operator+() const noexcept { + size_t operator+() const noexcept { return L_; } - void Reserve(size_t len) { + void Reserve(size_t len) { len = FastClp2(len); if (len > L_) { @@ -42,7 +42,7 @@ namespace { size_t L_; }; - + template <class T> class TVirtualize: public IPollerFace { public: @@ -67,25 +67,25 @@ namespace { const EContPoller PollerEngine_; }; - + template <class T> class TPoller { - using TInternalEvent = typename T::TEvent; + using TInternalEvent = typename T::TEvent; public: - TPoller() { + TPoller() { E_.Reserve(1); } - void Set(const TChange& c) { + void Set(const TChange& c) { P_.Set(c.Data, c.Fd, c.Flags); } - void Reserve(size_t size) { + void Reserve(size_t size) { E_.Reserve(size); } - void Wait(TEvents& events, TInstant deadLine) { + void Wait(TEvents& events, TInstant deadLine) { const size_t ret = P_.WaitD(~E_, +E_, deadLine); events.reserve(ret); @@ -110,21 +110,21 @@ namespace { TUnsafeBuf<TInternalEvent> E_; }; - + template <class T> class TIndexedArray { - struct TVal: - public T, - public TIntrusiveListItem<TVal>, - public TObjectFromPool<TVal> - { + struct TVal: + public T, + public TIntrusiveListItem<TVal>, + public TObjectFromPool<TVal> + { // NOTE Constructor must be user-defined (and not =default) here // because TVal objects are created in the UB-capable placement // TObjectFromPool::new operator that stores data in a memory // allocated for the object. Without user defined constructor // zero-initialization takes place in TVal() expression and the // data is overwritten. - TVal() { + TVal() { } }; @@ -134,32 +134,32 @@ namespace { typedef typename TListType::TIterator TIterator; typedef typename TListType::TConstIterator TConstIterator; - TIndexedArray() + TIndexedArray() : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance()) { } - TIterator Begin() noexcept { + TIterator Begin() noexcept { return I_.Begin(); } - TIterator End() noexcept { + TIterator End() noexcept { return I_.End(); } - TConstIterator Begin() const noexcept { + TConstIterator Begin() const noexcept { return I_.Begin(); } - TConstIterator End() const noexcept { + TConstIterator End() const noexcept { return I_.End(); } - T& operator[](size_t i) { + T& operator[](size_t i) { return *Get(i); } - T* Get(size_t i) { + T* Get(size_t i) { TValRef& v = V_.Get(i); if (Y_UNLIKELY(!v)) { @@ -172,22 +172,22 @@ namespace { return v.Get(); } - void Erase(size_t i) noexcept { + void Erase(size_t i) noexcept { V_.Get(i).Destroy(); } - size_t Size() const noexcept { + size_t Size() const noexcept { return I_.Size(); } private: - using TValRef = THolder<TVal>; + using TValRef = THolder<TVal>; typename TVal::TPool P_; TSocketMap<TValRef> V_; TListType I_; }; - + inline short PollFlags(ui16 flags) noexcept { short ret = 0; @@ -208,15 +208,15 @@ namespace { return ret; } - + class TPollPoller { public: - size_t Size() const noexcept { + size_t Size() const noexcept { return S_.Size(); } template <class T> - void Build(T& t) const { + void Build(T& t) const { for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) { t.Set(*it); } @@ -224,7 +224,7 @@ namespace { t.Reserve(Size()); } - void Set(const TChange& c) { + void Set(const TChange& c) { if (c.Flags) { S_[c.Fd] = c; } else { @@ -232,7 +232,7 @@ namespace { } } - void Wait(TEvents& events, TInstant deadLine) { + void Wait(TEvents& events, TInstant deadLine) { T_.clear(); T_.reserve(Size()); @@ -265,8 +265,8 @@ namespace { int status = 0; ui16 filter = 0; - // We are perfectly fine with an EOF while reading a pipe or a unix socket - if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) { + // We are perfectly fine with an EOF while reading a pipe or a unix socket + if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) { filter |= CONT_POLL_READ; } @@ -310,16 +310,16 @@ namespace { TPollVec T_; }; - + class TCombinedPoller { typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller; public: - TCombinedPoller() { + TCombinedPoller() { P_.Reset(new TPollPoller()); } - void Set(const TChange& c) { + void Set(const TChange& c) { if (!P_) { D_->Set(c); } else { @@ -327,7 +327,7 @@ namespace { } } - void Wait(TEvents& events, TInstant deadLine) { + void Wait(TEvents& events, TInstant deadLine) { if (!P_) { D_->Wait(events, deadLine); } else { @@ -343,48 +343,48 @@ namespace { } private: - THolder<TPollPoller> P_; - THolder<TDefaultPoller> D_; + THolder<TPollPoller> P_; + THolder<TDefaultPoller> D_; }; struct TUserPoller: public TString { - TUserPoller() + TUserPoller() : TString(GetEnv("USER_POLLER")) { } }; } -THolder<IPollerFace> IPollerFace::Default() { +THolder<IPollerFace> IPollerFace::Default() { return Construct(*SingletonWithPriority<TUserPoller, 0>()); } -THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) { - return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default); -} +THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) { + return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default); +} -THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) { - switch (poller) { - case EContPoller::Default: +THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) { + switch (poller) { + case EContPoller::Default: case EContPoller::Combined: return MakeHolder<TVirtualize<TCombinedPoller>>(EContPoller::Combined); - case EContPoller::Select: + case EContPoller::Select: return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller); - case EContPoller::Poll: + case EContPoller::Poll: return MakeHolder<TVirtualize<TPollPoller>>(poller); - case EContPoller::Epoll: + case EContPoller::Epoll: #if defined(HAVE_EPOLL_POLLER) return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller); -#else - return nullptr; +#else + return nullptr; #endif - case EContPoller::Kqueue: -#if defined(HAVE_KQUEUE_POLLER) + case EContPoller::Kqueue: +#if defined(HAVE_KQUEUE_POLLER) return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller); -#else - return nullptr; +#else + return nullptr; #endif - default: - Y_FAIL("bad poller type"); + default: + Y_FAIL("bad poller type"); } } diff --git a/library/cpp/coroutine/engine/poller.h b/library/cpp/coroutine/engine/poller.h index 73d482cfaf..8ea012c0fc 100644 --- a/library/cpp/coroutine/engine/poller.h +++ b/library/cpp/coroutine/engine/poller.h @@ -1,4 +1,4 @@ -#pragma once +#pragma once #include <util/generic/ptr.h> #include <util/generic/vector.h> @@ -6,15 +6,15 @@ #include <util/network/pollerimpl.h> #include <util/datetime/base.h> -enum class EContPoller { - Default /* "default" */, +enum class EContPoller { + Default /* "default" */, Combined /* "combined" */, - Select /* "select" */, - Poll /* "poll" */, - Epoll /* "epoll" */, - Kqueue /* "kqueue" */ -}; - + Select /* "select" */, + Poll /* "poll" */, + Epoll /* "epoll" */, + Kqueue /* "kqueue" */ +}; + class IPollerFace { public: struct TChange { @@ -29,12 +29,12 @@ public: ui16 Filter; }; - using TEvents = TVector<TEvent>; + using TEvents = TVector<TEvent>; virtual ~IPollerFace() { } - void Set(void* ptr, SOCKET fd, ui16 flags) { + void Set(void* ptr, SOCKET fd, ui16 flags) { const TChange c = {fd, ptr, flags}; Set(c); @@ -44,7 +44,7 @@ public: virtual void Wait(TEvents& events, TInstant deadLine) = 0; virtual EContPoller PollEngine() const = 0; - static THolder<IPollerFace> Default(); - static THolder<IPollerFace> Construct(TStringBuf name); - static THolder<IPollerFace> Construct(EContPoller poller); + static THolder<IPollerFace> Default(); + static THolder<IPollerFace> Construct(TStringBuf name); + static THolder<IPollerFace> Construct(EContPoller poller); }; diff --git a/library/cpp/coroutine/engine/sockmap.h b/library/cpp/coroutine/engine/sockmap.h index b4a3af73f6..fd189e1774 100644 --- a/library/cpp/coroutine/engine/sockmap.h +++ b/library/cpp/coroutine/engine/sockmap.h @@ -6,7 +6,7 @@ template <class T> class TSocketMap { public: - T& Get(size_t idx) { + T& Get(size_t idx) { if (idx < 128000) { if (V_.size() <= idx) { V_.resize(idx + 1); diff --git a/library/cpp/coroutine/engine/sockpool.cpp b/library/cpp/coroutine/engine/sockpool.cpp index 39b97ecd85..b9482e780f 100644 --- a/library/cpp/coroutine/engine/sockpool.cpp +++ b/library/cpp/coroutine/engine/sockpool.cpp @@ -31,7 +31,7 @@ TPooledSocket TSocketPool::AllocateMore(TConnectData* conn) { TCont* cont = conn->Cont; while (true) { - TSocketHolder s(NCoro::Socket(Addr_->Addr()->sa_family, SOCK_STREAM, 0)); + TSocketHolder s(NCoro::Socket(Addr_->Addr()->sa_family, SOCK_STREAM, 0)); if (s == INVALID_SOCKET) { ythrow TSystemError(errno) << TStringBuf("can not create socket"); @@ -40,7 +40,7 @@ TPooledSocket TSocketPool::AllocateMore(TConnectData* conn) { SetCommonSockOpts(s, Addr_->Addr()); SetZeroLinger(s); - const int ret = NCoro::ConnectD(cont, s, Addr_->Addr(), Addr_->Len(), conn->DeadLine); + const int ret = NCoro::ConnectD(cont, s, Addr_->Addr(), Addr_->Len(), conn->DeadLine); if (ret == EINTR) { continue; diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h index b343b65fb6..1ebb7e7b38 100644 --- a/library/cpp/coroutine/engine/sockpool.h +++ b/library/cpp/coroutine/engine/sockpool.h @@ -1,7 +1,7 @@ -#pragma once +#pragma once #include "impl.h" -#include "network.h" +#include "network.h" #include <util/network/address.h> #include <util/network/socket.h> @@ -14,7 +14,7 @@ class TSocketPool; class TPooledSocket { class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> { public: - TImpl(SOCKET fd, TSocketPool* pool) noexcept + TImpl(SOCKET fd, TSocketPool* pool) noexcept : Pool_(pool) , IsKeepAlive_(false) , Fd_(fd) @@ -22,11 +22,11 @@ class TPooledSocket { Touch(); } - static void Destroy(TImpl* impl) noexcept { + static void Destroy(TImpl* impl) noexcept { impl->DoDestroy(); } - void DoDestroy() noexcept { + void DoDestroy() noexcept { if (!Closed() && IsKeepAlive() && IsInGoodState()) { ReturnToPool(); } else { @@ -34,28 +34,28 @@ class TPooledSocket { } } - bool IsKeepAlive() const noexcept { + bool IsKeepAlive() const noexcept { return IsKeepAlive_; } - void SetKeepAlive(bool ka) { + void SetKeepAlive(bool ka) { ::SetKeepAlive(Fd_, ka); IsKeepAlive_ = ka; } - SOCKET Socket() const noexcept { + SOCKET Socket() const noexcept { return Fd_; } - bool Closed() const noexcept { + bool Closed() const noexcept { return Fd_.Closed(); } - void Close() noexcept { + void Close() noexcept { Fd_.Close(); } - bool IsInGoodState() const noexcept { + bool IsInGoodState() const noexcept { int err = 0; socklen_t len = sizeof(err); @@ -64,15 +64,15 @@ class TPooledSocket { return !err; } - bool IsOpen() const noexcept { - return IsInGoodState() && IsNotSocketClosedByOtherSide(Fd_); + bool IsOpen() const noexcept { + return IsInGoodState() && IsNotSocketClosedByOtherSide(Fd_); } - void Touch() noexcept { + void Touch() noexcept { TouchTime_ = TInstant::Now(); } - const TInstant& LastTouch() const noexcept { + const TInstant& LastTouch() const noexcept { return TouchTime_; } @@ -89,31 +89,31 @@ class TPooledSocket { friend class TSocketPool; public: - TPooledSocket() + TPooledSocket() : Impl_(nullptr) { } - TPooledSocket(TImpl* impl) + TPooledSocket(TImpl* impl) : Impl_(impl) { } - ~TPooledSocket() { + ~TPooledSocket() { if (UncaughtException() && !!Impl_) { Close(); } } - operator SOCKET() const noexcept { + operator SOCKET() const noexcept { return Impl_->Socket(); } - void SetKeepAlive(bool ka) { + void SetKeepAlive(bool ka) { Impl_->SetKeepAlive(ka); } - void Close() noexcept { + void Close() noexcept { Impl_->Close(); } @@ -122,13 +122,13 @@ private: }; struct TConnectData { - TConnectData(TCont* cont, const TInstant& deadLine) + TConnectData(TCont* cont, const TInstant& deadLine) : Cont(cont) , DeadLine(deadLine) { } - TConnectData(TCont* cont, const TDuration& timeOut) + TConnectData(TCont* cont, const TDuration& timeOut) : Cont(cont) , DeadLine(TInstant::Now() + timeOut) { @@ -144,17 +144,17 @@ class TSocketPool { public: typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef; - TSocketPool(int ip, int port) + TSocketPool(int ip, int port) : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port))) { } - TSocketPool(const TAddrRef& addr) + TSocketPool(const TAddrRef& addr) : Addr_(addr) { } - void EraseStale(const TInstant& maxAge) noexcept { + void EraseStale(const TInstant& maxAge) noexcept { TSockets toDelete; { @@ -170,7 +170,7 @@ public: } } - TPooledSocket Get(TConnectData* conn) { + TPooledSocket Get(TConnectData* conn) { TPooledSocket ret; if (TPooledSocket::TImpl* alive = GetImpl()) { @@ -184,7 +184,7 @@ public: return ret; } - bool GetAlive(TPooledSocket& socket) { + bool GetAlive(TPooledSocket& socket) { if (TPooledSocket::TImpl* alive = GetImpl()) { alive->Touch(); socket = TPooledSocket(alive); @@ -194,7 +194,7 @@ public: } private: - TPooledSocket::TImpl* GetImpl() { + TPooledSocket::TImpl* GetImpl() { TGuard<TMutex> guard(Mutex_); while (!Pool_.Empty()) { @@ -207,7 +207,7 @@ private: return nullptr; } - void Release(TPooledSocket::TImpl* impl) noexcept { + void Release(TPooledSocket::TImpl* impl) noexcept { TGuard<TMutex> guard(Mutex_); Pool_.PushFront(impl); @@ -217,7 +217,7 @@ private: private: TAddrRef Addr_; - using TSockets = TIntrusiveListWithAutoDelete<TPooledSocket::TImpl, TDelete>; + using TSockets = TIntrusiveListWithAutoDelete<TPooledSocket::TImpl, TDelete>; TSockets Pool_; TMutex Mutex_; }; @@ -226,24 +226,24 @@ inline void TPooledSocket::TImpl::ReturnToPool() noexcept { Pool_->Release(this); } - + class TContIO: public IInputStream, public IOutputStream { public: - TContIO(SOCKET fd, TCont* cont) + TContIO(SOCKET fd, TCont* cont) : Fd_(fd) , Cont_(cont) { } void DoWrite(const void* buf, size_t len) override { - NCoro::WriteI(Cont_, Fd_, buf, len).Checked(); + NCoro::WriteI(Cont_, Fd_, buf, len).Checked(); } size_t DoRead(void* buf, size_t len) override { - return NCoro::ReadI(Cont_, Fd_, buf, len).Checked(); + return NCoro::ReadI(Cont_, Fd_, buf, len).Checked(); } - SOCKET Fd() const noexcept { + SOCKET Fd() const noexcept { return Fd_; } diff --git a/library/cpp/coroutine/engine/trampoline.cpp b/library/cpp/coroutine/engine/trampoline.cpp index 38e3951a51..10ea69ddc3 100644 --- a/library/cpp/coroutine/engine/trampoline.cpp +++ b/library/cpp/coroutine/engine/trampoline.cpp @@ -1,50 +1,50 @@ -#include "impl.h" -#include "trampoline.h" +#include "impl.h" +#include "trampoline.h" #include "stack/stack_allocator.h" -#include <util/system/info.h> -#include <util/system/protect.h> -#include <util/system/valgrind.h> +#include <util/system/info.h> +#include <util/system/protect.h> +#include <util/system/valgrind.h> #include <util/system/yassert.h> -#include <cstdlib> -#include <util/stream/format.h> +#include <cstdlib> +#include <util/stream/format.h> -namespace NCoro { - +namespace NCoro { + TTrampoline::TTrampoline(NStack::IAllocator& allocator, ui32 stackSize, TFunc f, TCont* cont) noexcept : Stack_(allocator, stackSize, cont->Name()) , Clo_{this, Stack_.Get(), cont->Name()} - , Ctx_(Clo_) + , Ctx_(Clo_) , Func_(std::move(f)) - , Cont_(cont) - {} - - void TTrampoline::DoRun() { + , Cont_(cont) + {} + + void TTrampoline::DoRun() { if (Cont_->Executor()->FailOnError()) { Func_(Cont_); } else { try { Func_(Cont_); } catch (...) {} - } - - Cont_->Terminate(); - } - - TArrayRef<char> TTrampoline::Stack() noexcept { - return Stack_.Get(); - } - + } + + Cont_->Terminate(); + } + + TArrayRef<char> TTrampoline::Stack() noexcept { + return Stack_.Get(); + } + const char* TTrampoline::ContName() const noexcept { return Cont_->Name(); } - + void TTrampoline::DoRunNaked() { DoRun(); abort(); } -} +} diff --git a/library/cpp/coroutine/engine/trampoline.h b/library/cpp/coroutine/engine/trampoline.h index 5ece7873b0..37b61cf015 100644 --- a/library/cpp/coroutine/engine/trampoline.h +++ b/library/cpp/coroutine/engine/trampoline.h @@ -1,60 +1,60 @@ -#pragma once +#pragma once #include "stack/stack_common.h" #include "stack/stack.h" -#include <util/generic/noncopyable.h> -#include <util/generic/ptr.h> +#include <util/generic/noncopyable.h> +#include <util/generic/ptr.h> #include <util/system/context.h> #include <util/system/defaults.h> -#if !defined(STACK_GROW_DOWN) -# error "unsupported" -#endif - +#if !defined(STACK_GROW_DOWN) +# error "unsupported" +#endif + class TCont; -typedef void (*TContFunc)(TCont*, void*); +typedef void (*TContFunc)(TCont*, void*); + +namespace NCoro { -namespace NCoro { - namespace NStack { class IAllocator; - } - - class TTrampoline : public ITrampoLine, TNonCopyable { - public: + } + + class TTrampoline : public ITrampoLine, TNonCopyable { + public: typedef std::function<void (TCont*)> TFunc; - TTrampoline( + TTrampoline( NCoro::NStack::IAllocator& allocator, uint32_t stackSize, TFunc f, TCont* cont - ) noexcept; + ) noexcept; + + TArrayRef<char> Stack() noexcept; - TArrayRef<char> Stack() noexcept; + TExceptionSafeContext* Context() noexcept { + return &Ctx_; + } - TExceptionSafeContext* Context() noexcept { - return &Ctx_; - } - - void SwitchTo(TExceptionSafeContext* ctx) noexcept { + void SwitchTo(TExceptionSafeContext* ctx) noexcept { Y_VERIFY(Stack_.LowerCanaryOk(), "Stack overflow (%s)", ContName()); Y_VERIFY(Stack_.UpperCanaryOk(), "Stack override (%s)", ContName()); - Ctx_.SwitchTo(ctx); - } - + Ctx_.SwitchTo(ctx); + } + void DoRun() override; - + void DoRunNaked() override; - private: + private: const char* ContName() const noexcept; private: NStack::TStackHolder Stack_; - const TContClosure Clo_; - TExceptionSafeContext Ctx_; + const TContClosure Clo_; + TExceptionSafeContext Ctx_; TFunc Func_; - TCont* const Cont_; + TCont* const Cont_; }; } diff --git a/library/cpp/coroutine/engine/ya.make b/library/cpp/coroutine/engine/ya.make index 2ea3c237c7..8c20b9afc3 100644 --- a/library/cpp/coroutine/engine/ya.make +++ b/library/cpp/coroutine/engine/ya.make @@ -1,24 +1,24 @@ LIBRARY() -OWNER( - pg - g:balancer -) +OWNER( + pg + g:balancer +) GENERATE_ENUM_SERIALIZATION(poller.h) GENERATE_ENUM_SERIALIZATION(stack/stack_common.h) - + PEERDIR( contrib/libs/libc_compat library/cpp/containers/intrusive_rb_tree ) SRCS( - cont_poller.cpp + cont_poller.cpp helper.cpp impl.cpp iostatus.cpp - network.cpp + network.cpp poller.cpp sockpool.cpp stack/stack.cpp @@ -26,7 +26,7 @@ SRCS( stack/stack_guards.cpp stack/stack_storage.cpp stack/stack_utils.cpp - trampoline.cpp + trampoline.cpp ) END() diff --git a/library/cpp/coroutine/listener/listen.cpp b/library/cpp/coroutine/listener/listen.cpp index 02441c879b..3d4e711d1d 100644 --- a/library/cpp/coroutine/listener/listen.cpp +++ b/library/cpp/coroutine/listener/listen.cpp @@ -107,7 +107,7 @@ private: SetDeferAccept(ListenSocket_); } - C_ = Parent_->E_->Create<TOneSocketListener, &TOneSocketListener::Run>(this, "listen_job"); + C_ = Parent_->E_->Create<TOneSocketListener, &TOneSocketListener::Run>(this, "listen_job"); } } @@ -120,7 +120,7 @@ private: C_->Cancel(); while (C_) { - Parent_->E_->Running()->Yield(); + Parent_->E_->Running()->Yield(); } } } @@ -130,7 +130,7 @@ private: while (!C_->Cancelled()) { try { TOpaqueAddr remote; - const int res = NCoro::AcceptI(C_, ListenSocket_, remote.MutableAddr(), remote.LenPtr()); + const int res = NCoro::AcceptI(C_, ListenSocket_, remote.MutableAddr(), remote.LenPtr()); if (res < 0) { const int err = -res; @@ -277,16 +277,16 @@ TContListener::TContListener(ICallBack* cb, TContExecutor* e, const TOptions& op TContListener::~TContListener() { } -namespace { - template <class T> - static inline T&& CheckImpl(T&& impl) { - Y_ENSURE_EX(impl, yexception() << "not running"); - return std::forward<T>(impl); +namespace { + template <class T> + static inline T&& CheckImpl(T&& impl) { + Y_ENSURE_EX(impl, yexception() << "not running"); + return std::forward<T>(impl); } -} +} void TContListener::Listen(const IRemoteAddr& addr) { - CheckImpl(Impl_)->Listen(addr); + CheckImpl(Impl_)->Listen(addr); } void TContListener::Listen(const TIpAddress& addr) { @@ -300,11 +300,11 @@ void TContListener::Listen(const TNetworkAddress& addr) { } void TContListener::Listen() { - CheckImpl(Impl_)->Listen(); + CheckImpl(Impl_)->Listen(); } void TContListener::Bind(const IRemoteAddr& addr) { - CheckImpl(Impl_)->Bind(addr); + CheckImpl(Impl_)->Bind(addr); } void TContListener::Bind(const TIpAddress& addr) { @@ -312,7 +312,7 @@ void TContListener::Bind(const TIpAddress& addr) { } void TContListener::Bind(const TNetworkAddress& addr) { - CheckImpl(Impl_)->Bind(addr); + CheckImpl(Impl_)->Bind(addr); } void TContListener::Stop() noexcept { @@ -320,7 +320,7 @@ void TContListener::Stop() noexcept { } void TContListener::StopListenAddr(const IRemoteAddr& addr) { - CheckImpl(Impl_)->StopListenAddr(addr); + CheckImpl(Impl_)->StopListenAddr(addr); } void TContListener::StopListenAddr(const TIpAddress& addr) { diff --git a/library/cpp/coroutine/listener/listen.h b/library/cpp/coroutine/listener/listen.h index 998284ec95..3a89cd3ecc 100644 --- a/library/cpp/coroutine/listener/listen.h +++ b/library/cpp/coroutine/listener/listen.h @@ -1,4 +1,4 @@ -#pragma once +#pragma once #include <util/generic/ptr.h> #include <util/generic/ylimits.h> |