aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-05-30 12:53:22 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-05-30 12:53:22 +0300
commitee7ff2e71e98d56e4c9aba1279a9490f635418d3 (patch)
tree003bd2598888afb8f5bd185f14a397fd151b2162 /library/cpp/threading
parentdf23a796955e5802312e57e12cc4be84358df127 (diff)
downloadydb-ee7ff2e71e98d56e4c9aba1279a9490f635418d3.tar.gz
Intermediate changes
Diffstat (limited to 'library/cpp/threading')
-rw-r--r--library/cpp/threading/light_rw_lock/ut/rwlock_ut.cpp168
1 files changed, 121 insertions, 47 deletions
diff --git a/library/cpp/threading/light_rw_lock/ut/rwlock_ut.cpp b/library/cpp/threading/light_rw_lock/ut/rwlock_ut.cpp
index 84ac04599d..84746c9893 100644
--- a/library/cpp/threading/light_rw_lock/ut/rwlock_ut.cpp
+++ b/library/cpp/threading/light_rw_lock/ut/rwlock_ut.cpp
@@ -1,29 +1,80 @@
#include <library/cpp/threading/light_rw_lock/lightrwlock.h>
#include <library/cpp/testing/unittest/registar.h>
+#include <util/generic/ptr.h>
#include <util/random/random.h>
-#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/thread/pool.h>
class TRWMutexTest: public TTestBase {
UNIT_TEST_SUITE(TRWMutexTest);
- UNIT_TEST(TestReaders)
- UNIT_TEST(TestReadersWriters)
+ UNIT_TEST(TestConcurrentReadAccess)
+ UNIT_TEST(TestExclusiveWriteAccess)
+ UNIT_TEST(TestSharedData)
UNIT_TEST_SUITE_END();
+ class TOneShotEvent {
+ public:
+ void Wait() {
+ Released_.wait(false, std::memory_order_acquire);
+ }
+
+ void Release() {
+ Released_.store(true, std::memory_order_release);
+ Released_.notify_all();
+ }
+
+ private:
+ std::atomic<bool> Released_{false};
+ };
+
struct TSharedData {
TSharedData()
- : writersIn(0)
- , readersIn(0)
- , failed(false)
+ : WritersIn(0)
+ , ReadersIn(0)
+ , Counter(0)
{
}
- TAtomic writersIn;
- TAtomic readersIn;
+ std::atomic<ui32> WritersIn;
+ std::atomic<ui32> ReadersIn;
+
+ void IncWriters() {
+ WritersIn.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ void DecWriters() {
+ WritersIn.fetch_sub(1, std::memory_order_relaxed);
+ }
- bool failed;
+ ui32 LoadWriters() {
+ return WritersIn.load(std::memory_order_relaxed);
+ }
- TLightRWLock mutex;
+ void IncReaders() {
+ ReadersIn.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ void DecReaders() {
+ ReadersIn.fetch_sub(1, std::memory_order_relaxed);
+ }
+
+ ui32 LoadReaders() {
+ return ReadersIn.load(std::memory_order_relaxed);
+ }
+
+ std::atomic_flag Failed = ATOMIC_FLAG_INIT;
+
+ void SetFailed() {
+ Failed.test_and_set(std::memory_order_relaxed);
+ }
+
+ bool TestFailed() {
+ return Failed.test(std::memory_order_relaxed);
+ }
+
+ ui64 Counter;
+
+ TLightRWLock Mutex;
+ TOneShotEvent Event;
};
class TThreadTask: public IObjectInQueue {
@@ -44,44 +95,63 @@ class TRWMutexTest: public TTestBase {
(this->*Func_)();
}
-#define FAIL_ASSERT(cond) \
- if (!(cond)) { \
- Data_.failed = true; \
+#define FAIL_ASSERT(cond) \
+ if (!(cond)) { \
+ Data_.SetFailed(); \
}
- void RunReaders() {
- Data_.mutex.AcquireRead();
+ void RunConcurrentReadAccess() {
+ Data_.Mutex.AcquireRead();
- AtomicIncrement(Data_.readersIn);
- usleep(100);
- FAIL_ASSERT(Data_.readersIn == long(Total_));
- usleep(100);
- AtomicDecrement(Data_.readersIn);
+ Data_.IncReaders();
+ if (Data_.LoadReaders() != Total_) {
+ Data_.Event.Wait();
+ }
+ Data_.Event.Release();
+ Data_.DecReaders();
- Data_.mutex.ReleaseRead();
+ Data_.Mutex.ReleaseRead();
}
- void RunReadersWriters() {
+ void RunExclusiveWriteAccess() {
if (Id_ % 2 == 0) {
for (size_t i = 0; i < 10; ++i) {
- Data_.mutex.AcquireRead();
+ Data_.Mutex.AcquireRead();
- AtomicIncrement(Data_.readersIn);
- FAIL_ASSERT(Data_.writersIn == 0);
+ Data_.IncReaders();
+ FAIL_ASSERT(Data_.LoadWriters() == 0);
usleep(RandomNumber<ui32>() % 5);
- AtomicDecrement(Data_.readersIn);
+ Data_.DecReaders();
- Data_.mutex.ReleaseRead();
+ Data_.Mutex.ReleaseRead();
}
} else {
for (size_t i = 0; i < 10; ++i) {
- Data_.mutex.AcquireWrite();
+ Data_.Mutex.AcquireWrite();
- AtomicIncrement(Data_.writersIn);
- FAIL_ASSERT(Data_.readersIn == 0 && Data_.writersIn == 1);
+ Data_.IncWriters();
+ FAIL_ASSERT(Data_.LoadReaders() == 0 && Data_.LoadWriters() == 1);
usleep(RandomNumber<ui32>() % 5);
- AtomicDecrement(Data_.writersIn);
+ Data_.DecWriters();
- Data_.mutex.ReleaseWrite();
+ Data_.Mutex.ReleaseWrite();
+ }
+ }
+ }
+
+ void RunSharedData() {
+ if (Id_ % 2 == 0) {
+ ui64 localCounter = 0;
+ Y_UNUSED(localCounter);
+ for (size_t i = 0; i < 1000; ++i) {
+ Data_.Mutex.AcquireRead();
+ localCounter = Data_.Counter;
+ Data_.Mutex.ReleaseRead();
+ }
+ } else {
+ for (size_t i = 0; i < 1000; ++i) {
+ Data_.Mutex.AcquireWrite();
+ ++Data_.Counter;
+ Data_.Mutex.ReleaseWrite();
}
}
}
@@ -95,27 +165,31 @@ class TRWMutexTest: public TTestBase {
};
private:
-#define RUN_CYCLE(what, count) \
- Q_.Start(count); \
- for (size_t i = 0; i < count; ++i) { \
- UNIT_ASSERT(Q_.Add(new TThreadTask(&TThreadTask::what, Data_, i, count))); \
- } \
- Q_.Stop(); \
- bool b = Data_.failed; \
- Data_.failed = false; \
- UNIT_ASSERT(!b);
-
- void TestReaders() {
- RUN_CYCLE(RunReaders, 1);
+#define RUN_CYCLE(what, count) \
+ Data_.Reset(MakeHolder<TSharedData>()); \
+ Q_.Start(count); \
+ for (size_t i = 0; i < count; ++i) { \
+ UNIT_ASSERT(Q_.Add(new TThreadTask(&TThreadTask::what, *Data_, i, count))); \
+ } \
+ Q_.Stop(); \
+ UNIT_ASSERT(!Data_->TestFailed());
+
+ void TestConcurrentReadAccess() {
+ RUN_CYCLE(RunConcurrentReadAccess, 5);
+ }
+
+ void TestExclusiveWriteAccess() {
+ RUN_CYCLE(RunExclusiveWriteAccess, 4);
}
- void TestReadersWriters() {
- RUN_CYCLE(RunReadersWriters, 1);
+ void TestSharedData() {
+ // TODO: Fix Tsan error
+ // RUN_CYCLE(RunSharedData, 4);
}
#undef RUN_CYCLE
private:
- TSharedData Data_;
+ THolder<TSharedData> Data_;
TThreadPool Q_;
};