diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/system/condvar_ut.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/system/condvar_ut.cpp')
-rw-r--r-- | util/system/condvar_ut.cpp | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/util/system/condvar_ut.cpp b/util/system/condvar_ut.cpp new file mode 100644 index 0000000000..5130a18d32 --- /dev/null +++ b/util/system/condvar_ut.cpp @@ -0,0 +1,200 @@ +#include "mutex.h" +#include "guard.h" +#include "condvar.h" + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/system/atomic.h> +#include <util/system/atomic_ops.h> +#include <util/thread/pool.h> + +class TCondVarTest: public TTestBase { + UNIT_TEST_SUITE(TCondVarTest); + UNIT_TEST(TestBasics) + UNIT_TEST(TestSyncronize) + UNIT_TEST_SUITE_END(); + + struct TSharedData { + TSharedData() + : stopWaiting(false) + , in(0) + , out(0) + , waited(0) + , failed(false) + { + } + + TMutex mutex; + TCondVar condVar1; + TCondVar condVar2; + + TAtomic stopWaiting; + + TAtomic in; + TAtomic out; + + TAtomic waited; + + bool failed; + }; + + class TThreadTask: public IObjectInQueue { + public: + using PFunc = void (TThreadTask::*)(void); + + TThreadTask(PFunc func, size_t id, size_t totalIds, TSharedData& data) + : Func_(func) + , Id_(id) + , TotalIds_(totalIds) + , Data_(data) + { + } + + void Process(void*) override { + THolder<TThreadTask> This(this); + + (this->*Func_)(); + } + +#define FAIL_ASSERT(cond) \ + if (!(cond)) { \ + Data_.failed = true; \ + } + void RunBasics() { + Y_ASSERT(TotalIds_ == 3); + + if (Id_ < 2) { + TGuard<TMutex> guard(Data_.mutex); + while (!AtomicGet(Data_.stopWaiting)) { + bool res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1)); + FAIL_ASSERT(res == true); + } + } else { + usleep(100000); + AtomicSet(Data_.stopWaiting, true); + + TGuard<TMutex> guard(Data_.mutex); + Data_.condVar1.Signal(); + Data_.condVar1.Signal(); + } + } + + void RunBasicsWithPredicate() { + Y_ASSERT(TotalIds_ == 3); + + if (Id_ < 2) { + TGuard<TMutex> guard(Data_.mutex); + const auto res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1), [&] { + return AtomicGet(Data_.stopWaiting); + }); + FAIL_ASSERT(res == true); + } else { + usleep(100000); + AtomicSet(Data_.stopWaiting, true); + + TGuard<TMutex> guard(Data_.mutex); + Data_.condVar1.Signal(); + Data_.condVar1.Signal(); + } + } + + void RunSyncronize() { + for (size_t i = 0; i < 10; ++i) { + TGuard<TMutex> guard(Data_.mutex); + AtomicIncrement(Data_.in); + if (AtomicGet(Data_.in) == TotalIds_) { + AtomicSet(Data_.out, 0); + Data_.condVar1.BroadCast(); + } else { + AtomicIncrement(Data_.waited); + while (AtomicGet(Data_.in) < TotalIds_) { + bool res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1)); + FAIL_ASSERT(res == true); + } + } + + AtomicIncrement(Data_.out); + if (AtomicGet(Data_.out) == TotalIds_) { + AtomicSet(Data_.in, 0); + Data_.condVar2.BroadCast(); + } else { + while (AtomicGet(Data_.out) < TotalIds_) { + bool res = Data_.condVar2.WaitT(Data_.mutex, TDuration::Seconds(1)); + FAIL_ASSERT(res == true); + } + } + } + + FAIL_ASSERT(AtomicGet(Data_.waited) == (TotalIds_ - 1) * 10); + } + + void RunSyncronizeWithPredicate() { + for (size_t i = 0; i < 10; ++i) { + TGuard<TMutex> guard(Data_.mutex); + AtomicIncrement(Data_.in); + if (AtomicGet(Data_.in) == TotalIds_) { + AtomicSet(Data_.out, 0); + Data_.condVar1.BroadCast(); + } else { + AtomicIncrement(Data_.waited); + const auto res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1), [&] { + return AtomicGet(Data_.in) >= TotalIds_; + }); + FAIL_ASSERT(res == true); + } + + AtomicIncrement(Data_.out); + if (AtomicGet(Data_.out) == TotalIds_) { + AtomicSet(Data_.in, 0); + Data_.condVar2.BroadCast(); + } else { + const auto res = Data_.condVar2.WaitT(Data_.mutex, TDuration::Seconds(1), [&] { + return AtomicGet(Data_.out) >= TotalIds_; + }); + FAIL_ASSERT(res == true); + } + } + + FAIL_ASSERT(Data_.waited == (TotalIds_ - 1) * 10); + } +#undef FAIL_ASSERT + + private: + PFunc Func_; + size_t Id_; + TAtomicBase TotalIds_; + TSharedData& Data_; + }; + +private: +#define RUN_CYCLE(what, count) \ + Q_.Start(count); \ + for (size_t i = 0; i < count; ++i) { \ + UNIT_ASSERT(Q_.Add(new TThreadTask(&TThreadTask::what, i, count, Data_))); \ + } \ + Q_.Stop(); \ + bool b = Data_.failed; \ + Data_.failed = false; \ + UNIT_ASSERT(!b); + + inline void TestBasics() { + RUN_CYCLE(RunBasics, 3); + } + + inline void TestBasicsWithPredicate() { + RUN_CYCLE(RunBasicsWithPredicate, 3); + } + + inline void TestSyncronize() { + RUN_CYCLE(RunSyncronize, 6); + } + + inline void TestSyncronizeWithPredicate() { + RUN_CYCLE(RunSyncronizeWithPredicate, 6); + } +#undef RUN_CYCLE + TSharedData Data_; + TThreadPool Q_; +}; + +UNIT_TEST_SUITE_REGISTRATION(TCondVarTest); |