aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorudovichenko-r <rvu@ydb.tech>2022-12-23 19:37:11 +0300
committerudovichenko-r <rvu@ydb.tech>2022-12-23 19:37:11 +0300
commit0ac2c6be996fb394146ce7ad0d310fe4b6956138 (patch)
treec43d89ba9354c1a03a3871b00533f988212a7475 /library
parentf6fd83a9bc9c2d6bb9502d3dd77adb4f5c31ceae (diff)
downloadydb-0ac2c6be996fb394146ce7ad0d310fe4b6956138.tar.gz
[yql] FuzzyEquals doc
Diffstat (limited to 'library')
-rw-r--r--library/cpp/neh/neh.cpp13
-rw-r--r--library/cpp/neh/wfmo.h48
2 files changed, 30 insertions, 31 deletions
diff --git a/library/cpp/neh/neh.cpp b/library/cpp/neh/neh.cpp
index 8b3092ad57..dded731bb7 100644
--- a/library/cpp/neh/neh.cpp
+++ b/library/cpp/neh/neh.cpp
@@ -43,20 +43,13 @@ namespace {
};
public:
- ~TMultiRequester() {
- for (auto& req : Reqs_) {
- req->Register(nullptr);
- }
- }
-
void Add(const THandleRef& req) override {
Reqs_.insert(req);
- req->Register(&WaitQueue_);
+ req->Register(WaitQueue_);
}
void Del(const THandleRef& req) override {
Reqs_.erase(req);
- req->Register(nullptr);
}
bool Wait(THandleRef& req, TInstant deadLine) override {
@@ -65,7 +58,7 @@ namespace {
return false;
}
TOnComplete cb(this);
- WaitForMultipleObj(WaitQueue_, deadLine, cb);
+ WaitForMultipleObj(*WaitQueue_, deadLine, cb);
if (!cb.Signalled) {
return false;
}
@@ -89,7 +82,7 @@ namespace {
private:
typedef THashSet<THandleRef, TOps, TOps> TReqs;
typedef TList<THandleRef> TComplete;
- TWaitQueue WaitQueue_;
+ TIntrusivePtr<TWaitQueue> WaitQueue_ = MakeIntrusive<TWaitQueue>();
TReqs Reqs_;
TComplete Complete_;
};
diff --git a/library/cpp/neh/wfmo.h b/library/cpp/neh/wfmo.h
index 15d92e3ef5..8e2cd6052a 100644
--- a/library/cpp/neh/wfmo.h
+++ b/library/cpp/neh/wfmo.h
@@ -26,28 +26,36 @@ namespace NNeh {
}
};
- class TWaitQueue {
+ class TWaitQueue: public TThrRefBase {
public:
class TWaitHandle {
public:
- void Signal() noexcept {
- TGuard<TSpinLock> lock(M_);
+ ~TWaitHandle() {
+ SwapWaitQueue(nullptr);
+ }
+
+ void Signal() noexcept {
Signalled_ = true;
- if (Parent_) {
- Parent_->Notify(this);
+ if (TIntrusivePtr<TWaitQueue> q = SwapWaitQueue(nullptr)) {
+ q->Notify(this);
}
}
- void Register(TWaitQueue* parent) noexcept {
- TGuard<TSpinLock> lock(M_);
+ void Register(TIntrusivePtr<TWaitQueue>& waitQueue) noexcept {
+ if (Signalled_) {
+ waitQueue->Notify(this);
+ SwapWaitQueue(nullptr);
+ return;
+ }
- Parent_ = parent;
+ waitQueue->Ref();
+ SwapWaitQueue(waitQueue.Get());
if (Signalled_) {
- if (Parent_) {
- Parent_->Notify(this);
+ if (TIntrusivePtr<TWaitQueue> q = SwapWaitQueue(nullptr)) {
+ q->Notify(this);
}
}
}
@@ -57,14 +65,16 @@ namespace NNeh {
}
void ResetState() {
- TGuard<TSpinLock> lock(M_);
-
Signalled_ = false;
+ SwapWaitQueue(nullptr);
+ }
+ private:
+ TIntrusivePtr<TWaitQueue> SwapWaitQueue(TWaitQueue* newQueue) noexcept {
+ return TIntrusivePtr<TWaitQueue>(AtomicSwap(&WaitQueue_, newQueue), TIntrusivePtr<TWaitQueue>::TNoIncrement());
}
private:
NAtomic::TBool Signalled_ = false;
- TWaitQueue* Parent_ = nullptr;
- TSpinLock M_;
+ TWaitQueue* WaitQueue_ = nullptr;
};
inline bool Wait(const TInstant& deadLine) noexcept {
@@ -115,14 +125,10 @@ namespace NNeh {
static inline bool WaitForOne(TWaitHandle& wh, const TInstant& deadLine) {
TSignalled func;
- TWaitQueue hndl;
- wh.Register(&hndl);
-
- Y_DEFER {
- wh.Register(nullptr);
- };
+ auto hndl = MakeIntrusive<TWaitQueue>();
+ wh.Register(hndl);
- WaitForMultipleObj(hndl, deadLine, func);
+ WaitForMultipleObj(*hndl, deadLine, func);
return func.Signalled;
}