diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/coroutine | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/coroutine')
24 files changed, 1509 insertions, 1509 deletions
diff --git a/library/cpp/coroutine/engine/condvar.h b/library/cpp/coroutine/engine/condvar.h index ffceede6fa..384a6c19f8 100644 --- a/library/cpp/coroutine/engine/condvar.h +++ b/library/cpp/coroutine/engine/condvar.h @@ -1,38 +1,38 @@ #pragma once - + #include "events.h" #include "mutex.h" -class TContCondVar { -public: +class TContCondVar { +public: int WaitD(TCont* current, TContMutex* mutex, TInstant deadline) { - mutex->UnLock(); - - const int ret = WaitQueue_.WaitD(current, deadline); - - if (ret != EWAKEDUP) { + mutex->UnLock(); + + const int ret = WaitQueue_.WaitD(current, deadline); + + if (ret != EWAKEDUP) { return ret; } - return mutex->LockD(current, deadline); - } - + return mutex->LockD(current, deadline); + } + int WaitT(TCont* current, TContMutex* mutex, TDuration timeout) { - return WaitD(current, mutex, timeout.ToDeadLine()); - } - + return WaitD(current, mutex, timeout.ToDeadLine()); + } + int WaitI(TCont* current, TContMutex* mutex) { - return WaitD(current, mutex, TInstant::Max()); - } - + return WaitD(current, mutex, TInstant::Max()); + } + void Signal() noexcept { - WaitQueue_.Signal(); - } - + WaitQueue_.Signal(); + } + void BroadCast() noexcept { - WaitQueue_.BroadCast(); - } - -private: - TContWaitQueue WaitQueue_; -}; + WaitQueue_.BroadCast(); + } + +private: + TContWaitQueue WaitQueue_; +}; diff --git a/library/cpp/coroutine/engine/cont_poller.cpp b/library/cpp/coroutine/engine/cont_poller.cpp index 76593d4e9b..6073b72e62 100644 --- a/library/cpp/coroutine/engine/cont_poller.cpp +++ b/library/cpp/coroutine/engine/cont_poller.cpp @@ -1,6 +1,6 @@ #include "cont_poller.h" -#include "impl.h" - +#include "impl.h" + namespace NCoro { namespace { template <class T> @@ -20,8 +20,8 @@ namespace NCoro { return event->Status(); } - } - + } + void TContPollEvent::Wake() noexcept { UnLink(); Cont()->ReSchedule(); @@ -55,7 +55,7 @@ namespace NCoro { }; IoWait_.ForEach(visitor); } -} +} void TFdEvent::RemoveFromIOWait() noexcept { this->Cont()->Executor()->Poller()->Remove(this); diff --git a/library/cpp/coroutine/engine/cont_poller.h b/library/cpp/coroutine/engine/cont_poller.h index b638b2df1a..e6b23149c3 100644 --- a/library/cpp/coroutine/engine/cont_poller.h +++ b/library/cpp/coroutine/engine/cont_poller.h @@ -1,27 +1,27 @@ #pragma once - -#include "poller.h" + +#include "poller.h" #include "sockmap.h" - + #include <library/cpp/containers/intrusive_rb_tree/rb_tree.h> #include <util/datetime/base.h> #include <util/memory/pool.h> -#include <util/memory/smallobj.h> +#include <util/memory/smallobj.h> #include <util/network/init.h> - + #include <cerrno> -class TCont; -class TContExecutor; +class TCont; +class TContExecutor; class TFdEvent; - + namespace NCoro { class IPollEvent; - - + + struct TContPollEventCompare { template <class T> static inline bool Compare(const T& l, const T& r) noexcept { @@ -29,74 +29,74 @@ namespace NCoro { } }; - + class TContPollEvent : public TRbTreeItem<TContPollEvent, TContPollEventCompare> { public: TContPollEvent(TCont* cont, TInstant deadLine) noexcept : Cont_(cont) , DeadLine_(deadLine) {} - + static bool Compare(const TContPollEvent& l, const TContPollEvent& r) noexcept { return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r); } - + int Status() const noexcept { return Status_; } - + void SetStatus(int status) noexcept { Status_ = status; } - + TCont* Cont() noexcept { return Cont_; } - + TInstant DeadLine() const noexcept { return DeadLine_; } - + void Wake(int status) noexcept { SetStatus(status); Wake(); - } - + } + private: void Wake() noexcept; - + private: TCont* Cont_; TInstant DeadLine_; int Status_ = EINPROGRESS; - }; - - + }; + + class IPollEvent: public TIntrusiveListItem<IPollEvent> { public: IPollEvent(SOCKET fd, ui16 what) noexcept : Fd_(fd) , What_(what) {} - + virtual ~IPollEvent() {} - + SOCKET Fd() const noexcept { return Fd_; } - + int What() const noexcept { return What_; } - + virtual void OnPollEvent(int status) noexcept = 0; - + private: SOCKET Fd_; ui16 What_; }; - - + + template <class T> class TBigArray { struct TValue: public T, public TObjectFromPool<TValue> { @@ -124,27 +124,27 @@ namespace NCoro { using TPollEventList = TIntrusiveList<IPollEvent>; - + class TContPoller { public: using TEvent = IPollerFace::TEvent; using TEvents = IPollerFace::TEvents; - + TContPoller() : P_(IPollerFace::Default()) { } - + explicit TContPoller(THolder<IPollerFace> poller) : P_(std::move(poller)) {} - + void Schedule(IPollEvent* event) { auto* lst = Lists_.Get(event->Fd()); const ui16 oldFlags = Flags(*lst); lst->PushFront(event); ui16 newFlags = Flags(*lst); - + if (newFlags != oldFlags) { if (oldFlags) { newFlags |= CONT_POLL_MODIFY; @@ -152,14 +152,14 @@ namespace NCoro { P_->Set(lst, event->Fd(), newFlags); } - } - + } + void Remove(IPollEvent* event) noexcept { auto* lst = Lists_.Get(event->Fd()); const ui16 oldFlags = Flags(*lst); event->Unlink(); ui16 newFlags = Flags(*lst); - + if (newFlags != oldFlags) { if (newFlags) { newFlags |= CONT_POLL_MODIFY; @@ -167,13 +167,13 @@ namespace NCoro { P_->Set(lst, event->Fd(), newFlags); } - } - + } + void Wait(TEvents& events, TInstant deadLine) { events.clear(); P_->Wait(events, deadLine); - } - + } + EContPoller PollEngine() const { return P_->PollEngine(); } @@ -182,14 +182,14 @@ namespace NCoro { ui16 ret = 0; for (auto&& item : lst) { ret |= item.What(); - } + } return ret; - } - + } + private: TBigArray<TPollEventList> Lists_; THolder<IPollerFace> P_; - }; + }; class TEventWaitQueue { @@ -210,12 +210,12 @@ namespace NCoro { TIoWait IoWait_; }; } - + class TFdEvent final: public NCoro::TContPollEvent, public NCoro::IPollEvent { -public: +public: TFdEvent(TCont* cont, SOCKET fd, ui16 what, TInstant deadLine) noexcept : TContPollEvent(cont, deadLine) , IPollEvent(fd, what) @@ -223,22 +223,22 @@ public: ~TFdEvent() { RemoveFromIOWait(); - } - + } + void RemoveFromIOWait() noexcept; - + void OnPollEvent(int status) noexcept override { Wake(status); - } -}; - + } +}; + class TTimerEvent: public NCoro::TContPollEvent { -public: +public: TTimerEvent(TCont* cont, TInstant deadLine) noexcept : TContPollEvent(cont, deadLine) {} -}; +}; int ExecuteEvent(TFdEvent* event) noexcept; diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp index 8b372496a2..29e0d1dfc5 100644 --- a/library/cpp/coroutine/engine/coroutine_ut.cpp +++ b/library/cpp/coroutine/engine/coroutine_ut.cpp @@ -1,34 +1,34 @@ -#include "impl.h" +#include "impl.h" #include "condvar.h" #include "network.h" - + #include <library/cpp/testing/unittest/registar.h> - -#include <util/string/cast.h> + +#include <util/string/cast.h> #include <util/system/pipe.h> #include <util/system/env.h> #include <util/system/info.h> #include <util/system/thread.h> #include <util/generic/xrange.h> #include <util/generic/serialized_enum.h> - + // TODO (velavokr): BALANCER-1345 add more tests on pollers -class TCoroTest: public TTestBase { - UNIT_TEST_SUITE(TCoroTest); - UNIT_TEST(TestSimpleX1); +class TCoroTest: public TTestBase { + UNIT_TEST_SUITE(TCoroTest); + UNIT_TEST(TestSimpleX1); UNIT_TEST(TestSimpleX1MultiThread); - UNIT_TEST(TestSimpleX2); - UNIT_TEST(TestSimpleX3); - UNIT_TEST(TestMemFun); - UNIT_TEST(TestMutex); - UNIT_TEST(TestCondVar); + UNIT_TEST(TestSimpleX2); + UNIT_TEST(TestSimpleX3); + UNIT_TEST(TestMemFun); + UNIT_TEST(TestMutex); + UNIT_TEST(TestCondVar); UNIT_TEST(TestJoinDefault); UNIT_TEST(TestJoinEpoll); UNIT_TEST(TestJoinKqueue); UNIT_TEST(TestJoinPoll); UNIT_TEST(TestJoinSelect); - UNIT_TEST(TestException); + UNIT_TEST(TestException); UNIT_TEST(TestJoinCancelExitRaceBug); UNIT_TEST(TestWaitWakeLivelockBug); UNIT_TEST(TestFastPathWakeDefault) @@ -46,17 +46,17 @@ class TCoroTest: public TTestBase { UNIT_TEST(TestUserEvent); UNIT_TEST(TestPause); UNIT_TEST(TestOverrideTime); - UNIT_TEST_SUITE_END(); - -public: - void TestException(); - void TestSimpleX1(); + UNIT_TEST_SUITE_END(); + +public: + void TestException(); + void TestSimpleX1(); void TestSimpleX1MultiThread(); - void TestSimpleX2(); - void TestSimpleX3(); - void TestMemFun(); - void TestMutex(); - void TestCondVar(); + void TestSimpleX2(); + void TestSimpleX3(); + void TestMemFun(); + void TestMutex(); + void TestCondVar(); void TestJoinDefault(); void TestJoinEpoll(); void TestJoinKqueue(); @@ -78,72 +78,72 @@ public: void TestUserEvent(); void TestPause(); void TestOverrideTime(); -}; - -void TCoroTest::TestException() { - TContExecutor e(1000000); - - bool f2run = false; - - auto f1 = [&f2run](TCont* c) { - struct TCtx { - ~TCtx() { +}; + +void TCoroTest::TestException() { + TContExecutor e(1000000); + + bool f2run = false; + + auto f1 = [&f2run](TCont* c) { + struct TCtx { + ~TCtx() { Y_VERIFY(!*F2); - - C->Yield(); - } - - TCont* C; - bool* F2; - }; - - try { - TCtx ctx = {c, &f2run}; - - throw 1; - } catch (...) { - } - }; - - bool unc = true; - - auto f2 = [&unc, &f2run](TCont*) { - f2run = true; - unc = std::uncaught_exception(); - - // check segfault - try { - throw 2; - } catch (int) { - } - }; - - e.Create(f1, "f1"); - e.Create(f2, "f2"); - e.Execute(); - - UNIT_ASSERT(!unc); -} - + + C->Yield(); + } + + TCont* C; + bool* F2; + }; + + try { + TCtx ctx = {c, &f2run}; + + throw 1; + } catch (...) { + } + }; + + bool unc = true; + + auto f2 = [&unc, &f2run](TCont*) { + f2run = true; + unc = std::uncaught_exception(); + + // check segfault + try { + throw 2; + } catch (int) { + } + }; + + e.Create(f1, "f1"); + e.Create(f2, "f2"); + e.Execute(); + + UNIT_ASSERT(!unc); +} + static int i0; - -static void CoRun(TCont* c, void* /*run*/) { + +static void CoRun(TCont* c, void* /*run*/) { while (i0 < 100000) { ++i0; UNIT_ASSERT(RunningCont() == c); - c->Yield(); + c->Yield(); UNIT_ASSERT(RunningCont() == c); - } -} - -static void CoMain(TCont* c, void* /*arg*/) { + } +} + +static void CoMain(TCont* c, void* /*arg*/) { for (volatile size_t i2 = 0; i2 < 10; ++i2) { UNIT_ASSERT(RunningCont() == c); c->Executor()->Create(CoRun, nullptr, "run"); UNIT_ASSERT(RunningCont() == c); - } -} - + } +} + void TCoroTest::TestSimpleX1() { i0 = 0; TContExecutor e(32000); @@ -178,17 +178,17 @@ void TCoroTest::TestSimpleX1MultiThread() { UNIT_ASSERT_EQUAL(c, nThreads); } -struct TTestObject { +struct TTestObject { int i = 0; int j = 0; - + public: - void RunTask1(TCont*) { - i = 1; - } - void RunTask2(TCont*) { - j = 2; - } + void RunTask1(TCont*) { + i = 1; + } + void RunTask2(TCont*) { + j = 2; + } }; void TCoroTest::TestMemFun() { @@ -201,130 +201,130 @@ void TCoroTest::TestMemFun() { UNIT_ASSERT_EQUAL(obj.j, 2); } -void TCoroTest::TestSimpleX2() { - { +void TCoroTest::TestSimpleX2() { + { i0 = 0; - - { - TContExecutor e(32000); - e.Execute(CoMain); - } - + + { + TContExecutor e(32000); + e.Execute(CoMain); + } + UNIT_ASSERT_EQUAL(i0, 100000); - } - - { + } + + { i0 = 0; - - { - TContExecutor e(32000); - e.Execute(CoMain); - } - + + { + TContExecutor e(32000); + e.Execute(CoMain); + } + UNIT_ASSERT_EQUAL(i0, 100000); - } -} - -struct TRunner { - inline TRunner() - : Runs(0) - { - } - - inline void operator()(TCont* c) { - ++Runs; - c->Yield(); - } - - size_t Runs; -}; - -void TCoroTest::TestSimpleX3() { - TContExecutor e(32000); - TRunner runner; - + } +} + +struct TRunner { + inline TRunner() + : Runs(0) + { + } + + inline void operator()(TCont* c) { + ++Runs; + c->Yield(); + } + + size_t Runs; +}; + +void TCoroTest::TestSimpleX3() { + TContExecutor e(32000); + TRunner runner; + for (volatile size_t i3 = 0; i3 < 1000; ++i3) { - e.Create(runner, "runner"); - } - - e.Execute(); - - UNIT_ASSERT_EQUAL(runner.Runs, 1000); -} - + e.Create(runner, "runner"); + } + + e.Execute(); + + UNIT_ASSERT_EQUAL(runner.Runs, 1000); +} + static TString res; -static TContMutex mutex; - -static void CoMutex(TCont* c, void* /*run*/) { - { - mutex.LockI(c); - c->Yield(); - res += c->Name(); - mutex.UnLock(); - } - - c->Yield(); - - { - mutex.LockI(c); - c->Yield(); - res += c->Name(); - mutex.UnLock(); - } -} - -static void CoMutexTest(TCont* c, void* /*run*/) { +static TContMutex mutex; + +static void CoMutex(TCont* c, void* /*run*/) { + { + mutex.LockI(c); + c->Yield(); + res += c->Name(); + mutex.UnLock(); + } + + c->Yield(); + + { + mutex.LockI(c); + c->Yield(); + res += c->Name(); + mutex.UnLock(); + } +} + +static void CoMutexTest(TCont* c, void* /*run*/) { c->Executor()->Create(CoMutex, nullptr, "1"); c->Executor()->Create(CoMutex, nullptr, "2"); -} - -void TCoroTest::TestMutex() { - TContExecutor e(32000); - e.Execute(CoMutexTest); - UNIT_ASSERT_EQUAL(res, "1212"); +} + +void TCoroTest::TestMutex() { + TContExecutor e(32000); + e.Execute(CoMutexTest); + UNIT_ASSERT_EQUAL(res, "1212"); res.clear(); -} - -static TContMutex m1; -static TContCondVar c1; - -static void CoCondVar(TCont* c, void* /*run*/) { +} + +static TContMutex m1; +static TContCondVar c1; + +static void CoCondVar(TCont* c, void* /*run*/) { for (size_t i4 = 0; i4 < 3; ++i4) { - UNIT_ASSERT_EQUAL(m1.LockI(c), 0); - UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0); - res += c->Name(); - m1.UnLock(); - } -} - -static void CoCondVarTest(TCont* c, void* /*run*/) { + UNIT_ASSERT_EQUAL(m1.LockI(c), 0); + UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0); + res += c->Name(); + m1.UnLock(); + } +} + +static void CoCondVarTest(TCont* c, void* /*run*/) { c->Executor()->Create(CoCondVar, nullptr, "1"); - c->Yield(); + c->Yield(); c->Executor()->Create(CoCondVar, nullptr, "2"); - c->Yield(); + c->Yield(); c->Executor()->Create(CoCondVar, nullptr, "3"); - c->Yield(); + c->Yield(); c->Executor()->Create(CoCondVar, nullptr, "4"); - c->Yield(); + c->Yield(); c->Executor()->Create(CoCondVar, nullptr, "5"); - c->Yield(); + c->Yield(); c->Executor()->Create(CoCondVar, nullptr, "6"); - c->Yield(); - + c->Yield(); + for (size_t i5 = 0; i5 < 3; ++i5) { res += ToString((size_t)i5) + "^"; - c1.BroadCast(); - c->Yield(); - } -} - -void TCoroTest::TestCondVar() { - TContExecutor e(32000); - e.Execute(CoCondVarTest); - UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456"); + c1.BroadCast(); + c->Yield(); + } +} + +void TCoroTest::TestCondVar() { + TContExecutor e(32000); + e.Execute(CoCondVarTest); + UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456"); res.clear(); -} - +} + namespace NCoroTestJoin { struct TSleepCont { const TInstant Deadline; @@ -1004,4 +1004,4 @@ void TCoroTest::TestOverrideTime() { executor.Execute(); } -UNIT_TEST_SUITE_REGISTRATION(TCoroTest); +UNIT_TEST_SUITE_REGISTRATION(TCoroTest); diff --git a/library/cpp/coroutine/engine/events.h b/library/cpp/coroutine/engine/events.h index 07cc4d25e8..2e879999bf 100644 --- a/library/cpp/coroutine/engine/events.h +++ b/library/cpp/coroutine/engine/events.h @@ -1,148 +1,148 @@ #pragma once - + #include "impl.h" #include <util/datetime/base.h> -class TContEvent { -public: +class TContEvent { +public: TContEvent(TCont* current) noexcept - : Cont_(current) - , Status_(0) - { - } - + : Cont_(current) + , Status_(0) + { + } + ~TContEvent() { - } - + } + int WaitD(TInstant deadline) { - Status_ = 0; - const int ret = Cont_->SleepD(deadline); - - return Status_ ? Status_ : ret; - } - + Status_ = 0; + const int ret = Cont_->SleepD(deadline); + + return Status_ ? Status_ : ret; + } + int WaitT(TDuration timeout) { - return WaitD(timeout.ToDeadLine()); - } - + return WaitD(timeout.ToDeadLine()); + } + int WaitI() { - return WaitD(TInstant::Max()); - } - + return WaitD(TInstant::Max()); + } + void Wake() noexcept { - SetStatus(EWAKEDUP); - Cont_->ReSchedule(); - } - + SetStatus(EWAKEDUP); + Cont_->ReSchedule(); + } + TCont* Cont() noexcept { - return Cont_; - } - + return Cont_; + } + int Status() const noexcept { - return Status_; - } - + return Status_; + } + void SetStatus(int status) noexcept { - Status_ = status; - } - -private: - TCont* Cont_; - int Status_; -}; - -class TContWaitQueue { - class TWaiter: public TContEvent, public TIntrusiveListItem<TWaiter> { - public: + Status_ = status; + } + +private: + TCont* Cont_; + int Status_; +}; + +class TContWaitQueue { + class TWaiter: public TContEvent, public TIntrusiveListItem<TWaiter> { + public: TWaiter(TCont* current) noexcept - : TContEvent(current) - { - } - + : TContEvent(current) + { + } + ~TWaiter() { - } - }; - -public: + } + }; + +public: TContWaitQueue() noexcept { - } - + } + ~TContWaitQueue() { Y_ASSERT(Waiters_.Empty()); - } - + } + int WaitD(TCont* current, TInstant deadline) { - TWaiter waiter(current); - - Waiters_.PushBack(&waiter); - - return waiter.WaitD(deadline); - } - + TWaiter waiter(current); + + Waiters_.PushBack(&waiter); + + return waiter.WaitD(deadline); + } + int WaitT(TCont* current, TDuration timeout) { - return WaitD(current, timeout.ToDeadLine()); - } - + return WaitD(current, timeout.ToDeadLine()); + } + int WaitI(TCont* current) { - return WaitD(current, TInstant::Max()); - } - + return WaitD(current, TInstant::Max()); + } + void Signal() noexcept { - if (!Waiters_.Empty()) { - Waiters_.PopFront()->Wake(); - } - } + if (!Waiters_.Empty()) { + Waiters_.PopFront()->Wake(); + } + } void BroadCast() noexcept { - while (!Waiters_.Empty()) { - Waiters_.PopFront()->Wake(); - } - } - + while (!Waiters_.Empty()) { + Waiters_.PopFront()->Wake(); + } + } + void BroadCast(size_t number) noexcept { for (size_t i = 0; i < number && !Waiters_.Empty(); ++i) { Waiters_.PopFront()->Wake(); } } -private: - TIntrusiveList<TWaiter> Waiters_; -}; - +private: + TIntrusiveList<TWaiter> Waiters_; +}; + -class TContSimpleEvent { -public: +class TContSimpleEvent { +public: TContSimpleEvent(TContExecutor* e) - : E_(e) - { - } - + : E_(e) + { + } + TContExecutor* Executor() const noexcept { - return E_; - } - + return E_; + } + void Signal() noexcept { - Q_.Signal(); - } - + Q_.Signal(); + } + void BroadCast() noexcept { - Q_.BroadCast(); - } - + Q_.BroadCast(); + } + int WaitD(TInstant deadLine) noexcept { return Q_.WaitD(E_->Running(), deadLine); - } - + } + int WaitT(TDuration timeout) noexcept { - return WaitD(timeout.ToDeadLine()); - } - + return WaitD(timeout.ToDeadLine()); + } + int WaitI() noexcept { - return WaitD(TInstant::Max()); - } - -private: - TContWaitQueue Q_; - TContExecutor* E_; -}; + return WaitD(TInstant::Max()); + } + +private: + TContWaitQueue Q_; + TContExecutor* E_; +}; diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp index 7ae6f74051..df08ddd3cc 100644 --- a/library/cpp/coroutine/engine/impl.cpp +++ b/library/cpp/coroutine/engine/impl.cpp @@ -1,15 +1,15 @@ #include "impl.h" - + #include "stack/stack_allocator.h" #include "stack/stack_guards.h" #include <util/generic/scope.h> #include <util/thread/singleton.h> #include <util/stream/format.h> -#include <util/stream/output.h> +#include <util/stream/output.h> #include <util/system/yassert.h> - - + + TCont::TJoinWait::TJoinWait(TCont& c) noexcept : Cont_(c) {} @@ -35,12 +35,12 @@ TCont::TCont(NCoro::NStack::IAllocator& allocator, void TCont::PrintMe(IOutputStream& out) const noexcept { - out << "cont(" + out << "cont(" << "name = " << Name_ << ", " << "addr = " << Hex((size_t)this) - << ")"; -} - + << ")"; +} + bool TCont::Join(TCont* c, TInstant deadLine) noexcept { TJoinWait ev(*this); c->Waiters_.PushBack(&ev); @@ -148,8 +148,8 @@ void TContExecutor::Execute(TContFunc func, void* arg) noexcept { func(cont, arg); }, "sys_main"); RunScheduler(); -} - +} + void TContExecutor::WaitForIO() { while (Ready_.Empty() && !WaitQueue_.Empty()) { const auto now = Now(); @@ -190,8 +190,8 @@ void TContExecutor::WaitForIO() { Ready_.Append(ReadyNext_); } -} - +} + void TContExecutor::Poll(TInstant deadline) { Poller_.Wait(PollerEvents_, deadline); LastPoll_ = Now(); @@ -217,8 +217,8 @@ void TContExecutor::Poll(TInstant deadline) { } } } -} - +} + void TContExecutor::Abort() noexcept { WaitQueue_.Abort(); auto visitor = [](TCont* c) { @@ -324,7 +324,7 @@ void TContExecutor::RunScheduler() noexcept { } TCont* cont = Ready_.PopFront(); - + if (ScheduleCallback_) { ScheduleCallback_->OnSchedule(*this, *cont); } @@ -347,9 +347,9 @@ void TContExecutor::RunScheduler() noexcept { } catch (...) { TBackTrace::FromCurrentException().PrintTo(Cerr); Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str()); - } + } } - + void TContExecutor::Pause() { if (auto cont = Running()) { Paused_ = true; @@ -371,4 +371,4 @@ TInstant TContExecutor::Now() { template <> void Out<TCont>(IOutputStream& out, const TCont& c) { c.PrintMe(out); -} +} diff --git a/library/cpp/coroutine/engine/impl.h b/library/cpp/coroutine/engine/impl.h index 283a96ecf1..d904235ed3 100644 --- a/library/cpp/coroutine/engine/impl.h +++ b/library/cpp/coroutine/engine/impl.h @@ -1,30 +1,30 @@ #pragma once - + #include "callbacks.h" #include "cont_poller.h" #include "iostatus.h" -#include "poller.h" +#include "poller.h" #include "stack/stack_common.h" #include "trampoline.h" #include "custom_time.h" - + #include <library/cpp/containers/intrusive_rb_tree/rb_tree.h> -#include <util/system/error.h> -#include <util/generic/ptr.h> -#include <util/generic/intrlist.h> -#include <util/datetime/base.h> +#include <util/system/error.h> +#include <util/generic/ptr.h> +#include <util/generic/intrlist.h> +#include <util/datetime/base.h> #include <util/generic/maybe.h> #include <util/generic/function.h> - + #define EWAKEDUP 34567 -class TCont; -struct TContRep; -class TContExecutor; -class TContPollEvent; - +class TCont; +struct TContRep; +class TContExecutor; +class TContPollEvent; + namespace NCoro::NStack { class IAllocator; } @@ -32,18 +32,18 @@ namespace NCoro::NStack { class TCont : private TIntrusiveListItem<TCont> { struct TJoinWait: public TIntrusiveListItem<TJoinWait> { TJoinWait(TCont& c) noexcept; - + void Wake() noexcept; - + public: TCont& Cont_; }; - + friend class TContExecutor; friend class TIntrusiveListItem<TCont>; friend class NCoro::TEventWaitQueue; friend class NCoro::TTrampoline; - + private: TCont( NCoro::NStack::IAllocator& allocator, @@ -52,63 +52,63 @@ private: NCoro::TTrampoline::TFunc func, const char* name ) noexcept; - + public: TContExecutor* Executor() noexcept { return &Executor_; - } - + } + const TContExecutor* Executor() const noexcept { return &Executor_; - } - + } + const char* Name() const noexcept { - return Name_; - } - + return Name_; + } + void PrintMe(IOutputStream& out) const noexcept; - + void Yield() noexcept; - + void ReScheduleAndSwitch() noexcept; - - /// @return ETIMEDOUT on success + + /// @return ETIMEDOUT on success int SleepD(TInstant deadline) noexcept; - + int SleepT(TDuration timeout) noexcept { - return SleepD(timeout.ToDeadLine()); - } - + return SleepD(timeout.ToDeadLine()); + } + int SleepI() noexcept { - return SleepD(TInstant::Max()); - } - + return SleepD(TInstant::Max()); + } + bool IAmRunning() const noexcept; void Cancel() noexcept; bool Cancelled() const noexcept { - return Cancelled_; - } - + return Cancelled_; + } + bool Scheduled() const noexcept { return Scheduled_; } bool Join(TCont* c, TInstant deadLine = TInstant::Max()) noexcept; - + void ReSchedule() noexcept; - + void Switch() noexcept; void SwitchTo(TExceptionSafeContext* ctx) { Trampoline_.SwitchTo(ctx); } -private: +private: void Terminate(); -private: +private: TContExecutor& Executor_; // TODO(velavokr): allow name storage owning (for generated names backed by TString) @@ -119,17 +119,17 @@ private: TIntrusiveList<TJoinWait> Waiters_; bool Cancelled_ = false; bool Scheduled_ = false; -}; - +}; + TCont* RunningCont(); - - -template <class Functor> -static void ContHelperFunc(TCont* cont, void* arg) { - (*((Functor*)(arg)))(cont); -} - -template <typename T, void (T::*M)(TCont*)> + + +template <class Functor> +static void ContHelperFunc(TCont* cont, void* arg) { + (*((Functor*)(arg)))(cont); +} + +template <typename T, void (T::*M)(TCont*)> static void ContHelperMemberFunc(TCont* c, void* arg) { ((reinterpret_cast<T*>(arg))->*M)(c); } @@ -145,11 +145,11 @@ public: /// Central coroutine class. /// Note, coroutines are single-threaded, and all methods must be called from the single thread -class TContExecutor { - friend class TCont; +class TContExecutor { + friend class TCont; using TContList = TIntrusiveList<TCont>; - -public: + +public: TContExecutor( uint32_t defaultStackSize, THolder<IPollerFace> poller = IPollerFace::Default(), @@ -159,41 +159,41 @@ public: TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings = Nothing(), NCoro::ITime* time = nullptr ); - + ~TContExecutor(); - + // if we already have a coroutine to run void Execute() noexcept; - + void Execute(TContFunc func, void* arg = nullptr) noexcept; - - template <class Functor> + + template <class Functor> void Execute(Functor& f) noexcept { - Execute((TContFunc)ContHelperFunc<Functor>, (void*)&f); - } - - template <typename T, void (T::*M)(TCont*)> + Execute((TContFunc)ContHelperFunc<Functor>, (void*)&f); + } + + template <typename T, void (T::*M)(TCont*)> void Execute(T* obj) noexcept { - Execute(ContHelperMemberFunc<T, M>, obj); - } + Execute(ContHelperMemberFunc<T, M>, obj); + } - template <class Functor> + template <class Functor> TCont* Create( Functor& f, const char* name, TMaybe<ui32> customStackSize = Nothing() ) noexcept { return Create((TContFunc)ContHelperFunc<Functor>, (void*)&f, name, customStackSize); - } - - template <typename T, void (T::*M)(TCont*)> + } + + template <typename T, void (T::*M)(TCont*)> TCont* Create( T* obj, const char* name, TMaybe<ui32> customStackSize = Nothing() ) noexcept { return Create(ContHelperMemberFunc<T, M>, obj, name, customStackSize); - } + } TCont* Create( TContFunc func, @@ -201,7 +201,7 @@ public: const char* name, TMaybe<ui32> customStackSize = Nothing() ) noexcept; - + TCont* CreateOwned( NCoro::TTrampoline::TFunc func, const char* name, @@ -209,17 +209,17 @@ public: ) noexcept; NCoro::TContPoller* Poller() noexcept { - return &Poller_; - } - + return &Poller_; + } + TCont* Running() noexcept { - return Current_; - } - + return Current_; + } + const TCont* Running() const noexcept { - return Current_; - } - + return Current_; + } + size_t TotalReadyConts() const noexcept { return Ready_.Size() + TotalScheduledConts(); } @@ -240,54 +240,54 @@ public: // TODO(velavokr): rename, it is just CancelAll actually void Abort() noexcept; - + void SetFailOnError(bool fail) noexcept { - FailOnError_ = fail; - } - + FailOnError_ = fail; + } + bool FailOnError() const noexcept { - return FailOnError_; - } - + return FailOnError_; + } + void RegisterInWaitQueue(NCoro::TContPollEvent* event) { WaitQueue_.Register(event); } void ScheduleIoWait(TFdEvent* event) { RegisterInWaitQueue(event); - Poller_.Schedule(event); - } - + Poller_.Schedule(event); + } + void ScheduleIoWait(TTimerEvent* event) noexcept { RegisterInWaitQueue(event); - } - + } + void ScheduleUserEvent(IUserEvent* event) { UserEvents_.PushBack(event); } void Pause(); TInstant Now(); -private: +private: void Release(TCont* cont) noexcept; - + void Exit(TCont* cont) noexcept; - + void RunScheduler() noexcept; - + void ScheduleToDelete(TCont* cont) noexcept; - + void ScheduleExecution(TCont* cont) noexcept; - + void ScheduleExecutionNow(TCont* cont) noexcept; void DeleteScheduled() noexcept; - - void WaitForIO(); - + + void WaitForIO(); + void Poll(TInstant deadline); -private: +private: NCoro::IScheduleCallback* const ScheduleCallback_ = nullptr; NCoro::IEnterPollerCallback* const EnterPollerCallback_ = nullptr; const uint32_t DefaultStackSize_; @@ -295,8 +295,8 @@ private: TExceptionSafeContext SchedContext_; - TContList ToDelete_; - TContList Ready_; + TContList ToDelete_; + TContList Ready_; TContList ReadyNext_; NCoro::TEventWaitQueue WaitQueue_; NCoro::TContPoller Poller_; @@ -310,4 +310,4 @@ private: bool FailOnError_ = false; bool Paused_ = false; NCoro::ITime* Time_ = nullptr; -}; +}; diff --git a/library/cpp/coroutine/engine/iostatus.cpp b/library/cpp/coroutine/engine/iostatus.cpp index 8298229b39..a92142edeb 100644 --- a/library/cpp/coroutine/engine/iostatus.cpp +++ b/library/cpp/coroutine/engine/iostatus.cpp @@ -1 +1 @@ -#include "iostatus.h" +#include "iostatus.h" diff --git a/library/cpp/coroutine/engine/iostatus.h b/library/cpp/coroutine/engine/iostatus.h index bf6036805d..8478ce05e8 100644 --- a/library/cpp/coroutine/engine/iostatus.h +++ b/library/cpp/coroutine/engine/iostatus.h @@ -1,91 +1,91 @@ #pragma once - + #include <util/generic/yexception.h> -class TIOStatus { -public: +class TIOStatus { +public: TIOStatus(int status) noexcept - : Status_(status) - { - } - + : Status_(status) + { + } + static TIOStatus Error(int status) noexcept { - return TIOStatus(status); - } - + return TIOStatus(status); + } + static TIOStatus Error() noexcept { - return TIOStatus(LastSystemError()); - } - + return TIOStatus(LastSystemError()); + } + static TIOStatus Success() noexcept { - return TIOStatus(0); - } - + return TIOStatus(0); + } + void Check() const { - if (Status_) { - ythrow TSystemError(Status_) << "io error"; - } - } - + if (Status_) { + ythrow TSystemError(Status_) << "io error"; + } + } + bool Failed() const noexcept { - return (bool)Status_; - } - + return (bool)Status_; + } + bool Succeed() const noexcept { - return !Failed(); - } - + return !Failed(); + } + int Status() const noexcept { - return Status_; - } - -private: - int Status_; -}; - - -class TContIOStatus { -public: + return Status_; + } + +private: + int Status_; +}; + + +class TContIOStatus { +public: TContIOStatus(size_t processed, TIOStatus status) noexcept - : Processed_(processed) - , Status_(status) - { - } - + : Processed_(processed) + , Status_(status) + { + } + static TContIOStatus Error(TIOStatus status) noexcept { - return TContIOStatus(0, status); - } - + return TContIOStatus(0, status); + } + static TContIOStatus Error() noexcept { - return TContIOStatus(0, TIOStatus::Error()); - } - + return TContIOStatus(0, TIOStatus::Error()); + } + static TContIOStatus Success(size_t processed) noexcept { - return TContIOStatus(processed, TIOStatus::Success()); - } - + return TContIOStatus(processed, TIOStatus::Success()); + } + static TContIOStatus Eof() noexcept { - return Success(0); - } - + return Success(0); + } + ~TContIOStatus() { - } - + } + size_t Processed() const noexcept { - return Processed_; - } - + return Processed_; + } + int Status() const noexcept { - return Status_.Status(); - } - + return Status_.Status(); + } + size_t Checked() const { - Status_.Check(); - - return Processed_; - } - -private: - size_t Processed_; - TIOStatus Status_; -}; + Status_.Check(); + + return Processed_; + } + +private: + size_t Processed_; + TIOStatus Status_; +}; diff --git a/library/cpp/coroutine/engine/mutex.h b/library/cpp/coroutine/engine/mutex.h index 93e9119503..27392f7d85 100644 --- a/library/cpp/coroutine/engine/mutex.h +++ b/library/cpp/coroutine/engine/mutex.h @@ -1,49 +1,49 @@ #pragma once - + #include "impl.h" #include "events.h" -class TContMutex { -public: +class TContMutex { +public: TContMutex() noexcept - : Token_(true) - { - } - + : Token_(true) + { + } + ~TContMutex() { Y_ASSERT(Token_); - } - + } + int LockD(TCont* current, TInstant deadline) { - while (!Token_) { - const int ret = WaitQueue_.WaitD(current, deadline); - - if (ret != EWAKEDUP) { - return ret; - } - } - - Token_ = false; - - return 0; - } - + while (!Token_) { + const int ret = WaitQueue_.WaitD(current, deadline); + + if (ret != EWAKEDUP) { + return ret; + } + } + + Token_ = false; + + return 0; + } + int LockT(TCont* current, TDuration timeout) { - return LockD(current, timeout.ToDeadLine()); - } - + return LockD(current, timeout.ToDeadLine()); + } + int LockI(TCont* current) { - return LockD(current, TInstant::Max()); - } - + return LockD(current, TInstant::Max()); + } + void UnLock() noexcept { Y_ASSERT(!Token_); - - Token_ = true; - WaitQueue_.Signal(); - } - -private: - TContWaitQueue WaitQueue_; - bool Token_; -}; + + Token_ = true; + WaitQueue_.Signal(); + } + +private: + TContWaitQueue WaitQueue_; + bool Token_; +}; diff --git a/library/cpp/coroutine/engine/network.cpp b/library/cpp/coroutine/engine/network.cpp index 85b647d210..a5c0d9282c 100644 --- a/library/cpp/coroutine/engine/network.cpp +++ b/library/cpp/coroutine/engine/network.cpp @@ -1,41 +1,41 @@ #include "impl.h" #include "network.h" - + #include <util/generic/scope.h> #include <util/generic/xrange.h> - + #include <sys/uio.h> - + #if defined(_bionic_) # define IOV_MAX 1024 #endif - - + + namespace NCoro { namespace { bool IsBlocked(int lasterr) noexcept { return lasterr == EAGAIN || lasterr == EWOULDBLOCK; } - + ssize_t DoReadVector(SOCKET fd, TContIOVector* vec) noexcept { return readv(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count())); } - + ssize_t DoWriteVector(SOCKET fd, TContIOVector* vec) noexcept { return writev(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count())); } } - - + + int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept { if (cont->Cancelled()) { return ECANCELED; } - + if (nfds == 0) { return 0; } - + TTempArray<TFdEvent> events(nfds); for (auto i : xrange(nfds)) { @@ -47,7 +47,7 @@ namespace NCoro { (events.Data() + i)->~TFdEvent(); } }; - + for (auto i : xrange(nfds)) { cont->Executor()->ScheduleIoWait(events.Data() + i); } @@ -79,36 +79,36 @@ namespace NCoro { if (ret) { if (outfd) { *outfd = ret->Fd(); - } + } return ret->Status(); - } - + } + return EINPROGRESS; - } - + } + int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept { return SelectD(cont, fds, what, nfds, outfd, timeout.ToDeadLine()); - } - + } + int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd) { return SelectD(cont, fds, what, nfds, outfd, TInstant::Max()); } - + int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept { TFdEvent event(cont, fd, (ui16)what, deadline); return ExecuteEvent(&event); - } - + } + int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept { return PollD(cont, fd, what, timeout.ToDeadLine()); } int PollI(TCont* cont, SOCKET fd, int what) noexcept { return PollD(cont, fd, what, TInstant::Max()); - } - - + } + + TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { while (true) { ssize_t res = DoReadVector(fd, vec); @@ -117,103 +117,103 @@ namespace NCoro { return TContIOStatus::Success((size_t) res); } - { - const int err = LastSystemError(); - - if (!IsBlocked(err)) { + { + const int err = LastSystemError(); + + if (!IsBlocked(err)) { return TContIOStatus::Error(err); - } - } - + } + } + if ((res = PollD(cont, fd, CONT_POLL_READ, deadline)) != 0) { return TContIOStatus::Error((int) res); - } - } - } - + } + } + } + TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { return ReadVectorD(cont, fd, vec, timeOut.ToDeadLine()); } - + TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { return ReadVectorD(cont, fd, vec, TInstant::Max()); } - - + + TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept { IOutputStream::TPart part(buf, len); TContIOVector vec(&part, 1); return ReadVectorD(cont, fd, &vec, deadline); } - + TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept { return ReadD(cont, fd, buf, len, timeout.ToDeadLine()); } - + TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept { return ReadD(cont, fd, buf, len, TInstant::Max()); - } - - + } + + TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { size_t written = 0; - + while (!vec->Complete()) { ssize_t res = DoWriteVector(fd, vec); - + if (res >= 0) { written += res; - + vec->Proceed((size_t) res); } else { { const int err = LastSystemError(); - + if (!IsBlocked(err)) { return TContIOStatus(written, err); } } - + if ((res = PollD(cont, fd, CONT_POLL_WRITE, deadline)) != 0) { return TContIOStatus(written, (int) res); } } - } - + } + return TContIOStatus::Success(written); - } - + } + TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { return WriteVectorD(cont, fd, vec, timeOut.ToDeadLine()); } - + TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { return WriteVectorD(cont, fd, vec, TInstant::Max()); } - - + + TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept { IOutputStream::TPart part(buf, len); TContIOVector vec(&part, 1); return WriteVectorD(cont, fd, &vec, deadline); } - + TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept { return WriteD(cont, fd, buf, len, timeout.ToDeadLine()); } - + TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept { return WriteD(cont, fd, buf, len, TInstant::Max()); - } - - + } + + int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept { TSocketHolder res(Socket(ai)); - + if (res.Closed()) { return LastSystemError(); } - + const int ret = ConnectD(cont, res, ai.ai_addr, (socklen_t) ai.ai_addrlen, deadline); if (!ret) { @@ -221,49 +221,49 @@ namespace NCoro { } return ret; - } - + } + int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept { int ret = EHOSTUNREACH; - + for (auto it = addr.Begin(); it != addr.End(); ++it) { ret = ConnectD(cont, s, *it, deadline); - + if (ret == 0 || ret == ETIMEDOUT) { return ret; } } - + return ret; - } - + } + int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept { return ConnectD(cont, s, addr, timeout.ToDeadLine()); } - + int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept { return ConnectD(cont, s, addr, TInstant::Max()); - } - + } + int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept { if (connect(s, name, namelen)) { const int err = LastSystemError(); - + if (!IsBlocked(err) && err != EINPROGRESS) { return err; } - + int ret = PollD(cont, s, CONT_POLL_WRITE, deadline); - + if (ret) { return ret; } - + // check if we really connected // FIXME: Unportable ?? int serr = 0; socklen_t slen = sizeof(serr); - + ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (char*) &serr, &slen); if (ret) { @@ -286,27 +286,27 @@ namespace NCoro { return ConnectD(cont, s, name, namelen, TInstant::Max()); } - + int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept { SOCKET ret; - + while ((ret = Accept4(s, addr, addrlen)) == INVALID_SOCKET) { int err = LastSystemError(); - + if (!IsBlocked(err)) { return -err; } err = PollD(cont, s, CONT_POLL_READ, deadline); - + if (err) { return -err; } } - + return (int) ret; - } - + } + int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept { return AcceptD(cont, s, addr, addrlen, timeout.ToDeadLine()); } diff --git a/library/cpp/coroutine/engine/network.h b/library/cpp/coroutine/engine/network.h index f2c9afe4f8..db979c4c59 100644 --- a/library/cpp/coroutine/engine/network.h +++ b/library/cpp/coroutine/engine/network.h @@ -1,55 +1,55 @@ #pragma once - + #include "iostatus.h" - + #include <util/datetime/base.h> #include <util/network/init.h> #include <util/network/iovec.h> #include <util/network/nonblock.h> -#include <util/network/socket.h> - -class TCont; - +#include <util/network/socket.h> + +class TCont; + namespace NCoro { int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept; int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept; int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd); - + int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept; int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept; int PollI(TCont* cont, SOCKET fd, int what) noexcept; - + TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept; TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept; TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept; - + TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept; TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept; TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept; - + TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept; TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept; TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept; - + TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept; TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept; TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept; - + int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept; - + int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept; int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept; int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept; - + int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept; int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept; int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept; - + int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept; int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept; int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept; - + SOCKET Socket(int domain, int type, int protocol) noexcept; SOCKET Socket(const struct addrinfo& ai) noexcept; -} +} diff --git a/library/cpp/coroutine/engine/poller.cpp b/library/cpp/coroutine/engine/poller.cpp index 61164fa56b..6fd1f673ae 100644 --- a/library/cpp/coroutine/engine/poller.cpp +++ b/library/cpp/coroutine/engine/poller.cpp @@ -1,118 +1,118 @@ -#include "poller.h" -#include "sockmap.h" - -#include <util/memory/smallobj.h> -#include <util/generic/intrlist.h> -#include <util/generic/singleton.h> +#include "poller.h" +#include "sockmap.h" + +#include <util/memory/smallobj.h> +#include <util/generic/intrlist.h> +#include <util/generic/singleton.h> #include <util/system/env.h> #include <util/string/cast.h> - -namespace { + +namespace { using TChange = IPollerFace::TChange; using TEvent = IPollerFace::TEvent; using TEvents = IPollerFace::TEvents; - - template <class T> - class TUnsafeBuf { - public: + + template <class T> + class TUnsafeBuf { + public: TUnsafeBuf() noexcept - : L_(0) - { - } - + : L_(0) + { + } + T* operator~() const noexcept { - return B_.Get(); - } - + return B_.Get(); + } + size_t operator+() const noexcept { - return L_; - } - + return L_; + } + void Reserve(size_t len) { - len = FastClp2(len); - - if (len > L_) { - B_.Reset(new T[len]); - L_ = len; - } - } - - private: - TArrayHolder<T> B_; - size_t L_; - }; - - - template <class T> - class TVirtualize: public IPollerFace { - public: + len = FastClp2(len); + + if (len > L_) { + B_.Reset(new T[len]); + L_ = len; + } + } + + private: + TArrayHolder<T> B_; + size_t L_; + }; + + + template <class T> + class TVirtualize: public IPollerFace { + public: TVirtualize(EContPoller pollerEngine) : PollerEngine_(pollerEngine) { } void Set(const TChange& c) override { - P_.Set(c); - } - + P_.Set(c); + } + void Wait(TEvents& events, TInstant deadLine) override { - P_.Wait(events, deadLine); - } - + P_.Wait(events, deadLine); + } + EContPoller PollEngine() const override { return PollerEngine_; } - private: - T P_; + private: + T P_; const EContPoller PollerEngine_; - }; - + }; + - template <class T> - class TPoller { + template <class T> + class TPoller { using TInternalEvent = typename T::TEvent; - - public: + + public: TPoller() { - E_.Reserve(1); - } - + E_.Reserve(1); + } + void Set(const TChange& c) { - P_.Set(c.Data, c.Fd, c.Flags); - } - + P_.Set(c.Data, c.Fd, c.Flags); + } + void Reserve(size_t size) { E_.Reserve(size); } void Wait(TEvents& events, TInstant deadLine) { - const size_t ret = P_.WaitD(~E_, +E_, deadLine); - + const size_t ret = P_.WaitD(~E_, +E_, deadLine); + events.reserve(ret); - for (size_t i = 0; i < ret; ++i) { - const TInternalEvent* ie = ~E_ + i; - - const TEvent e = { - T::ExtractEvent(ie), - T::ExtractStatus(ie), - (ui16)T::ExtractFilter(ie), - }; - - events.push_back(e); - } - - E_.Reserve(ret + 1); - } - - private: - T P_; - TUnsafeBuf<TInternalEvent> E_; - }; - - - template <class T> - class TIndexedArray { + for (size_t i = 0; i < ret; ++i) { + const TInternalEvent* ie = ~E_ + i; + + const TEvent e = { + T::ExtractEvent(ie), + T::ExtractStatus(ie), + (ui16)T::ExtractFilter(ie), + }; + + events.push_back(e); + } + + E_.Reserve(ret + 1); + } + + private: + T P_; + TUnsafeBuf<TInternalEvent> E_; + }; + + + template <class T> + class TIndexedArray { struct TVal: public T, public TIntrusiveListItem<TVal>, @@ -125,244 +125,244 @@ namespace { // zero-initialization takes place in TVal() expression and the // data is overwritten. TVal() { - } - }; - + } + }; + typedef TIntrusiveList<TVal> TListType; - - public: + + public: typedef typename TListType::TIterator TIterator; typedef typename TListType::TConstIterator TConstIterator; - + TIndexedArray() - : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance()) - { - } - + : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance()) + { + } + TIterator Begin() noexcept { - return I_.Begin(); - } - + return I_.Begin(); + } + TIterator End() noexcept { - return I_.End(); - } - + return I_.End(); + } + TConstIterator Begin() const noexcept { - return I_.Begin(); - } - + return I_.Begin(); + } + TConstIterator End() const noexcept { - return I_.End(); - } - + return I_.End(); + } + T& operator[](size_t i) { - return *Get(i); - } - + return *Get(i); + } + T* Get(size_t i) { - TValRef& v = V_.Get(i); - + TValRef& v = V_.Get(i); + if (Y_UNLIKELY(!v)) { - v.Reset(new (&P_) TVal()); - I_.PushFront(v.Get()); - } - - Y_PREFETCH_WRITE(v.Get(), 1); - - return v.Get(); - } - + v.Reset(new (&P_) TVal()); + I_.PushFront(v.Get()); + } + + Y_PREFETCH_WRITE(v.Get(), 1); + + return v.Get(); + } + void Erase(size_t i) noexcept { - V_.Get(i).Destroy(); - } - + V_.Get(i).Destroy(); + } + size_t Size() const noexcept { - return I_.Size(); - } - - private: + return I_.Size(); + } + + private: using TValRef = THolder<TVal>; - typename TVal::TPool P_; - TSocketMap<TValRef> V_; + typename TVal::TPool P_; + TSocketMap<TValRef> V_; TListType I_; - }; - + }; + inline short PollFlags(ui16 flags) noexcept { - short ret = 0; - - if (flags & CONT_POLL_READ) { - ret |= POLLIN; - } - - if (flags & CONT_POLL_WRITE) { - ret |= POLLOUT; - } - + short ret = 0; + + if (flags & CONT_POLL_READ) { + ret |= POLLIN; + } + + if (flags & CONT_POLL_WRITE) { + ret |= POLLOUT; + } + #if defined(_linux_) if (flags & CONT_POLL_RDHUP) { ret |= POLLRDHUP; } #endif - return ret; - } - + return ret; + } + - class TPollPoller { - public: + class TPollPoller { + public: size_t Size() const noexcept { - return S_.Size(); - } - - template <class T> + return S_.Size(); + } + + template <class T> void Build(T& t) const { - for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) { - t.Set(*it); - } + for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) { + t.Set(*it); + } t.Reserve(Size()); - } - + } + void Set(const TChange& c) { - if (c.Flags) { - S_[c.Fd] = c; - } else { - S_.Erase(c.Fd); - } - } - + if (c.Flags) { + S_[c.Fd] = c; + } else { + S_.Erase(c.Fd); + } + } + void Wait(TEvents& events, TInstant deadLine) { - T_.clear(); - T_.reserve(Size()); - - for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) { - const pollfd pfd = { - it->Fd, - PollFlags(it->Flags), - 0, - }; - - T_.push_back(pfd); - } - + T_.clear(); + T_.reserve(Size()); + + for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) { + const pollfd pfd = { + it->Fd, + PollFlags(it->Flags), + 0, + }; + + T_.push_back(pfd); + } + const ssize_t ret = PollD(T_.data(), (nfds_t) T_.size(), deadLine); - + if (ret <= 0) { - return; - } - + return; + } + events.reserve(T_.size()); for (size_t i = 0; i < T_.size(); ++i) { - const pollfd& pfd = T_[i]; - const short ev = pfd.revents; - - if (!ev) { - continue; - } - - int status = 0; - ui16 filter = 0; - + const pollfd& pfd = T_[i]; + const short ev = pfd.revents; + + if (!ev) { + continue; + } + + int status = 0; + ui16 filter = 0; + // We are perfectly fine with an EOF while reading a pipe or a unix socket if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) { - filter |= CONT_POLL_READ; - } - - if (ev & POLLOUT) { - filter |= CONT_POLL_WRITE; - } - + filter |= CONT_POLL_READ; + } + + if (ev & POLLOUT) { + filter |= CONT_POLL_WRITE; + } + #if defined(_linux_) if (ev & POLLRDHUP) { filter |= CONT_POLL_RDHUP; } #endif - if (ev & POLLERR) { - status = EIO; + if (ev & POLLERR) { + status = EIO; } else if (ev & POLLHUP && pfd.events & POLLOUT) { // Only write operations may cause EPIPE - status = EPIPE; - } else if (ev & POLLNVAL) { - status = EINVAL; - } - - if (status) { + status = EPIPE; + } else if (ev & POLLNVAL) { + status = EINVAL; + } + + if (status) { filter = CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP; - } - - const TEvent res = { - S_[pfd.fd].Data, - status, - filter, - }; - - events.push_back(res); - } - } - - private: - typedef TIndexedArray<TChange> TFds; - TFds S_; + } + + const TEvent res = { + S_[pfd.fd].Data, + status, + filter, + }; + + events.push_back(res); + } + } + + private: + typedef TIndexedArray<TChange> TFds; + TFds S_; typedef TVector<pollfd> TPollVec; - TPollVec T_; - }; - - - class TCombinedPoller { - typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller; - - public: + TPollVec T_; + }; + + + class TCombinedPoller { + typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller; + + public: TCombinedPoller() { - P_.Reset(new TPollPoller()); - } - + P_.Reset(new TPollPoller()); + } + void Set(const TChange& c) { - if (!P_) { - D_->Set(c); - } else { - P_->Set(c); - } - } - + if (!P_) { + D_->Set(c); + } else { + P_->Set(c); + } + } + void Wait(TEvents& events, TInstant deadLine) { - if (!P_) { - D_->Wait(events, deadLine); - } else { - if (P_->Size() > 200) { - D_.Reset(new TDefaultPoller()); - P_->Build(*D_); - P_.Destroy(); - D_->Wait(events, deadLine); - } else { - P_->Wait(events, deadLine); - } - } - } - - private: + if (!P_) { + D_->Wait(events, deadLine); + } else { + if (P_->Size() > 200) { + D_.Reset(new TDefaultPoller()); + P_->Build(*D_); + P_.Destroy(); + D_->Wait(events, deadLine); + } else { + P_->Wait(events, deadLine); + } + } + } + + private: THolder<TPollPoller> P_; THolder<TDefaultPoller> D_; - }; - - struct TUserPoller: public TString { + }; + + struct TUserPoller: public TString { TUserPoller() - : TString(GetEnv("USER_POLLER")) - { - } - }; -} - + : TString(GetEnv("USER_POLLER")) + { + } + }; +} + THolder<IPollerFace> IPollerFace::Default() { return Construct(*SingletonWithPriority<TUserPoller, 0>()); -} - +} + THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) { return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default); } - + THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) { switch (poller) { case EContPoller::Default: @@ -373,18 +373,18 @@ THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) { case EContPoller::Poll: return MakeHolder<TVirtualize<TPollPoller>>(poller); case EContPoller::Epoll: -#if defined(HAVE_EPOLL_POLLER) +#if defined(HAVE_EPOLL_POLLER) return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller); #else return nullptr; -#endif +#endif case EContPoller::Kqueue: #if defined(HAVE_KQUEUE_POLLER) return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller); #else return nullptr; -#endif +#endif default: Y_FAIL("bad poller type"); - } -} + } +} diff --git a/library/cpp/coroutine/engine/poller.h b/library/cpp/coroutine/engine/poller.h index 8ea012c0fc..054de6616b 100644 --- a/library/cpp/coroutine/engine/poller.h +++ b/library/cpp/coroutine/engine/poller.h @@ -1,11 +1,11 @@ #pragma once - -#include <util/generic/ptr.h> -#include <util/generic/vector.h> -#include <util/network/socket.h> -#include <util/network/pollerimpl.h> -#include <util/datetime/base.h> - + +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/network/socket.h> +#include <util/network/pollerimpl.h> +#include <util/datetime/base.h> + enum class EContPoller { Default /* "default" */, Combined /* "combined" */, @@ -15,36 +15,36 @@ enum class EContPoller { Kqueue /* "kqueue" */ }; -class IPollerFace { -public: - struct TChange { +class IPollerFace { +public: + struct TChange { SOCKET Fd; - void* Data; - ui16 Flags; - }; - - struct TEvent { - void* Data; - int Status; - ui16 Filter; - }; - + void* Data; + ui16 Flags; + }; + + struct TEvent { + void* Data; + int Status; + ui16 Filter; + }; + using TEvents = TVector<TEvent>; - - virtual ~IPollerFace() { - } - + + virtual ~IPollerFace() { + } + void Set(void* ptr, SOCKET fd, ui16 flags) { - const TChange c = {fd, ptr, flags}; - - Set(c); - } - - virtual void Set(const TChange& change) = 0; - virtual void Wait(TEvents& events, TInstant deadLine) = 0; + const TChange c = {fd, ptr, flags}; + + Set(c); + } + + virtual void Set(const TChange& change) = 0; + virtual void Wait(TEvents& events, TInstant deadLine) = 0; virtual EContPoller PollEngine() const = 0; - + static THolder<IPollerFace> Default(); static THolder<IPollerFace> Construct(TStringBuf name); static THolder<IPollerFace> Construct(EContPoller poller); -}; +}; diff --git a/library/cpp/coroutine/engine/sockmap.h b/library/cpp/coroutine/engine/sockmap.h index fd189e1774..aa616da61e 100644 --- a/library/cpp/coroutine/engine/sockmap.h +++ b/library/cpp/coroutine/engine/sockmap.h @@ -1,24 +1,24 @@ -#pragma once - -#include <util/generic/hash.h> -#include <util/generic/vector.h> - -template <class T> -class TSocketMap { -public: +#pragma once + +#include <util/generic/hash.h> +#include <util/generic/vector.h> + +template <class T> +class TSocketMap { +public: T& Get(size_t idx) { - if (idx < 128000) { - if (V_.size() <= idx) { - V_.resize(idx + 1); - } - - return V_[idx]; - } - - return H_[idx]; - } - -private: + if (idx < 128000) { + if (V_.size() <= idx) { + V_.resize(idx + 1); + } + + return V_[idx]; + } + + return H_[idx]; + } + +private: TVector<T> V_; THashMap<size_t, T> H_; -}; +}; diff --git a/library/cpp/coroutine/engine/sockpool.cpp b/library/cpp/coroutine/engine/sockpool.cpp index b9482e780f..895dd686c7 100644 --- a/library/cpp/coroutine/engine/sockpool.cpp +++ b/library/cpp/coroutine/engine/sockpool.cpp @@ -13,7 +13,7 @@ void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa) { warn("bind"); } } else if (sa->sa_family == AF_INET6) { - sockaddr_in6 s_in6(*(const sockaddr_in6*)sa); + sockaddr_in6 s_in6(*(const sockaddr_in6*)sa); Zero(s_in6.sin6_addr); s_in6.sin6_port = 0; diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h index 1ebb7e7b38..b34d5ace46 100644 --- a/library/cpp/coroutine/engine/sockpool.h +++ b/library/cpp/coroutine/engine/sockpool.h @@ -1,253 +1,253 @@ #pragma once - + #include "impl.h" #include "network.h" - + #include <util/network/address.h> #include <util/network/socket.h> #include <util/system/mutex.h> - + extern void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa = nullptr); - + class TSocketPool; class TPooledSocket { - class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> { - public: + class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> { + public: TImpl(SOCKET fd, TSocketPool* pool) noexcept - : Pool_(pool) - , IsKeepAlive_(false) - , Fd_(fd) - { - Touch(); - } - + : Pool_(pool) + , IsKeepAlive_(false) + , Fd_(fd) + { + Touch(); + } + static void Destroy(TImpl* impl) noexcept { - impl->DoDestroy(); - } - + impl->DoDestroy(); + } + void DoDestroy() noexcept { - if (!Closed() && IsKeepAlive() && IsInGoodState()) { - ReturnToPool(); - } else { - delete this; - } - } - + if (!Closed() && IsKeepAlive() && IsInGoodState()) { + ReturnToPool(); + } else { + delete this; + } + } + bool IsKeepAlive() const noexcept { - return IsKeepAlive_; - } - + return IsKeepAlive_; + } + void SetKeepAlive(bool ka) { - ::SetKeepAlive(Fd_, ka); - IsKeepAlive_ = ka; - } - + ::SetKeepAlive(Fd_, ka); + IsKeepAlive_ = ka; + } + SOCKET Socket() const noexcept { - return Fd_; - } - + return Fd_; + } + bool Closed() const noexcept { - return Fd_.Closed(); - } - + return Fd_.Closed(); + } + void Close() noexcept { - Fd_.Close(); - } - + Fd_.Close(); + } + bool IsInGoodState() const noexcept { - int err = 0; - socklen_t len = sizeof(err); - - getsockopt(Fd_, SOL_SOCKET, SO_ERROR, (char*)&err, &len); - - return !err; - } - + int err = 0; + socklen_t len = sizeof(err); + + getsockopt(Fd_, SOL_SOCKET, SO_ERROR, (char*)&err, &len); + + return !err; + } + bool IsOpen() const noexcept { return IsInGoodState() && IsNotSocketClosedByOtherSide(Fd_); - } - + } + void Touch() noexcept { - TouchTime_ = TInstant::Now(); - } - + TouchTime_ = TInstant::Now(); + } + const TInstant& LastTouch() const noexcept { - return TouchTime_; - } - - private: + return TouchTime_; + } + + private: inline void ReturnToPool() noexcept; - - private: - TSocketPool* Pool_; - bool IsKeepAlive_; - TSocketHolder Fd_; - TInstant TouchTime_; - }; - - friend class TSocketPool; - -public: + + private: + TSocketPool* Pool_; + bool IsKeepAlive_; + TSocketHolder Fd_; + TInstant TouchTime_; + }; + + friend class TSocketPool; + +public: TPooledSocket() : Impl_(nullptr) - { - } + { + } TPooledSocket(TImpl* impl) - : Impl_(impl) - { - } - + : Impl_(impl) + { + } + ~TPooledSocket() { - if (UncaughtException() && !!Impl_) { - Close(); + if (UncaughtException() && !!Impl_) { + Close(); } - } - + } + operator SOCKET() const noexcept { - return Impl_->Socket(); - } - + return Impl_->Socket(); + } + void SetKeepAlive(bool ka) { - Impl_->SetKeepAlive(ka); - } - + Impl_->SetKeepAlive(ka); + } + void Close() noexcept { - Impl_->Close(); - } - -private: - TIntrusivePtr<TImpl> Impl_; + Impl_->Close(); + } + +private: + TIntrusivePtr<TImpl> Impl_; }; struct TConnectData { TConnectData(TCont* cont, const TInstant& deadLine) - : Cont(cont) - , DeadLine(deadLine) - { - } - + : Cont(cont) + , DeadLine(deadLine) + { + } + TConnectData(TCont* cont, const TDuration& timeOut) - : Cont(cont) - , DeadLine(TInstant::Now() + timeOut) - { - } - - TCont* Cont; - const TInstant DeadLine; + : Cont(cont) + , DeadLine(TInstant::Now() + timeOut) + { + } + + TCont* Cont; + const TInstant DeadLine; }; class TSocketPool { - friend class TPooledSocket::TImpl; - -public: - typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef; + friend class TPooledSocket::TImpl; + +public: + typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef; TSocketPool(int ip, int port) - : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port))) - { - } - + : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port))) + { + } + TSocketPool(const TAddrRef& addr) - : Addr_(addr) - { - } + : Addr_(addr) + { + } void EraseStale(const TInstant& maxAge) noexcept { - TSockets toDelete; - - { - TGuard<TMutex> guard(Mutex_); - - for (TSockets::TIterator it = Pool_.Begin(); it != Pool_.End();) { - if (it->LastTouch() < maxAge) { - toDelete.PushBack(&*(it++)); - } else { - ++it; - } - } - } - } - + TSockets toDelete; + + { + TGuard<TMutex> guard(Mutex_); + + for (TSockets::TIterator it = Pool_.Begin(); it != Pool_.End();) { + if (it->LastTouch() < maxAge) { + toDelete.PushBack(&*(it++)); + } else { + ++it; + } + } + } + } + TPooledSocket Get(TConnectData* conn) { - TPooledSocket ret; - - if (TPooledSocket::TImpl* alive = GetImpl()) { - ret = TPooledSocket(alive); - } else { - ret = AllocateMore(conn); - } - - ret.Impl_->Touch(); - - return ret; - } - + TPooledSocket ret; + + if (TPooledSocket::TImpl* alive = GetImpl()) { + ret = TPooledSocket(alive); + } else { + ret = AllocateMore(conn); + } + + ret.Impl_->Touch(); + + return ret; + } + bool GetAlive(TPooledSocket& socket) { - if (TPooledSocket::TImpl* alive = GetImpl()) { - alive->Touch(); - socket = TPooledSocket(alive); - return true; + if (TPooledSocket::TImpl* alive = GetImpl()) { + alive->Touch(); + socket = TPooledSocket(alive); + return true; } - return false; - } + return false; + } -private: +private: TPooledSocket::TImpl* GetImpl() { - TGuard<TMutex> guard(Mutex_); - - while (!Pool_.Empty()) { - THolder<TPooledSocket::TImpl> ret(Pool_.PopFront()); - - if (ret->IsOpen()) { - return ret.Release(); + TGuard<TMutex> guard(Mutex_); + + while (!Pool_.Empty()) { + THolder<TPooledSocket::TImpl> ret(Pool_.PopFront()); + + if (ret->IsOpen()) { + return ret.Release(); } } return nullptr; - } - + } + void Release(TPooledSocket::TImpl* impl) noexcept { - TGuard<TMutex> guard(Mutex_); - - Pool_.PushFront(impl); - } - - TPooledSocket AllocateMore(TConnectData* conn); - -private: - TAddrRef Addr_; + TGuard<TMutex> guard(Mutex_); + + Pool_.PushFront(impl); + } + + TPooledSocket AllocateMore(TConnectData* conn); + +private: + TAddrRef Addr_; using TSockets = TIntrusiveListWithAutoDelete<TPooledSocket::TImpl, TDelete>; - TSockets Pool_; - TMutex Mutex_; + TSockets Pool_; + TMutex Mutex_; }; inline void TPooledSocket::TImpl::ReturnToPool() noexcept { - Pool_->Release(this); + Pool_->Release(this); } class TContIO: public IInputStream, public IOutputStream { -public: +public: TContIO(SOCKET fd, TCont* cont) - : Fd_(fd) - , Cont_(cont) - { - } - + : Fd_(fd) + , Cont_(cont) + { + } + void DoWrite(const void* buf, size_t len) override { NCoro::WriteI(Cont_, Fd_, buf, len).Checked(); - } - + } + size_t DoRead(void* buf, size_t len) override { return NCoro::ReadI(Cont_, Fd_, buf, len).Checked(); - } - + } + SOCKET Fd() const noexcept { - return Fd_; - } - -private: - SOCKET Fd_; - TCont* Cont_; + return Fd_; + } + +private: + SOCKET Fd_; + TCont* Cont_; }; diff --git a/library/cpp/coroutine/engine/trampoline.cpp b/library/cpp/coroutine/engine/trampoline.cpp index 10ea69ddc3..75254b58a8 100644 --- a/library/cpp/coroutine/engine/trampoline.cpp +++ b/library/cpp/coroutine/engine/trampoline.cpp @@ -1,16 +1,16 @@ #include "impl.h" #include "trampoline.h" - + #include "stack/stack_allocator.h" #include <util/system/info.h> #include <util/system/protect.h> #include <util/system/valgrind.h> #include <util/system/yassert.h> - + #include <cstdlib> #include <util/stream/format.h> - + namespace NCoro { @@ -30,7 +30,7 @@ TTrampoline::TTrampoline(NStack::IAllocator& allocator, ui32 stackSize, TFunc f, Func_(Cont_); } catch (...) {} } - + Cont_->Terminate(); } diff --git a/library/cpp/coroutine/engine/trampoline.h b/library/cpp/coroutine/engine/trampoline.h index 37b61cf015..23cc587c13 100644 --- a/library/cpp/coroutine/engine/trampoline.h +++ b/library/cpp/coroutine/engine/trampoline.h @@ -1,20 +1,20 @@ #pragma once - + #include "stack/stack_common.h" #include "stack/stack.h" #include <util/generic/noncopyable.h> #include <util/generic/ptr.h> -#include <util/system/context.h> -#include <util/system/defaults.h> - +#include <util/system/context.h> +#include <util/system/defaults.h> + #if !defined(STACK_GROW_DOWN) # error "unsupported" #endif -class TCont; +class TCont; typedef void (*TContFunc)(TCont*, void*); - + namespace NCoro { namespace NStack { @@ -31,9 +31,9 @@ namespace NCoro { TFunc f, TCont* cont ) noexcept; - + TArrayRef<char> Stack() noexcept; - + TExceptionSafeContext* Context() noexcept { return &Ctx_; } @@ -56,5 +56,5 @@ namespace NCoro { TExceptionSafeContext Ctx_; TFunc Func_; TCont* const Cont_; - }; -} + }; +} diff --git a/library/cpp/coroutine/engine/ya.make b/library/cpp/coroutine/engine/ya.make index 8c20b9afc3..de48968307 100644 --- a/library/cpp/coroutine/engine/ya.make +++ b/library/cpp/coroutine/engine/ya.make @@ -1,25 +1,25 @@ -LIBRARY() - +LIBRARY() + OWNER( pg g:balancer ) - + GENERATE_ENUM_SERIALIZATION(poller.h) GENERATE_ENUM_SERIALIZATION(stack/stack_common.h) -PEERDIR( +PEERDIR( contrib/libs/libc_compat library/cpp/containers/intrusive_rb_tree -) - -SRCS( +) + +SRCS( cont_poller.cpp helper.cpp - impl.cpp - iostatus.cpp + impl.cpp + iostatus.cpp network.cpp - poller.cpp + poller.cpp sockpool.cpp stack/stack.cpp stack/stack_allocator.cpp @@ -27,9 +27,9 @@ SRCS( stack/stack_storage.cpp stack/stack_utils.cpp trampoline.cpp -) - -END() +) + +END() RECURSE( stack/benchmark diff --git a/library/cpp/coroutine/listener/listen.cpp b/library/cpp/coroutine/listener/listen.cpp index 3d4e711d1d..13a3a9b3ae 100644 --- a/library/cpp/coroutine/listener/listen.cpp +++ b/library/cpp/coroutine/listener/listen.cpp @@ -1,15 +1,15 @@ -#include "listen.h" - +#include "listen.h" + #include <library/cpp/coroutine/engine/impl.h> #include <library/cpp/coroutine/engine/network.h> - -#include <util/network/ip.h> -#include <util/network/address.h> + +#include <util/network/ip.h> +#include <util/network/address.h> #include <util/generic/ylimits.h> -#include <util/generic/intrlist.h> - -using namespace NAddr; - +#include <util/generic/intrlist.h> + +using namespace NAddr; + namespace { union TSa { const sockaddr* Sa; @@ -24,10 +24,10 @@ namespace { inline bool operator==(const TSa& r) const noexcept { if (Sa->sa_family == r.Sa->sa_family) { switch (Sa->sa_family) { - case AF_INET: - return In->sin_port == r.In->sin_port && In->sin_addr.s_addr == r.In->sin_addr.s_addr; - case AF_INET6: - return In6->sin6_port == r.In6->sin6_port && !memcmp(&In6->sin6_addr, &r.In6->sin6_addr, sizeof(in6_addr)); + case AF_INET: + return In->sin_port == r.In->sin_port && In->sin_addr.s_addr == r.In->sin_addr.s_addr; + case AF_INET6: + return In6->sin6_port == r.In6->sin6_port && !memcmp(&In6->sin6_addr, &r.In6->sin6_addr, sizeof(in6_addr)); } } @@ -40,31 +40,31 @@ namespace { }; } -class TContListener::TImpl { +class TContListener::TImpl { private: struct TStoredAddrInfo: public TAddrInfo, private TNetworkAddress { inline TStoredAddrInfo(const struct addrinfo* ai, const TNetworkAddress& addr) noexcept - : TAddrInfo(ai) - , TNetworkAddress(addr) - { - } - }; - + : TAddrInfo(ai) + , TNetworkAddress(addr) + { + } + }; + private: - class TOneSocketListener: public TIntrusiveListItem<TOneSocketListener> { - public: - inline TOneSocketListener(TImpl* parent, IRemoteAddrPtr addr) - : Parent_(parent) + class TOneSocketListener: public TIntrusiveListItem<TOneSocketListener> { + public: + inline TOneSocketListener(TImpl* parent, IRemoteAddrPtr addr) + : Parent_(parent) , C_(nullptr) - , ListenSocket_(socket(addr->Addr()->sa_family, SOCK_STREAM, 0)) + , ListenSocket_(socket(addr->Addr()->sa_family, SOCK_STREAM, 0)) , Addr_(std::move(addr)) - { - if (ListenSocket_ == INVALID_SOCKET) { - ythrow TSystemError() << "can not create socket"; - } - - FixIPv6ListenSocket(ListenSocket_); - CheckedSetSockOpt(ListenSocket_, SOL_SOCKET, SO_REUSEADDR, 1, "reuse addr"); + { + if (ListenSocket_ == INVALID_SOCKET) { + ythrow TSystemError() << "can not create socket"; + } + + FixIPv6ListenSocket(ListenSocket_); + CheckedSetSockOpt(ListenSocket_, SOL_SOCKET, SO_REUSEADDR, 1, "reuse addr"); const TOptions& opts = Parent_->Opts_; if (opts.SendBufSize) { @@ -77,106 +77,106 @@ private: SetReusePort(ListenSocket_, opts.ReusePort); } - SetNonBlock(ListenSocket_); - - if (bind(ListenSocket_, Addr_->Addr(), Addr_->Len()) < 0) { - ythrow TSystemError() << "bind failed"; - } - } - + SetNonBlock(ListenSocket_); + + if (bind(ListenSocket_, Addr_->Addr(), Addr_->Len()) < 0) { + ythrow TSystemError() << "bind failed"; + } + } + inline ~TOneSocketListener() { - Stop(); - } - + Stop(); + } + public: inline void Run(TCont* cont) noexcept { C_ = cont; DoRun(); C_ = nullptr; - } - - inline void StartListen() { - if (!C_) { - const TOptions& opts = Parent_->Opts_; - - if (listen(ListenSocket_, (int)Min<size_t>(Max<int>(), opts.ListenQueue)) < 0) { - ythrow TSystemError() << "listen failed"; - } - - if (opts.EnableDeferAccept) { - SetDeferAccept(ListenSocket_); - } - + } + + inline void StartListen() { + if (!C_) { + const TOptions& opts = Parent_->Opts_; + + if (listen(ListenSocket_, (int)Min<size_t>(Max<int>(), opts.ListenQueue)) < 0) { + ythrow TSystemError() << "listen failed"; + } + + if (opts.EnableDeferAccept) { + SetDeferAccept(ListenSocket_); + } + C_ = Parent_->E_->Create<TOneSocketListener, &TOneSocketListener::Run>(this, "listen_job"); - } - } + } + } inline const IRemoteAddr* Addr() const noexcept { - return Addr_.Get(); - } - + return Addr_.Get(); + } + inline void Stop() noexcept { - if (C_) { - C_->Cancel(); - - while (C_) { + if (C_) { + C_->Cancel(); + + while (C_) { Parent_->E_->Running()->Yield(); - } - } - } - - private: + } + } + } + + private: inline void DoRun() noexcept { while (!C_->Cancelled()) { - try { - TOpaqueAddr remote; + try { + TOpaqueAddr remote; const int res = NCoro::AcceptI(C_, ListenSocket_, remote.MutableAddr(), remote.LenPtr()); - - if (res < 0) { - const int err = -res; - - if (err != ECONNABORTED) { - if (err == ECANCELED) { - break; - } + + if (res < 0) { + const int err = -res; + + if (err != ECONNABORTED) { + if (err == ECANCELED) { + break; + } if (errno == EMFILE) { C_->SleepT(TDuration::MilliSeconds(1)); } - - ythrow TSystemError(err) << "can not accept"; - } - } else { - TSocketHolder c((SOCKET)res); - + + ythrow TSystemError(err) << "can not accept"; + } + } else { + TSocketHolder c((SOCKET)res); + const ICallBack::TAcceptFull acc = { - &c, - &remote, - Addr(), - }; - + &c, + &remote, + Addr(), + }; + Parent_->Cb_->OnAcceptFull(acc); - } - } catch (...) { - try { - Parent_->Cb_->OnError(); - } catch (...) { - } - } - } - + } + } catch (...) { + try { + Parent_->Cb_->OnError(); + } catch (...) { + } + } + } + try { Parent_->Cb_->OnStop(&ListenSocket_); } catch (...) { } - } - - private: + } + + private: const TImpl* const Parent_; - TCont* C_; - TSocketHolder ListenSocket_; + TCont* C_; + TSocketHolder ListenSocket_; const IRemoteAddrPtr Addr_; - }; - + }; + private: class TListeners: public TIntrusiveListWithAutoDelete<TOneSocketListener, TDelete> { private: @@ -184,11 +184,11 @@ private: using TIt = std::conditional_t<std::is_const<T>::value, typename T::TConstIterator, typename T::TIterator>; template <class T> - static inline TIt<T> FindImpl(T* t, const IRemoteAddr& addr) { + static inline TIt<T> FindImpl(T* t, const IRemoteAddr& addr) { const TSa sa(addr.Addr()); - TIt<T> it = t->Begin(); - TIt<T> const end = t->End(); + TIt<T> it = t->Begin(); + TIt<T> const end = t->End(); while (it != end && sa != it->Addr()->Addr()) { ++it; @@ -207,20 +207,20 @@ private: } }; -public: +public: inline TImpl(ICallBack* cb, TContExecutor* e, const TOptions& opts) noexcept - : E_(e) - , Cb_(cb) - , Opts_(opts) - { - } - - inline void Listen() { - for (TListeners::TIterator it = L_.Begin(); it != L_.End(); ++it) { - it->StartListen(); - } - } - + : E_(e) + , Cb_(cb) + , Opts_(opts) + { + } + + inline void Listen() { + for (TListeners::TIterator it = L_.Begin(); it != L_.End(); ++it) { + it->StartListen(); + } + } + inline void Listen(const IRemoteAddr& addr) { const TListeners::TIterator it = L_.Find(addr); @@ -233,62 +233,62 @@ public: const TSa sa(addr.Addr()); switch (sa.Sa->sa_family) { - case AF_INET: + case AF_INET: L_.PushBack(new TOneSocketListener(this, MakeHolder<TIPv4Addr>(*sa.In))); - break; - case AF_INET6: + break; + case AF_INET6: L_.PushBack(new TOneSocketListener(this, MakeHolder<TIPv6Addr>(*sa.In6))); - break; - default: + break; + default: ythrow yexception() << TStringBuf("unknown protocol"); } } - inline void Bind(const TIpAddress& addr) { + inline void Bind(const TIpAddress& addr) { L_.PushBack(new TOneSocketListener(this, MakeHolder<TIPv4Addr>(addr))); - } - - inline void Bind(const TNetworkAddress& addr) { - for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) { + } + + inline void Bind(const TNetworkAddress& addr) { + for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) { L_.PushBack(new TOneSocketListener(this, MakeHolder<TStoredAddrInfo>(&*it, addr))); - } - } - + } + } + inline void StopListenAddr(const IRemoteAddr& addr) { const TListeners::TIterator it = L_.Find(addr); - + if (it != L_.End()) { delete &*it; } - } + } -private: +private: TContExecutor* const E_; ICallBack* const Cb_; - TListeners L_; - const TOptions Opts_; -}; - -TContListener::TContListener(ICallBack* cb, TContExecutor* e, const TOptions& opts) - : Impl_(new TImpl(cb, e, opts)) -{ -} - + TListeners L_; + const TOptions Opts_; +}; + +TContListener::TContListener(ICallBack* cb, TContExecutor* e, const TOptions& opts) + : Impl_(new TImpl(cb, e, opts)) +{ +} + TContListener::~TContListener() { -} - +} + namespace { template <class T> static inline T&& CheckImpl(T&& impl) { Y_ENSURE_EX(impl, yexception() << "not running"); return std::forward<T>(impl); - } + } } - + void TContListener::Listen(const IRemoteAddr& addr) { CheckImpl(Impl_)->Listen(addr); -} - +} + void TContListener::Listen(const TIpAddress& addr) { return Listen(TIPv4Addr(addr)); } @@ -296,28 +296,28 @@ void TContListener::Listen(const TIpAddress& addr) { void TContListener::Listen(const TNetworkAddress& addr) { for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) { Listen(TAddrInfo(&*it)); - } + } } - + void TContListener::Listen() { CheckImpl(Impl_)->Listen(); } void TContListener::Bind(const IRemoteAddr& addr) { CheckImpl(Impl_)->Bind(addr); -} - +} + void TContListener::Bind(const TIpAddress& addr) { return Bind(TIPv4Addr(addr)); } - + void TContListener::Bind(const TNetworkAddress& addr) { CheckImpl(Impl_)->Bind(addr); -} - +} + void TContListener::Stop() noexcept { - Impl_.Destroy(); -} + Impl_.Destroy(); +} void TContListener::StopListenAddr(const IRemoteAddr& addr) { CheckImpl(Impl_)->StopListenAddr(addr); @@ -326,13 +326,13 @@ void TContListener::StopListenAddr(const IRemoteAddr& addr) { void TContListener::StopListenAddr(const TIpAddress& addr) { return StopListenAddr(TIPv4Addr(addr)); } - + void TContListener::StopListenAddr(const TNetworkAddress& addr) { for (TNetworkAddress::TIterator it = addr.Begin(); it != addr.End(); ++it) { StopListenAddr(TAddrInfo(&*it)); } -} - +} + void TContListener::ICallBack::OnAcceptFull(const TAcceptFull& params) { const TSa remote(params.Remote->Addr()); const TSa local(params.Local->Addr()); diff --git a/library/cpp/coroutine/listener/listen.h b/library/cpp/coroutine/listener/listen.h index 3a89cd3ecc..a61bdeb6df 100644 --- a/library/cpp/coroutine/listener/listen.h +++ b/library/cpp/coroutine/listener/listen.h @@ -1,41 +1,41 @@ #pragma once - + #include <util/generic/ptr.h> -#include <util/generic/ylimits.h> - +#include <util/generic/ylimits.h> + struct TIpAddress; -class TContExecutor; -class TSocketHolder; -class TNetworkAddress; - -namespace NAddr { - class IRemoteAddr; -} - -class TContListener { -public: - struct TOptions { +class TContExecutor; +class TSocketHolder; +class TNetworkAddress; + +namespace NAddr { + class IRemoteAddr; +} + +class TContListener { +public: + struct TOptions { inline TOptions() noexcept - : ListenQueue(Max<size_t>()) + : ListenQueue(Max<size_t>()) , SendBufSize(0) , RecvBufSize(0) - , EnableDeferAccept(false) + , EnableDeferAccept(false) , ReusePort(false) - { - } - + { + } + inline TOptions& SetListenQueue(size_t len) noexcept { - ListenQueue = len; - - return *this; - } - + ListenQueue = len; + + return *this; + } + inline TOptions& SetDeferAccept(bool enable) noexcept { - EnableDeferAccept = enable; - - return *this; - } - + EnableDeferAccept = enable; + + return *this; + } + inline TOptions& SetSendBufSize(unsigned size) noexcept { SendBufSize = size; @@ -54,61 +54,61 @@ public: return *this; } - size_t ListenQueue; + size_t ListenQueue; unsigned SendBufSize; unsigned RecvBufSize; - bool EnableDeferAccept; + bool EnableDeferAccept; bool ReusePort; - }; - - class ICallBack { - public: - struct TAccept { - TSocketHolder* S; + }; + + class ICallBack { + public: + struct TAccept { + TSocketHolder* S; const TIpAddress* Remote; const TIpAddress* Local; }; struct TAcceptFull { TSocketHolder* S; - const NAddr::IRemoteAddr* Remote; - const NAddr::IRemoteAddr* Local; - }; + const NAddr::IRemoteAddr* Remote; + const NAddr::IRemoteAddr* Local; + }; virtual void OnAccept(const TAccept&) { } - + virtual void OnAcceptFull(const TAcceptFull&); - /* - * will be called from catch (...) {} context - * so your can re-throw current exception and work around it - */ - virtual void OnError() = 0; - + /* + * will be called from catch (...) {} context + * so your can re-throw current exception and work around it + */ + virtual void OnError() = 0; + virtual void OnStop(TSocketHolder*); - virtual ~ICallBack() { - } - }; - - TContListener(ICallBack* cb, TContExecutor* e, const TOptions& opts = TOptions()); + virtual ~ICallBack() { + } + }; + + TContListener(ICallBack* cb, TContExecutor* e, const TOptions& opts = TOptions()); ~TContListener(); - - /// start listener threads - void Listen(); + + /// start listener threads + void Listen(); void Listen(const NAddr::IRemoteAddr& addr); void Listen(const TIpAddress& addr); void Listen(const TNetworkAddress& addr); - /// bind server on address. Can be called multiple times to bind on more then one address + /// bind server on address. Can be called multiple times to bind on more then one address void Bind(const NAddr::IRemoteAddr& addr); - void Bind(const TIpAddress& addr); - void Bind(const TNetworkAddress& addr); + void Bind(const TIpAddress& addr); + void Bind(const TNetworkAddress& addr); void Stop() noexcept; - + void StopListenAddr(const NAddr::IRemoteAddr& addr); void StopListenAddr(const TIpAddress& addr); void StopListenAddr(const TNetworkAddress& addr); @@ -119,7 +119,7 @@ public: Listen(addr); } -private: - class TImpl; - THolder<TImpl> Impl_; -}; +private: + class TImpl; + THolder<TImpl> Impl_; +}; diff --git a/library/cpp/coroutine/listener/ya.make b/library/cpp/coroutine/listener/ya.make index 700c3abe3e..9544e24c41 100644 --- a/library/cpp/coroutine/listener/ya.make +++ b/library/cpp/coroutine/listener/ya.make @@ -1,13 +1,13 @@ -LIBRARY() - -OWNER(pg) - -PEERDIR( +LIBRARY() + +OWNER(pg) + +PEERDIR( library/cpp/coroutine/engine -) - -SRCS( - listen.cpp -) - -END() +) + +SRCS( + listen.cpp +) + +END() diff --git a/library/cpp/coroutine/ya.make b/library/cpp/coroutine/ya.make index 34e30f2b25..2f2d57289b 100644 --- a/library/cpp/coroutine/ya.make +++ b/library/cpp/coroutine/ya.make @@ -1,11 +1,11 @@ -RECURSE( - benchmark - dns - dns/example - engine +RECURSE( + benchmark + dns + dns/example + engine engine/stack/ut - listener - test + listener + test ut util -) +) |