aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/coroutine
diff options
context:
space:
mode:
authorRuslan Kovalev <ruslan.a.kovalev@gmail.com>2022-02-10 16:46:45 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:45 +0300
commit9123176b341b6f2658cff5132482b8237c1416c8 (patch)
tree49e222ea1c5804306084bb3ae065bb702625360f /library/cpp/coroutine
parent59e19371de37995fcb36beb16cd6ec030af960bc (diff)
downloadydb-9123176b341b6f2658cff5132482b8237c1416c8.tar.gz
Restoring authorship annotation for Ruslan Kovalev <ruslan.a.kovalev@gmail.com>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/coroutine')
-rw-r--r--library/cpp/coroutine/engine/callbacks.h22
-rw-r--r--library/cpp/coroutine/engine/condvar.h18
-rw-r--r--library/cpp/coroutine/engine/cont_poller.cpp128
-rw-r--r--library/cpp/coroutine/engine/cont_poller.h356
-rw-r--r--library/cpp/coroutine/engine/coroutine_ut.cpp1112
-rw-r--r--library/cpp/coroutine/engine/events.h66
-rw-r--r--library/cpp/coroutine/engine/impl.cpp408
-rw-r--r--library/cpp/coroutine/engine/impl.h250
-rw-r--r--library/cpp/coroutine/engine/iostatus.h38
-rw-r--r--library/cpp/coroutine/engine/mutex.h20
-rw-r--r--library/cpp/coroutine/engine/network.cpp484
-rw-r--r--library/cpp/coroutine/engine/network.h74
-rw-r--r--library/cpp/coroutine/engine/poller.cpp130
-rw-r--r--library/cpp/coroutine/engine/poller.h28
-rw-r--r--library/cpp/coroutine/engine/sockmap.h2
-rw-r--r--library/cpp/coroutine/engine/sockpool.cpp4
-rw-r--r--library/cpp/coroutine/engine/sockpool.h72
-rw-r--r--library/cpp/coroutine/engine/trampoline.cpp50
-rw-r--r--library/cpp/coroutine/engine/trampoline.h60
-rw-r--r--library/cpp/coroutine/engine/ya.make16
-rw-r--r--library/cpp/coroutine/listener/listen.cpp28
-rw-r--r--library/cpp/coroutine/listener/listen.h2
22 files changed, 1684 insertions, 1684 deletions
diff --git a/library/cpp/coroutine/engine/callbacks.h b/library/cpp/coroutine/engine/callbacks.h
index 640e6c25d7..e81b17344f 100644
--- a/library/cpp/coroutine/engine/callbacks.h
+++ b/library/cpp/coroutine/engine/callbacks.h
@@ -1,18 +1,18 @@
-#pragma once
-
-class TCont;
-class TContExecutor;
-
-namespace NCoro {
- class IScheduleCallback {
- public:
+#pragma once
+
+class TCont;
+class TContExecutor;
+
+namespace NCoro {
+ class IScheduleCallback {
+ public:
virtual void OnSchedule(TContExecutor&, TCont&) = 0;
virtual void OnUnschedule(TContExecutor&) = 0;
};
-
+
class IEnterPollerCallback {
public:
virtual void OnEnterPoller() = 0;
virtual void OnExitPoller() = 0;
- };
-}
+ };
+}
diff --git a/library/cpp/coroutine/engine/condvar.h b/library/cpp/coroutine/engine/condvar.h
index ecfeab5b73..ffceede6fa 100644
--- a/library/cpp/coroutine/engine/condvar.h
+++ b/library/cpp/coroutine/engine/condvar.h
@@ -1,11 +1,11 @@
-#pragma once
+#pragma once
+
+#include "events.h"
+#include "mutex.h"
-#include "events.h"
-#include "mutex.h"
-
class TContCondVar {
public:
- int WaitD(TCont* current, TContMutex* mutex, TInstant deadline) {
+ int WaitD(TCont* current, TContMutex* mutex, TInstant deadline) {
mutex->UnLock();
const int ret = WaitQueue_.WaitD(current, deadline);
@@ -17,19 +17,19 @@ public:
return mutex->LockD(current, deadline);
}
- int WaitT(TCont* current, TContMutex* mutex, TDuration timeout) {
+ int WaitT(TCont* current, TContMutex* mutex, TDuration timeout) {
return WaitD(current, mutex, timeout.ToDeadLine());
}
- int WaitI(TCont* current, TContMutex* mutex) {
+ int WaitI(TCont* current, TContMutex* mutex) {
return WaitD(current, mutex, TInstant::Max());
}
- void Signal() noexcept {
+ void Signal() noexcept {
WaitQueue_.Signal();
}
- void BroadCast() noexcept {
+ void BroadCast() noexcept {
WaitQueue_.BroadCast();
}
diff --git a/library/cpp/coroutine/engine/cont_poller.cpp b/library/cpp/coroutine/engine/cont_poller.cpp
index d801fe617f..76593d4e9b 100644
--- a/library/cpp/coroutine/engine/cont_poller.cpp
+++ b/library/cpp/coroutine/engine/cont_poller.cpp
@@ -1,70 +1,70 @@
-#include "cont_poller.h"
+#include "cont_poller.h"
#include "impl.h"
-namespace NCoro {
- namespace {
- template <class T>
- int DoExecuteEvent(T* event) noexcept {
- auto* cont = event->Cont();
-
- if (cont->Cancelled()) {
- return ECANCELED;
- }
-
- cont->Executor()->ScheduleIoWait(event);
+namespace NCoro {
+ namespace {
+ template <class T>
+ int DoExecuteEvent(T* event) noexcept {
+ auto* cont = event->Cont();
+
+ if (cont->Cancelled()) {
+ return ECANCELED;
+ }
+
+ cont->Executor()->ScheduleIoWait(event);
cont->Switch();
-
- if (cont->Cancelled()) {
- return ECANCELED;
- }
-
- return event->Status();
- }
+
+ if (cont->Cancelled()) {
+ return ECANCELED;
+ }
+
+ return event->Status();
+ }
+ }
+
+ void TContPollEvent::Wake() noexcept {
+ UnLink();
+ Cont()->ReSchedule();
}
- void TContPollEvent::Wake() noexcept {
- UnLink();
- Cont()->ReSchedule();
- }
-
-
- TInstant TEventWaitQueue::WakeTimedout(TInstant now) noexcept {
- TIoWait::TIterator it = IoWait_.Begin();
-
- if (it != IoWait_.End()) {
- if (it->DeadLine() > now) {
- return it->DeadLine();
- }
-
- do {
- (it++)->Wake(ETIMEDOUT);
- } while (it != IoWait_.End() && it->DeadLine() <= now);
- }
-
- return now;
- }
-
- void TEventWaitQueue::Register(NCoro::TContPollEvent* event) {
- IoWait_.Insert(event);
- event->Cont()->Unlink();
- }
-
- void TEventWaitQueue::Abort() noexcept {
- auto visitor = [](TContPollEvent& e) {
- e.Cont()->Cancel();
- };
- IoWait_.ForEach(visitor);
- }
+
+ TInstant TEventWaitQueue::WakeTimedout(TInstant now) noexcept {
+ TIoWait::TIterator it = IoWait_.Begin();
+
+ if (it != IoWait_.End()) {
+ if (it->DeadLine() > now) {
+ return it->DeadLine();
+ }
+
+ do {
+ (it++)->Wake(ETIMEDOUT);
+ } while (it != IoWait_.End() && it->DeadLine() <= now);
+ }
+
+ return now;
+ }
+
+ void TEventWaitQueue::Register(NCoro::TContPollEvent* event) {
+ IoWait_.Insert(event);
+ event->Cont()->Unlink();
+ }
+
+ void TEventWaitQueue::Abort() noexcept {
+ auto visitor = [](TContPollEvent& e) {
+ e.Cont()->Cancel();
+ };
+ IoWait_.ForEach(visitor);
+ }
+}
+
+void TFdEvent::RemoveFromIOWait() noexcept {
+ this->Cont()->Executor()->Poller()->Remove(this);
+}
+
+int ExecuteEvent(TFdEvent* event) noexcept {
+ return NCoro::DoExecuteEvent(event);
+}
+
+int ExecuteEvent(TTimerEvent* event) noexcept {
+ return NCoro::DoExecuteEvent(event);
}
-
-void TFdEvent::RemoveFromIOWait() noexcept {
- this->Cont()->Executor()->Poller()->Remove(this);
-}
-
-int ExecuteEvent(TFdEvent* event) noexcept {
- return NCoro::DoExecuteEvent(event);
-}
-
-int ExecuteEvent(TTimerEvent* event) noexcept {
- return NCoro::DoExecuteEvent(event);
-}
diff --git a/library/cpp/coroutine/engine/cont_poller.h b/library/cpp/coroutine/engine/cont_poller.h
index 824b05489a..b638b2df1a 100644
--- a/library/cpp/coroutine/engine/cont_poller.h
+++ b/library/cpp/coroutine/engine/cont_poller.h
@@ -1,245 +1,245 @@
-#pragma once
+#pragma once
#include "poller.h"
-#include "sockmap.h"
+#include "sockmap.h"
#include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>
-
-#include <util/datetime/base.h>
-#include <util/memory/pool.h>
+
+#include <util/datetime/base.h>
+#include <util/memory/pool.h>
#include <util/memory/smallobj.h>
-#include <util/network/init.h>
+#include <util/network/init.h>
+
+#include <cerrno>
+
-#include <cerrno>
-
-
class TCont;
class TContExecutor;
-class TFdEvent;
+class TFdEvent;
-namespace NCoro {
-
- class IPollEvent;
+namespace NCoro {
+ class IPollEvent;
- struct TContPollEventCompare {
- template <class T>
- static inline bool Compare(const T& l, const T& r) noexcept {
- return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
- }
- };
-
- class TContPollEvent : public TRbTreeItem<TContPollEvent, TContPollEventCompare> {
- public:
- TContPollEvent(TCont* cont, TInstant deadLine) noexcept
- : Cont_(cont)
- , DeadLine_(deadLine)
- {}
+ struct TContPollEventCompare {
+ template <class T>
+ static inline bool Compare(const T& l, const T& r) noexcept {
+ return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
+ }
+ };
- static bool Compare(const TContPollEvent& l, const TContPollEvent& r) noexcept {
- return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
- }
- int Status() const noexcept {
- return Status_;
- }
+ class TContPollEvent : public TRbTreeItem<TContPollEvent, TContPollEventCompare> {
+ public:
+ TContPollEvent(TCont* cont, TInstant deadLine) noexcept
+ : Cont_(cont)
+ , DeadLine_(deadLine)
+ {}
- void SetStatus(int status) noexcept {
- Status_ = status;
- }
+ static bool Compare(const TContPollEvent& l, const TContPollEvent& r) noexcept {
+ return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
+ }
- TCont* Cont() noexcept {
- return Cont_;
- }
+ int Status() const noexcept {
+ return Status_;
+ }
+
+ void SetStatus(int status) noexcept {
+ Status_ = status;
+ }
+
+ TCont* Cont() noexcept {
+ return Cont_;
+ }
- TInstant DeadLine() const noexcept {
- return DeadLine_;
- }
+ TInstant DeadLine() const noexcept {
+ return DeadLine_;
+ }
- void Wake(int status) noexcept {
- SetStatus(status);
- Wake();
+ void Wake(int status) noexcept {
+ SetStatus(status);
+ Wake();
}
- private:
- void Wake() noexcept;
+ private:
+ void Wake() noexcept;
- private:
- TCont* Cont_;
- TInstant DeadLine_;
- int Status_ = EINPROGRESS;
+ private:
+ TCont* Cont_;
+ TInstant DeadLine_;
+ int Status_ = EINPROGRESS;
};
- class IPollEvent: public TIntrusiveListItem<IPollEvent> {
- public:
- IPollEvent(SOCKET fd, ui16 what) noexcept
- : Fd_(fd)
- , What_(what)
- {}
+ class IPollEvent: public TIntrusiveListItem<IPollEvent> {
+ public:
+ IPollEvent(SOCKET fd, ui16 what) noexcept
+ : Fd_(fd)
+ , What_(what)
+ {}
+
+ virtual ~IPollEvent() {}
- virtual ~IPollEvent() {}
+ SOCKET Fd() const noexcept {
+ return Fd_;
+ }
- SOCKET Fd() const noexcept {
- return Fd_;
- }
+ int What() const noexcept {
+ return What_;
+ }
- int What() const noexcept {
- return What_;
- }
+ virtual void OnPollEvent(int status) noexcept = 0;
- virtual void OnPollEvent(int status) noexcept = 0;
+ private:
+ SOCKET Fd_;
+ ui16 What_;
+ };
- private:
- SOCKET Fd_;
- ui16 What_;
- };
+ template <class T>
+ class TBigArray {
+ struct TValue: public T, public TObjectFromPool<TValue> {
+ TValue() {}
+ };
- template <class T>
- class TBigArray {
- struct TValue: public T, public TObjectFromPool<TValue> {
- TValue() {}
- };
+ public:
+ TBigArray()
+ : Pool_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
+ {}
- public:
- TBigArray()
- : Pool_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
- {}
-
- T* Get(size_t index) {
- TRef& ret = Lst_.Get(index);
- if (!ret) {
+ T* Get(size_t index) {
+ TRef& ret = Lst_.Get(index);
+ if (!ret) {
ret = TRef(new (&Pool_) TValue());
- }
- return ret.Get();
- }
-
- private:
- using TRef = THolder<TValue>;
- typename TValue::TPool Pool_;
- TSocketMap<TRef> Lst_;
- };
-
-
- using TPollEventList = TIntrusiveList<IPollEvent>;
-
- class TContPoller {
- public:
- using TEvent = IPollerFace::TEvent;
- using TEvents = IPollerFace::TEvents;
-
- TContPoller()
- : P_(IPollerFace::Default())
- {
- }
-
- explicit TContPoller(THolder<IPollerFace> poller)
- : P_(std::move(poller))
- {}
-
- void Schedule(IPollEvent* event) {
- auto* lst = Lists_.Get(event->Fd());
- const ui16 oldFlags = Flags(*lst);
- lst->PushFront(event);
+ }
+ return ret.Get();
+ }
+
+ private:
+ using TRef = THolder<TValue>;
+ typename TValue::TPool Pool_;
+ TSocketMap<TRef> Lst_;
+ };
+
+
+ using TPollEventList = TIntrusiveList<IPollEvent>;
+
+ class TContPoller {
+ public:
+ using TEvent = IPollerFace::TEvent;
+ using TEvents = IPollerFace::TEvents;
+
+ TContPoller()
+ : P_(IPollerFace::Default())
+ {
+ }
+
+ explicit TContPoller(THolder<IPollerFace> poller)
+ : P_(std::move(poller))
+ {}
+
+ void Schedule(IPollEvent* event) {
+ auto* lst = Lists_.Get(event->Fd());
+ const ui16 oldFlags = Flags(*lst);
+ lst->PushFront(event);
ui16 newFlags = Flags(*lst);
- if (newFlags != oldFlags) {
+ if (newFlags != oldFlags) {
if (oldFlags) {
newFlags |= CONT_POLL_MODIFY;
}
- P_->Set(lst, event->Fd(), newFlags);
- }
+ P_->Set(lst, event->Fd(), newFlags);
+ }
}
- void Remove(IPollEvent* event) noexcept {
- auto* lst = Lists_.Get(event->Fd());
- const ui16 oldFlags = Flags(*lst);
- event->Unlink();
+ void Remove(IPollEvent* event) noexcept {
+ auto* lst = Lists_.Get(event->Fd());
+ const ui16 oldFlags = Flags(*lst);
+ event->Unlink();
ui16 newFlags = Flags(*lst);
- if (newFlags != oldFlags) {
+ if (newFlags != oldFlags) {
if (newFlags) {
newFlags |= CONT_POLL_MODIFY;
}
- P_->Set(lst, event->Fd(), newFlags);
- }
+ P_->Set(lst, event->Fd(), newFlags);
+ }
}
- void Wait(TEvents& events, TInstant deadLine) {
- events.clear();
- P_->Wait(events, deadLine);
+ void Wait(TEvents& events, TInstant deadLine) {
+ events.clear();
+ P_->Wait(events, deadLine);
}
EContPoller PollEngine() const {
return P_->PollEngine();
}
- private:
- static ui16 Flags(TIntrusiveList<IPollEvent>& lst) noexcept {
- ui16 ret = 0;
- for (auto&& item : lst) {
- ret |= item.What();
+ private:
+ static ui16 Flags(TIntrusiveList<IPollEvent>& lst) noexcept {
+ ui16 ret = 0;
+ for (auto&& item : lst) {
+ ret |= item.What();
}
- return ret;
+ return ret;
}
- private:
- TBigArray<TPollEventList> Lists_;
- THolder<IPollerFace> P_;
+ private:
+ TBigArray<TPollEventList> Lists_;
+ THolder<IPollerFace> P_;
};
-
-
- class TEventWaitQueue {
- using TIoWait = TRbTree<NCoro::TContPollEvent, NCoro::TContPollEventCompare>;
-
- public:
- void Register(NCoro::TContPollEvent* event);
-
- bool Empty() const noexcept {
- return IoWait_.Empty();
- }
-
- void Abort() noexcept;
-
- TInstant WakeTimedout(TInstant now) noexcept;
-
- private:
- TIoWait IoWait_;
- };
-}
-
-class TFdEvent final:
- public NCoro::TContPollEvent,
- public NCoro::IPollEvent
-{
+
+
+ class TEventWaitQueue {
+ using TIoWait = TRbTree<NCoro::TContPollEvent, NCoro::TContPollEventCompare>;
+
+ public:
+ void Register(NCoro::TContPollEvent* event);
+
+ bool Empty() const noexcept {
+ return IoWait_.Empty();
+ }
+
+ void Abort() noexcept;
+
+ TInstant WakeTimedout(TInstant now) noexcept;
+
+ private:
+ TIoWait IoWait_;
+ };
+}
+
+class TFdEvent final:
+ public NCoro::TContPollEvent,
+ public NCoro::IPollEvent
+{
public:
- TFdEvent(TCont* cont, SOCKET fd, ui16 what, TInstant deadLine) noexcept
- : TContPollEvent(cont, deadLine)
- , IPollEvent(fd, what)
- {}
-
- ~TFdEvent() {
- RemoveFromIOWait();
+ TFdEvent(TCont* cont, SOCKET fd, ui16 what, TInstant deadLine) noexcept
+ : TContPollEvent(cont, deadLine)
+ , IPollEvent(fd, what)
+ {}
+
+ ~TFdEvent() {
+ RemoveFromIOWait();
}
- void RemoveFromIOWait() noexcept;
+ void RemoveFromIOWait() noexcept;
- void OnPollEvent(int status) noexcept override {
- Wake(status);
+ void OnPollEvent(int status) noexcept override {
+ Wake(status);
}
};
-
-class TTimerEvent: public NCoro::TContPollEvent {
+
+class TTimerEvent: public NCoro::TContPollEvent {
public:
- TTimerEvent(TCont* cont, TInstant deadLine) noexcept
- : TContPollEvent(cont, deadLine)
- {}
+ TTimerEvent(TCont* cont, TInstant deadLine) noexcept
+ : TContPollEvent(cont, deadLine)
+ {}
};
-
-int ExecuteEvent(TFdEvent* event) noexcept;
-
-int ExecuteEvent(TTimerEvent* event) noexcept;
+
+int ExecuteEvent(TFdEvent* event) noexcept;
+
+int ExecuteEvent(TTimerEvent* event) noexcept;
diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp
index 1ba31245b0..8b372496a2 100644
--- a/library/cpp/coroutine/engine/coroutine_ut.cpp
+++ b/library/cpp/coroutine/engine/coroutine_ut.cpp
@@ -1,19 +1,19 @@
#include "impl.h"
-#include "condvar.h"
-#include "network.h"
+#include "condvar.h"
+#include "network.h"
#include <library/cpp/testing/unittest/registar.h>
#include <util/string/cast.h>
#include <util/system/pipe.h>
-#include <util/system/env.h>
-#include <util/system/info.h>
+#include <util/system/env.h>
+#include <util/system/info.h>
#include <util/system/thread.h>
-#include <util/generic/xrange.h>
+#include <util/generic/xrange.h>
#include <util/generic/serialized_enum.h>
-// TODO (velavokr): BALANCER-1345 add more tests on pollers
-
+// TODO (velavokr): BALANCER-1345 add more tests on pollers
+
class TCoroTest: public TTestBase {
UNIT_TEST_SUITE(TCoroTest);
UNIT_TEST(TestSimpleX1);
@@ -23,23 +23,23 @@ class TCoroTest: public TTestBase {
UNIT_TEST(TestMemFun);
UNIT_TEST(TestMutex);
UNIT_TEST(TestCondVar);
- UNIT_TEST(TestJoinDefault);
- UNIT_TEST(TestJoinEpoll);
- UNIT_TEST(TestJoinKqueue);
- UNIT_TEST(TestJoinPoll);
- UNIT_TEST(TestJoinSelect);
+ UNIT_TEST(TestJoinDefault);
+ UNIT_TEST(TestJoinEpoll);
+ UNIT_TEST(TestJoinKqueue);
+ UNIT_TEST(TestJoinPoll);
+ UNIT_TEST(TestJoinSelect);
UNIT_TEST(TestException);
- UNIT_TEST(TestJoinCancelExitRaceBug);
- UNIT_TEST(TestWaitWakeLivelockBug);
- UNIT_TEST(TestFastPathWakeDefault)
- // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs
-// UNIT_TEST(TestFastPathWakeEpoll)
- UNIT_TEST(TestFastPathWakeKqueue)
- UNIT_TEST(TestFastPathWakePoll)
+ UNIT_TEST(TestJoinCancelExitRaceBug);
+ UNIT_TEST(TestWaitWakeLivelockBug);
+ UNIT_TEST(TestFastPathWakeDefault)
+ // TODO (velavokr): BALANCER-1338 our epoll wrapper cannot handle pipe eofs
+// UNIT_TEST(TestFastPathWakeEpoll)
+ UNIT_TEST(TestFastPathWakeKqueue)
+ UNIT_TEST(TestFastPathWakePoll)
UNIT_TEST(TestFastPathWakeSelect)
- UNIT_TEST(TestLegacyCancelYieldRaceBug)
- UNIT_TEST(TestJoinRescheduleBug);
- UNIT_TEST(TestEventQueue)
+ UNIT_TEST(TestLegacyCancelYieldRaceBug)
+ UNIT_TEST(TestJoinRescheduleBug);
+ UNIT_TEST(TestEventQueue)
UNIT_TEST(TestNestedExecutor)
UNIT_TEST(TestComputeCoroutineYield)
UNIT_TEST(TestPollEngines);
@@ -57,21 +57,21 @@ public:
void TestMemFun();
void TestMutex();
void TestCondVar();
- void TestJoinDefault();
- void TestJoinEpoll();
- void TestJoinKqueue();
- void TestJoinPoll();
- void TestJoinSelect();
- void TestJoinCancelExitRaceBug();
- void TestWaitWakeLivelockBug();
- void TestFastPathWakeDefault();
- void TestFastPathWakeEpoll();
- void TestFastPathWakeKqueue();
- void TestFastPathWakePoll();
- void TestFastPathWakeSelect();
- void TestLegacyCancelYieldRaceBug();
- void TestJoinRescheduleBug();
- void TestEventQueue();
+ void TestJoinDefault();
+ void TestJoinEpoll();
+ void TestJoinKqueue();
+ void TestJoinPoll();
+ void TestJoinSelect();
+ void TestJoinCancelExitRaceBug();
+ void TestWaitWakeLivelockBug();
+ void TestFastPathWakeDefault();
+ void TestFastPathWakeEpoll();
+ void TestFastPathWakeKqueue();
+ void TestFastPathWakePoll();
+ void TestFastPathWakeSelect();
+ void TestLegacyCancelYieldRaceBug();
+ void TestJoinRescheduleBug();
+ void TestEventQueue();
void TestNestedExecutor();
void TestComputeCoroutineYield();
void TestPollEngines();
@@ -88,7 +88,7 @@ void TCoroTest::TestException() {
auto f1 = [&f2run](TCont* c) {
struct TCtx {
~TCtx() {
- Y_VERIFY(!*F2);
+ Y_VERIFY(!*F2);
C->Yield();
}
@@ -144,18 +144,18 @@ static void CoMain(TCont* c, void* /*arg*/) {
}
}
-void TCoroTest::TestSimpleX1() {
- i0 = 0;
- TContExecutor e(32000);
+void TCoroTest::TestSimpleX1() {
+ i0 = 0;
+ TContExecutor e(32000);
UNIT_ASSERT(RunningCont() == nullptr);
- e.Execute(CoMain);
- UNIT_ASSERT_VALUES_EQUAL(i0, 100000);
+ e.Execute(CoMain);
+ UNIT_ASSERT_VALUES_EQUAL(i0, 100000);
UNIT_ASSERT(RunningCont() == nullptr);
-}
-
+}
+
void TCoroTest::TestSimpleX1MultiThread() {
TVector<THolder<TThread>> threads;
const size_t nThreads = 0;
@@ -179,10 +179,10 @@ void TCoroTest::TestSimpleX1MultiThread() {
}
struct TTestObject {
- int i = 0;
- int j = 0;
+ int i = 0;
+ int j = 0;
-public:
+public:
void RunTask1(TCont*) {
i = 1;
}
@@ -325,7 +325,7 @@ void TCoroTest::TestCondVar() {
res.clear();
}
-namespace NCoroTestJoin {
+namespace NCoroTestJoin {
struct TSleepCont {
const TInstant Deadline;
int Result;
@@ -342,7 +342,7 @@ namespace NCoroTestJoin {
inline void operator()(TCont* c) {
char buf = 0;
- Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status();
+ Result = NCoro::ReadD(c, Sock, &buf, sizeof(buf), Deadline).Status();
}
};
@@ -356,513 +356,513 @@ namespace NCoroTestJoin {
}
};
- void DoTestJoin(EContPoller pollerType) {
- auto poller = IPollerFace::Construct(pollerType);
-
- if (!poller) {
- return;
- }
-
- TContExecutor e(32000, std::move(poller));
-
- TPipe in, out;
- TPipe::Pipe(in, out);
- SetNonBlock(in.GetHandle());
-
- {
- TSleepCont sc = {TInstant::Max(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
-
- {
- TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT);
- UNIT_ASSERT_EQUAL(jc.Result, true);
- }
-
- {
- TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
-
- {
- TReadCont rc = {TInstant::Max(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
-
- {
- TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT);
- UNIT_ASSERT_EQUAL(jc.Result, true);
- }
-
- {
- TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0};
- TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
-
- e.Execute(jc);
-
- UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
- UNIT_ASSERT_EQUAL(jc.Result, false);
- }
- }
-}
-
-void TCoroTest::TestJoinDefault() {
- NCoroTestJoin::DoTestJoin(EContPoller::Default);
-}
-
-void TCoroTest::TestJoinEpoll() {
- NCoroTestJoin::DoTestJoin(EContPoller::Epoll);
-}
-
-void TCoroTest::TestJoinKqueue() {
- NCoroTestJoin::DoTestJoin(EContPoller::Kqueue);
-}
-
-void TCoroTest::TestJoinPoll() {
- NCoroTestJoin::DoTestJoin(EContPoller::Poll);
-}
-
-void TCoroTest::TestJoinSelect() {
- NCoroTestJoin::DoTestJoin(EContPoller::Select);
-}
-
-namespace NCoroJoinCancelExitRaceBug {
- struct TState {
- TCont* Sub = nullptr;
- };
-
- static void DoAux(TCont*, void* argPtr) noexcept {
- TState& state = *(TState*)(argPtr);
-
- // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]}
- state.Sub->Cancel();
- }
-
- static void DoSub2(TCont*, void*) noexcept {
- // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]}
- // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]}
- }
-
- static void DoSub(TCont* cont, void* argPtr) noexcept {
- TState& state = *(TState*)(argPtr);
- state.Sub = cont;
-
- // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]}
- auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2");
-
- // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux)
- // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]}
- // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit
- cont->Join(sub2);
-
- state.Sub = nullptr;
- }
-
- static void DoMain(TCont* cont) noexcept {
- TState state;
-
- // 01.{Ready:[]} > {Ready:[Sub]}
- auto* sub = cont->Executor()->Create(DoSub, &state, "Sub");
-
- // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]}
- cont->Executor()->Create(DoAux, &state, "Aux");
-
- // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub)
- cont->Join(sub);
- }
-}
-
-void TCoroTest::TestJoinCancelExitRaceBug() {
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroJoinCancelExitRaceBug::DoMain);
-}
-
-namespace NCoroWaitWakeLivelockBug {
- struct TState;
-
- struct TSubState {
- TSubState(TState& parent, ui32 self)
- : Parent(parent)
+ void DoTestJoin(EContPoller pollerType) {
+ auto poller = IPollerFace::Construct(pollerType);
+
+ if (!poller) {
+ return;
+ }
+
+ TContExecutor e(32000, std::move(poller));
+
+ TPipe in, out;
+ TPipe::Pipe(in, out);
+ SetNonBlock(in.GetHandle());
+
+ {
+ TSleepCont sc = {TInstant::Max(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+
+ {
+ TSleepCont sc = {TDuration::MilliSeconds(100).ToDeadLine(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(sc, "sc"), false};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(sc.Result, ETIMEDOUT);
+ UNIT_ASSERT_EQUAL(jc.Result, true);
+ }
+
+ {
+ TSleepCont sc = {TDuration::MilliSeconds(200).ToDeadLine(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(sc, "sc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(sc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+
+ {
+ TReadCont rc = {TInstant::Max(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+
+ {
+ TReadCont rc = {TDuration::MilliSeconds(100).ToDeadLine(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(200).ToDeadLine(), e.Create(rc, "rc"), false};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(rc.Result, ETIMEDOUT);
+ UNIT_ASSERT_EQUAL(jc.Result, true);
+ }
+
+ {
+ TReadCont rc = {TDuration::MilliSeconds(200).ToDeadLine(), in.GetHandle(), 0};
+ TJoinCont jc = {TDuration::MilliSeconds(100).ToDeadLine(), e.Create(rc, "rc"), true};
+
+ e.Execute(jc);
+
+ UNIT_ASSERT_EQUAL(rc.Result, ECANCELED);
+ UNIT_ASSERT_EQUAL(jc.Result, false);
+ }
+ }
+}
+
+void TCoroTest::TestJoinDefault() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Default);
+}
+
+void TCoroTest::TestJoinEpoll() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Epoll);
+}
+
+void TCoroTest::TestJoinKqueue() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Kqueue);
+}
+
+void TCoroTest::TestJoinPoll() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Poll);
+}
+
+void TCoroTest::TestJoinSelect() {
+ NCoroTestJoin::DoTestJoin(EContPoller::Select);
+}
+
+namespace NCoroJoinCancelExitRaceBug {
+ struct TState {
+ TCont* Sub = nullptr;
+ };
+
+ static void DoAux(TCont*, void* argPtr) noexcept {
+ TState& state = *(TState*)(argPtr);
+
+ // 06.{Ready:[Sub2]} > {Ready:[Sub2,Sub]}
+ state.Sub->Cancel();
+ }
+
+ static void DoSub2(TCont*, void*) noexcept {
+ // 07.{Ready:[Sub]} > Exit > {Ready:[Sub],ToDelete:[Sub2]}
+ // 08.{Ready:[Sub],ToDelete:[Sub2]} > Release(Sub2) > {Ready:[Sub],Deleted:[Sub2]}
+ }
+
+ static void DoSub(TCont* cont, void* argPtr) noexcept {
+ TState& state = *(TState*)(argPtr);
+ state.Sub = cont;
+
+ // 04.{Ready:[Aux]} > {Ready:[Aux,Sub2]}
+ auto* sub2 = cont->Executor()->Create(DoSub2, argPtr, "Sub2");
+
+ // 05.{Ready:[Aux,Sub2]} > SwitchTo(Aux)
+ // 09.{Ready:[],Deleted:[Sub2]} > Cancel(Sub2) > {Ready:[Sub2],Deleted:[Sub2]}
+ // 10.{Ready:[Sub2],Deleted:[Sub2]} > SwitchTo(Sub2) > FAIL: can not return from exit
+ cont->Join(sub2);
+
+ state.Sub = nullptr;
+ }
+
+ static void DoMain(TCont* cont) noexcept {
+ TState state;
+
+ // 01.{Ready:[]} > {Ready:[Sub]}
+ auto* sub = cont->Executor()->Create(DoSub, &state, "Sub");
+
+ // 02.{Ready:[Sub]} > {Ready:[Sub,Aux]}
+ cont->Executor()->Create(DoAux, &state, "Aux");
+
+ // 03.{Ready:[Sub,Aux]} > SwitchTo(Sub)
+ cont->Join(sub);
+ }
+}
+
+void TCoroTest::TestJoinCancelExitRaceBug() {
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroJoinCancelExitRaceBug::DoMain);
+}
+
+namespace NCoroWaitWakeLivelockBug {
+ struct TState;
+
+ struct TSubState {
+ TSubState(TState& parent, ui32 self)
+ : Parent(parent)
, Name(TStringBuilder() << "Sub" << self)
- , Self(self)
- {
- UNIT_ASSERT(self < 2);
- }
-
- TSubState& OtherState();
-
- TState& Parent;
- TTimerEvent* Event = nullptr;
- TCont* Cont = nullptr;
- TString Name;
- ui32 Self = -1;
- };
-
- struct TState {
- TState()
- : Subs{{*this, 0}, {*this, 1}}
- {}
-
- TSubState Subs[2];
- bool Stop = false;
- };
-
- TSubState& TSubState::OtherState() {
- return Parent.Subs[1 - Self];
- }
-
- static void DoStop(TCont* cont, void* argPtr) {
- TState& state = *(TState*)(argPtr);
-
- TTimerEvent event(cont, TInstant::Now());
- ExecuteEvent(&event);
- state.Stop = true;
- for (auto& sub: state.Subs) {
- if (sub.Event) {
- sub.Event->Wake(EWAKEDUP);
- }
- }
- }
-
- static void DoSub(TCont* cont, void* argPtr) {
- TSubState& state = *(TSubState*)(argPtr);
-
- while (!state.Parent.Stop) {
- TTimerEvent event(cont, TInstant::Max());
- if (state.OtherState().Event) {
- state.OtherState().Event->Wake(EWAKEDUP);
- }
- state.Event = &event;
- ExecuteEvent(&event);
- state.Event = nullptr;
- }
-
- state.Cont = nullptr;
- }
-
- static void DoMain(TCont* cont) noexcept {
- TState state;
-
- for (auto& subState : state.Subs) {
- subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
- }
-
- cont->Join(
- cont->Executor()->Create(DoStop, &state, "Stop")
- );
-
- for (auto& subState : state.Subs) {
- if (subState.Cont) {
- cont->Join(subState.Cont);
- }
- }
- }
-}
-
-void TCoroTest::TestWaitWakeLivelockBug() {
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroWaitWakeLivelockBug::DoMain);
-}
-
-namespace NCoroTestFastPathWake {
- struct TState;
-
- struct TSubState {
- TSubState(TState& parent, ui32 self)
- : Parent(parent)
- , Name(TStringBuilder() << "Sub" << self)
- {}
-
- TState& Parent;
- TInstant Finish;
- TTimerEvent* Event = nullptr;
- TCont* Cont = nullptr;
- TString Name;
- };
-
- struct TState {
- TState()
- : Subs{{*this, 0}, {*this, 1}}
- {
- TPipe::Pipe(In, Out);
- SetNonBlock(In.GetHandle());
- }
-
- TSubState Subs[2];
- TPipe In, Out;
- bool IoSleepRunning = false;
- };
-
- static void DoIoSleep(TCont* cont, void* argPtr) noexcept {
- try {
- TState& state = *(TState*) (argPtr);
- state.IoSleepRunning = true;
-
- TTempBuf tmp;
- // Wait for the event from io
- auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine());
- UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0);
- state.IoSleepRunning = false;
- } catch (const NUnitTest::TAssertException& ex) {
- Cerr << ex.AsStrBuf() << Endl;
- ex.BackTrace()->PrintTo(Cerr);
- throw;
- } catch (...) {
- Cerr << CurrentExceptionMessage() << Endl;
- throw;
- }
- }
-
- static void DoSub(TCont* cont, void* argPtr) noexcept {
- TSubState& state = *(TSubState*)(argPtr);
-
- TTimerEvent event(cont, TInstant::Max());
- state.Event = &event;
- ExecuteEvent(&event);
- state.Event = nullptr;
- state.Cont = nullptr;
- state.Finish = TInstant::Now();
- }
-
- static void DoMain(TCont* cont) noexcept {
- try {
- TState state;
- TInstant start = TInstant::Now();
-
- // This guy sleeps on io
- auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper");
-
- // These guys are to be woken up right away
- for (auto& subState : state.Subs) {
- subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
- }
-
- // Give way
- cont->Yield();
-
- // Check everyone has started, wake those to be woken
- UNIT_ASSERT(state.IoSleepRunning);
-
- for (auto& subState : state.Subs) {
- UNIT_ASSERT(subState.Event);
- subState.Event->Wake(EWAKEDUP);
- }
-
- // Give way again
- cont->Yield();
-
- // Check the woken guys have finished and quite soon
- for (auto& subState : state.Subs) {
- UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100));
- UNIT_ASSERT(!subState.Cont);
- }
-
- // Wake the io guy and finish
- state.Out.Close();
-
- if (state.IoSleepRunning) {
- cont->Join(sleeper);
- }
-
- // Check everything has ended sooner than the timeout
- UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1));
- } catch (const NUnitTest::TAssertException& ex) {
- Cerr << ex.AsStrBuf() << Endl;
- ex.BackTrace()->PrintTo(Cerr);
- throw;
- } catch (...) {
- Cerr << CurrentExceptionMessage() << Endl;
- throw;
- }
- }
-
- static void DoTestFastPathWake(EContPoller pollerType) {
- if (auto poller = IPollerFace::Construct(pollerType)) {
- TContExecutor exec(20000, std::move(poller));
- exec.SetFailOnError(true);
- exec.Execute(NCoroTestFastPathWake::DoMain);
- }
- }
-}
-
-void TCoroTest::TestFastPathWakeDefault() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default);
-}
-
-void TCoroTest::TestFastPathWakeEpoll() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll);
-}
-
-void TCoroTest::TestFastPathWakeKqueue() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue);
-}
-
-void TCoroTest::TestFastPathWakePoll() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll);
-}
-
-void TCoroTest::TestFastPathWakeSelect() {
- NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select);
-}
-
-namespace NCoroTestLegacyCancelYieldRaceBug {
- enum class EState {
- Idle, Running, Finished,
- };
-
- struct TState {
- EState SubState = EState::Idle;
- };
-
- static void DoSub(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubState = EState::Running;
- cont->Yield();
- cont->Yield();
- state.SubState = EState::Finished;
- }
-
- static void DoMain(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub");
- sub->Cancel();
- cont->Yield();
- UNIT_ASSERT_EQUAL(state.SubState, EState::Finished);
- }
-}
-
-void TCoroTest::TestLegacyCancelYieldRaceBug() {
- NCoroTestLegacyCancelYieldRaceBug::TState state;
- TContExecutor exec(20000);
- exec.SetFailOnError(true);
- exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state);
-}
-
-namespace NCoroTestJoinRescheduleBug {
- enum class EState {
- Idle, Running, Finished,
- };
-
- struct TState {
- TCont* volatile SubA = nullptr;
- volatile EState SubAState = EState::Idle;
- volatile EState SubBState = EState::Idle;
- volatile EState SubCState = EState::Idle;
- };
-
- static void DoSubC(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubCState = EState::Running;
- while (state.SubBState != EState::Running) {
- cont->Yield();
- }
- while (cont->SleepD(TInstant::Max()) != ECANCELED) {
- }
- state.SubCState = EState::Finished;
- }
-
- static void DoSubB(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubBState = EState::Running;
- while (state.SubAState != EState::Running && state.SubCState != EState::Running) {
- cont->Yield();
- }
- for (auto i : xrange(100)) {
- Y_UNUSED(i);
- if (!state.SubA) {
- break;
- }
- state.SubA->ReSchedule();
- cont->Yield();
- }
- state.SubBState = EState::Finished;
- }
-
- static void DoSubA(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- state.SubAState = EState::Running;
- TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC");
- while (state.SubBState != EState::Running && state.SubCState != EState::Running) {
- cont->Yield();
- }
- cont->Join(subC);
- UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
- state.SubA = nullptr;
- state.SubAState = EState::Finished;
- }
-
- static void DoMain(TCont* cont, void* argPtr) {
- TState& state = *(TState*)argPtr;
- TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA");
- state.SubA = subA;
- cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB"));
-
- if (state.SubA) {
- subA->Cancel();
- cont->Join(subA);
- }
- }
-}
-
-void TCoroTest::TestJoinRescheduleBug() {
- using namespace NCoroTestJoinRescheduleBug;
- TState state;
- {
- TContExecutor exec(20000);
- exec.Execute(DoMain, &state);
- }
- UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished);
- UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished);
- UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
-}
-
-void TCoroTest::TestEventQueue() {
- NCoro::TEventWaitQueue queue;
- UNIT_ASSERT(queue.Empty());
- UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant());
- TContExecutor exec(32000);
- exec.Execute([](TCont* cont, void* arg) {
- NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg;
- TTimerEvent ev(cont, TInstant::Max());
- TTimerEvent ev2(cont, TInstant::Seconds(12345));
- q->Register(&ev);
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
- q->Register(&ev2);
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345));
- UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max());
- }, &queue);
-}
-
+ , Self(self)
+ {
+ UNIT_ASSERT(self < 2);
+ }
+
+ TSubState& OtherState();
+
+ TState& Parent;
+ TTimerEvent* Event = nullptr;
+ TCont* Cont = nullptr;
+ TString Name;
+ ui32 Self = -1;
+ };
+
+ struct TState {
+ TState()
+ : Subs{{*this, 0}, {*this, 1}}
+ {}
+
+ TSubState Subs[2];
+ bool Stop = false;
+ };
+
+ TSubState& TSubState::OtherState() {
+ return Parent.Subs[1 - Self];
+ }
+
+ static void DoStop(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)(argPtr);
+
+ TTimerEvent event(cont, TInstant::Now());
+ ExecuteEvent(&event);
+ state.Stop = true;
+ for (auto& sub: state.Subs) {
+ if (sub.Event) {
+ sub.Event->Wake(EWAKEDUP);
+ }
+ }
+ }
+
+ static void DoSub(TCont* cont, void* argPtr) {
+ TSubState& state = *(TSubState*)(argPtr);
+
+ while (!state.Parent.Stop) {
+ TTimerEvent event(cont, TInstant::Max());
+ if (state.OtherState().Event) {
+ state.OtherState().Event->Wake(EWAKEDUP);
+ }
+ state.Event = &event;
+ ExecuteEvent(&event);
+ state.Event = nullptr;
+ }
+
+ state.Cont = nullptr;
+ }
+
+ static void DoMain(TCont* cont) noexcept {
+ TState state;
+
+ for (auto& subState : state.Subs) {
+ subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
+ }
+
+ cont->Join(
+ cont->Executor()->Create(DoStop, &state, "Stop")
+ );
+
+ for (auto& subState : state.Subs) {
+ if (subState.Cont) {
+ cont->Join(subState.Cont);
+ }
+ }
+ }
+}
+
+void TCoroTest::TestWaitWakeLivelockBug() {
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroWaitWakeLivelockBug::DoMain);
+}
+
+namespace NCoroTestFastPathWake {
+ struct TState;
+
+ struct TSubState {
+ TSubState(TState& parent, ui32 self)
+ : Parent(parent)
+ , Name(TStringBuilder() << "Sub" << self)
+ {}
+
+ TState& Parent;
+ TInstant Finish;
+ TTimerEvent* Event = nullptr;
+ TCont* Cont = nullptr;
+ TString Name;
+ };
+
+ struct TState {
+ TState()
+ : Subs{{*this, 0}, {*this, 1}}
+ {
+ TPipe::Pipe(In, Out);
+ SetNonBlock(In.GetHandle());
+ }
+
+ TSubState Subs[2];
+ TPipe In, Out;
+ bool IoSleepRunning = false;
+ };
+
+ static void DoIoSleep(TCont* cont, void* argPtr) noexcept {
+ try {
+ TState& state = *(TState*) (argPtr);
+ state.IoSleepRunning = true;
+
+ TTempBuf tmp;
+ // Wait for the event from io
+ auto res = NCoro::ReadD(cont, state.In.GetHandle(), tmp.Data(), 1, TDuration::Seconds(10).ToDeadLine());
+ UNIT_ASSERT_VALUES_EQUAL(res.Checked(), 0);
+ state.IoSleepRunning = false;
+ } catch (const NUnitTest::TAssertException& ex) {
+ Cerr << ex.AsStrBuf() << Endl;
+ ex.BackTrace()->PrintTo(Cerr);
+ throw;
+ } catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ throw;
+ }
+ }
+
+ static void DoSub(TCont* cont, void* argPtr) noexcept {
+ TSubState& state = *(TSubState*)(argPtr);
+
+ TTimerEvent event(cont, TInstant::Max());
+ state.Event = &event;
+ ExecuteEvent(&event);
+ state.Event = nullptr;
+ state.Cont = nullptr;
+ state.Finish = TInstant::Now();
+ }
+
+ static void DoMain(TCont* cont) noexcept {
+ try {
+ TState state;
+ TInstant start = TInstant::Now();
+
+ // This guy sleeps on io
+ auto sleeper = cont->Executor()->Create(DoIoSleep, &state, "io_sleeper");
+
+ // These guys are to be woken up right away
+ for (auto& subState : state.Subs) {
+ subState.Cont = cont->Executor()->Create(DoSub, &subState, subState.Name.data());
+ }
+
+ // Give way
+ cont->Yield();
+
+ // Check everyone has started, wake those to be woken
+ UNIT_ASSERT(state.IoSleepRunning);
+
+ for (auto& subState : state.Subs) {
+ UNIT_ASSERT(subState.Event);
+ subState.Event->Wake(EWAKEDUP);
+ }
+
+ // Give way again
+ cont->Yield();
+
+ // Check the woken guys have finished and quite soon
+ for (auto& subState : state.Subs) {
+ UNIT_ASSERT(subState.Finish - start < TDuration::MilliSeconds(100));
+ UNIT_ASSERT(!subState.Cont);
+ }
+
+ // Wake the io guy and finish
+ state.Out.Close();
+
+ if (state.IoSleepRunning) {
+ cont->Join(sleeper);
+ }
+
+ // Check everything has ended sooner than the timeout
+ UNIT_ASSERT(TInstant::Now() - start < TDuration::Seconds(1));
+ } catch (const NUnitTest::TAssertException& ex) {
+ Cerr << ex.AsStrBuf() << Endl;
+ ex.BackTrace()->PrintTo(Cerr);
+ throw;
+ } catch (...) {
+ Cerr << CurrentExceptionMessage() << Endl;
+ throw;
+ }
+ }
+
+ static void DoTestFastPathWake(EContPoller pollerType) {
+ if (auto poller = IPollerFace::Construct(pollerType)) {
+ TContExecutor exec(20000, std::move(poller));
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroTestFastPathWake::DoMain);
+ }
+ }
+}
+
+void TCoroTest::TestFastPathWakeDefault() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Default);
+}
+
+void TCoroTest::TestFastPathWakeEpoll() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Epoll);
+}
+
+void TCoroTest::TestFastPathWakeKqueue() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Kqueue);
+}
+
+void TCoroTest::TestFastPathWakePoll() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Poll);
+}
+
+void TCoroTest::TestFastPathWakeSelect() {
+ NCoroTestFastPathWake::DoTestFastPathWake(EContPoller::Select);
+}
+
+namespace NCoroTestLegacyCancelYieldRaceBug {
+ enum class EState {
+ Idle, Running, Finished,
+ };
+
+ struct TState {
+ EState SubState = EState::Idle;
+ };
+
+ static void DoSub(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubState = EState::Running;
+ cont->Yield();
+ cont->Yield();
+ state.SubState = EState::Finished;
+ }
+
+ static void DoMain(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ TCont* sub = cont->Executor()->Create(DoSub, argPtr, "Sub");
+ sub->Cancel();
+ cont->Yield();
+ UNIT_ASSERT_EQUAL(state.SubState, EState::Finished);
+ }
+}
+
+void TCoroTest::TestLegacyCancelYieldRaceBug() {
+ NCoroTestLegacyCancelYieldRaceBug::TState state;
+ TContExecutor exec(20000);
+ exec.SetFailOnError(true);
+ exec.Execute(NCoroTestLegacyCancelYieldRaceBug::DoMain, &state);
+}
+
+namespace NCoroTestJoinRescheduleBug {
+ enum class EState {
+ Idle, Running, Finished,
+ };
+
+ struct TState {
+ TCont* volatile SubA = nullptr;
+ volatile EState SubAState = EState::Idle;
+ volatile EState SubBState = EState::Idle;
+ volatile EState SubCState = EState::Idle;
+ };
+
+ static void DoSubC(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubCState = EState::Running;
+ while (state.SubBState != EState::Running) {
+ cont->Yield();
+ }
+ while (cont->SleepD(TInstant::Max()) != ECANCELED) {
+ }
+ state.SubCState = EState::Finished;
+ }
+
+ static void DoSubB(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubBState = EState::Running;
+ while (state.SubAState != EState::Running && state.SubCState != EState::Running) {
+ cont->Yield();
+ }
+ for (auto i : xrange(100)) {
+ Y_UNUSED(i);
+ if (!state.SubA) {
+ break;
+ }
+ state.SubA->ReSchedule();
+ cont->Yield();
+ }
+ state.SubBState = EState::Finished;
+ }
+
+ static void DoSubA(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ state.SubAState = EState::Running;
+ TCont* subC = cont->Executor()->Create(DoSubC, argPtr, "SubC");
+ while (state.SubBState != EState::Running && state.SubCState != EState::Running) {
+ cont->Yield();
+ }
+ cont->Join(subC);
+ UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
+ state.SubA = nullptr;
+ state.SubAState = EState::Finished;
+ }
+
+ static void DoMain(TCont* cont, void* argPtr) {
+ TState& state = *(TState*)argPtr;
+ TCont* subA = cont->Executor()->Create(DoSubA, argPtr, "SubA");
+ state.SubA = subA;
+ cont->Join(cont->Executor()->Create(DoSubB, argPtr, "SubB"));
+
+ if (state.SubA) {
+ subA->Cancel();
+ cont->Join(subA);
+ }
+ }
+}
+
+void TCoroTest::TestJoinRescheduleBug() {
+ using namespace NCoroTestJoinRescheduleBug;
+ TState state;
+ {
+ TContExecutor exec(20000);
+ exec.Execute(DoMain, &state);
+ }
+ UNIT_ASSERT_EQUAL(state.SubAState, EState::Finished);
+ UNIT_ASSERT_EQUAL(state.SubBState, EState::Finished);
+ UNIT_ASSERT_EQUAL(state.SubCState, EState::Finished);
+}
+
+void TCoroTest::TestEventQueue() {
+ NCoro::TEventWaitQueue queue;
+ UNIT_ASSERT(queue.Empty());
+ UNIT_ASSERT_VALUES_EQUAL(queue.WakeTimedout(TInstant()), TInstant());
+ TContExecutor exec(32000);
+ exec.Execute([](TCont* cont, void* arg) {
+ NCoro::TEventWaitQueue* q = (NCoro::TEventWaitQueue*)arg;
+ TTimerEvent ev(cont, TInstant::Max());
+ TTimerEvent ev2(cont, TInstant::Seconds(12345));
+ q->Register(&ev);
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Max());
+ q->Register(&ev2);
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12344)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Seconds(12345));
+ UNIT_ASSERT_VALUES_EQUAL(q->WakeTimedout(TInstant::Seconds(12345)), TInstant::Max());
+ }, &queue);
+}
+
void TCoroTest::TestNestedExecutor() {
#ifndef _win_
//nested executors actually don't work correctly, but anyway shouldn't break RunningCont() ptr
diff --git a/library/cpp/coroutine/engine/events.h b/library/cpp/coroutine/engine/events.h
index e58d9d239b..07cc4d25e8 100644
--- a/library/cpp/coroutine/engine/events.h
+++ b/library/cpp/coroutine/engine/events.h
@@ -1,49 +1,49 @@
-#pragma once
+#pragma once
+
+#include "impl.h"
+
+#include <util/datetime/base.h>
-#include "impl.h"
-
-#include <util/datetime/base.h>
-
class TContEvent {
public:
- TContEvent(TCont* current) noexcept
+ TContEvent(TCont* current) noexcept
: Cont_(current)
, Status_(0)
{
}
- ~TContEvent() {
+ ~TContEvent() {
}
- int WaitD(TInstant deadline) {
+ int WaitD(TInstant deadline) {
Status_ = 0;
const int ret = Cont_->SleepD(deadline);
return Status_ ? Status_ : ret;
}
- int WaitT(TDuration timeout) {
+ int WaitT(TDuration timeout) {
return WaitD(timeout.ToDeadLine());
}
- int WaitI() {
+ int WaitI() {
return WaitD(TInstant::Max());
}
- void Wake() noexcept {
+ void Wake() noexcept {
SetStatus(EWAKEDUP);
Cont_->ReSchedule();
}
- TCont* Cont() noexcept {
+ TCont* Cont() noexcept {
return Cont_;
}
- int Status() const noexcept {
+ int Status() const noexcept {
return Status_;
}
- void SetStatus(int status) noexcept {
+ void SetStatus(int status) noexcept {
Status_ = status;
}
@@ -55,24 +55,24 @@ private:
class TContWaitQueue {
class TWaiter: public TContEvent, public TIntrusiveListItem<TWaiter> {
public:
- TWaiter(TCont* current) noexcept
+ TWaiter(TCont* current) noexcept
: TContEvent(current)
{
}
- ~TWaiter() {
+ ~TWaiter() {
}
};
public:
- TContWaitQueue() noexcept {
+ TContWaitQueue() noexcept {
}
- ~TContWaitQueue() {
+ ~TContWaitQueue() {
Y_ASSERT(Waiters_.Empty());
}
- int WaitD(TCont* current, TInstant deadline) {
+ int WaitD(TCont* current, TInstant deadline) {
TWaiter waiter(current);
Waiters_.PushBack(&waiter);
@@ -80,27 +80,27 @@ public:
return waiter.WaitD(deadline);
}
- int WaitT(TCont* current, TDuration timeout) {
+ int WaitT(TCont* current, TDuration timeout) {
return WaitD(current, timeout.ToDeadLine());
}
- int WaitI(TCont* current) {
+ int WaitI(TCont* current) {
return WaitD(current, TInstant::Max());
}
- void Signal() noexcept {
+ void Signal() noexcept {
if (!Waiters_.Empty()) {
Waiters_.PopFront()->Wake();
}
}
- void BroadCast() noexcept {
+ void BroadCast() noexcept {
while (!Waiters_.Empty()) {
Waiters_.PopFront()->Wake();
}
}
- void BroadCast(size_t number) noexcept {
+ void BroadCast(size_t number) noexcept {
for (size_t i = 0; i < number && !Waiters_.Empty(); ++i) {
Waiters_.PopFront()->Wake();
}
@@ -110,35 +110,35 @@ private:
TIntrusiveList<TWaiter> Waiters_;
};
-
+
class TContSimpleEvent {
public:
- TContSimpleEvent(TContExecutor* e)
+ TContSimpleEvent(TContExecutor* e)
: E_(e)
{
}
- TContExecutor* Executor() const noexcept {
+ TContExecutor* Executor() const noexcept {
return E_;
}
- void Signal() noexcept {
+ void Signal() noexcept {
Q_.Signal();
}
- void BroadCast() noexcept {
+ void BroadCast() noexcept {
Q_.BroadCast();
}
- int WaitD(TInstant deadLine) noexcept {
- return Q_.WaitD(E_->Running(), deadLine);
+ int WaitD(TInstant deadLine) noexcept {
+ return Q_.WaitD(E_->Running(), deadLine);
}
- int WaitT(TDuration timeout) noexcept {
+ int WaitT(TDuration timeout) noexcept {
return WaitD(timeout.ToDeadLine());
}
- int WaitI() noexcept {
+ int WaitI() noexcept {
return WaitD(TInstant::Max());
}
diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp
index 6bf8b03348..7ae6f74051 100644
--- a/library/cpp/coroutine/engine/impl.cpp
+++ b/library/cpp/coroutine/engine/impl.cpp
@@ -1,162 +1,162 @@
-#include "impl.h"
+#include "impl.h"
#include "stack/stack_allocator.h"
#include "stack/stack_guards.h"
#include <util/generic/scope.h>
#include <util/thread/singleton.h>
-#include <util/stream/format.h>
+#include <util/stream/format.h>
#include <util/stream/output.h>
#include <util/system/yassert.h>
-TCont::TJoinWait::TJoinWait(TCont& c) noexcept
- : Cont_(c)
-{}
-
-void TCont::TJoinWait::Wake() noexcept {
- Cont_.ReSchedule();
-}
-
+TCont::TJoinWait::TJoinWait(TCont& c) noexcept
+ : Cont_(c)
+{}
+
+void TCont::TJoinWait::Wake() noexcept {
+ Cont_.ReSchedule();
+}
+
TCont::TCont(NCoro::NStack::IAllocator& allocator,
uint32_t stackSize,
TContExecutor& executor,
NCoro::TTrampoline::TFunc func,
const char* name) noexcept
- : Executor_(executor)
+ : Executor_(executor)
, Name_(name)
- , Trampoline_(
+ , Trampoline_(
allocator,
- stackSize,
+ stackSize,
std::move(func),
this
- )
-{}
-
-
+ )
+{}
+
+
void TCont::PrintMe(IOutputStream& out) const noexcept {
out << "cont("
- << "name = " << Name_ << ", "
- << "addr = " << Hex((size_t)this)
+ << "name = " << Name_ << ", "
+ << "addr = " << Hex((size_t)this)
<< ")";
}
-bool TCont::Join(TCont* c, TInstant deadLine) noexcept {
- TJoinWait ev(*this);
- c->Waiters_.PushBack(&ev);
-
- do {
- if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) {
- if (!ev.Empty()) {
- c->Cancel();
-
- do {
+bool TCont::Join(TCont* c, TInstant deadLine) noexcept {
+ TJoinWait ev(*this);
+ c->Waiters_.PushBack(&ev);
+
+ do {
+ if (SleepD(deadLine) == ETIMEDOUT || Cancelled()) {
+ if (!ev.Empty()) {
+ c->Cancel();
+
+ do {
Switch();
- } while (!ev.Empty());
- }
-
- return false;
- }
- } while (!ev.Empty());
-
- return true;
-}
-
-int TCont::SleepD(TInstant deadline) noexcept {
- TTimerEvent event(this, deadline);
-
- return ExecuteEvent(&event);
-}
-
+ } while (!ev.Empty());
+ }
+
+ return false;
+ }
+ } while (!ev.Empty());
+
+ return true;
+}
+
+int TCont::SleepD(TInstant deadline) noexcept {
+ TTimerEvent event(this, deadline);
+
+ return ExecuteEvent(&event);
+}
+
void TCont::Switch() noexcept {
Executor()->RunScheduler();
}
-void TCont::Yield() noexcept {
- if (SleepD(TInstant::Zero())) {
- ReScheduleAndSwitch();
- }
-}
-
-void TCont::ReScheduleAndSwitch() noexcept {
- ReSchedule();
+void TCont::Yield() noexcept {
+ if (SleepD(TInstant::Zero())) {
+ ReScheduleAndSwitch();
+ }
+}
+
+void TCont::ReScheduleAndSwitch() noexcept {
+ ReSchedule();
Switch();
-}
-
-void TCont::Terminate() {
- while (!Waiters_.Empty()) {
- Waiters_.PopFront()->Wake();
- }
- Executor()->Exit(this);
-}
-
-bool TCont::IAmRunning() const noexcept {
- return this == Executor()->Running();
-}
-
-void TCont::Cancel() noexcept {
- if (Cancelled()) {
- return;
- }
-
- Cancelled_ = true;
-
- if (!IAmRunning()) {
- ReSchedule();
- }
-}
-
-void TCont::ReSchedule() noexcept {
- if (Cancelled()) {
- // Legacy code may expect a Cancelled coroutine to be scheduled without delay.
- Executor()->ScheduleExecutionNow(this);
- } else {
- Executor()->ScheduleExecution(this);
- }
-}
-
-
-TContExecutor::TContExecutor(
+}
+
+void TCont::Terminate() {
+ while (!Waiters_.Empty()) {
+ Waiters_.PopFront()->Wake();
+ }
+ Executor()->Exit(this);
+}
+
+bool TCont::IAmRunning() const noexcept {
+ return this == Executor()->Running();
+}
+
+void TCont::Cancel() noexcept {
+ if (Cancelled()) {
+ return;
+ }
+
+ Cancelled_ = true;
+
+ if (!IAmRunning()) {
+ ReSchedule();
+ }
+}
+
+void TCont::ReSchedule() noexcept {
+ if (Cancelled()) {
+ // Legacy code may expect a Cancelled coroutine to be scheduled without delay.
+ Executor()->ScheduleExecutionNow(this);
+ } else {
+ Executor()->ScheduleExecution(this);
+ }
+}
+
+
+TContExecutor::TContExecutor(
uint32_t defaultStackSize,
- THolder<IPollerFace> poller,
+ THolder<IPollerFace> poller,
NCoro::IScheduleCallback* scheduleCallback,
NCoro::IEnterPollerCallback* enterPollerCallback,
NCoro::NStack::EGuard defaultGuard,
TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings,
NCoro::ITime* time
-)
+)
: ScheduleCallback_(scheduleCallback)
, EnterPollerCallback_(enterPollerCallback)
- , DefaultStackSize_(defaultStackSize)
- , Poller_(std::move(poller))
+ , DefaultStackSize_(defaultStackSize)
+ , Poller_(std::move(poller))
, Time_(time)
{
StackAllocator_ = NCoro::NStack::GetAllocator(poolSettings, defaultGuard);
}
-
-TContExecutor::~TContExecutor() {
- Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_);
-}
-
-void TContExecutor::Execute() noexcept {
- auto nop = [](void*){};
- Execute(nop);
-}
-
-void TContExecutor::Execute(TContFunc func, void* arg) noexcept {
+
+TContExecutor::~TContExecutor() {
+ Y_VERIFY(Allocated_ == 0, "leaked %u coroutines", (ui32)Allocated_);
+}
+
+void TContExecutor::Execute() noexcept {
+ auto nop = [](void*){};
+ Execute(nop);
+}
+
+void TContExecutor::Execute(TContFunc func, void* arg) noexcept {
CreateOwned([=](TCont* cont) {
func(cont, arg);
}, "sys_main");
- RunScheduler();
+ RunScheduler();
}
-void TContExecutor::WaitForIO() {
- while (Ready_.Empty() && !WaitQueue_.Empty()) {
+void TContExecutor::WaitForIO() {
+ while (Ready_.Empty() && !WaitQueue_.Empty()) {
const auto now = Now();
-
- // Waking a coroutine puts it into ReadyNext_ list
- const auto next = WaitQueue_.WakeTimedout(now);
-
+
+ // Waking a coroutine puts it into ReadyNext_ list
+ const auto next = WaitQueue_.WakeTimedout(now);
+
if (!UserEvents_.Empty()) {
TIntrusiveList<IUserEvent> userEvents;
userEvents.Swap(UserEvents_);
@@ -165,11 +165,11 @@ void TContExecutor::WaitForIO() {
} while (!userEvents.Empty());
}
- // Polling will return as soon as there is an event to process or a timeout.
- // If there are woken coroutines we do not want to sleep in the poller
- // yet still we want to check for new io
- // to prevent ourselves from locking out of io by constantly waking coroutines.
-
+ // Polling will return as soon as there is an event to process or a timeout.
+ // If there are woken coroutines we do not want to sleep in the poller
+ // yet still we want to check for new io
+ // to prevent ourselves from locking out of io by constantly waking coroutines.
+
if (ReadyNext_.Empty()) {
if (EnterPollerCallback_) {
EnterPollerCallback_->OnEnterPoller();
@@ -187,9 +187,9 @@ void TContExecutor::WaitForIO() {
EnterPollerCallback_->OnExitPoller();
}
}
-
- Ready_.Append(ReadyNext_);
- }
+
+ Ready_.Append(ReadyNext_);
+ }
}
void TContExecutor::Poll(TInstant deadline) {
@@ -198,42 +198,42 @@ void TContExecutor::Poll(TInstant deadline) {
// Waking a coroutine puts it into ReadyNext_ list
for (auto event : PollerEvents_) {
- auto* lst = (NCoro::TPollEventList*)event.Data;
- const int status = event.Status;
-
- if (status) {
- for (auto it = lst->Begin(); it != lst->End();) {
- (it++)->OnPollEvent(status);
- }
- } else {
- const ui16 filter = event.Filter;
-
- for (auto it = lst->Begin(); it != lst->End();) {
- if (it->What() & filter) {
- (it++)->OnPollEvent(0);
- } else {
- ++it;
- }
- }
- }
- }
-}
-
-void TContExecutor::Abort() noexcept {
- WaitQueue_.Abort();
- auto visitor = [](TCont* c) {
- c->Cancel();
- };
- Ready_.ForEach(visitor);
- ReadyNext_.ForEach(visitor);
-}
-
-TCont* TContExecutor::Create(
- TContFunc func,
- void* arg,
- const char* name,
- TMaybe<ui32> customStackSize
-) noexcept {
+ auto* lst = (NCoro::TPollEventList*)event.Data;
+ const int status = event.Status;
+
+ if (status) {
+ for (auto it = lst->Begin(); it != lst->End();) {
+ (it++)->OnPollEvent(status);
+ }
+ } else {
+ const ui16 filter = event.Filter;
+
+ for (auto it = lst->Begin(); it != lst->End();) {
+ if (it->What() & filter) {
+ (it++)->OnPollEvent(0);
+ } else {
+ ++it;
+ }
+ }
+ }
+ }
+}
+
+void TContExecutor::Abort() noexcept {
+ WaitQueue_.Abort();
+ auto visitor = [](TCont* c) {
+ c->Cancel();
+ };
+ Ready_.ForEach(visitor);
+ ReadyNext_.ForEach(visitor);
+}
+
+TCont* TContExecutor::Create(
+ TContFunc func,
+ void* arg,
+ const char* name,
+ TMaybe<ui32> customStackSize
+) noexcept {
return CreateOwned([=](TCont* cont) {
func(cont, arg);
}, name, customStackSize);
@@ -244,38 +244,38 @@ TCont* TContExecutor::CreateOwned(
const char* name,
TMaybe<ui32> customStackSize
) noexcept {
- Allocated_ += 1;
- if (!customStackSize) {
- customStackSize = DefaultStackSize_;
- }
+ Allocated_ += 1;
+ if (!customStackSize) {
+ customStackSize = DefaultStackSize_;
+ }
auto* cont = new TCont(*StackAllocator_, *customStackSize, *this, std::move(func), name);
- ScheduleExecution(cont);
- return cont;
-}
-
+ ScheduleExecution(cont);
+ return cont;
+}
+
NCoro::NStack::TAllocatorStats TContExecutor::GetAllocatorStats() const noexcept {
return StackAllocator_->GetStackStats();
}
-void TContExecutor::Release(TCont* cont) noexcept {
- delete cont;
- Allocated_ -= 1;
-}
-
-void TContExecutor::ScheduleToDelete(TCont* cont) noexcept {
- ToDelete_.PushBack(cont);
-}
-
-void TContExecutor::ScheduleExecution(TCont* cont) noexcept {
- cont->Scheduled_ = true;
- ReadyNext_.PushBack(cont);
-}
-
-void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept {
- cont->Scheduled_ = true;
- Ready_.PushBack(cont);
-}
-
+void TContExecutor::Release(TCont* cont) noexcept {
+ delete cont;
+ Allocated_ -= 1;
+}
+
+void TContExecutor::ScheduleToDelete(TCont* cont) noexcept {
+ ToDelete_.PushBack(cont);
+}
+
+void TContExecutor::ScheduleExecution(TCont* cont) noexcept {
+ cont->Scheduled_ = true;
+ ReadyNext_.PushBack(cont);
+}
+
+void TContExecutor::ScheduleExecutionNow(TCont* cont) noexcept {
+ cont->Scheduled_ = true;
+ Ready_.PushBack(cont);
+}
+
namespace {
inline TContExecutor*& ThisThreadExecutor() {
struct TThisThreadExecutorHolder {
@@ -285,12 +285,12 @@ namespace {
}
}
-void TContExecutor::DeleteScheduled() noexcept {
- ToDelete_.ForEach([this](TCont* c) {
- Release(c);
- });
-}
-
+void TContExecutor::DeleteScheduled() noexcept {
+ ToDelete_.ForEach([this](TCont* c) {
+ Release(c);
+ });
+}
+
TCont* RunningCont() {
TContExecutor* thisThreadExecutor = ThisThreadExecutor();
return thisThreadExecutor ? thisThreadExecutor->Running() : nullptr;
@@ -314,7 +314,7 @@ void TContExecutor::RunScheduler() noexcept {
WaitForIO();
DeleteScheduled();
Ready_.Append(ReadyNext_);
-
+
if (Ready_.Empty()) {
Current_ = nullptr;
if (caller) {
@@ -322,18 +322,18 @@ void TContExecutor::RunScheduler() noexcept {
}
break;
}
-
- TCont* cont = Ready_.PopFront();
+
+ TCont* cont = Ready_.PopFront();
if (ScheduleCallback_) {
ScheduleCallback_->OnSchedule(*this, *cont);
- }
+ }
Current_ = cont;
cont->Scheduled_ = false;
if (cont == caller) {
break;
- }
+ }
context->SwitchTo(cont->Trampoline_.Context());
if (Paused_) {
Paused_ = false;
@@ -346,9 +346,9 @@ void TContExecutor::RunScheduler() noexcept {
}
} catch (...) {
TBackTrace::FromCurrentException().PrintTo(Cerr);
- Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str());
+ Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str());
}
-}
+}
void TContExecutor::Pause() {
if (auto cont = Running()) {
@@ -358,17 +358,17 @@ void TContExecutor::Pause() {
}
}
-void TContExecutor::Exit(TCont* cont) noexcept {
- ScheduleToDelete(cont);
- cont->SwitchTo(&SchedContext_);
- Y_FAIL("can not return from exit");
-}
-
+void TContExecutor::Exit(TCont* cont) noexcept {
+ ScheduleToDelete(cont);
+ cont->SwitchTo(&SchedContext_);
+ Y_FAIL("can not return from exit");
+}
+
TInstant TContExecutor::Now() {
return Y_LIKELY(Time_ == nullptr) ? TInstant::Now() : Time_->Now();
}
-template <>
-void Out<TCont>(IOutputStream& out, const TCont& c) {
- c.PrintMe(out);
+template <>
+void Out<TCont>(IOutputStream& out, const TCont& c) {
+ c.PrintMe(out);
}
diff --git a/library/cpp/coroutine/engine/impl.h b/library/cpp/coroutine/engine/impl.h
index 4713b61efe..283a96ecf1 100644
--- a/library/cpp/coroutine/engine/impl.h
+++ b/library/cpp/coroutine/engine/impl.h
@@ -1,25 +1,25 @@
-#pragma once
+#pragma once
#include "callbacks.h"
-#include "cont_poller.h"
-#include "iostatus.h"
+#include "cont_poller.h"
+#include "iostatus.h"
#include "poller.h"
#include "stack/stack_common.h"
-#include "trampoline.h"
+#include "trampoline.h"
#include "custom_time.h"
#include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>
-
+
#include <util/system/error.h>
#include <util/generic/ptr.h>
#include <util/generic/intrlist.h>
#include <util/datetime/base.h>
-#include <util/generic/maybe.h>
+#include <util/generic/maybe.h>
#include <util/generic/function.h>
-
-#define EWAKEDUP 34567
-
+
+#define EWAKEDUP 34567
+
class TCont;
struct TContRep;
class TContExecutor;
@@ -28,97 +28,97 @@ class TContPollEvent;
namespace NCoro::NStack {
class IAllocator;
}
-
-class TCont : private TIntrusiveListItem<TCont> {
- struct TJoinWait: public TIntrusiveListItem<TJoinWait> {
- TJoinWait(TCont& c) noexcept;
- void Wake() noexcept;
+class TCont : private TIntrusiveListItem<TCont> {
+ struct TJoinWait: public TIntrusiveListItem<TJoinWait> {
+ TJoinWait(TCont& c) noexcept;
+
+ void Wake() noexcept;
- public:
- TCont& Cont_;
- };
+ public:
+ TCont& Cont_;
+ };
- friend class TContExecutor;
- friend class TIntrusiveListItem<TCont>;
- friend class NCoro::TEventWaitQueue;
- friend class NCoro::TTrampoline;
+ friend class TContExecutor;
+ friend class TIntrusiveListItem<TCont>;
+ friend class NCoro::TEventWaitQueue;
+ friend class NCoro::TTrampoline;
-private:
- TCont(
+private:
+ TCont(
NCoro::NStack::IAllocator& allocator,
uint32_t stackSize,
- TContExecutor& executor,
+ TContExecutor& executor,
NCoro::TTrampoline::TFunc func,
- const char* name
- ) noexcept;
+ const char* name
+ ) noexcept;
-public:
- TContExecutor* Executor() noexcept {
- return &Executor_;
+public:
+ TContExecutor* Executor() noexcept {
+ return &Executor_;
}
- const TContExecutor* Executor() const noexcept {
- return &Executor_;
+ const TContExecutor* Executor() const noexcept {
+ return &Executor_;
}
- const char* Name() const noexcept {
+ const char* Name() const noexcept {
return Name_;
}
void PrintMe(IOutputStream& out) const noexcept;
- void Yield() noexcept;
+ void Yield() noexcept;
- void ReScheduleAndSwitch() noexcept;
+ void ReScheduleAndSwitch() noexcept;
/// @return ETIMEDOUT on success
- int SleepD(TInstant deadline) noexcept;
+ int SleepD(TInstant deadline) noexcept;
- int SleepT(TDuration timeout) noexcept {
+ int SleepT(TDuration timeout) noexcept {
return SleepD(timeout.ToDeadLine());
}
- int SleepI() noexcept {
+ int SleepI() noexcept {
return SleepD(TInstant::Max());
}
- bool IAmRunning() const noexcept;
+ bool IAmRunning() const noexcept;
- void Cancel() noexcept;
+ void Cancel() noexcept;
- bool Cancelled() const noexcept {
+ bool Cancelled() const noexcept {
return Cancelled_;
}
- bool Scheduled() const noexcept {
+ bool Scheduled() const noexcept {
return Scheduled_;
}
- bool Join(TCont* c, TInstant deadLine = TInstant::Max()) noexcept;
+ bool Join(TCont* c, TInstant deadLine = TInstant::Max()) noexcept;
- void ReSchedule() noexcept;
+ void ReSchedule() noexcept;
void Switch() noexcept;
- void SwitchTo(TExceptionSafeContext* ctx) {
- Trampoline_.SwitchTo(ctx);
- }
+ void SwitchTo(TExceptionSafeContext* ctx) {
+ Trampoline_.SwitchTo(ctx);
+ }
private:
- void Terminate();
-
+ void Terminate();
+
private:
- TContExecutor& Executor_;
-
- // TODO(velavokr): allow name storage owning (for generated names backed by TString)
- const char* Name_ = nullptr;
+ TContExecutor& Executor_;
+
+ // TODO(velavokr): allow name storage owning (for generated names backed by TString)
+ const char* Name_ = nullptr;
NCoro::TTrampoline Trampoline_;
TIntrusiveList<TJoinWait> Waiters_;
- bool Cancelled_ = false;
- bool Scheduled_ = false;
+ bool Cancelled_ = false;
+ bool Scheduled_ = false;
};
TCont* RunningCont();
@@ -147,60 +147,60 @@ public:
/// Note, coroutines are single-threaded, and all methods must be called from the single thread
class TContExecutor {
friend class TCont;
- using TContList = TIntrusiveList<TCont>;
+ using TContList = TIntrusiveList<TCont>;
public:
- TContExecutor(
+ TContExecutor(
uint32_t defaultStackSize,
- THolder<IPollerFace> poller = IPollerFace::Default(),
- NCoro::IScheduleCallback* = nullptr,
+ THolder<IPollerFace> poller = IPollerFace::Default(),
+ NCoro::IScheduleCallback* = nullptr,
NCoro::IEnterPollerCallback* = nullptr,
NCoro::NStack::EGuard stackGuard = NCoro::NStack::EGuard::Canary,
TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings = Nothing(),
NCoro::ITime* time = nullptr
- );
+ );
~TContExecutor();
- // if we already have a coroutine to run
- void Execute() noexcept;
+ // if we already have a coroutine to run
+ void Execute() noexcept;
- void Execute(TContFunc func, void* arg = nullptr) noexcept;
+ void Execute(TContFunc func, void* arg = nullptr) noexcept;
template <class Functor>
- void Execute(Functor& f) noexcept {
+ void Execute(Functor& f) noexcept {
Execute((TContFunc)ContHelperFunc<Functor>, (void*)&f);
}
template <typename T, void (T::*M)(TCont*)>
- void Execute(T* obj) noexcept {
+ void Execute(T* obj) noexcept {
Execute(ContHelperMemberFunc<T, M>, obj);
}
template <class Functor>
- TCont* Create(
- Functor& f,
- const char* name,
- TMaybe<ui32> customStackSize = Nothing()
- ) noexcept {
- return Create((TContFunc)ContHelperFunc<Functor>, (void*)&f, name, customStackSize);
+ TCont* Create(
+ Functor& f,
+ const char* name,
+ TMaybe<ui32> customStackSize = Nothing()
+ ) noexcept {
+ return Create((TContFunc)ContHelperFunc<Functor>, (void*)&f, name, customStackSize);
}
template <typename T, void (T::*M)(TCont*)>
- TCont* Create(
- T* obj,
- const char* name,
- TMaybe<ui32> customStackSize = Nothing()
- ) noexcept {
- return Create(ContHelperMemberFunc<T, M>, obj, name, customStackSize);
+ TCont* Create(
+ T* obj,
+ const char* name,
+ TMaybe<ui32> customStackSize = Nothing()
+ ) noexcept {
+ return Create(ContHelperMemberFunc<T, M>, obj, name, customStackSize);
}
- TCont* Create(
- TContFunc func,
- void* arg,
- const char* name,
- TMaybe<ui32> customStackSize = Nothing()
- ) noexcept;
+ TCont* Create(
+ TContFunc func,
+ void* arg,
+ const char* name,
+ TMaybe<ui32> customStackSize = Nothing()
+ ) noexcept;
TCont* CreateOwned(
NCoro::TTrampoline::TFunc func,
@@ -208,44 +208,44 @@ public:
TMaybe<ui32> customStackSize = Nothing()
) noexcept;
- NCoro::TContPoller* Poller() noexcept {
+ NCoro::TContPoller* Poller() noexcept {
return &Poller_;
}
- TCont* Running() noexcept {
+ TCont* Running() noexcept {
return Current_;
}
- const TCont* Running() const noexcept {
+ const TCont* Running() const noexcept {
return Current_;
}
- size_t TotalReadyConts() const noexcept {
- return Ready_.Size() + TotalScheduledConts();
- }
-
- size_t TotalScheduledConts() const noexcept {
- return ReadyNext_.Size();
- }
-
- size_t TotalConts() const noexcept {
- return Allocated_;
- }
-
- size_t TotalWaitingConts() const noexcept {
- return TotalConts() - TotalReadyConts();
- }
-
+ size_t TotalReadyConts() const noexcept {
+ return Ready_.Size() + TotalScheduledConts();
+ }
+
+ size_t TotalScheduledConts() const noexcept {
+ return ReadyNext_.Size();
+ }
+
+ size_t TotalConts() const noexcept {
+ return Allocated_;
+ }
+
+ size_t TotalWaitingConts() const noexcept {
+ return TotalConts() - TotalReadyConts();
+ }
+
NCoro::NStack::TAllocatorStats GetAllocatorStats() const noexcept;
- // TODO(velavokr): rename, it is just CancelAll actually
- void Abort() noexcept;
+ // TODO(velavokr): rename, it is just CancelAll actually
+ void Abort() noexcept;
- void SetFailOnError(bool fail) noexcept {
+ void SetFailOnError(bool fail) noexcept {
FailOnError_ = fail;
}
- bool FailOnError() const noexcept {
+ bool FailOnError() const noexcept {
return FailOnError_;
}
@@ -253,12 +253,12 @@ public:
WaitQueue_.Register(event);
}
- void ScheduleIoWait(TFdEvent* event) {
+ void ScheduleIoWait(TFdEvent* event) {
RegisterInWaitQueue(event);
Poller_.Schedule(event);
}
- void ScheduleIoWait(TTimerEvent* event) noexcept {
+ void ScheduleIoWait(TTimerEvent* event) noexcept {
RegisterInWaitQueue(event);
}
@@ -269,45 +269,45 @@ public:
void Pause();
TInstant Now();
private:
- void Release(TCont* cont) noexcept;
+ void Release(TCont* cont) noexcept;
- void Exit(TCont* cont) noexcept;
+ void Exit(TCont* cont) noexcept;
void RunScheduler() noexcept;
- void ScheduleToDelete(TCont* cont) noexcept;
+ void ScheduleToDelete(TCont* cont) noexcept;
- void ScheduleExecution(TCont* cont) noexcept;
+ void ScheduleExecution(TCont* cont) noexcept;
- void ScheduleExecutionNow(TCont* cont) noexcept;
-
- void DeleteScheduled() noexcept;
+ void ScheduleExecutionNow(TCont* cont) noexcept;
+
+ void DeleteScheduled() noexcept;
void WaitForIO();
void Poll(TInstant deadline);
-
+
private:
NCoro::IScheduleCallback* const ScheduleCallback_ = nullptr;
NCoro::IEnterPollerCallback* const EnterPollerCallback_ = nullptr;
const uint32_t DefaultStackSize_;
THolder<NCoro::NStack::IAllocator> StackAllocator_;
-
- TExceptionSafeContext SchedContext_;
-
+
+ TExceptionSafeContext SchedContext_;
+
TContList ToDelete_;
TContList Ready_;
- TContList ReadyNext_;
- NCoro::TEventWaitQueue WaitQueue_;
- NCoro::TContPoller Poller_;
+ TContList ReadyNext_;
+ NCoro::TEventWaitQueue WaitQueue_;
+ NCoro::TContPoller Poller_;
NCoro::TContPoller::TEvents PollerEvents_;
TInstant LastPoll_;
-
+
TIntrusiveList<IUserEvent> UserEvents_;
- size_t Allocated_ = 0;
- TCont* Current_ = nullptr;
- bool FailOnError_ = false;
+ size_t Allocated_ = 0;
+ TCont* Current_ = nullptr;
+ bool FailOnError_ = false;
bool Paused_ = false;
NCoro::ITime* Time_ = nullptr;
};
diff --git a/library/cpp/coroutine/engine/iostatus.h b/library/cpp/coroutine/engine/iostatus.h
index 201cc77825..bf6036805d 100644
--- a/library/cpp/coroutine/engine/iostatus.h
+++ b/library/cpp/coroutine/engine/iostatus.h
@@ -1,41 +1,41 @@
-#pragma once
+#pragma once
#include <util/generic/yexception.h>
class TIOStatus {
public:
- TIOStatus(int status) noexcept
+ TIOStatus(int status) noexcept
: Status_(status)
{
}
- static TIOStatus Error(int status) noexcept {
+ static TIOStatus Error(int status) noexcept {
return TIOStatus(status);
}
- static TIOStatus Error() noexcept {
+ static TIOStatus Error() noexcept {
return TIOStatus(LastSystemError());
}
- static TIOStatus Success() noexcept {
+ static TIOStatus Success() noexcept {
return TIOStatus(0);
}
- void Check() const {
+ void Check() const {
if (Status_) {
ythrow TSystemError(Status_) << "io error";
}
}
- bool Failed() const noexcept {
+ bool Failed() const noexcept {
return (bool)Status_;
}
- bool Succeed() const noexcept {
+ bool Succeed() const noexcept {
return !Failed();
}
- int Status() const noexcept {
+ int Status() const noexcept {
return Status_;
}
@@ -43,43 +43,43 @@ private:
int Status_;
};
-
+
class TContIOStatus {
public:
- TContIOStatus(size_t processed, TIOStatus status) noexcept
+ TContIOStatus(size_t processed, TIOStatus status) noexcept
: Processed_(processed)
, Status_(status)
{
}
- static TContIOStatus Error(TIOStatus status) noexcept {
+ static TContIOStatus Error(TIOStatus status) noexcept {
return TContIOStatus(0, status);
}
- static TContIOStatus Error() noexcept {
+ static TContIOStatus Error() noexcept {
return TContIOStatus(0, TIOStatus::Error());
}
- static TContIOStatus Success(size_t processed) noexcept {
+ static TContIOStatus Success(size_t processed) noexcept {
return TContIOStatus(processed, TIOStatus::Success());
}
- static TContIOStatus Eof() noexcept {
+ static TContIOStatus Eof() noexcept {
return Success(0);
}
- ~TContIOStatus() {
+ ~TContIOStatus() {
}
- size_t Processed() const noexcept {
+ size_t Processed() const noexcept {
return Processed_;
}
- int Status() const noexcept {
+ int Status() const noexcept {
return Status_.Status();
}
- size_t Checked() const {
+ size_t Checked() const {
Status_.Check();
return Processed_;
diff --git a/library/cpp/coroutine/engine/mutex.h b/library/cpp/coroutine/engine/mutex.h
index 5a73e13c2c..93e9119503 100644
--- a/library/cpp/coroutine/engine/mutex.h
+++ b/library/cpp/coroutine/engine/mutex.h
@@ -1,20 +1,20 @@
-#pragma once
+#pragma once
+
+#include "impl.h"
+#include "events.h"
-#include "impl.h"
-#include "events.h"
-
class TContMutex {
public:
- TContMutex() noexcept
+ TContMutex() noexcept
: Token_(true)
{
}
- ~TContMutex() {
+ ~TContMutex() {
Y_ASSERT(Token_);
}
- int LockD(TCont* current, TInstant deadline) {
+ int LockD(TCont* current, TInstant deadline) {
while (!Token_) {
const int ret = WaitQueue_.WaitD(current, deadline);
@@ -28,15 +28,15 @@ public:
return 0;
}
- int LockT(TCont* current, TDuration timeout) {
+ int LockT(TCont* current, TDuration timeout) {
return LockD(current, timeout.ToDeadLine());
}
- int LockI(TCont* current) {
+ int LockI(TCont* current) {
return LockD(current, TInstant::Max());
}
- void UnLock() noexcept {
+ void UnLock() noexcept {
Y_ASSERT(!Token_);
Token_ = true;
diff --git a/library/cpp/coroutine/engine/network.cpp b/library/cpp/coroutine/engine/network.cpp
index 46100a8023..85b647d210 100644
--- a/library/cpp/coroutine/engine/network.cpp
+++ b/library/cpp/coroutine/engine/network.cpp
@@ -1,325 +1,325 @@
-#include "impl.h"
-#include "network.h"
+#include "impl.h"
+#include "network.h"
-#include <util/generic/scope.h>
-#include <util/generic/xrange.h>
+#include <util/generic/scope.h>
+#include <util/generic/xrange.h>
#include <sys/uio.h>
-#if defined(_bionic_)
-# define IOV_MAX 1024
-#endif
+#if defined(_bionic_)
+# define IOV_MAX 1024
+#endif
-namespace NCoro {
- namespace {
- bool IsBlocked(int lasterr) noexcept {
- return lasterr == EAGAIN || lasterr == EWOULDBLOCK;
- }
+namespace NCoro {
+ namespace {
+ bool IsBlocked(int lasterr) noexcept {
+ return lasterr == EAGAIN || lasterr == EWOULDBLOCK;
+ }
- ssize_t DoReadVector(SOCKET fd, TContIOVector* vec) noexcept {
+ ssize_t DoReadVector(SOCKET fd, TContIOVector* vec) noexcept {
return readv(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count()));
- }
+ }
- ssize_t DoWriteVector(SOCKET fd, TContIOVector* vec) noexcept {
+ ssize_t DoWriteVector(SOCKET fd, TContIOVector* vec) noexcept {
return writev(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count()));
- }
- }
-
-
- int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept {
- if (cont->Cancelled()) {
- return ECANCELED;
- }
-
- if (nfds == 0) {
- return 0;
- }
-
- TTempArray<TFdEvent> events(nfds);
-
- for (auto i : xrange(nfds)) {
- new(events.Data() + i) TFdEvent(cont, fds[i], (ui16) what[i], deadline);
- }
-
- Y_DEFER {
- for (auto i : xrange(nfds)) {
- (events.Data() + i)->~TFdEvent();
- }
- };
-
- for (auto i : xrange(nfds)) {
- cont->Executor()->ScheduleIoWait(events.Data() + i);
- }
+ }
+ }
+
+
+ int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept {
+ if (cont->Cancelled()) {
+ return ECANCELED;
+ }
+
+ if (nfds == 0) {
+ return 0;
+ }
+
+ TTempArray<TFdEvent> events(nfds);
+
+ for (auto i : xrange(nfds)) {
+ new(events.Data() + i) TFdEvent(cont, fds[i], (ui16) what[i], deadline);
+ }
+
+ Y_DEFER {
+ for (auto i : xrange(nfds)) {
+ (events.Data() + i)->~TFdEvent();
+ }
+ };
+
+ for (auto i : xrange(nfds)) {
+ cont->Executor()->ScheduleIoWait(events.Data() + i);
+ }
cont->Switch();
-
- if (cont->Cancelled()) {
- return ECANCELED;
- }
-
- TFdEvent* ret = nullptr;
- int status = EINPROGRESS;
-
- for (auto i : xrange(nfds)) {
- auto& ev = *(events.Data() + i);
- switch (ev.Status()) {
- case EINPROGRESS:
- break;
- case ETIMEDOUT:
- if (status != EINPROGRESS) {
- break;
+
+ if (cont->Cancelled()) {
+ return ECANCELED;
+ }
+
+ TFdEvent* ret = nullptr;
+ int status = EINPROGRESS;
+
+ for (auto i : xrange(nfds)) {
+ auto& ev = *(events.Data() + i);
+ switch (ev.Status()) {
+ case EINPROGRESS:
+ break;
+ case ETIMEDOUT:
+ if (status != EINPROGRESS) {
+ break;
}
[[fallthrough]];
- default:
- status = ev.Status();
- ret = &ev;
- }
- }
-
- if (ret) {
- if (outfd) {
- *outfd = ret->Fd();
+ default:
+ status = ev.Status();
+ ret = &ev;
}
- return ret->Status();
}
- return EINPROGRESS;
+ if (ret) {
+ if (outfd) {
+ *outfd = ret->Fd();
+ }
+ return ret->Status();
+ }
+
+ return EINPROGRESS;
+ }
+
+ int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept {
+ return SelectD(cont, fds, what, nfds, outfd, timeout.ToDeadLine());
}
- int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept {
- return SelectD(cont, fds, what, nfds, outfd, timeout.ToDeadLine());
+ int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd) {
+ return SelectD(cont, fds, what, nfds, outfd, TInstant::Max());
}
- int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd) {
- return SelectD(cont, fds, what, nfds, outfd, TInstant::Max());
- }
-
- int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept {
- TFdEvent event(cont, fd, (ui16)what, deadline);
- return ExecuteEvent(&event);
+ int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept {
+ TFdEvent event(cont, fd, (ui16)what, deadline);
+ return ExecuteEvent(&event);
}
- int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept {
- return PollD(cont, fd, what, timeout.ToDeadLine());
- }
-
- int PollI(TCont* cont, SOCKET fd, int what) noexcept {
- return PollD(cont, fd, what, TInstant::Max());
+ int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept {
+ return PollD(cont, fd, what, timeout.ToDeadLine());
+ }
+
+ int PollI(TCont* cont, SOCKET fd, int what) noexcept {
+ return PollD(cont, fd, what, TInstant::Max());
}
- TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept {
- while (true) {
- ssize_t res = DoReadVector(fd, vec);
+ TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept {
+ while (true) {
+ ssize_t res = DoReadVector(fd, vec);
- if (res >= 0) {
- return TContIOStatus::Success((size_t) res);
+ if (res >= 0) {
+ return TContIOStatus::Success((size_t) res);
}
{
const int err = LastSystemError();
if (!IsBlocked(err)) {
- return TContIOStatus::Error(err);
+ return TContIOStatus::Error(err);
}
}
- if ((res = PollD(cont, fd, CONT_POLL_READ, deadline)) != 0) {
- return TContIOStatus::Error((int) res);
+ if ((res = PollD(cont, fd, CONT_POLL_READ, deadline)) != 0) {
+ return TContIOStatus::Error((int) res);
}
}
}
- TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept {
- return ReadVectorD(cont, fd, vec, timeOut.ToDeadLine());
- }
+ TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept {
+ return ReadVectorD(cont, fd, vec, timeOut.ToDeadLine());
+ }
- TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept {
- return ReadVectorD(cont, fd, vec, TInstant::Max());
- }
+ TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept {
+ return ReadVectorD(cont, fd, vec, TInstant::Max());
+ }
- TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept {
- IOutputStream::TPart part(buf, len);
- TContIOVector vec(&part, 1);
- return ReadVectorD(cont, fd, &vec, deadline);
- }
+ TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept {
+ IOutputStream::TPart part(buf, len);
+ TContIOVector vec(&part, 1);
+ return ReadVectorD(cont, fd, &vec, deadline);
+ }
- TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept {
- return ReadD(cont, fd, buf, len, timeout.ToDeadLine());
- }
+ TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept {
+ return ReadD(cont, fd, buf, len, timeout.ToDeadLine());
+ }
- TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept {
- return ReadD(cont, fd, buf, len, TInstant::Max());
+ TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept {
+ return ReadD(cont, fd, buf, len, TInstant::Max());
}
- TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept {
- size_t written = 0;
+ TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept {
+ size_t written = 0;
- while (!vec->Complete()) {
- ssize_t res = DoWriteVector(fd, vec);
+ while (!vec->Complete()) {
+ ssize_t res = DoWriteVector(fd, vec);
- if (res >= 0) {
- written += res;
+ if (res >= 0) {
+ written += res;
- vec->Proceed((size_t) res);
- } else {
- {
- const int err = LastSystemError();
+ vec->Proceed((size_t) res);
+ } else {
+ {
+ const int err = LastSystemError();
- if (!IsBlocked(err)) {
- return TContIOStatus(written, err);
- }
- }
+ if (!IsBlocked(err)) {
+ return TContIOStatus(written, err);
+ }
+ }
- if ((res = PollD(cont, fd, CONT_POLL_WRITE, deadline)) != 0) {
- return TContIOStatus(written, (int) res);
- }
- }
+ if ((res = PollD(cont, fd, CONT_POLL_WRITE, deadline)) != 0) {
+ return TContIOStatus(written, (int) res);
+ }
+ }
}
- return TContIOStatus::Success(written);
+ return TContIOStatus::Success(written);
}
- TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept {
- return WriteVectorD(cont, fd, vec, timeOut.ToDeadLine());
- }
+ TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept {
+ return WriteVectorD(cont, fd, vec, timeOut.ToDeadLine());
+ }
- TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept {
- return WriteVectorD(cont, fd, vec, TInstant::Max());
- }
+ TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept {
+ return WriteVectorD(cont, fd, vec, TInstant::Max());
+ }
- TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept {
- IOutputStream::TPart part(buf, len);
- TContIOVector vec(&part, 1);
- return WriteVectorD(cont, fd, &vec, deadline);
- }
+ TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept {
+ IOutputStream::TPart part(buf, len);
+ TContIOVector vec(&part, 1);
+ return WriteVectorD(cont, fd, &vec, deadline);
+ }
- TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept {
- return WriteD(cont, fd, buf, len, timeout.ToDeadLine());
- }
+ TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept {
+ return WriteD(cont, fd, buf, len, timeout.ToDeadLine());
+ }
- TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept {
- return WriteD(cont, fd, buf, len, TInstant::Max());
+ TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept {
+ return WriteD(cont, fd, buf, len, TInstant::Max());
}
- int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept {
- TSocketHolder res(Socket(ai));
+ int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept {
+ TSocketHolder res(Socket(ai));
- if (res.Closed()) {
- return LastSystemError();
- }
+ if (res.Closed()) {
+ return LastSystemError();
+ }
- const int ret = ConnectD(cont, res, ai.ai_addr, (socklen_t) ai.ai_addrlen, deadline);
+ const int ret = ConnectD(cont, res, ai.ai_addr, (socklen_t) ai.ai_addrlen, deadline);
- if (!ret) {
- s.Swap(res);
- }
-
- return ret;
+ if (!ret) {
+ s.Swap(res);
+ }
+
+ return ret;
}
- int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept {
- int ret = EHOSTUNREACH;
+ int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept {
+ int ret = EHOSTUNREACH;
- for (auto it = addr.Begin(); it != addr.End(); ++it) {
- ret = ConnectD(cont, s, *it, deadline);
+ for (auto it = addr.Begin(); it != addr.End(); ++it) {
+ ret = ConnectD(cont, s, *it, deadline);
- if (ret == 0 || ret == ETIMEDOUT) {
- return ret;
- }
- }
+ if (ret == 0 || ret == ETIMEDOUT) {
+ return ret;
+ }
+ }
- return ret;
+ return ret;
}
- int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept {
- return ConnectD(cont, s, addr, timeout.ToDeadLine());
- }
+ int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept {
+ return ConnectD(cont, s, addr, timeout.ToDeadLine());
+ }
- int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept {
- return ConnectD(cont, s, addr, TInstant::Max());
+ int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept {
+ return ConnectD(cont, s, addr, TInstant::Max());
}
- int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept {
- if (connect(s, name, namelen)) {
- const int err = LastSystemError();
-
- if (!IsBlocked(err) && err != EINPROGRESS) {
- return err;
- }
-
- int ret = PollD(cont, s, CONT_POLL_WRITE, deadline);
-
- if (ret) {
- return ret;
- }
-
- // check if we really connected
- // FIXME: Unportable ??
- int serr = 0;
- socklen_t slen = sizeof(serr);
-
- ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (char*) &serr, &slen);
-
- if (ret) {
- return LastSystemError();
- }
-
- if (serr) {
- return serr;
- }
- }
-
- return 0;
- }
-
- int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept {
- return ConnectD(cont, s, name, namelen, timeout.ToDeadLine());
- }
-
- int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept {
- return ConnectD(cont, s, name, namelen, TInstant::Max());
- }
-
-
- int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept {
- SOCKET ret;
-
- while ((ret = Accept4(s, addr, addrlen)) == INVALID_SOCKET) {
- int err = LastSystemError();
-
- if (!IsBlocked(err)) {
- return -err;
+ int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept {
+ if (connect(s, name, namelen)) {
+ const int err = LastSystemError();
+
+ if (!IsBlocked(err) && err != EINPROGRESS) {
+ return err;
}
-
- err = PollD(cont, s, CONT_POLL_READ, deadline);
- if (err) {
- return -err;
- }
- }
+ int ret = PollD(cont, s, CONT_POLL_WRITE, deadline);
+
+ if (ret) {
+ return ret;
+ }
+
+ // check if we really connected
+ // FIXME: Unportable ??
+ int serr = 0;
+ socklen_t slen = sizeof(serr);
+
+ ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (char*) &serr, &slen);
+
+ if (ret) {
+ return LastSystemError();
+ }
+
+ if (serr) {
+ return serr;
+ }
+ }
+
+ return 0;
+ }
+
+ int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept {
+ return ConnectD(cont, s, name, namelen, timeout.ToDeadLine());
+ }
- return (int) ret;
+ int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept {
+ return ConnectD(cont, s, name, namelen, TInstant::Max());
}
- int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept {
- return AcceptD(cont, s, addr, addrlen, timeout.ToDeadLine());
- }
-
- int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept {
- return AcceptD(cont, s, addr, addrlen, TInstant::Max());
- }
-
- SOCKET Socket(int domain, int type, int protocol) noexcept {
- return Socket4(domain, type, protocol);
- }
-
- SOCKET Socket(const struct addrinfo& ai) noexcept {
- return Socket(ai.ai_family, ai.ai_socktype, ai.ai_protocol);
- }
-}
+
+ int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept {
+ SOCKET ret;
+
+ while ((ret = Accept4(s, addr, addrlen)) == INVALID_SOCKET) {
+ int err = LastSystemError();
+
+ if (!IsBlocked(err)) {
+ return -err;
+ }
+
+ err = PollD(cont, s, CONT_POLL_READ, deadline);
+
+ if (err) {
+ return -err;
+ }
+ }
+
+ return (int) ret;
+ }
+
+ int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept {
+ return AcceptD(cont, s, addr, addrlen, timeout.ToDeadLine());
+ }
+
+ int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept {
+ return AcceptD(cont, s, addr, addrlen, TInstant::Max());
+ }
+
+ SOCKET Socket(int domain, int type, int protocol) noexcept {
+ return Socket4(domain, type, protocol);
+ }
+
+ SOCKET Socket(const struct addrinfo& ai) noexcept {
+ return Socket(ai.ai_family, ai.ai_socktype, ai.ai_protocol);
+ }
+}
diff --git a/library/cpp/coroutine/engine/network.h b/library/cpp/coroutine/engine/network.h
index 79462be586..f2c9afe4f8 100644
--- a/library/cpp/coroutine/engine/network.h
+++ b/library/cpp/coroutine/engine/network.h
@@ -1,55 +1,55 @@
-#pragma once
+#pragma once
-#include "iostatus.h"
+#include "iostatus.h"
-#include <util/datetime/base.h>
-#include <util/network/init.h>
+#include <util/datetime/base.h>
+#include <util/network/init.h>
#include <util/network/iovec.h>
-#include <util/network/nonblock.h>
+#include <util/network/nonblock.h>
#include <util/network/socket.h>
class TCont;
-namespace NCoro {
-
- int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept;
- int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept;
- int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd);
+namespace NCoro {
- int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept;
- int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept;
- int PollI(TCont* cont, SOCKET fd, int what) noexcept;
+ int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept;
+ int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept;
+ int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd);
- TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept;
- TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept;
- TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept;
+ int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept;
+ int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept;
+ int PollI(TCont* cont, SOCKET fd, int what) noexcept;
- TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept;
- TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept;
- TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept;
+ TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept;
+ TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept;
+ TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept;
- TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept;
- TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept;
- TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept;
+ TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept;
+ TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept;
+ TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept;
- TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept;
- TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept;
- TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept;
+ TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept;
+ TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept;
+ TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept;
- int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept;
+ TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept;
+ TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept;
+ TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept;
- int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept;
- int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept;
- int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept;
+ int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept;
- int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept;
- int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept;
- int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept;
+ int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept;
+ int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept;
+ int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept;
- int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept;
- int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept;
- int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept;
+ int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept;
+ int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept;
+ int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept;
- SOCKET Socket(int domain, int type, int protocol) noexcept;
- SOCKET Socket(const struct addrinfo& ai) noexcept;
+ int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept;
+ int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept;
+ int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept;
+
+ SOCKET Socket(int domain, int type, int protocol) noexcept;
+ SOCKET Socket(const struct addrinfo& ai) noexcept;
}
diff --git a/library/cpp/coroutine/engine/poller.cpp b/library/cpp/coroutine/engine/poller.cpp
index 4efabd0c7e..61164fa56b 100644
--- a/library/cpp/coroutine/engine/poller.cpp
+++ b/library/cpp/coroutine/engine/poller.cpp
@@ -5,30 +5,30 @@
#include <util/generic/intrlist.h>
#include <util/generic/singleton.h>
#include <util/system/env.h>
-#include <util/string/cast.h>
+#include <util/string/cast.h>
namespace {
- using TChange = IPollerFace::TChange;
- using TEvent = IPollerFace::TEvent;
- using TEvents = IPollerFace::TEvents;
+ using TChange = IPollerFace::TChange;
+ using TEvent = IPollerFace::TEvent;
+ using TEvents = IPollerFace::TEvents;
template <class T>
class TUnsafeBuf {
public:
- TUnsafeBuf() noexcept
+ TUnsafeBuf() noexcept
: L_(0)
{
}
- T* operator~() const noexcept {
+ T* operator~() const noexcept {
return B_.Get();
}
- size_t operator+() const noexcept {
+ size_t operator+() const noexcept {
return L_;
}
- void Reserve(size_t len) {
+ void Reserve(size_t len) {
len = FastClp2(len);
if (len > L_) {
@@ -42,7 +42,7 @@ namespace {
size_t L_;
};
-
+
template <class T>
class TVirtualize: public IPollerFace {
public:
@@ -67,25 +67,25 @@ namespace {
const EContPoller PollerEngine_;
};
-
+
template <class T>
class TPoller {
- using TInternalEvent = typename T::TEvent;
+ using TInternalEvent = typename T::TEvent;
public:
- TPoller() {
+ TPoller() {
E_.Reserve(1);
}
- void Set(const TChange& c) {
+ void Set(const TChange& c) {
P_.Set(c.Data, c.Fd, c.Flags);
}
- void Reserve(size_t size) {
+ void Reserve(size_t size) {
E_.Reserve(size);
}
- void Wait(TEvents& events, TInstant deadLine) {
+ void Wait(TEvents& events, TInstant deadLine) {
const size_t ret = P_.WaitD(~E_, +E_, deadLine);
events.reserve(ret);
@@ -110,21 +110,21 @@ namespace {
TUnsafeBuf<TInternalEvent> E_;
};
-
+
template <class T>
class TIndexedArray {
- struct TVal:
- public T,
- public TIntrusiveListItem<TVal>,
- public TObjectFromPool<TVal>
- {
+ struct TVal:
+ public T,
+ public TIntrusiveListItem<TVal>,
+ public TObjectFromPool<TVal>
+ {
// NOTE Constructor must be user-defined (and not =default) here
// because TVal objects are created in the UB-capable placement
// TObjectFromPool::new operator that stores data in a memory
// allocated for the object. Without user defined constructor
// zero-initialization takes place in TVal() expression and the
// data is overwritten.
- TVal() {
+ TVal() {
}
};
@@ -134,32 +134,32 @@ namespace {
typedef typename TListType::TIterator TIterator;
typedef typename TListType::TConstIterator TConstIterator;
- TIndexedArray()
+ TIndexedArray()
: P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
{
}
- TIterator Begin() noexcept {
+ TIterator Begin() noexcept {
return I_.Begin();
}
- TIterator End() noexcept {
+ TIterator End() noexcept {
return I_.End();
}
- TConstIterator Begin() const noexcept {
+ TConstIterator Begin() const noexcept {
return I_.Begin();
}
- TConstIterator End() const noexcept {
+ TConstIterator End() const noexcept {
return I_.End();
}
- T& operator[](size_t i) {
+ T& operator[](size_t i) {
return *Get(i);
}
- T* Get(size_t i) {
+ T* Get(size_t i) {
TValRef& v = V_.Get(i);
if (Y_UNLIKELY(!v)) {
@@ -172,22 +172,22 @@ namespace {
return v.Get();
}
- void Erase(size_t i) noexcept {
+ void Erase(size_t i) noexcept {
V_.Get(i).Destroy();
}
- size_t Size() const noexcept {
+ size_t Size() const noexcept {
return I_.Size();
}
private:
- using TValRef = THolder<TVal>;
+ using TValRef = THolder<TVal>;
typename TVal::TPool P_;
TSocketMap<TValRef> V_;
TListType I_;
};
-
+
inline short PollFlags(ui16 flags) noexcept {
short ret = 0;
@@ -208,15 +208,15 @@ namespace {
return ret;
}
-
+
class TPollPoller {
public:
- size_t Size() const noexcept {
+ size_t Size() const noexcept {
return S_.Size();
}
template <class T>
- void Build(T& t) const {
+ void Build(T& t) const {
for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
t.Set(*it);
}
@@ -224,7 +224,7 @@ namespace {
t.Reserve(Size());
}
- void Set(const TChange& c) {
+ void Set(const TChange& c) {
if (c.Flags) {
S_[c.Fd] = c;
} else {
@@ -232,7 +232,7 @@ namespace {
}
}
- void Wait(TEvents& events, TInstant deadLine) {
+ void Wait(TEvents& events, TInstant deadLine) {
T_.clear();
T_.reserve(Size());
@@ -265,8 +265,8 @@ namespace {
int status = 0;
ui16 filter = 0;
- // We are perfectly fine with an EOF while reading a pipe or a unix socket
- if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) {
+ // We are perfectly fine with an EOF while reading a pipe or a unix socket
+ if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) {
filter |= CONT_POLL_READ;
}
@@ -310,16 +310,16 @@ namespace {
TPollVec T_;
};
-
+
class TCombinedPoller {
typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller;
public:
- TCombinedPoller() {
+ TCombinedPoller() {
P_.Reset(new TPollPoller());
}
- void Set(const TChange& c) {
+ void Set(const TChange& c) {
if (!P_) {
D_->Set(c);
} else {
@@ -327,7 +327,7 @@ namespace {
}
}
- void Wait(TEvents& events, TInstant deadLine) {
+ void Wait(TEvents& events, TInstant deadLine) {
if (!P_) {
D_->Wait(events, deadLine);
} else {
@@ -343,48 +343,48 @@ namespace {
}
private:
- THolder<TPollPoller> P_;
- THolder<TDefaultPoller> D_;
+ THolder<TPollPoller> P_;
+ THolder<TDefaultPoller> D_;
};
struct TUserPoller: public TString {
- TUserPoller()
+ TUserPoller()
: TString(GetEnv("USER_POLLER"))
{
}
};
}
-THolder<IPollerFace> IPollerFace::Default() {
+THolder<IPollerFace> IPollerFace::Default() {
return Construct(*SingletonWithPriority<TUserPoller, 0>());
}
-THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) {
- return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default);
-}
+THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) {
+ return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default);
+}
-THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) {
- switch (poller) {
- case EContPoller::Default:
+THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) {
+ switch (poller) {
+ case EContPoller::Default:
case EContPoller::Combined:
return MakeHolder<TVirtualize<TCombinedPoller>>(EContPoller::Combined);
- case EContPoller::Select:
+ case EContPoller::Select:
return MakeHolder<TVirtualize<TPoller<TGenericPoller<TSelectPoller<TWithoutLocking>>>>>(poller);
- case EContPoller::Poll:
+ case EContPoller::Poll:
return MakeHolder<TVirtualize<TPollPoller>>(poller);
- case EContPoller::Epoll:
+ case EContPoller::Epoll:
#if defined(HAVE_EPOLL_POLLER)
return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller);
-#else
- return nullptr;
+#else
+ return nullptr;
#endif
- case EContPoller::Kqueue:
-#if defined(HAVE_KQUEUE_POLLER)
+ case EContPoller::Kqueue:
+#if defined(HAVE_KQUEUE_POLLER)
return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller);
-#else
- return nullptr;
+#else
+ return nullptr;
#endif
- default:
- Y_FAIL("bad poller type");
+ default:
+ Y_FAIL("bad poller type");
}
}
diff --git a/library/cpp/coroutine/engine/poller.h b/library/cpp/coroutine/engine/poller.h
index 73d482cfaf..8ea012c0fc 100644
--- a/library/cpp/coroutine/engine/poller.h
+++ b/library/cpp/coroutine/engine/poller.h
@@ -1,4 +1,4 @@
-#pragma once
+#pragma once
#include <util/generic/ptr.h>
#include <util/generic/vector.h>
@@ -6,15 +6,15 @@
#include <util/network/pollerimpl.h>
#include <util/datetime/base.h>
-enum class EContPoller {
- Default /* "default" */,
+enum class EContPoller {
+ Default /* "default" */,
Combined /* "combined" */,
- Select /* "select" */,
- Poll /* "poll" */,
- Epoll /* "epoll" */,
- Kqueue /* "kqueue" */
-};
-
+ Select /* "select" */,
+ Poll /* "poll" */,
+ Epoll /* "epoll" */,
+ Kqueue /* "kqueue" */
+};
+
class IPollerFace {
public:
struct TChange {
@@ -29,12 +29,12 @@ public:
ui16 Filter;
};
- using TEvents = TVector<TEvent>;
+ using TEvents = TVector<TEvent>;
virtual ~IPollerFace() {
}
- void Set(void* ptr, SOCKET fd, ui16 flags) {
+ void Set(void* ptr, SOCKET fd, ui16 flags) {
const TChange c = {fd, ptr, flags};
Set(c);
@@ -44,7 +44,7 @@ public:
virtual void Wait(TEvents& events, TInstant deadLine) = 0;
virtual EContPoller PollEngine() const = 0;
- static THolder<IPollerFace> Default();
- static THolder<IPollerFace> Construct(TStringBuf name);
- static THolder<IPollerFace> Construct(EContPoller poller);
+ static THolder<IPollerFace> Default();
+ static THolder<IPollerFace> Construct(TStringBuf name);
+ static THolder<IPollerFace> Construct(EContPoller poller);
};
diff --git a/library/cpp/coroutine/engine/sockmap.h b/library/cpp/coroutine/engine/sockmap.h
index b4a3af73f6..fd189e1774 100644
--- a/library/cpp/coroutine/engine/sockmap.h
+++ b/library/cpp/coroutine/engine/sockmap.h
@@ -6,7 +6,7 @@
template <class T>
class TSocketMap {
public:
- T& Get(size_t idx) {
+ T& Get(size_t idx) {
if (idx < 128000) {
if (V_.size() <= idx) {
V_.resize(idx + 1);
diff --git a/library/cpp/coroutine/engine/sockpool.cpp b/library/cpp/coroutine/engine/sockpool.cpp
index 39b97ecd85..b9482e780f 100644
--- a/library/cpp/coroutine/engine/sockpool.cpp
+++ b/library/cpp/coroutine/engine/sockpool.cpp
@@ -31,7 +31,7 @@ TPooledSocket TSocketPool::AllocateMore(TConnectData* conn) {
TCont* cont = conn->Cont;
while (true) {
- TSocketHolder s(NCoro::Socket(Addr_->Addr()->sa_family, SOCK_STREAM, 0));
+ TSocketHolder s(NCoro::Socket(Addr_->Addr()->sa_family, SOCK_STREAM, 0));
if (s == INVALID_SOCKET) {
ythrow TSystemError(errno) << TStringBuf("can not create socket");
@@ -40,7 +40,7 @@ TPooledSocket TSocketPool::AllocateMore(TConnectData* conn) {
SetCommonSockOpts(s, Addr_->Addr());
SetZeroLinger(s);
- const int ret = NCoro::ConnectD(cont, s, Addr_->Addr(), Addr_->Len(), conn->DeadLine);
+ const int ret = NCoro::ConnectD(cont, s, Addr_->Addr(), Addr_->Len(), conn->DeadLine);
if (ret == EINTR) {
continue;
diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h
index b343b65fb6..1ebb7e7b38 100644
--- a/library/cpp/coroutine/engine/sockpool.h
+++ b/library/cpp/coroutine/engine/sockpool.h
@@ -1,7 +1,7 @@
-#pragma once
+#pragma once
#include "impl.h"
-#include "network.h"
+#include "network.h"
#include <util/network/address.h>
#include <util/network/socket.h>
@@ -14,7 +14,7 @@ class TSocketPool;
class TPooledSocket {
class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> {
public:
- TImpl(SOCKET fd, TSocketPool* pool) noexcept
+ TImpl(SOCKET fd, TSocketPool* pool) noexcept
: Pool_(pool)
, IsKeepAlive_(false)
, Fd_(fd)
@@ -22,11 +22,11 @@ class TPooledSocket {
Touch();
}
- static void Destroy(TImpl* impl) noexcept {
+ static void Destroy(TImpl* impl) noexcept {
impl->DoDestroy();
}
- void DoDestroy() noexcept {
+ void DoDestroy() noexcept {
if (!Closed() && IsKeepAlive() && IsInGoodState()) {
ReturnToPool();
} else {
@@ -34,28 +34,28 @@ class TPooledSocket {
}
}
- bool IsKeepAlive() const noexcept {
+ bool IsKeepAlive() const noexcept {
return IsKeepAlive_;
}
- void SetKeepAlive(bool ka) {
+ void SetKeepAlive(bool ka) {
::SetKeepAlive(Fd_, ka);
IsKeepAlive_ = ka;
}
- SOCKET Socket() const noexcept {
+ SOCKET Socket() const noexcept {
return Fd_;
}
- bool Closed() const noexcept {
+ bool Closed() const noexcept {
return Fd_.Closed();
}
- void Close() noexcept {
+ void Close() noexcept {
Fd_.Close();
}
- bool IsInGoodState() const noexcept {
+ bool IsInGoodState() const noexcept {
int err = 0;
socklen_t len = sizeof(err);
@@ -64,15 +64,15 @@ class TPooledSocket {
return !err;
}
- bool IsOpen() const noexcept {
- return IsInGoodState() && IsNotSocketClosedByOtherSide(Fd_);
+ bool IsOpen() const noexcept {
+ return IsInGoodState() && IsNotSocketClosedByOtherSide(Fd_);
}
- void Touch() noexcept {
+ void Touch() noexcept {
TouchTime_ = TInstant::Now();
}
- const TInstant& LastTouch() const noexcept {
+ const TInstant& LastTouch() const noexcept {
return TouchTime_;
}
@@ -89,31 +89,31 @@ class TPooledSocket {
friend class TSocketPool;
public:
- TPooledSocket()
+ TPooledSocket()
: Impl_(nullptr)
{
}
- TPooledSocket(TImpl* impl)
+ TPooledSocket(TImpl* impl)
: Impl_(impl)
{
}
- ~TPooledSocket() {
+ ~TPooledSocket() {
if (UncaughtException() && !!Impl_) {
Close();
}
}
- operator SOCKET() const noexcept {
+ operator SOCKET() const noexcept {
return Impl_->Socket();
}
- void SetKeepAlive(bool ka) {
+ void SetKeepAlive(bool ka) {
Impl_->SetKeepAlive(ka);
}
- void Close() noexcept {
+ void Close() noexcept {
Impl_->Close();
}
@@ -122,13 +122,13 @@ private:
};
struct TConnectData {
- TConnectData(TCont* cont, const TInstant& deadLine)
+ TConnectData(TCont* cont, const TInstant& deadLine)
: Cont(cont)
, DeadLine(deadLine)
{
}
- TConnectData(TCont* cont, const TDuration& timeOut)
+ TConnectData(TCont* cont, const TDuration& timeOut)
: Cont(cont)
, DeadLine(TInstant::Now() + timeOut)
{
@@ -144,17 +144,17 @@ class TSocketPool {
public:
typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef;
- TSocketPool(int ip, int port)
+ TSocketPool(int ip, int port)
: Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port)))
{
}
- TSocketPool(const TAddrRef& addr)
+ TSocketPool(const TAddrRef& addr)
: Addr_(addr)
{
}
- void EraseStale(const TInstant& maxAge) noexcept {
+ void EraseStale(const TInstant& maxAge) noexcept {
TSockets toDelete;
{
@@ -170,7 +170,7 @@ public:
}
}
- TPooledSocket Get(TConnectData* conn) {
+ TPooledSocket Get(TConnectData* conn) {
TPooledSocket ret;
if (TPooledSocket::TImpl* alive = GetImpl()) {
@@ -184,7 +184,7 @@ public:
return ret;
}
- bool GetAlive(TPooledSocket& socket) {
+ bool GetAlive(TPooledSocket& socket) {
if (TPooledSocket::TImpl* alive = GetImpl()) {
alive->Touch();
socket = TPooledSocket(alive);
@@ -194,7 +194,7 @@ public:
}
private:
- TPooledSocket::TImpl* GetImpl() {
+ TPooledSocket::TImpl* GetImpl() {
TGuard<TMutex> guard(Mutex_);
while (!Pool_.Empty()) {
@@ -207,7 +207,7 @@ private:
return nullptr;
}
- void Release(TPooledSocket::TImpl* impl) noexcept {
+ void Release(TPooledSocket::TImpl* impl) noexcept {
TGuard<TMutex> guard(Mutex_);
Pool_.PushFront(impl);
@@ -217,7 +217,7 @@ private:
private:
TAddrRef Addr_;
- using TSockets = TIntrusiveListWithAutoDelete<TPooledSocket::TImpl, TDelete>;
+ using TSockets = TIntrusiveListWithAutoDelete<TPooledSocket::TImpl, TDelete>;
TSockets Pool_;
TMutex Mutex_;
};
@@ -226,24 +226,24 @@ inline void TPooledSocket::TImpl::ReturnToPool() noexcept {
Pool_->Release(this);
}
-
+
class TContIO: public IInputStream, public IOutputStream {
public:
- TContIO(SOCKET fd, TCont* cont)
+ TContIO(SOCKET fd, TCont* cont)
: Fd_(fd)
, Cont_(cont)
{
}
void DoWrite(const void* buf, size_t len) override {
- NCoro::WriteI(Cont_, Fd_, buf, len).Checked();
+ NCoro::WriteI(Cont_, Fd_, buf, len).Checked();
}
size_t DoRead(void* buf, size_t len) override {
- return NCoro::ReadI(Cont_, Fd_, buf, len).Checked();
+ return NCoro::ReadI(Cont_, Fd_, buf, len).Checked();
}
- SOCKET Fd() const noexcept {
+ SOCKET Fd() const noexcept {
return Fd_;
}
diff --git a/library/cpp/coroutine/engine/trampoline.cpp b/library/cpp/coroutine/engine/trampoline.cpp
index 38e3951a51..10ea69ddc3 100644
--- a/library/cpp/coroutine/engine/trampoline.cpp
+++ b/library/cpp/coroutine/engine/trampoline.cpp
@@ -1,50 +1,50 @@
-#include "impl.h"
-#include "trampoline.h"
+#include "impl.h"
+#include "trampoline.h"
#include "stack/stack_allocator.h"
-#include <util/system/info.h>
-#include <util/system/protect.h>
-#include <util/system/valgrind.h>
+#include <util/system/info.h>
+#include <util/system/protect.h>
+#include <util/system/valgrind.h>
#include <util/system/yassert.h>
-#include <cstdlib>
-#include <util/stream/format.h>
+#include <cstdlib>
+#include <util/stream/format.h>
-namespace NCoro {
-
+namespace NCoro {
+
TTrampoline::TTrampoline(NStack::IAllocator& allocator, ui32 stackSize, TFunc f, TCont* cont) noexcept
: Stack_(allocator, stackSize, cont->Name())
, Clo_{this, Stack_.Get(), cont->Name()}
- , Ctx_(Clo_)
+ , Ctx_(Clo_)
, Func_(std::move(f))
- , Cont_(cont)
- {}
-
- void TTrampoline::DoRun() {
+ , Cont_(cont)
+ {}
+
+ void TTrampoline::DoRun() {
if (Cont_->Executor()->FailOnError()) {
Func_(Cont_);
} else {
try {
Func_(Cont_);
} catch (...) {}
- }
-
- Cont_->Terminate();
- }
-
- TArrayRef<char> TTrampoline::Stack() noexcept {
- return Stack_.Get();
- }
-
+ }
+
+ Cont_->Terminate();
+ }
+
+ TArrayRef<char> TTrampoline::Stack() noexcept {
+ return Stack_.Get();
+ }
+
const char* TTrampoline::ContName() const noexcept {
return Cont_->Name();
}
-
+
void TTrampoline::DoRunNaked() {
DoRun();
abort();
}
-}
+}
diff --git a/library/cpp/coroutine/engine/trampoline.h b/library/cpp/coroutine/engine/trampoline.h
index 5ece7873b0..37b61cf015 100644
--- a/library/cpp/coroutine/engine/trampoline.h
+++ b/library/cpp/coroutine/engine/trampoline.h
@@ -1,60 +1,60 @@
-#pragma once
+#pragma once
#include "stack/stack_common.h"
#include "stack/stack.h"
-#include <util/generic/noncopyable.h>
-#include <util/generic/ptr.h>
+#include <util/generic/noncopyable.h>
+#include <util/generic/ptr.h>
#include <util/system/context.h>
#include <util/system/defaults.h>
-#if !defined(STACK_GROW_DOWN)
-# error "unsupported"
-#endif
-
+#if !defined(STACK_GROW_DOWN)
+# error "unsupported"
+#endif
+
class TCont;
-typedef void (*TContFunc)(TCont*, void*);
+typedef void (*TContFunc)(TCont*, void*);
+
+namespace NCoro {
-namespace NCoro {
-
namespace NStack {
class IAllocator;
- }
-
- class TTrampoline : public ITrampoLine, TNonCopyable {
- public:
+ }
+
+ class TTrampoline : public ITrampoLine, TNonCopyable {
+ public:
typedef std::function<void (TCont*)> TFunc;
- TTrampoline(
+ TTrampoline(
NCoro::NStack::IAllocator& allocator,
uint32_t stackSize,
TFunc f,
TCont* cont
- ) noexcept;
+ ) noexcept;
+
+ TArrayRef<char> Stack() noexcept;
- TArrayRef<char> Stack() noexcept;
+ TExceptionSafeContext* Context() noexcept {
+ return &Ctx_;
+ }
- TExceptionSafeContext* Context() noexcept {
- return &Ctx_;
- }
-
- void SwitchTo(TExceptionSafeContext* ctx) noexcept {
+ void SwitchTo(TExceptionSafeContext* ctx) noexcept {
Y_VERIFY(Stack_.LowerCanaryOk(), "Stack overflow (%s)", ContName());
Y_VERIFY(Stack_.UpperCanaryOk(), "Stack override (%s)", ContName());
- Ctx_.SwitchTo(ctx);
- }
-
+ Ctx_.SwitchTo(ctx);
+ }
+
void DoRun() override;
-
+
void DoRunNaked() override;
- private:
+ private:
const char* ContName() const noexcept;
private:
NStack::TStackHolder Stack_;
- const TContClosure Clo_;
- TExceptionSafeContext Ctx_;
+ const TContClosure Clo_;
+ TExceptionSafeContext Ctx_;
TFunc Func_;
- TCont* const Cont_;
+ TCont* const Cont_;
};
}
diff --git a/library/cpp/coroutine/engine/ya.make b/library/cpp/coroutine/engine/ya.make
index 2ea3c237c7..8c20b9afc3 100644
--- a/library/cpp/coroutine/engine/ya.make
+++ b/library/cpp/coroutine/engine/ya.make
@@ -1,24 +1,24 @@
LIBRARY()
-OWNER(
- pg
- g:balancer
-)
+OWNER(
+ pg
+ g:balancer
+)
GENERATE_ENUM_SERIALIZATION(poller.h)
GENERATE_ENUM_SERIALIZATION(stack/stack_common.h)
-
+
PEERDIR(
contrib/libs/libc_compat
library/cpp/containers/intrusive_rb_tree
)
SRCS(
- cont_poller.cpp
+ cont_poller.cpp
helper.cpp
impl.cpp
iostatus.cpp
- network.cpp
+ network.cpp
poller.cpp
sockpool.cpp
stack/stack.cpp
@@ -26,7 +26,7 @@ SRCS(
stack/stack_guards.cpp
stack/stack_storage.cpp
stack/stack_utils.cpp
- trampoline.cpp
+ trampoline.cpp
)
END()
diff --git a/library/cpp/coroutine/listener/listen.cpp b/library/cpp/coroutine/listener/listen.cpp
index 02441c879b..3d4e711d1d 100644
--- a/library/cpp/coroutine/listener/listen.cpp
+++ b/library/cpp/coroutine/listener/listen.cpp
@@ -107,7 +107,7 @@ private:
SetDeferAccept(ListenSocket_);
}
- C_ = Parent_->E_->Create<TOneSocketListener, &TOneSocketListener::Run>(this, "listen_job");
+ C_ = Parent_->E_->Create<TOneSocketListener, &TOneSocketListener::Run>(this, "listen_job");
}
}
@@ -120,7 +120,7 @@ private:
C_->Cancel();
while (C_) {
- Parent_->E_->Running()->Yield();
+ Parent_->E_->Running()->Yield();
}
}
}
@@ -130,7 +130,7 @@ private:
while (!C_->Cancelled()) {
try {
TOpaqueAddr remote;
- const int res = NCoro::AcceptI(C_, ListenSocket_, remote.MutableAddr(), remote.LenPtr());
+ const int res = NCoro::AcceptI(C_, ListenSocket_, remote.MutableAddr(), remote.LenPtr());
if (res < 0) {
const int err = -res;
@@ -277,16 +277,16 @@ TContListener::TContListener(ICallBack* cb, TContExecutor* e, const TOptions& op
TContListener::~TContListener() {
}
-namespace {
- template <class T>
- static inline T&& CheckImpl(T&& impl) {
- Y_ENSURE_EX(impl, yexception() << "not running");
- return std::forward<T>(impl);
+namespace {
+ template <class T>
+ static inline T&& CheckImpl(T&& impl) {
+ Y_ENSURE_EX(impl, yexception() << "not running");
+ return std::forward<T>(impl);
}
-}
+}
void TContListener::Listen(const IRemoteAddr& addr) {
- CheckImpl(Impl_)->Listen(addr);
+ CheckImpl(Impl_)->Listen(addr);
}
void TContListener::Listen(const TIpAddress& addr) {
@@ -300,11 +300,11 @@ void TContListener::Listen(const TNetworkAddress& addr) {
}
void TContListener::Listen() {
- CheckImpl(Impl_)->Listen();
+ CheckImpl(Impl_)->Listen();
}
void TContListener::Bind(const IRemoteAddr& addr) {
- CheckImpl(Impl_)->Bind(addr);
+ CheckImpl(Impl_)->Bind(addr);
}
void TContListener::Bind(const TIpAddress& addr) {
@@ -312,7 +312,7 @@ void TContListener::Bind(const TIpAddress& addr) {
}
void TContListener::Bind(const TNetworkAddress& addr) {
- CheckImpl(Impl_)->Bind(addr);
+ CheckImpl(Impl_)->Bind(addr);
}
void TContListener::Stop() noexcept {
@@ -320,7 +320,7 @@ void TContListener::Stop() noexcept {
}
void TContListener::StopListenAddr(const IRemoteAddr& addr) {
- CheckImpl(Impl_)->StopListenAddr(addr);
+ CheckImpl(Impl_)->StopListenAddr(addr);
}
void TContListener::StopListenAddr(const TIpAddress& addr) {
diff --git a/library/cpp/coroutine/listener/listen.h b/library/cpp/coroutine/listener/listen.h
index 998284ec95..3a89cd3ecc 100644
--- a/library/cpp/coroutine/listener/listen.h
+++ b/library/cpp/coroutine/listener/listen.h
@@ -1,4 +1,4 @@
-#pragma once
+#pragma once
#include <util/generic/ptr.h>
#include <util/generic/ylimits.h>