aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/yt/threading/count_down_latch.cpp
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-05-30 12:53:22 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-05-30 12:53:22 +0300
commitee7ff2e71e98d56e4c9aba1279a9490f635418d3 (patch)
tree003bd2598888afb8f5bd185f14a397fd151b2162 /library/cpp/yt/threading/count_down_latch.cpp
parentdf23a796955e5802312e57e12cc4be84358df127 (diff)
downloadydb-ee7ff2e71e98d56e4c9aba1279a9490f635418d3.tar.gz
Intermediate changes
Diffstat (limited to 'library/cpp/yt/threading/count_down_latch.cpp')
-rw-r--r--library/cpp/yt/threading/count_down_latch.cpp71
1 files changed, 71 insertions, 0 deletions
diff --git a/library/cpp/yt/threading/count_down_latch.cpp b/library/cpp/yt/threading/count_down_latch.cpp
new file mode 100644
index 0000000000..5b750156b9
--- /dev/null
+++ b/library/cpp/yt/threading/count_down_latch.cpp
@@ -0,0 +1,71 @@
+#include "count_down_latch.h"
+
+#include "futex.h"
+
+#include <library/cpp/yt/threading/futex.h>
+
+#include <library/cpp/yt/assert/assert.h>
+
+#include <cerrno>
+
+namespace NYT::NThreading {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TCountDownLatch::TCountDownLatch(int count)
+ : Count_(count)
+{ }
+
+void TCountDownLatch::CountDown()
+{
+#ifndef _linux_
+ TGuard<TMutex> guard(Mutex_);
+#endif
+ auto previous = Count_.fetch_sub(1, std::memory_order::release);
+ if (previous == 1) {
+#ifdef _linux_
+ int rv = NThreading::FutexWake(
+ reinterpret_cast<int*>(&Count_),
+ std::numeric_limits<int>::max());
+ YT_VERIFY(rv >= 0);
+#else
+ ConditionVariable_.BroadCast();
+#endif
+ }
+}
+
+void TCountDownLatch::Wait() const
+{
+ while (true) {
+#ifndef _linux_
+ TGuard<TMutex> guard(Mutex_);
+#endif
+ auto count = Count_.load(std::memory_order::acquire);
+ if (count == 0) {
+ return;
+ }
+#ifdef _linux_
+ int rv = NThreading::FutexWait(
+ const_cast<int*>(reinterpret_cast<const int*>(&Count_)),
+ count);
+ YT_VERIFY(rv >= 0 || errno == EWOULDBLOCK || errno == EINTR);
+#else
+ ConditionVariable_.WaitI(Mutex_);
+#endif
+ }
+}
+
+bool TCountDownLatch::TryWait() const
+{
+ return Count_.load(std::memory_order::acquire) == 0;
+}
+
+int TCountDownLatch::GetCount() const
+{
+ return Count_.load(std::memory_order::relaxed);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NThreading
+