diff options
| author | qrort <[email protected]> | 2022-11-30 23:47:12 +0300 | 
|---|---|---|
| committer | qrort <[email protected]> | 2022-11-30 23:47:12 +0300 | 
| commit | 22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch) | |
| tree | bffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/coroutine/util/pipeevent.h | |
| parent | 332b99e2173f0425444abb759eebcb2fafaa9209 (diff) | |
validate canons without yatest_common
Diffstat (limited to 'library/cpp/coroutine/util/pipeevent.h')
| -rw-r--r-- | library/cpp/coroutine/util/pipeevent.h | 93 | 
1 files changed, 93 insertions, 0 deletions
diff --git a/library/cpp/coroutine/util/pipeevent.h b/library/cpp/coroutine/util/pipeevent.h new file mode 100644 index 00000000000..059f2a34d9a --- /dev/null +++ b/library/cpp/coroutine/util/pipeevent.h @@ -0,0 +1,93 @@ +#pragma once + +#include <library/cpp/coroutine/engine/events.h> +#include <library/cpp/coroutine/engine/impl.h> +#include <library/cpp/coroutine/engine/network.h> + +#include <util/network/socket.h> +#include <library/cpp/deprecated/atomic/atomic.h> +#include <util/system/pipe.h> + +// TPipeEvent and TPipeSemaphore try to minimize number of coroutines reading from same pipe +// because its actually quite expensive with >1000 coroutines waiting on same event/semaphore + +// Using this class you can block in coroutine on waiting +// a signal from outer thread. +// Usual thread synchronization primitives are not appropriate +// because they will block all coroutines in single thread. +class TPipeEvent { +public: +    explicit TPipeEvent() +        : Signaled(0) +        , NumWaiting(0) +    { +        TPipeHandle::Pipe(SignalRecvPipe, SignalSendPipe); +        SetNonBlock(SignalRecvPipe); +        SetNonBlock(SignalSendPipe); +    } + +    void Signal() { +        if (AtomicCas(&Signaled, 1, 0)) { +            char tmp = 1; +            SignalSendPipe.Write(&tmp, 1); +        } +    } + +    void Wait(TCont* cont) { +        if (++NumWaiting > 1) { +            ToWake.WaitI(cont); +        } +        char tmp; +        NCoro::ReadI(cont, SignalRecvPipe, &tmp, 1).Checked(); +        AtomicSet(Signaled, 0); +        if (--NumWaiting > 0) { +            ToWake.Signal(); +        } +    } + +private: +    TAtomic Signaled; +    size_t NumWaiting; +    TContWaitQueue ToWake; +    TPipeHandle SignalRecvPipe; +    TPipeHandle SignalSendPipe; +}; + +class TPipeSemaphore { +public: +    explicit TPipeSemaphore() +        : Value(0) +        , NumWaiting(0) +    { +        TPipeHandle::Pipe(SignalRecvPipe, SignalSendPipe); +        SetNonBlock(SignalRecvPipe); +        SetNonBlock(SignalSendPipe); +    } + +    void Inc() { +        if (AtomicIncrement(Value) <= 0) { +            char tmp = 1; +            SignalSendPipe.Write(&tmp, 1); +        } +    } + +    void Dec(TCont* cont) { +        if (AtomicDecrement(Value) < 0) { +            if (++NumWaiting > 1) { +                ToWake.WaitI(cont); +            } +            char tmp; +            NCoro::ReadI(cont, SignalRecvPipe, &tmp, 1).Checked(); +            if (--NumWaiting > 0) { +                ToWake.Signal(); +            } +        } +    } + +private: +    TAtomic Value; +    size_t NumWaiting; +    TContWaitQueue ToWake; +    TPipeHandle SignalRecvPipe; +    TPipeHandle SignalSendPipe; +};  | 
