aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/coroutine/engine/events.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/coroutine/engine/events.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/coroutine/engine/events.h')
-rw-r--r--library/cpp/coroutine/engine/events.h148
1 files changed, 148 insertions, 0 deletions
diff --git a/library/cpp/coroutine/engine/events.h b/library/cpp/coroutine/engine/events.h
new file mode 100644
index 0000000000..07cc4d25e8
--- /dev/null
+++ b/library/cpp/coroutine/engine/events.h
@@ -0,0 +1,148 @@
+#pragma once
+
+#include "impl.h"
+
+#include <util/datetime/base.h>
+
+class TContEvent {
+public:
+ TContEvent(TCont* current) noexcept
+ : Cont_(current)
+ , Status_(0)
+ {
+ }
+
+ ~TContEvent() {
+ }
+
+ int WaitD(TInstant deadline) {
+ Status_ = 0;
+ const int ret = Cont_->SleepD(deadline);
+
+ return Status_ ? Status_ : ret;
+ }
+
+ int WaitT(TDuration timeout) {
+ return WaitD(timeout.ToDeadLine());
+ }
+
+ int WaitI() {
+ return WaitD(TInstant::Max());
+ }
+
+ void Wake() noexcept {
+ SetStatus(EWAKEDUP);
+ Cont_->ReSchedule();
+ }
+
+ TCont* Cont() noexcept {
+ return Cont_;
+ }
+
+ int Status() const noexcept {
+ return Status_;
+ }
+
+ void SetStatus(int status) noexcept {
+ Status_ = status;
+ }
+
+private:
+ TCont* Cont_;
+ int Status_;
+};
+
+class TContWaitQueue {
+ class TWaiter: public TContEvent, public TIntrusiveListItem<TWaiter> {
+ public:
+ TWaiter(TCont* current) noexcept
+ : TContEvent(current)
+ {
+ }
+
+ ~TWaiter() {
+ }
+ };
+
+public:
+ TContWaitQueue() noexcept {
+ }
+
+ ~TContWaitQueue() {
+ Y_ASSERT(Waiters_.Empty());
+ }
+
+ int WaitD(TCont* current, TInstant deadline) {
+ TWaiter waiter(current);
+
+ Waiters_.PushBack(&waiter);
+
+ return waiter.WaitD(deadline);
+ }
+
+ int WaitT(TCont* current, TDuration timeout) {
+ return WaitD(current, timeout.ToDeadLine());
+ }
+
+ int WaitI(TCont* current) {
+ return WaitD(current, TInstant::Max());
+ }
+
+ void Signal() noexcept {
+ if (!Waiters_.Empty()) {
+ Waiters_.PopFront()->Wake();
+ }
+ }
+
+ void BroadCast() noexcept {
+ while (!Waiters_.Empty()) {
+ Waiters_.PopFront()->Wake();
+ }
+ }
+
+ void BroadCast(size_t number) noexcept {
+ for (size_t i = 0; i < number && !Waiters_.Empty(); ++i) {
+ Waiters_.PopFront()->Wake();
+ }
+ }
+
+private:
+ TIntrusiveList<TWaiter> Waiters_;
+};
+
+
+class TContSimpleEvent {
+public:
+ TContSimpleEvent(TContExecutor* e)
+ : E_(e)
+ {
+ }
+
+ TContExecutor* Executor() const noexcept {
+ return E_;
+ }
+
+ void Signal() noexcept {
+ Q_.Signal();
+ }
+
+ void BroadCast() noexcept {
+ Q_.BroadCast();
+ }
+
+ int WaitD(TInstant deadLine) noexcept {
+ return Q_.WaitD(E_->Running(), deadLine);
+ }
+
+ int WaitT(TDuration timeout) noexcept {
+ return WaitD(timeout.ToDeadLine());
+ }
+
+ int WaitI() noexcept {
+ return WaitD(TInstant::Max());
+ }
+
+private:
+ TContWaitQueue Q_;
+ TContExecutor* E_;
+};