diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-05-30 12:53:22 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-05-30 12:53:22 +0300 |
commit | ee7ff2e71e98d56e4c9aba1279a9490f635418d3 (patch) | |
tree | 003bd2598888afb8f5bd185f14a397fd151b2162 /library/cpp | |
parent | df23a796955e5802312e57e12cc4be84358df127 (diff) | |
download | ydb-ee7ff2e71e98d56e4c9aba1279a9490f635418d3.tar.gz |
Intermediate changes
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/threading/light_rw_lock/ut/rwlock_ut.cpp | 168 | ||||
-rw-r--r-- | library/cpp/yt/threading/count_down_latch.cpp | 71 | ||||
-rw-r--r-- | library/cpp/yt/threading/count_down_latch.h | 42 |
3 files changed, 234 insertions, 47 deletions
diff --git a/library/cpp/threading/light_rw_lock/ut/rwlock_ut.cpp b/library/cpp/threading/light_rw_lock/ut/rwlock_ut.cpp index 84ac04599d8..84746c98934 100644 --- a/library/cpp/threading/light_rw_lock/ut/rwlock_ut.cpp +++ b/library/cpp/threading/light_rw_lock/ut/rwlock_ut.cpp @@ -1,29 +1,80 @@ #include <library/cpp/threading/light_rw_lock/lightrwlock.h> #include <library/cpp/testing/unittest/registar.h> +#include <util/generic/ptr.h> #include <util/random/random.h> -#include <library/cpp/deprecated/atomic/atomic.h> #include <util/thread/pool.h> class TRWMutexTest: public TTestBase { UNIT_TEST_SUITE(TRWMutexTest); - UNIT_TEST(TestReaders) - UNIT_TEST(TestReadersWriters) + UNIT_TEST(TestConcurrentReadAccess) + UNIT_TEST(TestExclusiveWriteAccess) + UNIT_TEST(TestSharedData) UNIT_TEST_SUITE_END(); + class TOneShotEvent { + public: + void Wait() { + Released_.wait(false, std::memory_order_acquire); + } + + void Release() { + Released_.store(true, std::memory_order_release); + Released_.notify_all(); + } + + private: + std::atomic<bool> Released_{false}; + }; + struct TSharedData { TSharedData() - : writersIn(0) - , readersIn(0) - , failed(false) + : WritersIn(0) + , ReadersIn(0) + , Counter(0) { } - TAtomic writersIn; - TAtomic readersIn; + std::atomic<ui32> WritersIn; + std::atomic<ui32> ReadersIn; + + void IncWriters() { + WritersIn.fetch_add(1, std::memory_order_relaxed); + } + + void DecWriters() { + WritersIn.fetch_sub(1, std::memory_order_relaxed); + } - bool failed; + ui32 LoadWriters() { + return WritersIn.load(std::memory_order_relaxed); + } - TLightRWLock mutex; + void IncReaders() { + ReadersIn.fetch_add(1, std::memory_order_relaxed); + } + + void DecReaders() { + ReadersIn.fetch_sub(1, std::memory_order_relaxed); + } + + ui32 LoadReaders() { + return ReadersIn.load(std::memory_order_relaxed); + } + + std::atomic_flag Failed = ATOMIC_FLAG_INIT; + + void SetFailed() { + Failed.test_and_set(std::memory_order_relaxed); + } + + bool TestFailed() { + return Failed.test(std::memory_order_relaxed); + } + + ui64 Counter; + + TLightRWLock Mutex; + TOneShotEvent Event; }; class TThreadTask: public IObjectInQueue { @@ -44,44 +95,63 @@ class TRWMutexTest: public TTestBase { (this->*Func_)(); } -#define FAIL_ASSERT(cond) \ - if (!(cond)) { \ - Data_.failed = true; \ +#define FAIL_ASSERT(cond) \ + if (!(cond)) { \ + Data_.SetFailed(); \ } - void RunReaders() { - Data_.mutex.AcquireRead(); + void RunConcurrentReadAccess() { + Data_.Mutex.AcquireRead(); - AtomicIncrement(Data_.readersIn); - usleep(100); - FAIL_ASSERT(Data_.readersIn == long(Total_)); - usleep(100); - AtomicDecrement(Data_.readersIn); + Data_.IncReaders(); + if (Data_.LoadReaders() != Total_) { + Data_.Event.Wait(); + } + Data_.Event.Release(); + Data_.DecReaders(); - Data_.mutex.ReleaseRead(); + Data_.Mutex.ReleaseRead(); } - void RunReadersWriters() { + void RunExclusiveWriteAccess() { if (Id_ % 2 == 0) { for (size_t i = 0; i < 10; ++i) { - Data_.mutex.AcquireRead(); + Data_.Mutex.AcquireRead(); - AtomicIncrement(Data_.readersIn); - FAIL_ASSERT(Data_.writersIn == 0); + Data_.IncReaders(); + FAIL_ASSERT(Data_.LoadWriters() == 0); usleep(RandomNumber<ui32>() % 5); - AtomicDecrement(Data_.readersIn); + Data_.DecReaders(); - Data_.mutex.ReleaseRead(); + Data_.Mutex.ReleaseRead(); } } else { for (size_t i = 0; i < 10; ++i) { - Data_.mutex.AcquireWrite(); + Data_.Mutex.AcquireWrite(); - AtomicIncrement(Data_.writersIn); - FAIL_ASSERT(Data_.readersIn == 0 && Data_.writersIn == 1); + Data_.IncWriters(); + FAIL_ASSERT(Data_.LoadReaders() == 0 && Data_.LoadWriters() == 1); usleep(RandomNumber<ui32>() % 5); - AtomicDecrement(Data_.writersIn); + Data_.DecWriters(); - Data_.mutex.ReleaseWrite(); + Data_.Mutex.ReleaseWrite(); + } + } + } + + void RunSharedData() { + if (Id_ % 2 == 0) { + ui64 localCounter = 0; + Y_UNUSED(localCounter); + for (size_t i = 0; i < 1000; ++i) { + Data_.Mutex.AcquireRead(); + localCounter = Data_.Counter; + Data_.Mutex.ReleaseRead(); + } + } else { + for (size_t i = 0; i < 1000; ++i) { + Data_.Mutex.AcquireWrite(); + ++Data_.Counter; + Data_.Mutex.ReleaseWrite(); } } } @@ -95,27 +165,31 @@ class TRWMutexTest: public TTestBase { }; private: -#define RUN_CYCLE(what, count) \ - Q_.Start(count); \ - for (size_t i = 0; i < count; ++i) { \ - UNIT_ASSERT(Q_.Add(new TThreadTask(&TThreadTask::what, Data_, i, count))); \ - } \ - Q_.Stop(); \ - bool b = Data_.failed; \ - Data_.failed = false; \ - UNIT_ASSERT(!b); - - void TestReaders() { - RUN_CYCLE(RunReaders, 1); +#define RUN_CYCLE(what, count) \ + Data_.Reset(MakeHolder<TSharedData>()); \ + Q_.Start(count); \ + for (size_t i = 0; i < count; ++i) { \ + UNIT_ASSERT(Q_.Add(new TThreadTask(&TThreadTask::what, *Data_, i, count))); \ + } \ + Q_.Stop(); \ + UNIT_ASSERT(!Data_->TestFailed()); + + void TestConcurrentReadAccess() { + RUN_CYCLE(RunConcurrentReadAccess, 5); + } + + void TestExclusiveWriteAccess() { + RUN_CYCLE(RunExclusiveWriteAccess, 4); } - void TestReadersWriters() { - RUN_CYCLE(RunReadersWriters, 1); + void TestSharedData() { + // TODO: Fix Tsan error + // RUN_CYCLE(RunSharedData, 4); } #undef RUN_CYCLE private: - TSharedData Data_; + THolder<TSharedData> Data_; TThreadPool Q_; }; diff --git a/library/cpp/yt/threading/count_down_latch.cpp b/library/cpp/yt/threading/count_down_latch.cpp new file mode 100644 index 00000000000..5b750156b95 --- /dev/null +++ b/library/cpp/yt/threading/count_down_latch.cpp @@ -0,0 +1,71 @@ +#include "count_down_latch.h" + +#include "futex.h" + +#include <library/cpp/yt/threading/futex.h> + +#include <library/cpp/yt/assert/assert.h> + +#include <cerrno> + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +TCountDownLatch::TCountDownLatch(int count) + : Count_(count) +{ } + +void TCountDownLatch::CountDown() +{ +#ifndef _linux_ + TGuard<TMutex> guard(Mutex_); +#endif + auto previous = Count_.fetch_sub(1, std::memory_order::release); + if (previous == 1) { +#ifdef _linux_ + int rv = NThreading::FutexWake( + reinterpret_cast<int*>(&Count_), + std::numeric_limits<int>::max()); + YT_VERIFY(rv >= 0); +#else + ConditionVariable_.BroadCast(); +#endif + } +} + +void TCountDownLatch::Wait() const +{ + while (true) { +#ifndef _linux_ + TGuard<TMutex> guard(Mutex_); +#endif + auto count = Count_.load(std::memory_order::acquire); + if (count == 0) { + return; + } +#ifdef _linux_ + int rv = NThreading::FutexWait( + const_cast<int*>(reinterpret_cast<const int*>(&Count_)), + count); + YT_VERIFY(rv >= 0 || errno == EWOULDBLOCK || errno == EINTR); +#else + ConditionVariable_.WaitI(Mutex_); +#endif + } +} + +bool TCountDownLatch::TryWait() const +{ + return Count_.load(std::memory_order::acquire) == 0; +} + +int TCountDownLatch::GetCount() const +{ + return Count_.load(std::memory_order::relaxed); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading + diff --git a/library/cpp/yt/threading/count_down_latch.h b/library/cpp/yt/threading/count_down_latch.h new file mode 100644 index 00000000000..14e344365ed --- /dev/null +++ b/library/cpp/yt/threading/count_down_latch.h @@ -0,0 +1,42 @@ +#pragma once + +#include "public.h" + +#ifndef _linux_ + #include <util/system/condvar.h> + #include <util/system/mutex.h> +#endif + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +//! A synchronization aid that allows one or more threads to wait until +//! a set of operations being performed in other threads completes. +/*! + * See https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html + */ +class TCountDownLatch final +{ +public: + explicit TCountDownLatch(int count); + + void CountDown(); + + void Wait() const; + bool TryWait() const; + + int GetCount() const; + +private: + std::atomic<int> Count_; + +#ifndef _linux_ + mutable TCondVar ConditionVariable_; + mutable TMutex Mutex_; +#endif +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading |