diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-05-30 12:53:22 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-05-30 12:53:22 +0300 |
commit | ee7ff2e71e98d56e4c9aba1279a9490f635418d3 (patch) | |
tree | 003bd2598888afb8f5bd185f14a397fd151b2162 /library/cpp/yt | |
parent | df23a796955e5802312e57e12cc4be84358df127 (diff) | |
download | ydb-ee7ff2e71e98d56e4c9aba1279a9490f635418d3.tar.gz |
Intermediate changes
Diffstat (limited to 'library/cpp/yt')
-rw-r--r-- | library/cpp/yt/threading/count_down_latch.cpp | 71 | ||||
-rw-r--r-- | library/cpp/yt/threading/count_down_latch.h | 42 |
2 files changed, 113 insertions, 0 deletions
diff --git a/library/cpp/yt/threading/count_down_latch.cpp b/library/cpp/yt/threading/count_down_latch.cpp new file mode 100644 index 0000000000..5b750156b9 --- /dev/null +++ b/library/cpp/yt/threading/count_down_latch.cpp @@ -0,0 +1,71 @@ +#include "count_down_latch.h" + +#include "futex.h" + +#include <library/cpp/yt/threading/futex.h> + +#include <library/cpp/yt/assert/assert.h> + +#include <cerrno> + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +TCountDownLatch::TCountDownLatch(int count) + : Count_(count) +{ } + +void TCountDownLatch::CountDown() +{ +#ifndef _linux_ + TGuard<TMutex> guard(Mutex_); +#endif + auto previous = Count_.fetch_sub(1, std::memory_order::release); + if (previous == 1) { +#ifdef _linux_ + int rv = NThreading::FutexWake( + reinterpret_cast<int*>(&Count_), + std::numeric_limits<int>::max()); + YT_VERIFY(rv >= 0); +#else + ConditionVariable_.BroadCast(); +#endif + } +} + +void TCountDownLatch::Wait() const +{ + while (true) { +#ifndef _linux_ + TGuard<TMutex> guard(Mutex_); +#endif + auto count = Count_.load(std::memory_order::acquire); + if (count == 0) { + return; + } +#ifdef _linux_ + int rv = NThreading::FutexWait( + const_cast<int*>(reinterpret_cast<const int*>(&Count_)), + count); + YT_VERIFY(rv >= 0 || errno == EWOULDBLOCK || errno == EINTR); +#else + ConditionVariable_.WaitI(Mutex_); +#endif + } +} + +bool TCountDownLatch::TryWait() const +{ + return Count_.load(std::memory_order::acquire) == 0; +} + +int TCountDownLatch::GetCount() const +{ + return Count_.load(std::memory_order::relaxed); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading + diff --git a/library/cpp/yt/threading/count_down_latch.h b/library/cpp/yt/threading/count_down_latch.h new file mode 100644 index 0000000000..14e344365e --- /dev/null +++ b/library/cpp/yt/threading/count_down_latch.h @@ -0,0 +1,42 @@ +#pragma once + +#include "public.h" + +#ifndef _linux_ + #include <util/system/condvar.h> + #include <util/system/mutex.h> +#endif + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +//! A synchronization aid that allows one or more threads to wait until +//! a set of operations being performed in other threads completes. +/*! + * See https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html + */ +class TCountDownLatch final +{ +public: + explicit TCountDownLatch(int count); + + void CountDown(); + + void Wait() const; + bool TryWait() const; + + int GetCount() const; + +private: + std::atomic<int> Count_; + +#ifndef _linux_ + mutable TCondVar ConditionVariable_; + mutable TMutex Mutex_; +#endif +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading |