diff options
author | svidyuk <svidyuk@yandex-team.com> | 2022-11-14 12:03:26 +0300 |
---|---|---|
committer | svidyuk <svidyuk@yandex-team.com> | 2022-11-14 12:03:26 +0300 |
commit | 27d8c0c44daf7a5aba953248a3682667db688669 (patch) | |
tree | c7b10fe156ae6deef1d6ae19cc8ef509925ac463 /library/cpp/neh | |
parent | 1d1f495e76f87ee31598b60ea254edce5e6c78f5 (diff) | |
download | ydb-27d8c0c44daf7a5aba953248a3682667db688669.tar.gz |
rework pom generation to support EXCLUDES
Diffstat (limited to 'library/cpp/neh')
-rw-r--r-- | library/cpp/neh/neh.cpp | 16 | ||||
-rw-r--r-- | library/cpp/neh/wfmo.h | 36 |
2 files changed, 21 insertions, 31 deletions
diff --git a/library/cpp/neh/neh.cpp b/library/cpp/neh/neh.cpp index 2a3eef5023..8b3092ad57 100644 --- a/library/cpp/neh/neh.cpp +++ b/library/cpp/neh/neh.cpp @@ -43,12 +43,20 @@ namespace { }; public: + ~TMultiRequester() { + for (auto& req : Reqs_) { + req->Register(nullptr); + } + } + void Add(const THandleRef& req) override { Reqs_.insert(req); + req->Register(&WaitQueue_); } void Del(const THandleRef& req) override { Reqs_.erase(req); + req->Register(nullptr); } bool Wait(THandleRef& req, TInstant deadLine) override { @@ -56,11 +64,8 @@ namespace { if (Reqs_.empty()) { return false; } - TOnComplete cb(this); - - WaitForMultipleObj(Reqs_.begin(), Reqs_.end(), deadLine, cb); - + WaitForMultipleObj(WaitQueue_, deadLine, cb); if (!cb.Signalled) { return false; } @@ -78,12 +83,13 @@ namespace { inline void OnComplete(const THandleRef& req) { Complete_.push_back(req); - Reqs_.erase(req); + Del(req); } private: typedef THashSet<THandleRef, TOps, TOps> TReqs; typedef TList<THandleRef> TComplete; + TWaitQueue WaitQueue_; TReqs Reqs_; TComplete Complete_; }; diff --git a/library/cpp/neh/wfmo.h b/library/cpp/neh/wfmo.h index 11f32dda22..a51d391612 100644 --- a/library/cpp/neh/wfmo.h +++ b/library/cpp/neh/wfmo.h @@ -5,6 +5,7 @@ #include <library/cpp/threading/atomic/bool.h> #include <util/generic/vector.h> +#include <util/generic/scope.h> #include <library/cpp/deprecated/atomic/atomic.h> #include <library/cpp/deprecated/atomic/atomic_ops.h> #include <util/system/event.h> @@ -61,22 +62,6 @@ namespace NNeh { TSpinLock M_; }; - inline ~TWaitQueue() { - for (size_t i = 0; i < H_.size(); ++i) { - H_[i]->Register(nullptr); - } - } - - inline void Register(TWaitHandle& ev) { - H_.push_back(&ev); - ev.Register(this); - } - - template <class T> - inline void Register(const T& ev) { - Register(static_cast<TWaitHandle&>(*ev)); - } - inline bool Wait(const TInstant& deadLine) noexcept { return Q_.WaitD(deadLine); } @@ -91,19 +76,12 @@ namespace NNeh { private: TBlockedQueue<TWaitHandle*> Q_; - TVector<TWaitHandle*> H_; }; typedef TWaitQueue::TWaitHandle TWaitHandle; - template <class It, class T> - static inline void WaitForMultipleObj(It b, It e, const TInstant& deadLine, T& func) { - TWaitQueue hndl; - - while (b != e) { - hndl.Register(*b++); - } - + template <class T> + static inline void WaitForMultipleObj(TWaitQueue& hndl, const TInstant& deadLine, T& func) { do { TWaitHandle* ret = nullptr; @@ -132,8 +110,14 @@ namespace NNeh { static inline bool WaitForOne(TWaitHandle& wh, const TInstant& deadLine) { TSignalled func; + TWaitQueue hndl; + wh.Register(&hndl); + + Y_DEFER { + wh.Register(nullptr); + }; - WaitForMultipleObj(&wh, &wh + 1, deadLine, func); + WaitForMultipleObj(hndl, deadLine, func); return func.Signalled; } |