aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/coroutine/engine/impl.cpp
diff options
context:
space:
mode:
authorRuslan Kovalev <ruslan.a.kovalev@gmail.com>2022-02-10 16:46:44 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:44 +0300
commit59e19371de37995fcb36beb16cd6ec030af960bc (patch)
treefa68e36093ebff8b805462e9e6d331fe9d348214 /library/cpp/coroutine/engine/impl.cpp
parent89db6fe2fe2c32d2a832ddfeb04e8d078e301084 (diff)
downloadydb-59e19371de37995fcb36beb16cd6ec030af960bc.tar.gz
Restoring authorship annotation for Ruslan Kovalev <ruslan.a.kovalev@gmail.com>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/coroutine/engine/impl.cpp')
-rw-r--r--library/cpp/coroutine/engine/impl.cpp408
1 files changed, 204 insertions, 204 deletions
diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp
index 7ae6f74051..6bf8b03348 100644
--- a/library/cpp/coroutine/engine/impl.cpp
+++ b/library/cpp/coroutine/engine/impl.cpp
@@ -1,162 +1,162 @@
-#include "impl.h"
+#include "impl.h"
#include "stack/stack_allocator.h"
#include "stack/stack_guards.h"
#include <util/generic/scope.h>
#include <util/thread/singleton.h>
-#include <util/stream/format.h>
+#include <util/stream/format.h>
#include <util/stream/output.h>
#include <util/system/yassert.h>
-TCont::TJoinWait::TJoinWait(TCont& c) noexcept
- : Cont_(c)
-{}
-
-void TCont::TJoinWait::Wake() noexcept {
- Cont_.ReSchedule();
-}
-
+TCont::TJoinWait::TJoinWait(TCont& c) noexcept
+ : Cont_(c)
+{}
+
+void TCont::TJoinWait::Wake() noexcept {
+ Cont_.ReSchedule();
+}
+
TCont::TCont(NCoro::NStack::IAllocator& allocator,
uint32_t stackSize,
TContExecutor& executor,
NCoro::TTrampoline::TFunc func,
const char* name) noexcept
- : Executor_(executor)
+ : Executor_(executor)
, Name_(name)
- , Trampoline_(
+ , Trampoline_(
allocator,
- stackSize,
+ stackSize,
std::move(func),
this
- )
-{}
-
-
+ )
+{}
+
+
void TCont::PrintMe(IOutputStream& out) const noexcept {
out << "cont("
- << "name = " << Name_ << ", "
- << "addr = " << Hex((size_t)this)
+ << "name = " << Name_ << ", "
+ << "addr = " << Hex((size_t)this)
<< ")";
}
-bool TCont::Join(TCont* c, TInstant deadLine) noexcept {
- TJoinWait ev(*this);
- c->Waiters_.PushBack(&ev);
-
- do {
- if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) {
- if (!ev.Empty()) {
- c->Cancel();
-
- do {
+bool TCont::Join(TCont* c, TInstant deadLine) noexcept {
+ TJoinWait ev(*this);
+ c->Waiters_.PushBack(&ev);
+
+ do {
+ if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) {
+ if (!ev.Empty()) {
+ c->Cancel();
+
+ do {
Switch();
- } while (!ev.Empty());
- }
-
- return false;
- }
- } while (!ev.Empty());
-
- return true;
-}
-
-int TCont::SleepD(TInstant deadline) noexcept {
- TTimerEvent event(this, deadline);
-
- return ExecuteEvent(&event);
-}
-
+ } while (!ev.Empty());
+ }
+
+ return false;
+ }
+ } while (!ev.Empty());
+
+ return true;
+}
+
+int TCont::SleepD(TInstant deadline) noexcept {
+ TTimerEvent event(this, deadline);
+
+ return ExecuteEvent(&event);
+}
+
void TCont::Switch() noexcept {
Executor()->RunScheduler();
}
-void TCont::Yield() noexcept {
- if (SleepD(TInstant::Zero())) {
- ReScheduleAndSwitch();
- }
-}
-
-void TCont::ReScheduleAndSwitch() noexcept {
- ReSchedule();
+void TCont::Yield() noexcept {
+ if (SleepD(TInstant::Zero())) {
+ ReScheduleAndSwitch();
+ }
+}
+
+void TCont::ReScheduleAndSwitch() noexcept {
+ ReSchedule();
Switch();
-}
-
-void TCont::Terminate() {
- while (!Waiters_.Empty()) {
- Waiters_.PopFront()->Wake();
- }
- Executor()->Exit(this);
-}
-
-bool TCont::IAmRunning() const noexcept {
- return this == Executor()->Running();
-}
-
-void TCont::Cancel() noexcept {
- if (Cancelled()) {
- return;
- }
-
- Cancelled_ = true;
-
- if (!IAmRunning()) {
- ReSchedule();
- }
-}
-
-void TCont::ReSchedule() noexcept {
- if (Cancelled()) {
- // Legacy code may expect a Cancelled coroutine to be scheduled without delay.
- Executor()->ScheduleExecutionNow(this);
- } else {
- Executor()->ScheduleExecution(this);
- }
-}
-
-
-TContExecutor::TContExecutor(
+}
+
+void TCont::Terminate() {
+ while (!Waiters_.Empty()) {
+ Waiters_.PopFront()->Wake();
+ }
+ Executor()->Exit(this);
+}
+
+bool TCont::IAmRunning() const noexcept {
+ return this == Executor()->Running();
+}
+
+void TCont::Cancel() noexcept {
+ if (Cancelled()) {
+ return;
+ }
+
+ Cancelled_ = true;
+
+ if (!IAmRunning()) {
+ ReSchedule();
+ }
+}
+
+void TCont::ReSchedule() noexcept {
+ if (Cancelled()) {
+ // Legacy code may expect a Cancelled coroutine to be scheduled without delay.
+ Executor()->ScheduleExecutionNow(this);
+ } else {
+ Executor()->ScheduleExecution(this);
+ }
+}
+
+
+TContExecutor::TContExecutor(
uint32_t defaultStackSize,
- THolder<IPollerFace> poller,
+ THolder<IPollerFace> poller,
NCoro::IScheduleCallback* scheduleCallback,
NCoro::IEnterPollerCallback* enterPollerCallback,
NCoro::NStack::EGuard defaultGuard,
TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings,
NCoro::ITime* time
-)
+)
: ScheduleCallback_(scheduleCallback)
, EnterPollerCallback_(enterPollerCallback)
- , DefaultStackSize_(defaultStackSize)
- , Poller_(std::move(poller))
+ , DefaultStackSize_(defaultStackSize)
+ , Poller_(std::move(poller))
, Time_(time)
{
StackAllocator_ = NCoro::NStack::GetAllocator(poolSettings, defaultGuard);
}
-
-TContExecutor::~TContExecutor() {
- Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_);
-}
-
-void TContExecutor::Execute() noexcept {
- auto nop = [](void*){};
- Execute(nop);
-}
-
-void TContExecutor::Execute(TContFunc func, void* arg) noexcept {
+
+TContExecutor::~TContExecutor() {
+ Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_);
+}
+
+void TContExecutor::Execute() noexcept {
+ auto nop = [](void*){};
+ Execute(nop);
+}
+
+void TContExecutor::Execute(TContFunc func, void* arg) noexcept {
CreateOwned([=](TCont* cont) {
func(cont, arg);
}, "sys_main");
- RunScheduler();
+ RunScheduler();
}
-void TContExecutor::WaitForIO() {
- while (Ready_.Empty() && !WaitQueue_.Empty()) {
+void TContExecutor::WaitForIO() {
+ while (Ready_.Empty() && !WaitQueue_.Empty()) {
const auto now = Now();
-
- // Waking a coroutine puts it into ReadyNext_ list
- const auto next = WaitQueue_.WakeTimedout(now);
-
+
+ // Waking a coroutine puts it into ReadyNext_ list
+ const auto next = WaitQueue_.WakeTimedout(now);
+
if (!UserEvents_.Empty()) {
TIntrusiveList<IUserEvent> userEvents;
userEvents.Swap(UserEvents_);
@@ -165,11 +165,11 @@ void TContExecutor::WaitForIO() {
} while (!userEvents.Empty());
}
- // Polling will return as soon as there is an event to process or a timeout.
- // If there are woken coroutines we do not want to sleep in the poller
- // yet still we want to check for new io
- // to prevent ourselves from locking out of io by constantly waking coroutines.
-
+ // Polling will return as soon as there is an event to process or a timeout.
+ // If there are woken coroutines we do not want to sleep in the poller
+ // yet still we want to check for new io
+ // to prevent ourselves from locking out of io by constantly waking coroutines.
+
if (ReadyNext_.Empty()) {
if (EnterPollerCallback_) {
EnterPollerCallback_->OnEnterPoller();
@@ -187,9 +187,9 @@ void TContExecutor::WaitForIO() {
EnterPollerCallback_->OnExitPoller();
}
}
-
- Ready_.Append(ReadyNext_);
- }
+
+ Ready_.Append(ReadyNext_);
+ }
}
void TContExecutor::Poll(TInstant deadline) {
@@ -198,42 +198,42 @@ void TContExecutor::Poll(TInstant deadline) {
// Waking a coroutine puts it into ReadyNext_ list
for (auto event : PollerEvents_) {
- auto* lst = (NCoro::TPollEventList*)event.Data;
- const int status = event.Status;
-
- if (status) {
- for (auto it = lst->Begin(); it != lst->End();) {
- (it++)->OnPollEvent(status);
- }
- } else {
- const ui16 filter = event.Filter;
-
- for (auto it = lst->Begin(); it != lst->End();) {
- if (it->What() & filter) {
- (it++)->OnPollEvent(0);
- } else {
- ++it;
- }
- }
- }
- }
-}
-
-void TContExecutor::Abort() noexcept {
- WaitQueue_.Abort();
- auto visitor = [](TCont* c) {
- c->Cancel();
- };
- Ready_.ForEach(visitor);
- ReadyNext_.ForEach(visitor);
-}
-
-TCont* TContExecutor::Create(
- TContFunc func,
- void* arg,
- const char* name,
- TMaybe<ui32> customStackSize
-) noexcept {
+ auto* lst = (NCoro::TPollEventList*)event.Data;
+ const int status = event.Status;
+
+ if (status) {
+ for (auto it = lst->Begin(); it != lst->End();) {
+ (it++)->OnPollEvent(status);
+ }
+ } else {
+ const ui16 filter = event.Filter;
+
+ for (auto it = lst->Begin(); it != lst->End();) {
+ if (it->What() & filter) {
+ (it++)->OnPollEvent(0);
+ } else {
+ ++it;
+ }
+ }
+ }
+ }
+}
+
+void TContExecutor::Abort() noexcept {
+ WaitQueue_.Abort();
+ auto visitor = [](TCont* c) {
+ c->Cancel();
+ };
+ Ready_.ForEach(visitor);
+ ReadyNext_.ForEach(visitor);
+}
+
+TCont* TContExecutor::Create(
+ TContFunc func,
+ void* arg,
+ const char* name,
+ TMaybe<ui32> customStackSize
+) noexcept {
return CreateOwned([=](TCont* cont) {
func(cont, arg);
}, name, customStackSize);
@@ -244,38 +244,38 @@ TCont* TContExecutor::CreateOwned(
const char* name,
TMaybe<ui32> customStackSize
) noexcept {
- Allocated_ += 1;
- if (!customStackSize) {
- customStackSize = DefaultStackSize_;
- }
+ Allocated_ += 1;
+ if (!customStackSize) {
+ customStackSize = DefaultStackSize_;
+ }
auto* cont = new TCont(*StackAllocator_, *customStackSize, *this, std::move(func), name);
- ScheduleExecution(cont);
- return cont;
-}
-
+ ScheduleExecution(cont);
+ return cont;
+}
+
NCoro::NStack::TAllocatorStats TContExecutor::GetAllocatorStats() const noexcept {
return StackAllocator_->GetStackStats();
}
-void TContExecutor::Release(TCont* cont) noexcept {
- delete cont;
- Allocated_ -= 1;
-}
-
-void TContExecutor::ScheduleToDelete(TCont* cont) noexcept {
- ToDelete_.PushBack(cont);
-}
-
-void TContExecutor::ScheduleExecution(TCont* cont) noexcept {
- cont->Scheduled_ = true;
- ReadyNext_.PushBack(cont);
-}
-
-void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept {
- cont->Scheduled_ = true;
- Ready_.PushBack(cont);
-}
-
+void TContExecutor::Release(TCont* cont) noexcept {
+ delete cont;
+ Allocated_ -= 1;
+}
+
+void TContExecutor::ScheduleToDelete(TCont* cont) noexcept {
+ ToDelete_.PushBack(cont);
+}
+
+void TContExecutor::ScheduleExecution(TCont* cont) noexcept {
+ cont->Scheduled_ = true;
+ ReadyNext_.PushBack(cont);
+}
+
+void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept {
+ cont->Scheduled_ = true;
+ Ready_.PushBack(cont);
+}
+
namespace {
inline TContExecutor*& ThisThreadExecutor() {
struct TThisThreadExecutorHolder {
@@ -285,12 +285,12 @@ namespace {
}
}
-void TContExecutor::DeleteScheduled() noexcept {
- ToDelete_.ForEach([this](TCont* c) {
- Release(c);
- });
-}
-
+void TContExecutor::DeleteScheduled() noexcept {
+ ToDelete_.ForEach([this](TCont* c) {
+ Release(c);
+ });
+}
+
TCont* RunningCont() {
TContExecutor* thisThreadExecutor = ThisThreadExecutor();
return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr;
@@ -314,7 +314,7 @@ void TContExecutor::RunScheduler() noexcept {
WaitForIO();
DeleteScheduled();
Ready_.Append(ReadyNext_);
-
+
if (Ready_.Empty()) {
Current_ = nullptr;
if (caller) {
@@ -322,18 +322,18 @@ void TContExecutor::RunScheduler() noexcept {
}
break;
}
-
- TCont* cont = Ready_.PopFront();
+
+ TCont* cont = Ready_.PopFront();
if (ScheduleCallback_) {
ScheduleCallback_->OnSchedule(*this, *cont);
- }
+ }
Current_ = cont;
cont->Scheduled_ = false;
if (cont == caller) {
break;
- }
+ }
context->SwitchTo(cont->Trampoline_.Context());
if (Paused_) {
Paused_ = false;
@@ -346,9 +346,9 @@ void TContExecutor::RunScheduler() noexcept {
}
} catch (...) {
TBackTrace::FromCurrentException().PrintTo(Cerr);
- Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str());
+ Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str());
}
-}
+}
void TContExecutor::Pause() {
if (auto cont = Running()) {
@@ -358,17 +358,17 @@ void TContExecutor::Pause() {
}
}
-void TContExecutor::Exit(TCont* cont) noexcept {
- ScheduleToDelete(cont);
- cont->SwitchTo(&SchedContext_);
- Y_FAIL("can not return from exit");
-}
-
+void TContExecutor::Exit(TCont* cont) noexcept {
+ ScheduleToDelete(cont);
+ cont->SwitchTo(&SchedContext_);
+ Y_FAIL("can not return from exit");
+}
+
TInstant TContExecutor::Now() {
return Y_LIKELY(Time_ == nullptr) ? TInstant::Now() : Time_->Now();
}
-template <>
-void Out<TCont>(IOutputStream& out, const TCont& c) {
- c.PrintMe(out);
+template <>
+void Out<TCont>(IOutputStream& out, const TCont& c) {
+ c.PrintMe(out);
}