diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/wfmo.h | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/neh/wfmo.h')
-rw-r--r-- | library/cpp/neh/wfmo.h | 140 |
1 files changed, 140 insertions, 0 deletions
diff --git a/library/cpp/neh/wfmo.h b/library/cpp/neh/wfmo.h new file mode 100644 index 00000000000..11f32dda22e --- /dev/null +++ b/library/cpp/neh/wfmo.h @@ -0,0 +1,140 @@ +#pragma once + +#include "lfqueue.h" + +#include <library/cpp/threading/atomic/bool.h> + +#include <util/generic/vector.h> +#include <library/cpp/deprecated/atomic/atomic.h> +#include <library/cpp/deprecated/atomic/atomic_ops.h> +#include <util/system/event.h> +#include <util/system/spinlock.h> + +namespace NNeh { + template <class T> + class TBlockedQueue: public TLockFreeQueue<T>, public TSystemEvent { + public: + inline TBlockedQueue() noexcept + : TSystemEvent(TSystemEvent::rAuto) + { + } + + inline void Notify(T t) noexcept { + this->Enqueue(t); + Signal(); + } + }; + + class TWaitQueue { + public: + struct TWaitHandle { + inline TWaitHandle() noexcept + : Signalled(false) + , Parent(nullptr) + { + } + + inline void Signal() noexcept { + TGuard<TSpinLock> lock(M_); + + Signalled = true; + + if (Parent) { + Parent->Notify(this); + } + } + + inline void Register(TWaitQueue* parent) noexcept { + TGuard<TSpinLock> lock(M_); + + Parent = parent; + + if (Signalled) { + if (Parent) { + Parent->Notify(this); + } + } + } + + NAtomic::TBool Signalled; + TWaitQueue* Parent; + TSpinLock M_; + }; + + inline ~TWaitQueue() { + for (size_t i = 0; i < H_.size(); ++i) { + H_[i]->Register(nullptr); + } + } + + inline void Register(TWaitHandle& ev) { + H_.push_back(&ev); + ev.Register(this); + } + + template <class T> + inline void Register(const T& ev) { + Register(static_cast<TWaitHandle&>(*ev)); + } + + inline bool Wait(const TInstant& deadLine) noexcept { + return Q_.WaitD(deadLine); + } + + inline void Notify(TWaitHandle* wq) noexcept { + Q_.Notify(wq); + } + + inline bool Dequeue(TWaitHandle** wq) noexcept { + return Q_.Dequeue(wq); + } + + private: + TBlockedQueue<TWaitHandle*> Q_; + TVector<TWaitHandle*> H_; + }; + + typedef TWaitQueue::TWaitHandle TWaitHandle; + + template <class It, class T> + static inline void WaitForMultipleObj(It b, It e, const TInstant& deadLine, T& func) { + TWaitQueue hndl; + + while (b != e) { + hndl.Register(*b++); + } + + do { + TWaitHandle* ret = nullptr; + + if (hndl.Dequeue(&ret)) { + do { + func(ret); + } while (hndl.Dequeue(&ret)); + + return; + } + } while (hndl.Wait(deadLine)); + } + + struct TSignalled { + inline TSignalled() + : Signalled(false) + { + } + + inline void operator()(const TWaitHandle*) noexcept { + Signalled = true; + } + + bool Signalled; + }; + + static inline bool WaitForOne(TWaitHandle& wh, const TInstant& deadLine) { + TSignalled func; + + WaitForMultipleObj(&wh, &wh + 1, deadLine, func); + + return func.Signalled; + } +} |