aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/yt
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-05-30 12:53:22 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-05-30 12:53:22 +0300
commitee7ff2e71e98d56e4c9aba1279a9490f635418d3 (patch)
tree003bd2598888afb8f5bd185f14a397fd151b2162 /library/cpp/yt
parentdf23a796955e5802312e57e12cc4be84358df127 (diff)
downloadydb-ee7ff2e71e98d56e4c9aba1279a9490f635418d3.tar.gz
Intermediate changes
Diffstat (limited to 'library/cpp/yt')
-rw-r--r--library/cpp/yt/threading/count_down_latch.cpp71
-rw-r--r--library/cpp/yt/threading/count_down_latch.h42
2 files changed, 113 insertions, 0 deletions
diff --git a/library/cpp/yt/threading/count_down_latch.cpp b/library/cpp/yt/threading/count_down_latch.cpp
new file mode 100644
index 0000000000..5b750156b9
--- /dev/null
+++ b/library/cpp/yt/threading/count_down_latch.cpp
@@ -0,0 +1,71 @@
+#include "count_down_latch.h"
+
+#include "futex.h"
+
+#include <library/cpp/yt/threading/futex.h>
+
+#include <library/cpp/yt/assert/assert.h>
+
+#include <cerrno>
+
+namespace NYT::NThreading {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TCountDownLatch::TCountDownLatch(int count)
+ : Count_(count)
+{ }
+
+void TCountDownLatch::CountDown()
+{
+#ifndef _linux_
+ TGuard<TMutex> guard(Mutex_);
+#endif
+ auto previous = Count_.fetch_sub(1, std::memory_order::release);
+ if (previous == 1) {
+#ifdef _linux_
+ int rv = NThreading::FutexWake(
+ reinterpret_cast<int*>(&Count_),
+ std::numeric_limits<int>::max());
+ YT_VERIFY(rv >= 0);
+#else
+ ConditionVariable_.BroadCast();
+#endif
+ }
+}
+
+void TCountDownLatch::Wait() const
+{
+ while (true) {
+#ifndef _linux_
+ TGuard<TMutex> guard(Mutex_);
+#endif
+ auto count = Count_.load(std::memory_order::acquire);
+ if (count == 0) {
+ return;
+ }
+#ifdef _linux_
+ int rv = NThreading::FutexWait(
+ const_cast<int*>(reinterpret_cast<const int*>(&Count_)),
+ count);
+ YT_VERIFY(rv >= 0 || errno == EWOULDBLOCK || errno == EINTR);
+#else
+ ConditionVariable_.WaitI(Mutex_);
+#endif
+ }
+}
+
+bool TCountDownLatch::TryWait() const
+{
+ return Count_.load(std::memory_order::acquire) == 0;
+}
+
+int TCountDownLatch::GetCount() const
+{
+ return Count_.load(std::memory_order::relaxed);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NThreading
+
diff --git a/library/cpp/yt/threading/count_down_latch.h b/library/cpp/yt/threading/count_down_latch.h
new file mode 100644
index 0000000000..14e344365e
--- /dev/null
+++ b/library/cpp/yt/threading/count_down_latch.h
@@ -0,0 +1,42 @@
+#pragma once
+
+#include "public.h"
+
+#ifndef _linux_
+ #include <util/system/condvar.h>
+ #include <util/system/mutex.h>
+#endif
+
+namespace NYT::NThreading {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! A synchronization aid that allows one or more threads to wait until
+//! a set of operations being performed in other threads completes.
+/*!
+ * See https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html
+ */
+class TCountDownLatch final
+{
+public:
+ explicit TCountDownLatch(int count);
+
+ void CountDown();
+
+ void Wait() const;
+ bool TryWait() const;
+
+ int GetCount() const;
+
+private:
+ std::atomic<int> Count_;
+
+#ifndef _linux_
+ mutable TCondVar ConditionVariable_;
+ mutable TMutex Mutex_;
+#endif
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NThreading