summaryrefslogtreecommitdiffstats
path: root/library/cpp/coroutine/util/pipeevent.h
diff options
context:
space:
mode:
authorqrort <[email protected]>2022-11-30 23:47:12 +0300
committerqrort <[email protected]>2022-11-30 23:47:12 +0300
commit22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch)
treebffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/coroutine/util/pipeevent.h
parent332b99e2173f0425444abb759eebcb2fafaa9209 (diff)
validate canons without yatest_common
Diffstat (limited to 'library/cpp/coroutine/util/pipeevent.h')
-rw-r--r--library/cpp/coroutine/util/pipeevent.h93
1 files changed, 93 insertions, 0 deletions
diff --git a/library/cpp/coroutine/util/pipeevent.h b/library/cpp/coroutine/util/pipeevent.h
new file mode 100644
index 00000000000..059f2a34d9a
--- /dev/null
+++ b/library/cpp/coroutine/util/pipeevent.h
@@ -0,0 +1,93 @@
+#pragma once
+
+#include <library/cpp/coroutine/engine/events.h>
+#include <library/cpp/coroutine/engine/impl.h>
+#include <library/cpp/coroutine/engine/network.h>
+
+#include <util/network/socket.h>
+#include <library/cpp/deprecated/atomic/atomic.h>
+#include <util/system/pipe.h>
+
+// TPipeEvent and TPipeSemaphore try to minimize number of coroutines reading from same pipe
+// because its actually quite expensive with >1000 coroutines waiting on same event/semaphore
+
+// Using this class you can block in coroutine on waiting
+// a signal from outer thread.
+// Usual thread synchronization primitives are not appropriate
+// because they will block all coroutines in single thread.
+class TPipeEvent {
+public:
+ explicit TPipeEvent()
+ : Signaled(0)
+ , NumWaiting(0)
+ {
+ TPipeHandle::Pipe(SignalRecvPipe, SignalSendPipe);
+ SetNonBlock(SignalRecvPipe);
+ SetNonBlock(SignalSendPipe);
+ }
+
+ void Signal() {
+ if (AtomicCas(&Signaled, 1, 0)) {
+ char tmp = 1;
+ SignalSendPipe.Write(&tmp, 1);
+ }
+ }
+
+ void Wait(TCont* cont) {
+ if (++NumWaiting > 1) {
+ ToWake.WaitI(cont);
+ }
+ char tmp;
+ NCoro::ReadI(cont, SignalRecvPipe, &tmp, 1).Checked();
+ AtomicSet(Signaled, 0);
+ if (--NumWaiting > 0) {
+ ToWake.Signal();
+ }
+ }
+
+private:
+ TAtomic Signaled;
+ size_t NumWaiting;
+ TContWaitQueue ToWake;
+ TPipeHandle SignalRecvPipe;
+ TPipeHandle SignalSendPipe;
+};
+
+class TPipeSemaphore {
+public:
+ explicit TPipeSemaphore()
+ : Value(0)
+ , NumWaiting(0)
+ {
+ TPipeHandle::Pipe(SignalRecvPipe, SignalSendPipe);
+ SetNonBlock(SignalRecvPipe);
+ SetNonBlock(SignalSendPipe);
+ }
+
+ void Inc() {
+ if (AtomicIncrement(Value) <= 0) {
+ char tmp = 1;
+ SignalSendPipe.Write(&tmp, 1);
+ }
+ }
+
+ void Dec(TCont* cont) {
+ if (AtomicDecrement(Value) < 0) {
+ if (++NumWaiting > 1) {
+ ToWake.WaitI(cont);
+ }
+ char tmp;
+ NCoro::ReadI(cont, SignalRecvPipe, &tmp, 1).Checked();
+ if (--NumWaiting > 0) {
+ ToWake.Signal();
+ }
+ }
+ }
+
+private:
+ TAtomic Value;
+ size_t NumWaiting;
+ TContWaitQueue ToWake;
+ TPipeHandle SignalRecvPipe;
+ TPipeHandle SignalSendPipe;
+};