diff options
author | Ruslan Kovalev <ruslan.a.kovalev@gmail.com> | 2022-02-10 16:46:44 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:46:44 +0300 |
commit | 59e19371de37995fcb36beb16cd6ec030af960bc (patch) | |
tree | fa68e36093ebff8b805462e9e6d331fe9d348214 /library/cpp/coroutine/engine/impl.cpp | |
parent | 89db6fe2fe2c32d2a832ddfeb04e8d078e301084 (diff) | |
download | ydb-59e19371de37995fcb36beb16cd6ec030af960bc.tar.gz |
Restoring authorship annotation for Ruslan Kovalev <ruslan.a.kovalev@gmail.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/coroutine/engine/impl.cpp')
-rw-r--r-- | library/cpp/coroutine/engine/impl.cpp | 408 |
1 files changed, 204 insertions, 204 deletions
diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp index 7ae6f74051..6bf8b03348 100644 --- a/library/cpp/coroutine/engine/impl.cpp +++ b/library/cpp/coroutine/engine/impl.cpp @@ -1,162 +1,162 @@ -#include "impl.h" +#include "impl.h" #include "stack/stack_allocator.h" #include "stack/stack_guards.h" #include <util/generic/scope.h> #include <util/thread/singleton.h> -#include <util/stream/format.h> +#include <util/stream/format.h> #include <util/stream/output.h> #include <util/system/yassert.h> -TCont::TJoinWait::TJoinWait(TCont& c) noexcept - : Cont_(c) -{} - -void TCont::TJoinWait::Wake() noexcept { - Cont_.ReSchedule(); -} - +TCont::TJoinWait::TJoinWait(TCont& c) noexcept + : Cont_(c) +{} + +void TCont::TJoinWait::Wake() noexcept { + Cont_.ReSchedule(); +} + TCont::TCont(NCoro::NStack::IAllocator& allocator, uint32_t stackSize, TContExecutor& executor, NCoro::TTrampoline::TFunc func, const char* name) noexcept - : Executor_(executor) + : Executor_(executor) , Name_(name) - , Trampoline_( + , Trampoline_( allocator, - stackSize, + stackSize, std::move(func), this - ) -{} - - + ) +{} + + void TCont::PrintMe(IOutputStream& out) const noexcept { out << "cont(" - << "name = " << Name_ << ", " - << "addr = " << Hex((size_t)this) + << "name = " << Name_ << ", " + << "addr = " << Hex((size_t)this) << ")"; } -bool TCont::Join(TCont* c, TInstant deadLine) noexcept { - TJoinWait ev(*this); - c->Waiters_.PushBack(&ev); - - do { - if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) { - if (!ev.Empty()) { - c->Cancel(); - - do { +bool TCont::Join(TCont* c, TInstant deadLine) noexcept { + TJoinWait ev(*this); + c->Waiters_.PushBack(&ev); + + do { + if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) { + if (!ev.Empty()) { + c->Cancel(); + + do { Switch(); - } while (!ev.Empty()); - } - - return false; - } - } while (!ev.Empty()); - - return true; -} - -int TCont::SleepD(TInstant deadline) noexcept { - TTimerEvent event(this, deadline); - - return ExecuteEvent(&event); -} - + } while (!ev.Empty()); + } + + return false; + } + } while (!ev.Empty()); + + return true; +} + +int TCont::SleepD(TInstant deadline) noexcept { + TTimerEvent event(this, deadline); + + return ExecuteEvent(&event); +} + void TCont::Switch() noexcept { Executor()->RunScheduler(); } -void TCont::Yield() noexcept { - if (SleepD(TInstant::Zero())) { - ReScheduleAndSwitch(); - } -} - -void TCont::ReScheduleAndSwitch() noexcept { - ReSchedule(); +void TCont::Yield() noexcept { + if (SleepD(TInstant::Zero())) { + ReScheduleAndSwitch(); + } +} + +void TCont::ReScheduleAndSwitch() noexcept { + ReSchedule(); Switch(); -} - -void TCont::Terminate() { - while (!Waiters_.Empty()) { - Waiters_.PopFront()->Wake(); - } - Executor()->Exit(this); -} - -bool TCont::IAmRunning() const noexcept { - return this == Executor()->Running(); -} - -void TCont::Cancel() noexcept { - if (Cancelled()) { - return; - } - - Cancelled_ = true; - - if (!IAmRunning()) { - ReSchedule(); - } -} - -void TCont::ReSchedule() noexcept { - if (Cancelled()) { - // Legacy code may expect a Cancelled coroutine to be scheduled without delay. - Executor()->ScheduleExecutionNow(this); - } else { - Executor()->ScheduleExecution(this); - } -} - - -TContExecutor::TContExecutor( +} + +void TCont::Terminate() { + while (!Waiters_.Empty()) { + Waiters_.PopFront()->Wake(); + } + Executor()->Exit(this); +} + +bool TCont::IAmRunning() const noexcept { + return this == Executor()->Running(); +} + +void TCont::Cancel() noexcept { + if (Cancelled()) { + return; + } + + Cancelled_ = true; + + if (!IAmRunning()) { + ReSchedule(); + } +} + +void TCont::ReSchedule() noexcept { + if (Cancelled()) { + // Legacy code may expect a Cancelled coroutine to be scheduled without delay. + Executor()->ScheduleExecutionNow(this); + } else { + Executor()->ScheduleExecution(this); + } +} + + +TContExecutor::TContExecutor( uint32_t defaultStackSize, - THolder<IPollerFace> poller, + THolder<IPollerFace> poller, NCoro::IScheduleCallback* scheduleCallback, NCoro::IEnterPollerCallback* enterPollerCallback, NCoro::NStack::EGuard defaultGuard, TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings, NCoro::ITime* time -) +) : ScheduleCallback_(scheduleCallback) , EnterPollerCallback_(enterPollerCallback) - , DefaultStackSize_(defaultStackSize) - , Poller_(std::move(poller)) + , DefaultStackSize_(defaultStackSize) + , Poller_(std::move(poller)) , Time_(time) { StackAllocator_ = NCoro::NStack::GetAllocator(poolSettings, defaultGuard); } - -TContExecutor::~TContExecutor() { - Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_); -} - -void TContExecutor::Execute() noexcept { - auto nop = [](void*){}; - Execute(nop); -} - -void TContExecutor::Execute(TContFunc func, void* arg) noexcept { + +TContExecutor::~TContExecutor() { + Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_); +} + +void TContExecutor::Execute() noexcept { + auto nop = [](void*){}; + Execute(nop); +} + +void TContExecutor::Execute(TContFunc func, void* arg) noexcept { CreateOwned([=](TCont* cont) { func(cont, arg); }, "sys_main"); - RunScheduler(); + RunScheduler(); } -void TContExecutor::WaitForIO() { - while (Ready_.Empty() && !WaitQueue_.Empty()) { +void TContExecutor::WaitForIO() { + while (Ready_.Empty() && !WaitQueue_.Empty()) { const auto now = Now(); - - // Waking a coroutine puts it into ReadyNext_ list - const auto next = WaitQueue_.WakeTimedout(now); - + + // Waking a coroutine puts it into ReadyNext_ list + const auto next = WaitQueue_.WakeTimedout(now); + if (!UserEvents_.Empty()) { TIntrusiveList<IUserEvent> userEvents; userEvents.Swap(UserEvents_); @@ -165,11 +165,11 @@ void TContExecutor::WaitForIO() { } while (!userEvents.Empty()); } - // Polling will return as soon as there is an event to process or a timeout. - // If there are woken coroutines we do not want to sleep in the poller - // yet still we want to check for new io - // to prevent ourselves from locking out of io by constantly waking coroutines. - + // Polling will return as soon as there is an event to process or a timeout. + // If there are woken coroutines we do not want to sleep in the poller + // yet still we want to check for new io + // to prevent ourselves from locking out of io by constantly waking coroutines. + if (ReadyNext_.Empty()) { if (EnterPollerCallback_) { EnterPollerCallback_->OnEnterPoller(); @@ -187,9 +187,9 @@ void TContExecutor::WaitForIO() { EnterPollerCallback_->OnExitPoller(); } } - - Ready_.Append(ReadyNext_); - } + + Ready_.Append(ReadyNext_); + } } void TContExecutor::Poll(TInstant deadline) { @@ -198,42 +198,42 @@ void TContExecutor::Poll(TInstant deadline) { // Waking a coroutine puts it into ReadyNext_ list for (auto event : PollerEvents_) { - auto* lst = (NCoro::TPollEventList*)event.Data; - const int status = event.Status; - - if (status) { - for (auto it = lst->Begin(); it != lst->End();) { - (it++)->OnPollEvent(status); - } - } else { - const ui16 filter = event.Filter; - - for (auto it = lst->Begin(); it != lst->End();) { - if (it->What() & filter) { - (it++)->OnPollEvent(0); - } else { - ++it; - } - } - } - } -} - -void TContExecutor::Abort() noexcept { - WaitQueue_.Abort(); - auto visitor = [](TCont* c) { - c->Cancel(); - }; - Ready_.ForEach(visitor); - ReadyNext_.ForEach(visitor); -} - -TCont* TContExecutor::Create( - TContFunc func, - void* arg, - const char* name, - TMaybe<ui32> customStackSize -) noexcept { + auto* lst = (NCoro::TPollEventList*)event.Data; + const int status = event.Status; + + if (status) { + for (auto it = lst->Begin(); it != lst->End();) { + (it++)->OnPollEvent(status); + } + } else { + const ui16 filter = event.Filter; + + for (auto it = lst->Begin(); it != lst->End();) { + if (it->What() & filter) { + (it++)->OnPollEvent(0); + } else { + ++it; + } + } + } + } +} + +void TContExecutor::Abort() noexcept { + WaitQueue_.Abort(); + auto visitor = [](TCont* c) { + c->Cancel(); + }; + Ready_.ForEach(visitor); + ReadyNext_.ForEach(visitor); +} + +TCont* TContExecutor::Create( + TContFunc func, + void* arg, + const char* name, + TMaybe<ui32> customStackSize +) noexcept { return CreateOwned([=](TCont* cont) { func(cont, arg); }, name, customStackSize); @@ -244,38 +244,38 @@ TCont* TContExecutor::CreateOwned( const char* name, TMaybe<ui32> customStackSize ) noexcept { - Allocated_ += 1; - if (!customStackSize) { - customStackSize = DefaultStackSize_; - } + Allocated_ += 1; + if (!customStackSize) { + customStackSize = DefaultStackSize_; + } auto* cont = new TCont(*StackAllocator_, *customStackSize, *this, std::move(func), name); - ScheduleExecution(cont); - return cont; -} - + ScheduleExecution(cont); + return cont; +} + NCoro::NStack::TAllocatorStats TContExecutor::GetAllocatorStats() const noexcept { return StackAllocator_->GetStackStats(); } -void TContExecutor::Release(TCont* cont) noexcept { - delete cont; - Allocated_ -= 1; -} - -void TContExecutor::ScheduleToDelete(TCont* cont) noexcept { - ToDelete_.PushBack(cont); -} - -void TContExecutor::ScheduleExecution(TCont* cont) noexcept { - cont->Scheduled_ = true; - ReadyNext_.PushBack(cont); -} - -void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept { - cont->Scheduled_ = true; - Ready_.PushBack(cont); -} - +void TContExecutor::Release(TCont* cont) noexcept { + delete cont; + Allocated_ -= 1; +} + +void TContExecutor::ScheduleToDelete(TCont* cont) noexcept { + ToDelete_.PushBack(cont); +} + +void TContExecutor::ScheduleExecution(TCont* cont) noexcept { + cont->Scheduled_ = true; + ReadyNext_.PushBack(cont); +} + +void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept { + cont->Scheduled_ = true; + Ready_.PushBack(cont); +} + namespace { inline TContExecutor*& ThisThreadExecutor() { struct TThisThreadExecutorHolder { @@ -285,12 +285,12 @@ namespace { } } -void TContExecutor::DeleteScheduled() noexcept { - ToDelete_.ForEach([this](TCont* c) { - Release(c); - }); -} - +void TContExecutor::DeleteScheduled() noexcept { + ToDelete_.ForEach([this](TCont* c) { + Release(c); + }); +} + TCont* RunningCont() { TContExecutor* thisThreadExecutor = ThisThreadExecutor(); return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr; @@ -314,7 +314,7 @@ void TContExecutor::RunScheduler() noexcept { WaitForIO(); DeleteScheduled(); Ready_.Append(ReadyNext_); - + if (Ready_.Empty()) { Current_ = nullptr; if (caller) { @@ -322,18 +322,18 @@ void TContExecutor::RunScheduler() noexcept { } break; } - - TCont* cont = Ready_.PopFront(); + + TCont* cont = Ready_.PopFront(); if (ScheduleCallback_) { ScheduleCallback_->OnSchedule(*this, *cont); - } + } Current_ = cont; cont->Scheduled_ = false; if (cont == caller) { break; - } + } context->SwitchTo(cont->Trampoline_.Context()); if (Paused_) { Paused_ = false; @@ -346,9 +346,9 @@ void TContExecutor::RunScheduler() noexcept { } } catch (...) { TBackTrace::FromCurrentException().PrintTo(Cerr); - Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str()); + Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str()); } -} +} void TContExecutor::Pause() { if (auto cont = Running()) { @@ -358,17 +358,17 @@ void TContExecutor::Pause() { } } -void TContExecutor::Exit(TCont* cont) noexcept { - ScheduleToDelete(cont); - cont->SwitchTo(&SchedContext_); - Y_FAIL("can not return from exit"); -} - +void TContExecutor::Exit(TCont* cont) noexcept { + ScheduleToDelete(cont); + cont->SwitchTo(&SchedContext_); + Y_FAIL("can not return from exit"); +} + TInstant TContExecutor::Now() { return Y_LIKELY(Time_ == nullptr) ? TInstant::Now() : Time_->Now(); } -template <> -void Out<TCont>(IOutputStream& out, const TCont& c) { - c.PrintMe(out); +template <> +void Out<TCont>(IOutputStream& out, const TCont& c) { + c.PrintMe(out); } |