aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh
diff options
context:
space:
mode:
authorsvidyuk <svidyuk@yandex-team.com>2022-11-14 12:03:26 +0300
committersvidyuk <svidyuk@yandex-team.com>2022-11-14 12:03:26 +0300
commit27d8c0c44daf7a5aba953248a3682667db688669 (patch)
treec7b10fe156ae6deef1d6ae19cc8ef509925ac463 /library/cpp/neh
parent1d1f495e76f87ee31598b60ea254edce5e6c78f5 (diff)
downloadydb-27d8c0c44daf7a5aba953248a3682667db688669.tar.gz
rework pom generation to support EXCLUDES
Diffstat (limited to 'library/cpp/neh')
-rw-r--r--library/cpp/neh/neh.cpp16
-rw-r--r--library/cpp/neh/wfmo.h36
2 files changed, 21 insertions, 31 deletions
diff --git a/library/cpp/neh/neh.cpp b/library/cpp/neh/neh.cpp
index 2a3eef5023..8b3092ad57 100644
--- a/library/cpp/neh/neh.cpp
+++ b/library/cpp/neh/neh.cpp
@@ -43,12 +43,20 @@ namespace {
};
public:
+ ~TMultiRequester() {
+ for (auto& req : Reqs_) {
+ req->Register(nullptr);
+ }
+ }
+
void Add(const THandleRef& req) override {
Reqs_.insert(req);
+ req->Register(&WaitQueue_);
}
void Del(const THandleRef& req) override {
Reqs_.erase(req);
+ req->Register(nullptr);
}
bool Wait(THandleRef& req, TInstant deadLine) override {
@@ -56,11 +64,8 @@ namespace {
if (Reqs_.empty()) {
return false;
}
-
TOnComplete cb(this);
-
- WaitForMultipleObj(Reqs_.begin(), Reqs_.end(), deadLine, cb);
-
+ WaitForMultipleObj(WaitQueue_, deadLine, cb);
if (!cb.Signalled) {
return false;
}
@@ -78,12 +83,13 @@ namespace {
inline void OnComplete(const THandleRef& req) {
Complete_.push_back(req);
- Reqs_.erase(req);
+ Del(req);
}
private:
typedef THashSet<THandleRef, TOps, TOps> TReqs;
typedef TList<THandleRef> TComplete;
+ TWaitQueue WaitQueue_;
TReqs Reqs_;
TComplete Complete_;
};
diff --git a/library/cpp/neh/wfmo.h b/library/cpp/neh/wfmo.h
index 11f32dda22..a51d391612 100644
--- a/library/cpp/neh/wfmo.h
+++ b/library/cpp/neh/wfmo.h
@@ -5,6 +5,7 @@
#include <library/cpp/threading/atomic/bool.h>
#include <util/generic/vector.h>
+#include <util/generic/scope.h>
#include <library/cpp/deprecated/atomic/atomic.h>
#include <library/cpp/deprecated/atomic/atomic_ops.h>
#include <util/system/event.h>
@@ -61,22 +62,6 @@ namespace NNeh {
TSpinLock M_;
};
- inline ~TWaitQueue() {
- for (size_t i = 0; i < H_.size(); ++i) {
- H_[i]->Register(nullptr);
- }
- }
-
- inline void Register(TWaitHandle& ev) {
- H_.push_back(&ev);
- ev.Register(this);
- }
-
- template <class T>
- inline void Register(const T& ev) {
- Register(static_cast<TWaitHandle&>(*ev));
- }
-
inline bool Wait(const TInstant& deadLine) noexcept {
return Q_.WaitD(deadLine);
}
@@ -91,19 +76,12 @@ namespace NNeh {
private:
TBlockedQueue<TWaitHandle*> Q_;
- TVector<TWaitHandle*> H_;
};
typedef TWaitQueue::TWaitHandle TWaitHandle;
- template <class It, class T>
- static inline void WaitForMultipleObj(It b, It e, const TInstant& deadLine, T& func) {
- TWaitQueue hndl;
-
- while (b != e) {
- hndl.Register(*b++);
- }
-
+ template <class T>
+ static inline void WaitForMultipleObj(TWaitQueue& hndl, const TInstant& deadLine, T& func) {
do {
TWaitHandle* ret = nullptr;
@@ -132,8 +110,14 @@ namespace NNeh {
static inline bool WaitForOne(TWaitHandle& wh, const TInstant& deadLine) {
TSignalled func;
+ TWaitQueue hndl;
+ wh.Register(&hndl);
+
+ Y_DEFER {
+ wh.Register(nullptr);
+ };
- WaitForMultipleObj(&wh, &wh + 1, deadLine, func);
+ WaitForMultipleObj(hndl, deadLine, func);
return func.Signalled;
}