aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/coroutine/engine
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/coroutine/engine
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/coroutine/engine')
-rw-r--r--library/cpp/coroutine/engine/condvar.h52
-rw-r--r--library/cpp/coroutine/engine/cont_poller.cpp10
-rw-r--r--library/cpp/coroutine/engine/cont_poller.h112
-rw-r--r--library/cpp/coroutine/engine/coroutine_ut.cpp388
-rw-r--r--library/cpp/coroutine/engine/events.h216
-rw-r--r--library/cpp/coroutine/engine/impl.cpp36
-rw-r--r--library/cpp/coroutine/engine/impl.h216
-rw-r--r--library/cpp/coroutine/engine/iostatus.cpp2
-rw-r--r--library/cpp/coroutine/engine/iostatus.h142
-rw-r--r--library/cpp/coroutine/engine/mutex.h74
-rw-r--r--library/cpp/coroutine/engine/network.cpp170
-rw-r--r--library/cpp/coroutine/engine/network.h34
-rw-r--r--library/cpp/coroutine/engine/poller.cpp538
-rw-r--r--library/cpp/coroutine/engine/poller.h66
-rw-r--r--library/cpp/coroutine/engine/sockmap.h42
-rw-r--r--library/cpp/coroutine/engine/sockpool.cpp2
-rw-r--r--library/cpp/coroutine/engine/sockpool.h366
-rw-r--r--library/cpp/coroutine/engine/trampoline.cpp8
-rw-r--r--library/cpp/coroutine/engine/trampoline.h20
-rw-r--r--library/cpp/coroutine/engine/ya.make26
20 files changed, 1260 insertions, 1260 deletions
diff --git a/library/cpp/coroutine/engine/condvar.h b/library/cpp/coroutine/engine/condvar.h
index 384a6c19f8..ffceede6fa 100644
--- a/library/cpp/coroutine/engine/condvar.h
+++ b/library/cpp/coroutine/engine/condvar.h
@@ -1,38 +1,38 @@
#pragma once
-
+
#include "events.h"
#include "mutex.h"
-class TContCondVar {
-public:
+class TContCondVar {
+public:
int WaitD(TCont* current, TContMutex* mutex, TInstant deadline) {
- mutex->UnLock();
-
- const int ret = WaitQueue_.WaitD(current, deadline);
-
- if (ret != EWAKEDUP) {
+ mutex->UnLock();
+
+ const int ret = WaitQueue_.WaitD(current, deadline);
+
+ if (ret != EWAKEDUP) {
return ret;
}
- return mutex->LockD(current, deadline);
- }
-
+ return mutex->LockD(current, deadline);
+ }
+
int WaitT(TCont* current, TContMutex* mutex, TDuration timeout) {
- return WaitD(current, mutex, timeout.ToDeadLine());
- }
-
+ return WaitD(current, mutex, timeout.ToDeadLine());
+ }
+
int WaitI(TCont* current, TContMutex* mutex) {
- return WaitD(current, mutex, TInstant::Max());
- }
-
+ return WaitD(current, mutex, TInstant::Max());
+ }
+
void Signal() noexcept {
- WaitQueue_.Signal();
- }
-
+ WaitQueue_.Signal();
+ }
+
void BroadCast() noexcept {
- WaitQueue_.BroadCast();
- }
-
-private:
- TContWaitQueue WaitQueue_;
-};
+ WaitQueue_.BroadCast();
+ }
+
+private:
+ TContWaitQueue WaitQueue_;
+};
diff --git a/library/cpp/coroutine/engine/cont_poller.cpp b/library/cpp/coroutine/engine/cont_poller.cpp
index 6073b72e62..76593d4e9b 100644
--- a/library/cpp/coroutine/engine/cont_poller.cpp
+++ b/library/cpp/coroutine/engine/cont_poller.cpp
@@ -1,6 +1,6 @@
#include "cont_poller.h"
-#include "impl.h"
-
+#include "impl.h"
+
namespace NCoro {
namespace {
template <class T>
@@ -20,8 +20,8 @@ namespace NCoro {
return event->Status();
}
- }
-
+ }
+
void TContPollEvent::Wake() noexcept {
UnLink();
Cont()->ReSchedule();
@@ -55,7 +55,7 @@ namespace NCoro {
};
IoWait_.ForEach(visitor);
}
-}
+}
void TFdEvent::RemoveFromIOWait() noexcept {
this->Cont()->Executor()->Poller()->Remove(this);
diff --git a/library/cpp/coroutine/engine/cont_poller.h b/library/cpp/coroutine/engine/cont_poller.h
index e6b23149c3..b638b2df1a 100644
--- a/library/cpp/coroutine/engine/cont_poller.h
+++ b/library/cpp/coroutine/engine/cont_poller.h
@@ -1,27 +1,27 @@
#pragma once
-
-#include "poller.h"
+
+#include "poller.h"
#include "sockmap.h"
-
+
#include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>
#include <util/datetime/base.h>
#include <util/memory/pool.h>
-#include <util/memory/smallobj.h>
+#include <util/memory/smallobj.h>
#include <util/network/init.h>
-
+
#include <cerrno>
-class TCont;
-class TContExecutor;
+class TCont;
+class TContExecutor;
class TFdEvent;
-
+
namespace NCoro {
class IPollEvent;
-
-
+
+
struct TContPollEventCompare {
template <class T>
static inline bool Compare(const T& l, const T& r) noexcept {
@@ -29,74 +29,74 @@ namespace NCoro {
}
};
-
+
class TContPollEvent : public TRbTreeItem<TContPollEvent, TContPollEventCompare> {
public:
TContPollEvent(TCont* cont, TInstant deadLine) noexcept
: Cont_(cont)
, DeadLine_(deadLine)
{}
-
+
static bool Compare(const TContPollEvent& l, const TContPollEvent& r) noexcept {
return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
}
-
+
int Status() const noexcept {
return Status_;
}
-
+
void SetStatus(int status) noexcept {
Status_ = status;
}
-
+
TCont* Cont() noexcept {
return Cont_;
}
-
+
TInstant DeadLine() const noexcept {
return DeadLine_;
}
-
+
void Wake(int status) noexcept {
SetStatus(status);
Wake();
- }
-
+ }
+
private:
void Wake() noexcept;
-
+
private:
TCont* Cont_;
TInstant DeadLine_;
int Status_ = EINPROGRESS;
- };
-
-
+ };
+
+
class IPollEvent: public TIntrusiveListItem<IPollEvent> {
public:
IPollEvent(SOCKET fd, ui16 what) noexcept
: Fd_(fd)
, What_(what)
{}
-
+
virtual ~IPollEvent() {}
-
+
SOCKET Fd() const noexcept {
return Fd_;
}
-
+
int What() const noexcept {
return What_;
}
-
+
virtual void OnPollEvent(int status) noexcept = 0;
-
+
private:
SOCKET Fd_;
ui16 What_;
};
-
-
+
+
template <class T>
class TBigArray {
struct TValue: public T, public TObjectFromPool<TValue> {
@@ -124,27 +124,27 @@ namespace NCoro {
using TPollEventList = TIntrusiveList<IPollEvent>;
-
+
class TContPoller {
public:
using TEvent = IPollerFace::TEvent;
using TEvents = IPollerFace::TEvents;
-
+
TContPoller()
: P_(IPollerFace::Default())
{
}
-
+
explicit TContPoller(THolder<IPollerFace> poller)
: P_(std::move(poller))
{}
-
+
void Schedule(IPollEvent* event) {
auto* lst = Lists_.Get(event->Fd());
const ui16 oldFlags = Flags(*lst);
lst->PushFront(event);
ui16 newFlags = Flags(*lst);
-
+
if (newFlags != oldFlags) {
if (oldFlags) {
newFlags |= CONT_POLL_MODIFY;
@@ -152,14 +152,14 @@ namespace NCoro {
P_->Set(lst, event->Fd(), newFlags);
}
- }
-
+ }
+
void Remove(IPollEvent* event) noexcept {
auto* lst = Lists_.Get(event->Fd());
const ui16 oldFlags = Flags(*lst);
event->Unlink();
ui16 newFlags = Flags(*lst);
-
+
if (newFlags != oldFlags) {
if (newFlags) {
newFlags |= CONT_POLL_MODIFY;
@@ -167,13 +167,13 @@ namespace NCoro {
P_->Set(lst, event->Fd(), newFlags);
}
- }
-
+ }
+
void Wait(TEvents& events, TInstant deadLine) {
events.clear();
P_->Wait(events, deadLine);
- }
-
+ }
+
EContPoller PollEngine() const {
return P_->PollEngine();
}
@@ -182,14 +182,14 @@ namespace NCoro {
ui16 ret = 0;
for (auto&& item : lst) {
ret |= item.What();
- }
+ }
return ret;
- }
-
+ }
+
private:
TBigArray<TPollEventList> Lists_;
THolder<IPollerFace> P_;
- };
+ };
class TEventWaitQueue {
@@ -210,12 +210,12 @@ namespace NCoro {
TIoWait IoWait_;
};
}
-
+
class TFdEvent final:
public NCoro::TContPollEvent,
public NCoro::IPollEvent
{
-public:
+public:
TFdEvent(TCont* cont, SOCKET fd, ui16 what, TInstant deadLine) noexcept
: TContPollEvent(cont, deadLine)
, IPollEvent(fd, what)
@@ -223,22 +223,22 @@ public:
~TFdEvent() {
RemoveFromIOWait();
- }
-
+ }
+
void RemoveFromIOWait() noexcept;
-
+
void OnPollEvent(int status) noexcept override {
Wake(status);
- }
-};
-
+ }
+};
+
class TTimerEvent: public NCoro::TContPollEvent {
-public:
+public:
TTimerEvent(TCont* cont, TInstant deadLine) noexcept
: TContPollEvent(cont, deadLine)
{}
-};
+};
int ExecuteEvent(TFdEvent* event) noexcept;
diff --git a/library/cpp/coroutine/engine/coroutine_ut.cpp b/library/cpp/coroutine/engine/coroutine_ut.cpp
index 29e0d1dfc5..8b372496a2 100644
--- a/library/cpp/coroutine/engine/coroutine_ut.cpp
+++ b/library/cpp/coroutine/engine/coroutine_ut.cpp
@@ -1,34 +1,34 @@
-#include "impl.h"
+#include "impl.h"
#include "condvar.h"
#include "network.h"
-
+
#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/string/cast.h>
+
+#include <util/string/cast.h>
#include <util/system/pipe.h>
#include <util/system/env.h>
#include <util/system/info.h>
#include <util/system/thread.h>
#include <util/generic/xrange.h>
#include <util/generic/serialized_enum.h>
-
+
// TODO (velavokr): BALANCER-1345 add more tests on pollers
-class TCoroTest: public TTestBase {
- UNIT_TEST_SUITE(TCoroTest);
- UNIT_TEST(TestSimpleX1);
+class TCoroTest: public TTestBase {
+ UNIT_TEST_SUITE(TCoroTest);
+ UNIT_TEST(TestSimpleX1);
UNIT_TEST(TestSimpleX1MultiThread);
- UNIT_TEST(TestSimpleX2);
- UNIT_TEST(TestSimpleX3);
- UNIT_TEST(TestMemFun);
- UNIT_TEST(TestMutex);
- UNIT_TEST(TestCondVar);
+ UNIT_TEST(TestSimpleX2);
+ UNIT_TEST(TestSimpleX3);
+ UNIT_TEST(TestMemFun);
+ UNIT_TEST(TestMutex);
+ UNIT_TEST(TestCondVar);
UNIT_TEST(TestJoinDefault);
UNIT_TEST(TestJoinEpoll);
UNIT_TEST(TestJoinKqueue);
UNIT_TEST(TestJoinPoll);
UNIT_TEST(TestJoinSelect);
- UNIT_TEST(TestException);
+ UNIT_TEST(TestException);
UNIT_TEST(TestJoinCancelExitRaceBug);
UNIT_TEST(TestWaitWakeLivelockBug);
UNIT_TEST(TestFastPathWakeDefault)
@@ -46,17 +46,17 @@ class TCoroTest: public TTestBase {
UNIT_TEST(TestUserEvent);
UNIT_TEST(TestPause);
UNIT_TEST(TestOverrideTime);
- UNIT_TEST_SUITE_END();
-
-public:
- void TestException();
- void TestSimpleX1();
+ UNIT_TEST_SUITE_END();
+
+public:
+ void TestException();
+ void TestSimpleX1();
void TestSimpleX1MultiThread();
- void TestSimpleX2();
- void TestSimpleX3();
- void TestMemFun();
- void TestMutex();
- void TestCondVar();
+ void TestSimpleX2();
+ void TestSimpleX3();
+ void TestMemFun();
+ void TestMutex();
+ void TestCondVar();
void TestJoinDefault();
void TestJoinEpoll();
void TestJoinKqueue();
@@ -78,72 +78,72 @@ public:
void TestUserEvent();
void TestPause();
void TestOverrideTime();
-};
-
-void TCoroTest::TestException() {
- TContExecutor e(1000000);
-
- bool f2run = false;
-
- auto f1 = [&f2run](TCont* c) {
- struct TCtx {
- ~TCtx() {
+};
+
+void TCoroTest::TestException() {
+ TContExecutor e(1000000);
+
+ bool f2run = false;
+
+ auto f1 = [&f2run](TCont* c) {
+ struct TCtx {
+ ~TCtx() {
Y_VERIFY(!*F2);
-
- C->Yield();
- }
-
- TCont* C;
- bool* F2;
- };
-
- try {
- TCtx ctx = {c, &f2run};
-
- throw 1;
- } catch (...) {
- }
- };
-
- bool unc = true;
-
- auto f2 = [&unc, &f2run](TCont*) {
- f2run = true;
- unc = std::uncaught_exception();
-
- // check segfault
- try {
- throw 2;
- } catch (int) {
- }
- };
-
- e.Create(f1, "f1");
- e.Create(f2, "f2");
- e.Execute();
-
- UNIT_ASSERT(!unc);
-}
-
+
+ C->Yield();
+ }
+
+ TCont* C;
+ bool* F2;
+ };
+
+ try {
+ TCtx ctx = {c, &f2run};
+
+ throw 1;
+ } catch (...) {
+ }
+ };
+
+ bool unc = true;
+
+ auto f2 = [&unc, &f2run](TCont*) {
+ f2run = true;
+ unc = std::uncaught_exception();
+
+ // check segfault
+ try {
+ throw 2;
+ } catch (int) {
+ }
+ };
+
+ e.Create(f1, "f1");
+ e.Create(f2, "f2");
+ e.Execute();
+
+ UNIT_ASSERT(!unc);
+}
+
static int i0;
-
-static void CoRun(TCont* c, void* /*run*/) {
+
+static void CoRun(TCont* c, void* /*run*/) {
while (i0 < 100000) {
++i0;
UNIT_ASSERT(RunningCont() == c);
- c->Yield();
+ c->Yield();
UNIT_ASSERT(RunningCont() == c);
- }
-}
-
-static void CoMain(TCont* c, void* /*arg*/) {
+ }
+}
+
+static void CoMain(TCont* c, void* /*arg*/) {
for (volatile size_t i2 = 0; i2 < 10; ++i2) {
UNIT_ASSERT(RunningCont() == c);
c->Executor()->Create(CoRun, nullptr, "run");
UNIT_ASSERT(RunningCont() == c);
- }
-}
-
+ }
+}
+
void TCoroTest::TestSimpleX1() {
i0 = 0;
TContExecutor e(32000);
@@ -178,17 +178,17 @@ void TCoroTest::TestSimpleX1MultiThread() {
UNIT_ASSERT_EQUAL(c, nThreads);
}
-struct TTestObject {
+struct TTestObject {
int i = 0;
int j = 0;
-
+
public:
- void RunTask1(TCont*) {
- i = 1;
- }
- void RunTask2(TCont*) {
- j = 2;
- }
+ void RunTask1(TCont*) {
+ i = 1;
+ }
+ void RunTask2(TCont*) {
+ j = 2;
+ }
};
void TCoroTest::TestMemFun() {
@@ -201,130 +201,130 @@ void TCoroTest::TestMemFun() {
UNIT_ASSERT_EQUAL(obj.j, 2);
}
-void TCoroTest::TestSimpleX2() {
- {
+void TCoroTest::TestSimpleX2() {
+ {
i0 = 0;
-
- {
- TContExecutor e(32000);
- e.Execute(CoMain);
- }
-
+
+ {
+ TContExecutor e(32000);
+ e.Execute(CoMain);
+ }
+
UNIT_ASSERT_EQUAL(i0, 100000);
- }
-
- {
+ }
+
+ {
i0 = 0;
-
- {
- TContExecutor e(32000);
- e.Execute(CoMain);
- }
-
+
+ {
+ TContExecutor e(32000);
+ e.Execute(CoMain);
+ }
+
UNIT_ASSERT_EQUAL(i0, 100000);
- }
-}
-
-struct TRunner {
- inline TRunner()
- : Runs(0)
- {
- }
-
- inline void operator()(TCont* c) {
- ++Runs;
- c->Yield();
- }
-
- size_t Runs;
-};
-
-void TCoroTest::TestSimpleX3() {
- TContExecutor e(32000);
- TRunner runner;
-
+ }
+}
+
+struct TRunner {
+ inline TRunner()
+ : Runs(0)
+ {
+ }
+
+ inline void operator()(TCont* c) {
+ ++Runs;
+ c->Yield();
+ }
+
+ size_t Runs;
+};
+
+void TCoroTest::TestSimpleX3() {
+ TContExecutor e(32000);
+ TRunner runner;
+
for (volatile size_t i3 = 0; i3 < 1000; ++i3) {
- e.Create(runner, "runner");
- }
-
- e.Execute();
-
- UNIT_ASSERT_EQUAL(runner.Runs, 1000);
-}
-
+ e.Create(runner, "runner");
+ }
+
+ e.Execute();
+
+ UNIT_ASSERT_EQUAL(runner.Runs, 1000);
+}
+
static TString res;
-static TContMutex mutex;
-
-static void CoMutex(TCont* c, void* /*run*/) {
- {
- mutex.LockI(c);
- c->Yield();
- res += c->Name();
- mutex.UnLock();
- }
-
- c->Yield();
-
- {
- mutex.LockI(c);
- c->Yield();
- res += c->Name();
- mutex.UnLock();
- }
-}
-
-static void CoMutexTest(TCont* c, void* /*run*/) {
+static TContMutex mutex;
+
+static void CoMutex(TCont* c, void* /*run*/) {
+ {
+ mutex.LockI(c);
+ c->Yield();
+ res += c->Name();
+ mutex.UnLock();
+ }
+
+ c->Yield();
+
+ {
+ mutex.LockI(c);
+ c->Yield();
+ res += c->Name();
+ mutex.UnLock();
+ }
+}
+
+static void CoMutexTest(TCont* c, void* /*run*/) {
c->Executor()->Create(CoMutex, nullptr, "1");
c->Executor()->Create(CoMutex, nullptr, "2");
-}
-
-void TCoroTest::TestMutex() {
- TContExecutor e(32000);
- e.Execute(CoMutexTest);
- UNIT_ASSERT_EQUAL(res, "1212");
+}
+
+void TCoroTest::TestMutex() {
+ TContExecutor e(32000);
+ e.Execute(CoMutexTest);
+ UNIT_ASSERT_EQUAL(res, "1212");
res.clear();
-}
-
-static TContMutex m1;
-static TContCondVar c1;
-
-static void CoCondVar(TCont* c, void* /*run*/) {
+}
+
+static TContMutex m1;
+static TContCondVar c1;
+
+static void CoCondVar(TCont* c, void* /*run*/) {
for (size_t i4 = 0; i4 < 3; ++i4) {
- UNIT_ASSERT_EQUAL(m1.LockI(c), 0);
- UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0);
- res += c->Name();
- m1.UnLock();
- }
-}
-
-static void CoCondVarTest(TCont* c, void* /*run*/) {
+ UNIT_ASSERT_EQUAL(m1.LockI(c), 0);
+ UNIT_ASSERT_EQUAL(c1.WaitI(c, &m1), 0);
+ res += c->Name();
+ m1.UnLock();
+ }
+}
+
+static void CoCondVarTest(TCont* c, void* /*run*/) {
c->Executor()->Create(CoCondVar, nullptr, "1");
- c->Yield();
+ c->Yield();
c->Executor()->Create(CoCondVar, nullptr, "2");
- c->Yield();
+ c->Yield();
c->Executor()->Create(CoCondVar, nullptr, "3");
- c->Yield();
+ c->Yield();
c->Executor()->Create(CoCondVar, nullptr, "4");
- c->Yield();
+ c->Yield();
c->Executor()->Create(CoCondVar, nullptr, "5");
- c->Yield();
+ c->Yield();
c->Executor()->Create(CoCondVar, nullptr, "6");
- c->Yield();
-
+ c->Yield();
+
for (size_t i5 = 0; i5 < 3; ++i5) {
res += ToString((size_t)i5) + "^";
- c1.BroadCast();
- c->Yield();
- }
-}
-
-void TCoroTest::TestCondVar() {
- TContExecutor e(32000);
- e.Execute(CoCondVarTest);
- UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456");
+ c1.BroadCast();
+ c->Yield();
+ }
+}
+
+void TCoroTest::TestCondVar() {
+ TContExecutor e(32000);
+ e.Execute(CoCondVarTest);
+ UNIT_ASSERT_EQUAL(res, "0^1234561^1234562^123456");
res.clear();
-}
-
+}
+
namespace NCoroTestJoin {
struct TSleepCont {
const TInstant Deadline;
@@ -1004,4 +1004,4 @@ void TCoroTest::TestOverrideTime() {
executor.Execute();
}
-UNIT_TEST_SUITE_REGISTRATION(TCoroTest);
+UNIT_TEST_SUITE_REGISTRATION(TCoroTest);
diff --git a/library/cpp/coroutine/engine/events.h b/library/cpp/coroutine/engine/events.h
index 2e879999bf..07cc4d25e8 100644
--- a/library/cpp/coroutine/engine/events.h
+++ b/library/cpp/coroutine/engine/events.h
@@ -1,148 +1,148 @@
#pragma once
-
+
#include "impl.h"
#include <util/datetime/base.h>
-class TContEvent {
-public:
+class TContEvent {
+public:
TContEvent(TCont* current) noexcept
- : Cont_(current)
- , Status_(0)
- {
- }
-
+ : Cont_(current)
+ , Status_(0)
+ {
+ }
+
~TContEvent() {
- }
-
+ }
+
int WaitD(TInstant deadline) {
- Status_ = 0;
- const int ret = Cont_->SleepD(deadline);
-
- return Status_ ? Status_ : ret;
- }
-
+ Status_ = 0;
+ const int ret = Cont_->SleepD(deadline);
+
+ return Status_ ? Status_ : ret;
+ }
+
int WaitT(TDuration timeout) {
- return WaitD(timeout.ToDeadLine());
- }
-
+ return WaitD(timeout.ToDeadLine());
+ }
+
int WaitI() {
- return WaitD(TInstant::Max());
- }
-
+ return WaitD(TInstant::Max());
+ }
+
void Wake() noexcept {
- SetStatus(EWAKEDUP);
- Cont_->ReSchedule();
- }
-
+ SetStatus(EWAKEDUP);
+ Cont_->ReSchedule();
+ }
+
TCont* Cont() noexcept {
- return Cont_;
- }
-
+ return Cont_;
+ }
+
int Status() const noexcept {
- return Status_;
- }
-
+ return Status_;
+ }
+
void SetStatus(int status) noexcept {
- Status_ = status;
- }
-
-private:
- TCont* Cont_;
- int Status_;
-};
-
-class TContWaitQueue {
- class TWaiter: public TContEvent, public TIntrusiveListItem<TWaiter> {
- public:
+ Status_ = status;
+ }
+
+private:
+ TCont* Cont_;
+ int Status_;
+};
+
+class TContWaitQueue {
+ class TWaiter: public TContEvent, public TIntrusiveListItem<TWaiter> {
+ public:
TWaiter(TCont* current) noexcept
- : TContEvent(current)
- {
- }
-
+ : TContEvent(current)
+ {
+ }
+
~TWaiter() {
- }
- };
-
-public:
+ }
+ };
+
+public:
TContWaitQueue() noexcept {
- }
-
+ }
+
~TContWaitQueue() {
Y_ASSERT(Waiters_.Empty());
- }
-
+ }
+
int WaitD(TCont* current, TInstant deadline) {
- TWaiter waiter(current);
-
- Waiters_.PushBack(&waiter);
-
- return waiter.WaitD(deadline);
- }
-
+ TWaiter waiter(current);
+
+ Waiters_.PushBack(&waiter);
+
+ return waiter.WaitD(deadline);
+ }
+
int WaitT(TCont* current, TDuration timeout) {
- return WaitD(current, timeout.ToDeadLine());
- }
-
+ return WaitD(current, timeout.ToDeadLine());
+ }
+
int WaitI(TCont* current) {
- return WaitD(current, TInstant::Max());
- }
-
+ return WaitD(current, TInstant::Max());
+ }
+
void Signal() noexcept {
- if (!Waiters_.Empty()) {
- Waiters_.PopFront()->Wake();
- }
- }
+ if (!Waiters_.Empty()) {
+ Waiters_.PopFront()->Wake();
+ }
+ }
void BroadCast() noexcept {
- while (!Waiters_.Empty()) {
- Waiters_.PopFront()->Wake();
- }
- }
-
+ while (!Waiters_.Empty()) {
+ Waiters_.PopFront()->Wake();
+ }
+ }
+
void BroadCast(size_t number) noexcept {
for (size_t i = 0; i < number && !Waiters_.Empty(); ++i) {
Waiters_.PopFront()->Wake();
}
}
-private:
- TIntrusiveList<TWaiter> Waiters_;
-};
-
+private:
+ TIntrusiveList<TWaiter> Waiters_;
+};
+
-class TContSimpleEvent {
-public:
+class TContSimpleEvent {
+public:
TContSimpleEvent(TContExecutor* e)
- : E_(e)
- {
- }
-
+ : E_(e)
+ {
+ }
+
TContExecutor* Executor() const noexcept {
- return E_;
- }
-
+ return E_;
+ }
+
void Signal() noexcept {
- Q_.Signal();
- }
-
+ Q_.Signal();
+ }
+
void BroadCast() noexcept {
- Q_.BroadCast();
- }
-
+ Q_.BroadCast();
+ }
+
int WaitD(TInstant deadLine) noexcept {
return Q_.WaitD(E_->Running(), deadLine);
- }
-
+ }
+
int WaitT(TDuration timeout) noexcept {
- return WaitD(timeout.ToDeadLine());
- }
-
+ return WaitD(timeout.ToDeadLine());
+ }
+
int WaitI() noexcept {
- return WaitD(TInstant::Max());
- }
-
-private:
- TContWaitQueue Q_;
- TContExecutor* E_;
-};
+ return WaitD(TInstant::Max());
+ }
+
+private:
+ TContWaitQueue Q_;
+ TContExecutor* E_;
+};
diff --git a/library/cpp/coroutine/engine/impl.cpp b/library/cpp/coroutine/engine/impl.cpp
index df08ddd3cc..7ae6f74051 100644
--- a/library/cpp/coroutine/engine/impl.cpp
+++ b/library/cpp/coroutine/engine/impl.cpp
@@ -1,15 +1,15 @@
#include "impl.h"
-
+
#include "stack/stack_allocator.h"
#include "stack/stack_guards.h"
#include <util/generic/scope.h>
#include <util/thread/singleton.h>
#include <util/stream/format.h>
-#include <util/stream/output.h>
+#include <util/stream/output.h>
#include <util/system/yassert.h>
-
-
+
+
TCont::TJoinWait::TJoinWait(TCont& c) noexcept
: Cont_(c)
{}
@@ -35,12 +35,12 @@ TCont::TCont(NCoro::NStack::IAllocator& allocator,
void TCont::PrintMe(IOutputStream& out) const noexcept {
- out << "cont("
+ out << "cont("
<< "name = " << Name_ << ", "
<< "addr = " << Hex((size_t)this)
- << ")";
-}
-
+ << ")";
+}
+
bool TCont::Join(TCont* c, TInstant deadLine) noexcept {
TJoinWait ev(*this);
c->Waiters_.PushBack(&ev);
@@ -148,8 +148,8 @@ void TContExecutor::Execute(TContFunc func, void* arg) noexcept {
func(cont, arg);
}, "sys_main");
RunScheduler();
-}
-
+}
+
void TContExecutor::WaitForIO() {
while (Ready_.Empty() && !WaitQueue_.Empty()) {
const auto now = Now();
@@ -190,8 +190,8 @@ void TContExecutor::WaitForIO() {
Ready_.Append(ReadyNext_);
}
-}
-
+}
+
void TContExecutor::Poll(TInstant deadline) {
Poller_.Wait(PollerEvents_, deadline);
LastPoll_ = Now();
@@ -217,8 +217,8 @@ void TContExecutor::Poll(TInstant deadline) {
}
}
}
-}
-
+}
+
void TContExecutor::Abort() noexcept {
WaitQueue_.Abort();
auto visitor = [](TCont* c) {
@@ -324,7 +324,7 @@ void TContExecutor::RunScheduler() noexcept {
}
TCont* cont = Ready_.PopFront();
-
+
if (ScheduleCallback_) {
ScheduleCallback_->OnSchedule(*this, *cont);
}
@@ -347,9 +347,9 @@ void TContExecutor::RunScheduler() noexcept {
} catch (...) {
TBackTrace::FromCurrentException().PrintTo(Cerr);
Y_FAIL("Uncaught exception in the scheduler: %s", CurrentExceptionMessage().c_str());
- }
+ }
}
-
+
void TContExecutor::Pause() {
if (auto cont = Running()) {
Paused_ = true;
@@ -371,4 +371,4 @@ TInstant TContExecutor::Now() {
template <>
void Out<TCont>(IOutputStream& out, const TCont& c) {
c.PrintMe(out);
-}
+}
diff --git a/library/cpp/coroutine/engine/impl.h b/library/cpp/coroutine/engine/impl.h
index d904235ed3..283a96ecf1 100644
--- a/library/cpp/coroutine/engine/impl.h
+++ b/library/cpp/coroutine/engine/impl.h
@@ -1,30 +1,30 @@
#pragma once
-
+
#include "callbacks.h"
#include "cont_poller.h"
#include "iostatus.h"
-#include "poller.h"
+#include "poller.h"
#include "stack/stack_common.h"
#include "trampoline.h"
#include "custom_time.h"
-
+
#include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>
-#include <util/system/error.h>
-#include <util/generic/ptr.h>
-#include <util/generic/intrlist.h>
-#include <util/datetime/base.h>
+#include <util/system/error.h>
+#include <util/generic/ptr.h>
+#include <util/generic/intrlist.h>
+#include <util/datetime/base.h>
#include <util/generic/maybe.h>
#include <util/generic/function.h>
-
+
#define EWAKEDUP 34567
-class TCont;
-struct TContRep;
-class TContExecutor;
-class TContPollEvent;
-
+class TCont;
+struct TContRep;
+class TContExecutor;
+class TContPollEvent;
+
namespace NCoro::NStack {
class IAllocator;
}
@@ -32,18 +32,18 @@ namespace NCoro::NStack {
class TCont : private TIntrusiveListItem<TCont> {
struct TJoinWait: public TIntrusiveListItem<TJoinWait> {
TJoinWait(TCont& c) noexcept;
-
+
void Wake() noexcept;
-
+
public:
TCont& Cont_;
};
-
+
friend class TContExecutor;
friend class TIntrusiveListItem<TCont>;
friend class NCoro::TEventWaitQueue;
friend class NCoro::TTrampoline;
-
+
private:
TCont(
NCoro::NStack::IAllocator& allocator,
@@ -52,63 +52,63 @@ private:
NCoro::TTrampoline::TFunc func,
const char* name
) noexcept;
-
+
public:
TContExecutor* Executor() noexcept {
return &Executor_;
- }
-
+ }
+
const TContExecutor* Executor() const noexcept {
return &Executor_;
- }
-
+ }
+
const char* Name() const noexcept {
- return Name_;
- }
-
+ return Name_;
+ }
+
void PrintMe(IOutputStream& out) const noexcept;
-
+
void Yield() noexcept;
-
+
void ReScheduleAndSwitch() noexcept;
-
- /// @return ETIMEDOUT on success
+
+ /// @return ETIMEDOUT on success
int SleepD(TInstant deadline) noexcept;
-
+
int SleepT(TDuration timeout) noexcept {
- return SleepD(timeout.ToDeadLine());
- }
-
+ return SleepD(timeout.ToDeadLine());
+ }
+
int SleepI() noexcept {
- return SleepD(TInstant::Max());
- }
-
+ return SleepD(TInstant::Max());
+ }
+
bool IAmRunning() const noexcept;
void Cancel() noexcept;
bool Cancelled() const noexcept {
- return Cancelled_;
- }
-
+ return Cancelled_;
+ }
+
bool Scheduled() const noexcept {
return Scheduled_;
}
bool Join(TCont* c, TInstant deadLine = TInstant::Max()) noexcept;
-
+
void ReSchedule() noexcept;
-
+
void Switch() noexcept;
void SwitchTo(TExceptionSafeContext* ctx) {
Trampoline_.SwitchTo(ctx);
}
-private:
+private:
void Terminate();
-private:
+private:
TContExecutor& Executor_;
// TODO(velavokr): allow name storage owning (for generated names backed by TString)
@@ -119,17 +119,17 @@ private:
TIntrusiveList<TJoinWait> Waiters_;
bool Cancelled_ = false;
bool Scheduled_ = false;
-};
-
+};
+
TCont* RunningCont();
-
-
-template <class Functor>
-static void ContHelperFunc(TCont* cont, void* arg) {
- (*((Functor*)(arg)))(cont);
-}
-
-template <typename T, void (T::*M)(TCont*)>
+
+
+template <class Functor>
+static void ContHelperFunc(TCont* cont, void* arg) {
+ (*((Functor*)(arg)))(cont);
+}
+
+template <typename T, void (T::*M)(TCont*)>
static void ContHelperMemberFunc(TCont* c, void* arg) {
((reinterpret_cast<T*>(arg))->*M)(c);
}
@@ -145,11 +145,11 @@ public:
/// Central coroutine class.
/// Note, coroutines are single-threaded, and all methods must be called from the single thread
-class TContExecutor {
- friend class TCont;
+class TContExecutor {
+ friend class TCont;
using TContList = TIntrusiveList<TCont>;
-
-public:
+
+public:
TContExecutor(
uint32_t defaultStackSize,
THolder<IPollerFace> poller = IPollerFace::Default(),
@@ -159,41 +159,41 @@ public:
TMaybe<NCoro::NStack::TPoolAllocatorSettings> poolSettings = Nothing(),
NCoro::ITime* time = nullptr
);
-
+
~TContExecutor();
-
+
// if we already have a coroutine to run
void Execute() noexcept;
-
+
void Execute(TContFunc func, void* arg = nullptr) noexcept;
-
- template <class Functor>
+
+ template <class Functor>
void Execute(Functor& f) noexcept {
- Execute((TContFunc)ContHelperFunc<Functor>, (void*)&f);
- }
-
- template <typename T, void (T::*M)(TCont*)>
+ Execute((TContFunc)ContHelperFunc<Functor>, (void*)&f);
+ }
+
+ template <typename T, void (T::*M)(TCont*)>
void Execute(T* obj) noexcept {
- Execute(ContHelperMemberFunc<T, M>, obj);
- }
+ Execute(ContHelperMemberFunc<T, M>, obj);
+ }
- template <class Functor>
+ template <class Functor>
TCont* Create(
Functor& f,
const char* name,
TMaybe<ui32> customStackSize = Nothing()
) noexcept {
return Create((TContFunc)ContHelperFunc<Functor>, (void*)&f, name, customStackSize);
- }
-
- template <typename T, void (T::*M)(TCont*)>
+ }
+
+ template <typename T, void (T::*M)(TCont*)>
TCont* Create(
T* obj,
const char* name,
TMaybe<ui32> customStackSize = Nothing()
) noexcept {
return Create(ContHelperMemberFunc<T, M>, obj, name, customStackSize);
- }
+ }
TCont* Create(
TContFunc func,
@@ -201,7 +201,7 @@ public:
const char* name,
TMaybe<ui32> customStackSize = Nothing()
) noexcept;
-
+
TCont* CreateOwned(
NCoro::TTrampoline::TFunc func,
const char* name,
@@ -209,17 +209,17 @@ public:
) noexcept;
NCoro::TContPoller* Poller() noexcept {
- return &Poller_;
- }
-
+ return &Poller_;
+ }
+
TCont* Running() noexcept {
- return Current_;
- }
-
+ return Current_;
+ }
+
const TCont* Running() const noexcept {
- return Current_;
- }
-
+ return Current_;
+ }
+
size_t TotalReadyConts() const noexcept {
return Ready_.Size() + TotalScheduledConts();
}
@@ -240,54 +240,54 @@ public:
// TODO(velavokr): rename, it is just CancelAll actually
void Abort() noexcept;
-
+
void SetFailOnError(bool fail) noexcept {
- FailOnError_ = fail;
- }
-
+ FailOnError_ = fail;
+ }
+
bool FailOnError() const noexcept {
- return FailOnError_;
- }
-
+ return FailOnError_;
+ }
+
void RegisterInWaitQueue(NCoro::TContPollEvent* event) {
WaitQueue_.Register(event);
}
void ScheduleIoWait(TFdEvent* event) {
RegisterInWaitQueue(event);
- Poller_.Schedule(event);
- }
-
+ Poller_.Schedule(event);
+ }
+
void ScheduleIoWait(TTimerEvent* event) noexcept {
RegisterInWaitQueue(event);
- }
-
+ }
+
void ScheduleUserEvent(IUserEvent* event) {
UserEvents_.PushBack(event);
}
void Pause();
TInstant Now();
-private:
+private:
void Release(TCont* cont) noexcept;
-
+
void Exit(TCont* cont) noexcept;
-
+
void RunScheduler() noexcept;
-
+
void ScheduleToDelete(TCont* cont) noexcept;
-
+
void ScheduleExecution(TCont* cont) noexcept;
-
+
void ScheduleExecutionNow(TCont* cont) noexcept;
void DeleteScheduled() noexcept;
-
- void WaitForIO();
-
+
+ void WaitForIO();
+
void Poll(TInstant deadline);
-private:
+private:
NCoro::IScheduleCallback* const ScheduleCallback_ = nullptr;
NCoro::IEnterPollerCallback* const EnterPollerCallback_ = nullptr;
const uint32_t DefaultStackSize_;
@@ -295,8 +295,8 @@ private:
TExceptionSafeContext SchedContext_;
- TContList ToDelete_;
- TContList Ready_;
+ TContList ToDelete_;
+ TContList Ready_;
TContList ReadyNext_;
NCoro::TEventWaitQueue WaitQueue_;
NCoro::TContPoller Poller_;
@@ -310,4 +310,4 @@ private:
bool FailOnError_ = false;
bool Paused_ = false;
NCoro::ITime* Time_ = nullptr;
-};
+};
diff --git a/library/cpp/coroutine/engine/iostatus.cpp b/library/cpp/coroutine/engine/iostatus.cpp
index a92142edeb..8298229b39 100644
--- a/library/cpp/coroutine/engine/iostatus.cpp
+++ b/library/cpp/coroutine/engine/iostatus.cpp
@@ -1 +1 @@
-#include "iostatus.h"
+#include "iostatus.h"
diff --git a/library/cpp/coroutine/engine/iostatus.h b/library/cpp/coroutine/engine/iostatus.h
index 8478ce05e8..bf6036805d 100644
--- a/library/cpp/coroutine/engine/iostatus.h
+++ b/library/cpp/coroutine/engine/iostatus.h
@@ -1,91 +1,91 @@
#pragma once
-
+
#include <util/generic/yexception.h>
-class TIOStatus {
-public:
+class TIOStatus {
+public:
TIOStatus(int status) noexcept
- : Status_(status)
- {
- }
-
+ : Status_(status)
+ {
+ }
+
static TIOStatus Error(int status) noexcept {
- return TIOStatus(status);
- }
-
+ return TIOStatus(status);
+ }
+
static TIOStatus Error() noexcept {
- return TIOStatus(LastSystemError());
- }
-
+ return TIOStatus(LastSystemError());
+ }
+
static TIOStatus Success() noexcept {
- return TIOStatus(0);
- }
-
+ return TIOStatus(0);
+ }
+
void Check() const {
- if (Status_) {
- ythrow TSystemError(Status_) << "io error";
- }
- }
-
+ if (Status_) {
+ ythrow TSystemError(Status_) << "io error";
+ }
+ }
+
bool Failed() const noexcept {
- return (bool)Status_;
- }
-
+ return (bool)Status_;
+ }
+
bool Succeed() const noexcept {
- return !Failed();
- }
-
+ return !Failed();
+ }
+
int Status() const noexcept {
- return Status_;
- }
-
-private:
- int Status_;
-};
-
-
-class TContIOStatus {
-public:
+ return Status_;
+ }
+
+private:
+ int Status_;
+};
+
+
+class TContIOStatus {
+public:
TContIOStatus(size_t processed, TIOStatus status) noexcept
- : Processed_(processed)
- , Status_(status)
- {
- }
-
+ : Processed_(processed)
+ , Status_(status)
+ {
+ }
+
static TContIOStatus Error(TIOStatus status) noexcept {
- return TContIOStatus(0, status);
- }
-
+ return TContIOStatus(0, status);
+ }
+
static TContIOStatus Error() noexcept {
- return TContIOStatus(0, TIOStatus::Error());
- }
-
+ return TContIOStatus(0, TIOStatus::Error());
+ }
+
static TContIOStatus Success(size_t processed) noexcept {
- return TContIOStatus(processed, TIOStatus::Success());
- }
-
+ return TContIOStatus(processed, TIOStatus::Success());
+ }
+
static TContIOStatus Eof() noexcept {
- return Success(0);
- }
-
+ return Success(0);
+ }
+
~TContIOStatus() {
- }
-
+ }
+
size_t Processed() const noexcept {
- return Processed_;
- }
-
+ return Processed_;
+ }
+
int Status() const noexcept {
- return Status_.Status();
- }
-
+ return Status_.Status();
+ }
+
size_t Checked() const {
- Status_.Check();
-
- return Processed_;
- }
-
-private:
- size_t Processed_;
- TIOStatus Status_;
-};
+ Status_.Check();
+
+ return Processed_;
+ }
+
+private:
+ size_t Processed_;
+ TIOStatus Status_;
+};
diff --git a/library/cpp/coroutine/engine/mutex.h b/library/cpp/coroutine/engine/mutex.h
index 27392f7d85..93e9119503 100644
--- a/library/cpp/coroutine/engine/mutex.h
+++ b/library/cpp/coroutine/engine/mutex.h
@@ -1,49 +1,49 @@
#pragma once
-
+
#include "impl.h"
#include "events.h"
-class TContMutex {
-public:
+class TContMutex {
+public:
TContMutex() noexcept
- : Token_(true)
- {
- }
-
+ : Token_(true)
+ {
+ }
+
~TContMutex() {
Y_ASSERT(Token_);
- }
-
+ }
+
int LockD(TCont* current, TInstant deadline) {
- while (!Token_) {
- const int ret = WaitQueue_.WaitD(current, deadline);
-
- if (ret != EWAKEDUP) {
- return ret;
- }
- }
-
- Token_ = false;
-
- return 0;
- }
-
+ while (!Token_) {
+ const int ret = WaitQueue_.WaitD(current, deadline);
+
+ if (ret != EWAKEDUP) {
+ return ret;
+ }
+ }
+
+ Token_ = false;
+
+ return 0;
+ }
+
int LockT(TCont* current, TDuration timeout) {
- return LockD(current, timeout.ToDeadLine());
- }
-
+ return LockD(current, timeout.ToDeadLine());
+ }
+
int LockI(TCont* current) {
- return LockD(current, TInstant::Max());
- }
-
+ return LockD(current, TInstant::Max());
+ }
+
void UnLock() noexcept {
Y_ASSERT(!Token_);
-
- Token_ = true;
- WaitQueue_.Signal();
- }
-
-private:
- TContWaitQueue WaitQueue_;
- bool Token_;
-};
+
+ Token_ = true;
+ WaitQueue_.Signal();
+ }
+
+private:
+ TContWaitQueue WaitQueue_;
+ bool Token_;
+};
diff --git a/library/cpp/coroutine/engine/network.cpp b/library/cpp/coroutine/engine/network.cpp
index a5c0d9282c..85b647d210 100644
--- a/library/cpp/coroutine/engine/network.cpp
+++ b/library/cpp/coroutine/engine/network.cpp
@@ -1,41 +1,41 @@
#include "impl.h"
#include "network.h"
-
+
#include <util/generic/scope.h>
#include <util/generic/xrange.h>
-
+
#include <sys/uio.h>
-
+
#if defined(_bionic_)
# define IOV_MAX 1024
#endif
-
-
+
+
namespace NCoro {
namespace {
bool IsBlocked(int lasterr) noexcept {
return lasterr == EAGAIN || lasterr == EWOULDBLOCK;
}
-
+
ssize_t DoReadVector(SOCKET fd, TContIOVector* vec) noexcept {
return readv(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count()));
}
-
+
ssize_t DoWriteVector(SOCKET fd, TContIOVector* vec) noexcept {
return writev(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count()));
}
}
-
-
+
+
int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept {
if (cont->Cancelled()) {
return ECANCELED;
}
-
+
if (nfds == 0) {
return 0;
}
-
+
TTempArray<TFdEvent> events(nfds);
for (auto i : xrange(nfds)) {
@@ -47,7 +47,7 @@ namespace NCoro {
(events.Data() + i)->~TFdEvent();
}
};
-
+
for (auto i : xrange(nfds)) {
cont->Executor()->ScheduleIoWait(events.Data() + i);
}
@@ -79,36 +79,36 @@ namespace NCoro {
if (ret) {
if (outfd) {
*outfd = ret->Fd();
- }
+ }
return ret->Status();
- }
-
+ }
+
return EINPROGRESS;
- }
-
+ }
+
int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept {
return SelectD(cont, fds, what, nfds, outfd, timeout.ToDeadLine());
- }
-
+ }
+
int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd) {
return SelectD(cont, fds, what, nfds, outfd, TInstant::Max());
}
-
+
int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept {
TFdEvent event(cont, fd, (ui16)what, deadline);
return ExecuteEvent(&event);
- }
-
+ }
+
int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept {
return PollD(cont, fd, what, timeout.ToDeadLine());
}
int PollI(TCont* cont, SOCKET fd, int what) noexcept {
return PollD(cont, fd, what, TInstant::Max());
- }
-
-
+ }
+
+
TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept {
while (true) {
ssize_t res = DoReadVector(fd, vec);
@@ -117,103 +117,103 @@ namespace NCoro {
return TContIOStatus::Success((size_t) res);
}
- {
- const int err = LastSystemError();
-
- if (!IsBlocked(err)) {
+ {
+ const int err = LastSystemError();
+
+ if (!IsBlocked(err)) {
return TContIOStatus::Error(err);
- }
- }
-
+ }
+ }
+
if ((res = PollD(cont, fd, CONT_POLL_READ, deadline)) != 0) {
return TContIOStatus::Error((int) res);
- }
- }
- }
-
+ }
+ }
+ }
+
TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept {
return ReadVectorD(cont, fd, vec, timeOut.ToDeadLine());
}
-
+
TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept {
return ReadVectorD(cont, fd, vec, TInstant::Max());
}
-
-
+
+
TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept {
IOutputStream::TPart part(buf, len);
TContIOVector vec(&part, 1);
return ReadVectorD(cont, fd, &vec, deadline);
}
-
+
TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept {
return ReadD(cont, fd, buf, len, timeout.ToDeadLine());
}
-
+
TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept {
return ReadD(cont, fd, buf, len, TInstant::Max());
- }
-
-
+ }
+
+
TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept {
size_t written = 0;
-
+
while (!vec->Complete()) {
ssize_t res = DoWriteVector(fd, vec);
-
+
if (res >= 0) {
written += res;
-
+
vec->Proceed((size_t) res);
} else {
{
const int err = LastSystemError();
-
+
if (!IsBlocked(err)) {
return TContIOStatus(written, err);
}
}
-
+
if ((res = PollD(cont, fd, CONT_POLL_WRITE, deadline)) != 0) {
return TContIOStatus(written, (int) res);
}
}
- }
-
+ }
+
return TContIOStatus::Success(written);
- }
-
+ }
+
TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept {
return WriteVectorD(cont, fd, vec, timeOut.ToDeadLine());
}
-
+
TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept {
return WriteVectorD(cont, fd, vec, TInstant::Max());
}
-
-
+
+
TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept {
IOutputStream::TPart part(buf, len);
TContIOVector vec(&part, 1);
return WriteVectorD(cont, fd, &vec, deadline);
}
-
+
TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept {
return WriteD(cont, fd, buf, len, timeout.ToDeadLine());
}
-
+
TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept {
return WriteD(cont, fd, buf, len, TInstant::Max());
- }
-
-
+ }
+
+
int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept {
TSocketHolder res(Socket(ai));
-
+
if (res.Closed()) {
return LastSystemError();
}
-
+
const int ret = ConnectD(cont, res, ai.ai_addr, (socklen_t) ai.ai_addrlen, deadline);
if (!ret) {
@@ -221,49 +221,49 @@ namespace NCoro {
}
return ret;
- }
-
+ }
+
int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept {
int ret = EHOSTUNREACH;
-
+
for (auto it = addr.Begin(); it != addr.End(); ++it) {
ret = ConnectD(cont, s, *it, deadline);
-
+
if (ret == 0 || ret == ETIMEDOUT) {
return ret;
}
}
-
+
return ret;
- }
-
+ }
+
int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept {
return ConnectD(cont, s, addr, timeout.ToDeadLine());
}
-
+
int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept {
return ConnectD(cont, s, addr, TInstant::Max());
- }
-
+ }
+
int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept {
if (connect(s, name, namelen)) {
const int err = LastSystemError();
-
+
if (!IsBlocked(err) && err != EINPROGRESS) {
return err;
}
-
+
int ret = PollD(cont, s, CONT_POLL_WRITE, deadline);
-
+
if (ret) {
return ret;
}
-
+
// check if we really connected
// FIXME: Unportable ??
int serr = 0;
socklen_t slen = sizeof(serr);
-
+
ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (char*) &serr, &slen);
if (ret) {
@@ -286,27 +286,27 @@ namespace NCoro {
return ConnectD(cont, s, name, namelen, TInstant::Max());
}
-
+
int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept {
SOCKET ret;
-
+
while ((ret = Accept4(s, addr, addrlen)) == INVALID_SOCKET) {
int err = LastSystemError();
-
+
if (!IsBlocked(err)) {
return -err;
}
err = PollD(cont, s, CONT_POLL_READ, deadline);
-
+
if (err) {
return -err;
}
}
-
+
return (int) ret;
- }
-
+ }
+
int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept {
return AcceptD(cont, s, addr, addrlen, timeout.ToDeadLine());
}
diff --git a/library/cpp/coroutine/engine/network.h b/library/cpp/coroutine/engine/network.h
index db979c4c59..f2c9afe4f8 100644
--- a/library/cpp/coroutine/engine/network.h
+++ b/library/cpp/coroutine/engine/network.h
@@ -1,55 +1,55 @@
#pragma once
-
+
#include "iostatus.h"
-
+
#include <util/datetime/base.h>
#include <util/network/init.h>
#include <util/network/iovec.h>
#include <util/network/nonblock.h>
-#include <util/network/socket.h>
-
-class TCont;
-
+#include <util/network/socket.h>
+
+class TCont;
+
namespace NCoro {
int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept;
int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept;
int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd);
-
+
int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept;
int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept;
int PollI(TCont* cont, SOCKET fd, int what) noexcept;
-
+
TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept;
TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept;
TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept;
-
+
TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept;
TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept;
TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept;
-
+
TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept;
TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept;
TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept;
-
+
TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept;
TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept;
TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept;
-
+
int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept;
-
+
int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept;
int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept;
int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept;
-
+
int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept;
int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept;
int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept;
-
+
int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept;
int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept;
int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept;
-
+
SOCKET Socket(int domain, int type, int protocol) noexcept;
SOCKET Socket(const struct addrinfo& ai) noexcept;
-}
+}
diff --git a/library/cpp/coroutine/engine/poller.cpp b/library/cpp/coroutine/engine/poller.cpp
index 6fd1f673ae..61164fa56b 100644
--- a/library/cpp/coroutine/engine/poller.cpp
+++ b/library/cpp/coroutine/engine/poller.cpp
@@ -1,118 +1,118 @@
-#include "poller.h"
-#include "sockmap.h"
-
-#include <util/memory/smallobj.h>
-#include <util/generic/intrlist.h>
-#include <util/generic/singleton.h>
+#include "poller.h"
+#include "sockmap.h"
+
+#include <util/memory/smallobj.h>
+#include <util/generic/intrlist.h>
+#include <util/generic/singleton.h>
#include <util/system/env.h>
#include <util/string/cast.h>
-
-namespace {
+
+namespace {
using TChange = IPollerFace::TChange;
using TEvent = IPollerFace::TEvent;
using TEvents = IPollerFace::TEvents;
-
- template <class T>
- class TUnsafeBuf {
- public:
+
+ template <class T>
+ class TUnsafeBuf {
+ public:
TUnsafeBuf() noexcept
- : L_(0)
- {
- }
-
+ : L_(0)
+ {
+ }
+
T* operator~() const noexcept {
- return B_.Get();
- }
-
+ return B_.Get();
+ }
+
size_t operator+() const noexcept {
- return L_;
- }
-
+ return L_;
+ }
+
void Reserve(size_t len) {
- len = FastClp2(len);
-
- if (len > L_) {
- B_.Reset(new T[len]);
- L_ = len;
- }
- }
-
- private:
- TArrayHolder<T> B_;
- size_t L_;
- };
-
-
- template <class T>
- class TVirtualize: public IPollerFace {
- public:
+ len = FastClp2(len);
+
+ if (len > L_) {
+ B_.Reset(new T[len]);
+ L_ = len;
+ }
+ }
+
+ private:
+ TArrayHolder<T> B_;
+ size_t L_;
+ };
+
+
+ template <class T>
+ class TVirtualize: public IPollerFace {
+ public:
TVirtualize(EContPoller pollerEngine)
: PollerEngine_(pollerEngine)
{
}
void Set(const TChange& c) override {
- P_.Set(c);
- }
-
+ P_.Set(c);
+ }
+
void Wait(TEvents& events, TInstant deadLine) override {
- P_.Wait(events, deadLine);
- }
-
+ P_.Wait(events, deadLine);
+ }
+
EContPoller PollEngine() const override {
return PollerEngine_;
}
- private:
- T P_;
+ private:
+ T P_;
const EContPoller PollerEngine_;
- };
-
+ };
+
- template <class T>
- class TPoller {
+ template <class T>
+ class TPoller {
using TInternalEvent = typename T::TEvent;
-
- public:
+
+ public:
TPoller() {
- E_.Reserve(1);
- }
-
+ E_.Reserve(1);
+ }
+
void Set(const TChange& c) {
- P_.Set(c.Data, c.Fd, c.Flags);
- }
-
+ P_.Set(c.Data, c.Fd, c.Flags);
+ }
+
void Reserve(size_t size) {
E_.Reserve(size);
}
void Wait(TEvents& events, TInstant deadLine) {
- const size_t ret = P_.WaitD(~E_, +E_, deadLine);
-
+ const size_t ret = P_.WaitD(~E_, +E_, deadLine);
+
events.reserve(ret);
- for (size_t i = 0; i < ret; ++i) {
- const TInternalEvent* ie = ~E_ + i;
-
- const TEvent e = {
- T::ExtractEvent(ie),
- T::ExtractStatus(ie),
- (ui16)T::ExtractFilter(ie),
- };
-
- events.push_back(e);
- }
-
- E_.Reserve(ret + 1);
- }
-
- private:
- T P_;
- TUnsafeBuf<TInternalEvent> E_;
- };
-
-
- template <class T>
- class TIndexedArray {
+ for (size_t i = 0; i < ret; ++i) {
+ const TInternalEvent* ie = ~E_ + i;
+
+ const TEvent e = {
+ T::ExtractEvent(ie),
+ T::ExtractStatus(ie),
+ (ui16)T::ExtractFilter(ie),
+ };
+
+ events.push_back(e);
+ }
+
+ E_.Reserve(ret + 1);
+ }
+
+ private:
+ T P_;
+ TUnsafeBuf<TInternalEvent> E_;
+ };
+
+
+ template <class T>
+ class TIndexedArray {
struct TVal:
public T,
public TIntrusiveListItem<TVal>,
@@ -125,244 +125,244 @@ namespace {
// zero-initialization takes place in TVal() expression and the
// data is overwritten.
TVal() {
- }
- };
-
+ }
+ };
+
typedef TIntrusiveList<TVal> TListType;
-
- public:
+
+ public:
typedef typename TListType::TIterator TIterator;
typedef typename TListType::TConstIterator TConstIterator;
-
+
TIndexedArray()
- : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
- {
- }
-
+ : P_(TMemoryPool::TExpGrow::Instance(), TDefaultAllocator::Instance())
+ {
+ }
+
TIterator Begin() noexcept {
- return I_.Begin();
- }
-
+ return I_.Begin();
+ }
+
TIterator End() noexcept {
- return I_.End();
- }
-
+ return I_.End();
+ }
+
TConstIterator Begin() const noexcept {
- return I_.Begin();
- }
-
+ return I_.Begin();
+ }
+
TConstIterator End() const noexcept {
- return I_.End();
- }
-
+ return I_.End();
+ }
+
T& operator[](size_t i) {
- return *Get(i);
- }
-
+ return *Get(i);
+ }
+
T* Get(size_t i) {
- TValRef& v = V_.Get(i);
-
+ TValRef& v = V_.Get(i);
+
if (Y_UNLIKELY(!v)) {
- v.Reset(new (&P_) TVal());
- I_.PushFront(v.Get());
- }
-
- Y_PREFETCH_WRITE(v.Get(), 1);
-
- return v.Get();
- }
-
+ v.Reset(new (&P_) TVal());
+ I_.PushFront(v.Get());
+ }
+
+ Y_PREFETCH_WRITE(v.Get(), 1);
+
+ return v.Get();
+ }
+
void Erase(size_t i) noexcept {
- V_.Get(i).Destroy();
- }
-
+ V_.Get(i).Destroy();
+ }
+
size_t Size() const noexcept {
- return I_.Size();
- }
-
- private:
+ return I_.Size();
+ }
+
+ private:
using TValRef = THolder<TVal>;
- typename TVal::TPool P_;
- TSocketMap<TValRef> V_;
+ typename TVal::TPool P_;
+ TSocketMap<TValRef> V_;
TListType I_;
- };
-
+ };
+
inline short PollFlags(ui16 flags) noexcept {
- short ret = 0;
-
- if (flags & CONT_POLL_READ) {
- ret |= POLLIN;
- }
-
- if (flags & CONT_POLL_WRITE) {
- ret |= POLLOUT;
- }
-
+ short ret = 0;
+
+ if (flags & CONT_POLL_READ) {
+ ret |= POLLIN;
+ }
+
+ if (flags & CONT_POLL_WRITE) {
+ ret |= POLLOUT;
+ }
+
#if defined(_linux_)
if (flags & CONT_POLL_RDHUP) {
ret |= POLLRDHUP;
}
#endif
- return ret;
- }
-
+ return ret;
+ }
+
- class TPollPoller {
- public:
+ class TPollPoller {
+ public:
size_t Size() const noexcept {
- return S_.Size();
- }
-
- template <class T>
+ return S_.Size();
+ }
+
+ template <class T>
void Build(T& t) const {
- for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
- t.Set(*it);
- }
+ for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
+ t.Set(*it);
+ }
t.Reserve(Size());
- }
-
+ }
+
void Set(const TChange& c) {
- if (c.Flags) {
- S_[c.Fd] = c;
- } else {
- S_.Erase(c.Fd);
- }
- }
-
+ if (c.Flags) {
+ S_[c.Fd] = c;
+ } else {
+ S_.Erase(c.Fd);
+ }
+ }
+
void Wait(TEvents& events, TInstant deadLine) {
- T_.clear();
- T_.reserve(Size());
-
- for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
- const pollfd pfd = {
- it->Fd,
- PollFlags(it->Flags),
- 0,
- };
-
- T_.push_back(pfd);
- }
-
+ T_.clear();
+ T_.reserve(Size());
+
+ for (TFds::TConstIterator it = S_.Begin(); it != S_.End(); ++it) {
+ const pollfd pfd = {
+ it->Fd,
+ PollFlags(it->Flags),
+ 0,
+ };
+
+ T_.push_back(pfd);
+ }
+
const ssize_t ret = PollD(T_.data(), (nfds_t) T_.size(), deadLine);
-
+
if (ret <= 0) {
- return;
- }
-
+ return;
+ }
+
events.reserve(T_.size());
for (size_t i = 0; i < T_.size(); ++i) {
- const pollfd& pfd = T_[i];
- const short ev = pfd.revents;
-
- if (!ev) {
- continue;
- }
-
- int status = 0;
- ui16 filter = 0;
-
+ const pollfd& pfd = T_[i];
+ const short ev = pfd.revents;
+
+ if (!ev) {
+ continue;
+ }
+
+ int status = 0;
+ ui16 filter = 0;
+
// We are perfectly fine with an EOF while reading a pipe or a unix socket
if ((ev & POLLIN) || (ev & POLLHUP) && (pfd.events & POLLIN)) {
- filter |= CONT_POLL_READ;
- }
-
- if (ev & POLLOUT) {
- filter |= CONT_POLL_WRITE;
- }
-
+ filter |= CONT_POLL_READ;
+ }
+
+ if (ev & POLLOUT) {
+ filter |= CONT_POLL_WRITE;
+ }
+
#if defined(_linux_)
if (ev & POLLRDHUP) {
filter |= CONT_POLL_RDHUP;
}
#endif
- if (ev & POLLERR) {
- status = EIO;
+ if (ev & POLLERR) {
+ status = EIO;
} else if (ev & POLLHUP && pfd.events & POLLOUT) {
// Only write operations may cause EPIPE
- status = EPIPE;
- } else if (ev & POLLNVAL) {
- status = EINVAL;
- }
-
- if (status) {
+ status = EPIPE;
+ } else if (ev & POLLNVAL) {
+ status = EINVAL;
+ }
+
+ if (status) {
filter = CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP;
- }
-
- const TEvent res = {
- S_[pfd.fd].Data,
- status,
- filter,
- };
-
- events.push_back(res);
- }
- }
-
- private:
- typedef TIndexedArray<TChange> TFds;
- TFds S_;
+ }
+
+ const TEvent res = {
+ S_[pfd.fd].Data,
+ status,
+ filter,
+ };
+
+ events.push_back(res);
+ }
+ }
+
+ private:
+ typedef TIndexedArray<TChange> TFds;
+ TFds S_;
typedef TVector<pollfd> TPollVec;
- TPollVec T_;
- };
-
-
- class TCombinedPoller {
- typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller;
-
- public:
+ TPollVec T_;
+ };
+
+
+ class TCombinedPoller {
+ typedef TPoller<TPollerImpl<TWithoutLocking>> TDefaultPoller;
+
+ public:
TCombinedPoller() {
- P_.Reset(new TPollPoller());
- }
-
+ P_.Reset(new TPollPoller());
+ }
+
void Set(const TChange& c) {
- if (!P_) {
- D_->Set(c);
- } else {
- P_->Set(c);
- }
- }
-
+ if (!P_) {
+ D_->Set(c);
+ } else {
+ P_->Set(c);
+ }
+ }
+
void Wait(TEvents& events, TInstant deadLine) {
- if (!P_) {
- D_->Wait(events, deadLine);
- } else {
- if (P_->Size() > 200) {
- D_.Reset(new TDefaultPoller());
- P_->Build(*D_);
- P_.Destroy();
- D_->Wait(events, deadLine);
- } else {
- P_->Wait(events, deadLine);
- }
- }
- }
-
- private:
+ if (!P_) {
+ D_->Wait(events, deadLine);
+ } else {
+ if (P_->Size() > 200) {
+ D_.Reset(new TDefaultPoller());
+ P_->Build(*D_);
+ P_.Destroy();
+ D_->Wait(events, deadLine);
+ } else {
+ P_->Wait(events, deadLine);
+ }
+ }
+ }
+
+ private:
THolder<TPollPoller> P_;
THolder<TDefaultPoller> D_;
- };
-
- struct TUserPoller: public TString {
+ };
+
+ struct TUserPoller: public TString {
TUserPoller()
- : TString(GetEnv("USER_POLLER"))
- {
- }
- };
-}
-
+ : TString(GetEnv("USER_POLLER"))
+ {
+ }
+ };
+}
+
THolder<IPollerFace> IPollerFace::Default() {
return Construct(*SingletonWithPriority<TUserPoller, 0>());
-}
-
+}
+
THolder<IPollerFace> IPollerFace::Construct(TStringBuf name) {
return Construct(name ? FromString<EContPoller>(name) : EContPoller::Default);
}
-
+
THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) {
switch (poller) {
case EContPoller::Default:
@@ -373,18 +373,18 @@ THolder<IPollerFace> IPollerFace::Construct(EContPoller poller) {
case EContPoller::Poll:
return MakeHolder<TVirtualize<TPollPoller>>(poller);
case EContPoller::Epoll:
-#if defined(HAVE_EPOLL_POLLER)
+#if defined(HAVE_EPOLL_POLLER)
return MakeHolder<TVirtualize<TPoller<TGenericPoller<TEpollPoller<TWithoutLocking>>>>>(poller);
#else
return nullptr;
-#endif
+#endif
case EContPoller::Kqueue:
#if defined(HAVE_KQUEUE_POLLER)
return MakeHolder<TVirtualize<TPoller<TGenericPoller<TKqueuePoller<TWithoutLocking>>>>>(poller);
#else
return nullptr;
-#endif
+#endif
default:
Y_FAIL("bad poller type");
- }
-}
+ }
+}
diff --git a/library/cpp/coroutine/engine/poller.h b/library/cpp/coroutine/engine/poller.h
index 054de6616b..8ea012c0fc 100644
--- a/library/cpp/coroutine/engine/poller.h
+++ b/library/cpp/coroutine/engine/poller.h
@@ -1,11 +1,11 @@
#pragma once
-
-#include <util/generic/ptr.h>
-#include <util/generic/vector.h>
-#include <util/network/socket.h>
-#include <util/network/pollerimpl.h>
-#include <util/datetime/base.h>
-
+
+#include <util/generic/ptr.h>
+#include <util/generic/vector.h>
+#include <util/network/socket.h>
+#include <util/network/pollerimpl.h>
+#include <util/datetime/base.h>
+
enum class EContPoller {
Default /* "default" */,
Combined /* "combined" */,
@@ -15,36 +15,36 @@ enum class EContPoller {
Kqueue /* "kqueue" */
};
-class IPollerFace {
-public:
- struct TChange {
+class IPollerFace {
+public:
+ struct TChange {
SOCKET Fd;
- void* Data;
- ui16 Flags;
- };
-
- struct TEvent {
- void* Data;
- int Status;
- ui16 Filter;
- };
-
+ void* Data;
+ ui16 Flags;
+ };
+
+ struct TEvent {
+ void* Data;
+ int Status;
+ ui16 Filter;
+ };
+
using TEvents = TVector<TEvent>;
-
- virtual ~IPollerFace() {
- }
-
+
+ virtual ~IPollerFace() {
+ }
+
void Set(void* ptr, SOCKET fd, ui16 flags) {
- const TChange c = {fd, ptr, flags};
-
- Set(c);
- }
-
- virtual void Set(const TChange& change) = 0;
- virtual void Wait(TEvents& events, TInstant deadLine) = 0;
+ const TChange c = {fd, ptr, flags};
+
+ Set(c);
+ }
+
+ virtual void Set(const TChange& change) = 0;
+ virtual void Wait(TEvents& events, TInstant deadLine) = 0;
virtual EContPoller PollEngine() const = 0;
-
+
static THolder<IPollerFace> Default();
static THolder<IPollerFace> Construct(TStringBuf name);
static THolder<IPollerFace> Construct(EContPoller poller);
-};
+};
diff --git a/library/cpp/coroutine/engine/sockmap.h b/library/cpp/coroutine/engine/sockmap.h
index aa616da61e..fd189e1774 100644
--- a/library/cpp/coroutine/engine/sockmap.h
+++ b/library/cpp/coroutine/engine/sockmap.h
@@ -1,24 +1,24 @@
-#pragma once
-
-#include <util/generic/hash.h>
-#include <util/generic/vector.h>
-
-template <class T>
-class TSocketMap {
-public:
+#pragma once
+
+#include <util/generic/hash.h>
+#include <util/generic/vector.h>
+
+template <class T>
+class TSocketMap {
+public:
T& Get(size_t idx) {
- if (idx < 128000) {
- if (V_.size() <= idx) {
- V_.resize(idx + 1);
- }
-
- return V_[idx];
- }
-
- return H_[idx];
- }
-
-private:
+ if (idx < 128000) {
+ if (V_.size() <= idx) {
+ V_.resize(idx + 1);
+ }
+
+ return V_[idx];
+ }
+
+ return H_[idx];
+ }
+
+private:
TVector<T> V_;
THashMap<size_t, T> H_;
-};
+};
diff --git a/library/cpp/coroutine/engine/sockpool.cpp b/library/cpp/coroutine/engine/sockpool.cpp
index 895dd686c7..b9482e780f 100644
--- a/library/cpp/coroutine/engine/sockpool.cpp
+++ b/library/cpp/coroutine/engine/sockpool.cpp
@@ -13,7 +13,7 @@ void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa) {
warn("bind");
}
} else if (sa->sa_family == AF_INET6) {
- sockaddr_in6 s_in6(*(const sockaddr_in6*)sa);
+ sockaddr_in6 s_in6(*(const sockaddr_in6*)sa);
Zero(s_in6.sin6_addr);
s_in6.sin6_port = 0;
diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h
index b34d5ace46..1ebb7e7b38 100644
--- a/library/cpp/coroutine/engine/sockpool.h
+++ b/library/cpp/coroutine/engine/sockpool.h
@@ -1,253 +1,253 @@
#pragma once
-
+
#include "impl.h"
#include "network.h"
-
+
#include <util/network/address.h>
#include <util/network/socket.h>
#include <util/system/mutex.h>
-
+
extern void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa = nullptr);
-
+
class TSocketPool;
class TPooledSocket {
- class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> {
- public:
+ class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> {
+ public:
TImpl(SOCKET fd, TSocketPool* pool) noexcept
- : Pool_(pool)
- , IsKeepAlive_(false)
- , Fd_(fd)
- {
- Touch();
- }
-
+ : Pool_(pool)
+ , IsKeepAlive_(false)
+ , Fd_(fd)
+ {
+ Touch();
+ }
+
static void Destroy(TImpl* impl) noexcept {
- impl->DoDestroy();
- }
-
+ impl->DoDestroy();
+ }
+
void DoDestroy() noexcept {
- if (!Closed() && IsKeepAlive() && IsInGoodState()) {
- ReturnToPool();
- } else {
- delete this;
- }
- }
-
+ if (!Closed() && IsKeepAlive() && IsInGoodState()) {
+ ReturnToPool();
+ } else {
+ delete this;
+ }
+ }
+
bool IsKeepAlive() const noexcept {
- return IsKeepAlive_;
- }
-
+ return IsKeepAlive_;
+ }
+
void SetKeepAlive(bool ka) {
- ::SetKeepAlive(Fd_, ka);
- IsKeepAlive_ = ka;
- }
-
+ ::SetKeepAlive(Fd_, ka);
+ IsKeepAlive_ = ka;
+ }
+
SOCKET Socket() const noexcept {
- return Fd_;
- }
-
+ return Fd_;
+ }
+
bool Closed() const noexcept {
- return Fd_.Closed();
- }
-
+ return Fd_.Closed();
+ }
+
void Close() noexcept {
- Fd_.Close();
- }
-
+ Fd_.Close();
+ }
+
bool IsInGoodState() const noexcept {
- int err = 0;
- socklen_t len = sizeof(err);
-
- getsockopt(Fd_, SOL_SOCKET, SO_ERROR, (char*)&err, &len);
-
- return !err;
- }
-
+ int err = 0;
+ socklen_t len = sizeof(err);
+
+ getsockopt(Fd_, SOL_SOCKET, SO_ERROR, (char*)&err, &len);
+
+ return !err;
+ }
+
bool IsOpen() const noexcept {
return IsInGoodState() && IsNotSocketClosedByOtherSide(Fd_);
- }
-
+ }
+
void Touch() noexcept {
- TouchTime_ = TInstant::Now();
- }
-
+ TouchTime_ = TInstant::Now();
+ }
+
const TInstant& LastTouch() const noexcept {
- return TouchTime_;
- }
-
- private:
+ return TouchTime_;
+ }
+
+ private:
inline void ReturnToPool() noexcept;
-
- private:
- TSocketPool* Pool_;
- bool IsKeepAlive_;
- TSocketHolder Fd_;
- TInstant TouchTime_;
- };
-
- friend class TSocketPool;
-
-public:
+
+ private:
+ TSocketPool* Pool_;
+ bool IsKeepAlive_;
+ TSocketHolder Fd_;
+ TInstant TouchTime_;
+ };
+
+ friend class TSocketPool;
+
+public:
TPooledSocket()
: Impl_(nullptr)
- {
- }
+ {
+ }
TPooledSocket(TImpl* impl)
- : Impl_(impl)
- {
- }
-
+ : Impl_(impl)
+ {
+ }
+
~TPooledSocket() {
- if (UncaughtException() && !!Impl_) {
- Close();
+ if (UncaughtException() && !!Impl_) {
+ Close();
}
- }
-
+ }
+
operator SOCKET() const noexcept {
- return Impl_->Socket();
- }
-
+ return Impl_->Socket();
+ }
+
void SetKeepAlive(bool ka) {
- Impl_->SetKeepAlive(ka);
- }
-
+ Impl_->SetKeepAlive(ka);
+ }
+
void Close() noexcept {
- Impl_->Close();
- }
-
-private:
- TIntrusivePtr<TImpl> Impl_;
+ Impl_->Close();
+ }
+
+private:
+ TIntrusivePtr<TImpl> Impl_;
};
struct TConnectData {
TConnectData(TCont* cont, const TInstant& deadLine)
- : Cont(cont)
- , DeadLine(deadLine)
- {
- }
-
+ : Cont(cont)
+ , DeadLine(deadLine)
+ {
+ }
+
TConnectData(TCont* cont, const TDuration& timeOut)
- : Cont(cont)
- , DeadLine(TInstant::Now() + timeOut)
- {
- }
-
- TCont* Cont;
- const TInstant DeadLine;
+ : Cont(cont)
+ , DeadLine(TInstant::Now() + timeOut)
+ {
+ }
+
+ TCont* Cont;
+ const TInstant DeadLine;
};
class TSocketPool {
- friend class TPooledSocket::TImpl;
-
-public:
- typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef;
+ friend class TPooledSocket::TImpl;
+
+public:
+ typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef;
TSocketPool(int ip, int port)
- : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port)))
- {
- }
-
+ : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port)))
+ {
+ }
+
TSocketPool(const TAddrRef& addr)
- : Addr_(addr)
- {
- }
+ : Addr_(addr)
+ {
+ }
void EraseStale(const TInstant& maxAge) noexcept {
- TSockets toDelete;
-
- {
- TGuard<TMutex> guard(Mutex_);
-
- for (TSockets::TIterator it = Pool_.Begin(); it != Pool_.End();) {
- if (it->LastTouch() < maxAge) {
- toDelete.PushBack(&*(it++));
- } else {
- ++it;
- }
- }
- }
- }
-
+ TSockets toDelete;
+
+ {
+ TGuard<TMutex> guard(Mutex_);
+
+ for (TSockets::TIterator it = Pool_.Begin(); it != Pool_.End();) {
+ if (it->LastTouch() < maxAge) {
+ toDelete.PushBack(&*(it++));
+ } else {
+ ++it;
+ }
+ }
+ }
+ }
+
TPooledSocket Get(TConnectData* conn) {
- TPooledSocket ret;
-
- if (TPooledSocket::TImpl* alive = GetImpl()) {
- ret = TPooledSocket(alive);
- } else {
- ret = AllocateMore(conn);
- }
-
- ret.Impl_->Touch();
-
- return ret;
- }
-
+ TPooledSocket ret;
+
+ if (TPooledSocket::TImpl* alive = GetImpl()) {
+ ret = TPooledSocket(alive);
+ } else {
+ ret = AllocateMore(conn);
+ }
+
+ ret.Impl_->Touch();
+
+ return ret;
+ }
+
bool GetAlive(TPooledSocket& socket) {
- if (TPooledSocket::TImpl* alive = GetImpl()) {
- alive->Touch();
- socket = TPooledSocket(alive);
- return true;
+ if (TPooledSocket::TImpl* alive = GetImpl()) {
+ alive->Touch();
+ socket = TPooledSocket(alive);
+ return true;
}
- return false;
- }
+ return false;
+ }
-private:
+private:
TPooledSocket::TImpl* GetImpl() {
- TGuard<TMutex> guard(Mutex_);
-
- while (!Pool_.Empty()) {
- THolder<TPooledSocket::TImpl> ret(Pool_.PopFront());
-
- if (ret->IsOpen()) {
- return ret.Release();
+ TGuard<TMutex> guard(Mutex_);
+
+ while (!Pool_.Empty()) {
+ THolder<TPooledSocket::TImpl> ret(Pool_.PopFront());
+
+ if (ret->IsOpen()) {
+ return ret.Release();
}
}
return nullptr;
- }
-
+ }
+
void Release(TPooledSocket::TImpl* impl) noexcept {
- TGuard<TMutex> guard(Mutex_);
-
- Pool_.PushFront(impl);
- }
-
- TPooledSocket AllocateMore(TConnectData* conn);
-
-private:
- TAddrRef Addr_;
+ TGuard<TMutex> guard(Mutex_);
+
+ Pool_.PushFront(impl);
+ }
+
+ TPooledSocket AllocateMore(TConnectData* conn);
+
+private:
+ TAddrRef Addr_;
using TSockets = TIntrusiveListWithAutoDelete<TPooledSocket::TImpl, TDelete>;
- TSockets Pool_;
- TMutex Mutex_;
+ TSockets Pool_;
+ TMutex Mutex_;
};
inline void TPooledSocket::TImpl::ReturnToPool() noexcept {
- Pool_->Release(this);
+ Pool_->Release(this);
}
class TContIO: public IInputStream, public IOutputStream {
-public:
+public:
TContIO(SOCKET fd, TCont* cont)
- : Fd_(fd)
- , Cont_(cont)
- {
- }
-
+ : Fd_(fd)
+ , Cont_(cont)
+ {
+ }
+
void DoWrite(const void* buf, size_t len) override {
NCoro::WriteI(Cont_, Fd_, buf, len).Checked();
- }
-
+ }
+
size_t DoRead(void* buf, size_t len) override {
return NCoro::ReadI(Cont_, Fd_, buf, len).Checked();
- }
-
+ }
+
SOCKET Fd() const noexcept {
- return Fd_;
- }
-
-private:
- SOCKET Fd_;
- TCont* Cont_;
+ return Fd_;
+ }
+
+private:
+ SOCKET Fd_;
+ TCont* Cont_;
};
diff --git a/library/cpp/coroutine/engine/trampoline.cpp b/library/cpp/coroutine/engine/trampoline.cpp
index 75254b58a8..10ea69ddc3 100644
--- a/library/cpp/coroutine/engine/trampoline.cpp
+++ b/library/cpp/coroutine/engine/trampoline.cpp
@@ -1,16 +1,16 @@
#include "impl.h"
#include "trampoline.h"
-
+
#include "stack/stack_allocator.h"
#include <util/system/info.h>
#include <util/system/protect.h>
#include <util/system/valgrind.h>
#include <util/system/yassert.h>
-
+
#include <cstdlib>
#include <util/stream/format.h>
-
+
namespace NCoro {
@@ -30,7 +30,7 @@ TTrampoline::TTrampoline(NStack::IAllocator& allocator, ui32 stackSize, TFunc f,
Func_(Cont_);
} catch (...) {}
}
-
+
Cont_->Terminate();
}
diff --git a/library/cpp/coroutine/engine/trampoline.h b/library/cpp/coroutine/engine/trampoline.h
index 23cc587c13..37b61cf015 100644
--- a/library/cpp/coroutine/engine/trampoline.h
+++ b/library/cpp/coroutine/engine/trampoline.h
@@ -1,20 +1,20 @@
#pragma once
-
+
#include "stack/stack_common.h"
#include "stack/stack.h"
#include <util/generic/noncopyable.h>
#include <util/generic/ptr.h>
-#include <util/system/context.h>
-#include <util/system/defaults.h>
-
+#include <util/system/context.h>
+#include <util/system/defaults.h>
+
#if !defined(STACK_GROW_DOWN)
# error "unsupported"
#endif
-class TCont;
+class TCont;
typedef void (*TContFunc)(TCont*, void*);
-
+
namespace NCoro {
namespace NStack {
@@ -31,9 +31,9 @@ namespace NCoro {
TFunc f,
TCont* cont
) noexcept;
-
+
TArrayRef<char> Stack() noexcept;
-
+
TExceptionSafeContext* Context() noexcept {
return &Ctx_;
}
@@ -56,5 +56,5 @@ namespace NCoro {
TExceptionSafeContext Ctx_;
TFunc Func_;
TCont* const Cont_;
- };
-}
+ };
+}
diff --git a/library/cpp/coroutine/engine/ya.make b/library/cpp/coroutine/engine/ya.make
index de48968307..8c20b9afc3 100644
--- a/library/cpp/coroutine/engine/ya.make
+++ b/library/cpp/coroutine/engine/ya.make
@@ -1,25 +1,25 @@
-LIBRARY()
-
+LIBRARY()
+
OWNER(
pg
g:balancer
)
-
+
GENERATE_ENUM_SERIALIZATION(poller.h)
GENERATE_ENUM_SERIALIZATION(stack/stack_common.h)
-PEERDIR(
+PEERDIR(
contrib/libs/libc_compat
library/cpp/containers/intrusive_rb_tree
-)
-
-SRCS(
+)
+
+SRCS(
cont_poller.cpp
helper.cpp
- impl.cpp
- iostatus.cpp
+ impl.cpp
+ iostatus.cpp
network.cpp
- poller.cpp
+ poller.cpp
sockpool.cpp
stack/stack.cpp
stack/stack_allocator.cpp
@@ -27,9 +27,9 @@ SRCS(
stack/stack_storage.cpp
stack/stack_utils.cpp
trampoline.cpp
-)
-
-END()
+)
+
+END()
RECURSE(
stack/benchmark