diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/coroutine/engine/impl.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/coroutine/engine/impl.cpp')
-rw-r--r-- | library/cpp/coroutine/engine/impl.cpp | 374 |
1 files changed, 374 insertions, 0 deletions
diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp new file mode 100644 index 0000000000..7ae6f74051 --- /dev/null +++ b/library/cpp/coroutine/engine/impl.cpp @@ -0,0 +1,374 @@ +#include "impl.h" + +#include "stack/stack_allocator.h" +#include "stack/stack_guards.h" + +#include <util/generic/scope.h> +#include <util/thread/singleton.h> +#include <util/stream/format.h> +#include <util/stream/output.h> +#include <util/system/yassert.h> + + +TCont::TJoinWait::TJoinWait(TCont& c) noexcept + : Cont_(c) +{} + +void TCont::TJoinWait::Wake() noexcept { + Cont_.ReSchedule(); +} + +TCont::TCont(NCoro::NStack::IAllocator& allocator, + uint32_t stackSize, + TContExecutor& executor, + NCoro::TTrampoline::TFunc func, + const char* name) noexcept + : Executor_(executor) + , Name_(name) + , Trampoline_( + allocator, + stackSize, + std::move(func), + this + ) +{} + + +void TCont::PrintMe(IOutputStream& out) const noexcept { + out << "cont(" + << "name = " << Name_ << ", " + << "addr = " << Hex((size_t)this) + << ")"; +} + +bool TCont::Join(TCont* c, TInstant deadLine) noexcept { + TJoinWait ev(*this); + c->Waiters_.PushBack(&ev); + + do { + if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) { + if (!ev.Empty()) { + c->Cancel(); + + do { + Switch(); + } while (!ev.Empty()); + } + + return false; + } + } while (!ev.Empty()); + + return true; +} + +int TCont::SleepD(TInstant deadline) noexcept { + TTimerEvent event(this, deadline); + + return ExecuteEvent(&event); +} + +void TCont::Switch() noexcept { + Executor()->RunScheduler(); +} + +void TCont::Yield() noexcept { + if (SleepD(TInstant::Zero())) { + ReScheduleAndSwitch(); + } +} + +void TCont::ReScheduleAndSwitch() noexcept { + ReSchedule(); + Switch(); +} + +void TCont::Terminate() { + while (!Waiters_.Empty()) { + Waiters_.PopFront()->Wake(); + } + Executor()->Exit(this); +} + +bool TCont::IAmRunning() const noexcept { + return this == Executor()->Running(); +} + +void TCont::Cancel() noexcept { + if (Cancelled()) { + return; + } + + Cancelled_ = true; + + if (!IAmRunning()) { + ReSchedule(); + } +} + +void TCont::ReSchedule() noexcept { + if (Cancelled()) { + // Legacy code may expect a Cancelled coroutine to be scheduled without delay. + Executor()->ScheduleExecutionNow(this); + } else { + Executor()->ScheduleExecution(this); + } +} + + +TContExecutor::TContExecutor( + uint32_t defaultStackSize, + THolder<IPollerFace> poller, + NCoro::IScheduleCallback* scheduleCallback, + NCoro::IEnterPollerCallback* enterPollerCallback, + NCoro::NStack::EGuard defaultGuard, + TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings, + NCoro::ITime* time +) + : ScheduleCallback_(scheduleCallback) + , EnterPollerCallback_(enterPollerCallback) + , DefaultStackSize_(defaultStackSize) + , Poller_(std::move(poller)) + , Time_(time) +{ + StackAllocator_ = NCoro::NStack::GetAllocator(poolSettings, defaultGuard); +} + +TContExecutor::~TContExecutor() { + Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_); +} + +void TContExecutor::Execute() noexcept { + auto nop = [](void*){}; + Execute(nop); +} + +void TContExecutor::Execute(TContFunc func, void* arg) noexcept { + CreateOwned([=](TCont* cont) { + func(cont, arg); + }, "sys_main"); + RunScheduler(); +} + +void TContExecutor::WaitForIO() { + while (Ready_.Empty() && !WaitQueue_.Empty()) { + const auto now = Now(); + + // Waking a coroutine puts it into ReadyNext_ list + const auto next = WaitQueue_.WakeTimedout(now); + + if (!UserEvents_.Empty()) { + TIntrusiveList<IUserEvent> userEvents; + userEvents.Swap(UserEvents_); + do { + userEvents.PopFront()->Execute(); + } while (!userEvents.Empty()); + } + + // Polling will return as soon as there is an event to process or a timeout. + // If there are woken coroutines we do not want to sleep in the poller + // yet still we want to check for new io + // to prevent ourselves from locking out of io by constantly waking coroutines. + + if (ReadyNext_.Empty()) { + if (EnterPollerCallback_) { + EnterPollerCallback_->OnEnterPoller(); + } + Poll(next); + if (EnterPollerCallback_) { + EnterPollerCallback_->OnExitPoller(); + } + } else if (LastPoll_ + TDuration::MilliSeconds(5) < now) { + if (EnterPollerCallback_) { + EnterPollerCallback_->OnEnterPoller(); + } + Poll(now); + if (EnterPollerCallback_) { + EnterPollerCallback_->OnExitPoller(); + } + } + + Ready_.Append(ReadyNext_); + } +} + +void TContExecutor::Poll(TInstant deadline) { + Poller_.Wait(PollerEvents_, deadline); + LastPoll_ = Now(); + + // Waking a coroutine puts it into ReadyNext_ list + for (auto event : PollerEvents_) { + auto* lst = (NCoro::TPollEventList*)event.Data; + const int status = event.Status; + + if (status) { + for (auto it = lst->Begin(); it != lst->End();) { + (it++)->OnPollEvent(status); + } + } else { + const ui16 filter = event.Filter; + + for (auto it = lst->Begin(); it != lst->End();) { + if (it->What() & filter) { + (it++)->OnPollEvent(0); + } else { + ++it; + } + } + } + } +} + +void TContExecutor::Abort() noexcept { + WaitQueue_.Abort(); + auto visitor = [](TCont* c) { + c->Cancel(); + }; + Ready_.ForEach(visitor); + ReadyNext_.ForEach(visitor); +} + +TCont* TContExecutor::Create( + TContFunc func, + void* arg, + const char* name, + TMaybe<ui32> customStackSize +) noexcept { + return CreateOwned([=](TCont* cont) { + func(cont, arg); + }, name, customStackSize); +} + +TCont* TContExecutor::CreateOwned( + NCoro::TTrampoline::TFunc func, + const char* name, + TMaybe<ui32> customStackSize +) noexcept { + Allocated_ += 1; + if (!customStackSize) { + customStackSize = DefaultStackSize_; + } + auto* cont = new TCont(*StackAllocator_, *customStackSize, *this, std::move(func), name); + ScheduleExecution(cont); + return cont; +} + +NCoro::NStack::TAllocatorStats TContExecutor::GetAllocatorStats() const noexcept { + return StackAllocator_->GetStackStats(); +} + +void TContExecutor::Release(TCont* cont) noexcept { + delete cont; + Allocated_ -= 1; +} + +void TContExecutor::ScheduleToDelete(TCont* cont) noexcept { + ToDelete_.PushBack(cont); +} + +void TContExecutor::ScheduleExecution(TCont* cont) noexcept { + cont->Scheduled_ = true; + ReadyNext_.PushBack(cont); +} + +void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept { + cont->Scheduled_ = true; + Ready_.PushBack(cont); +} + +namespace { + inline TContExecutor*& ThisThreadExecutor() { + struct TThisThreadExecutorHolder { + TContExecutor* Executor = nullptr; + }; + return FastTlsSingletonWithPriority<TThisThreadExecutorHolder, 0>()->Executor; + } +} + +void TContExecutor::DeleteScheduled() noexcept { + ToDelete_.ForEach([this](TCont* c) { + Release(c); + }); +} + +TCont* RunningCont() { + TContExecutor* thisThreadExecutor = ThisThreadExecutor(); + return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr; +} + +void TContExecutor::RunScheduler() noexcept { + try { + TContExecutor* const prev = ThisThreadExecutor(); + ThisThreadExecutor() = this; + TCont* caller = Current_; + TExceptionSafeContext* context = caller ? caller->Trampoline_.Context() : &SchedContext_; + Y_DEFER { + ThisThreadExecutor() = prev; + }; + + while (true) { + if (ScheduleCallback_ && Current_) { + ScheduleCallback_->OnUnschedule(*this); + } + + WaitForIO(); + DeleteScheduled(); + Ready_.Append(ReadyNext_); + + if (Ready_.Empty()) { + Current_ = nullptr; + if (caller) { + context->SwitchTo(&SchedContext_); + } + break; + } + + TCont* cont = Ready_.PopFront(); + + if (ScheduleCallback_) { + ScheduleCallback_->OnSchedule(*this, *cont); + } + + Current_ = cont; + cont->Scheduled_ = false; + if (cont == caller) { + break; + } + context->SwitchTo(cont->Trampoline_.Context()); + if (Paused_) { + Paused_ = false; + Current_ = nullptr; + break; + } + if (caller) { + break; + } + } + } catch (...) { + TBackTrace::FromCurrentException().PrintTo(Cerr); + Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str()); + } +} + +void TContExecutor::Pause() { + if (auto cont = Running()) { + Paused_ = true; + ScheduleExecutionNow(cont); + cont->SwitchTo(&SchedContext_); + } +} + +void TContExecutor::Exit(TCont* cont) noexcept { + ScheduleToDelete(cont); + cont->SwitchTo(&SchedContext_); + Y_FAIL("can not return from exit"); +} + +TInstant TContExecutor::Now() { + return Y_LIKELY(Time_ == nullptr) ? TInstant::Now() : Time_->Now(); +} + +template <> +void Out<TCont>(IOutputStream& out, const TCont& c) { + c.PrintMe(out); +} |