aboutsummaryrefslogtreecommitdiffstats
path: root/util/system/condvar_ut.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /util/system/condvar_ut.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'util/system/condvar_ut.cpp')
-rw-r--r--util/system/condvar_ut.cpp200
1 files changed, 200 insertions, 0 deletions
diff --git a/util/system/condvar_ut.cpp b/util/system/condvar_ut.cpp
new file mode 100644
index 0000000000..5130a18d32
--- /dev/null
+++ b/util/system/condvar_ut.cpp
@@ -0,0 +1,200 @@
+#include "mutex.h"
+#include "guard.h"
+#include "condvar.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/system/atomic.h>
+#include <util/system/atomic_ops.h>
+#include <util/thread/pool.h>
+
+class TCondVarTest: public TTestBase {
+ UNIT_TEST_SUITE(TCondVarTest);
+ UNIT_TEST(TestBasics)
+ UNIT_TEST(TestSyncronize)
+ UNIT_TEST_SUITE_END();
+
+ struct TSharedData {
+ TSharedData()
+ : stopWaiting(false)
+ , in(0)
+ , out(0)
+ , waited(0)
+ , failed(false)
+ {
+ }
+
+ TMutex mutex;
+ TCondVar condVar1;
+ TCondVar condVar2;
+
+ TAtomic stopWaiting;
+
+ TAtomic in;
+ TAtomic out;
+
+ TAtomic waited;
+
+ bool failed;
+ };
+
+ class TThreadTask: public IObjectInQueue {
+ public:
+ using PFunc = void (TThreadTask::*)(void);
+
+ TThreadTask(PFunc func, size_t id, size_t totalIds, TSharedData& data)
+ : Func_(func)
+ , Id_(id)
+ , TotalIds_(totalIds)
+ , Data_(data)
+ {
+ }
+
+ void Process(void*) override {
+ THolder<TThreadTask> This(this);
+
+ (this->*Func_)();
+ }
+
+#define FAIL_ASSERT(cond) \
+ if (!(cond)) { \
+ Data_.failed = true; \
+ }
+ void RunBasics() {
+ Y_ASSERT(TotalIds_ == 3);
+
+ if (Id_ < 2) {
+ TGuard<TMutex> guard(Data_.mutex);
+ while (!AtomicGet(Data_.stopWaiting)) {
+ bool res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1));
+ FAIL_ASSERT(res == true);
+ }
+ } else {
+ usleep(100000);
+ AtomicSet(Data_.stopWaiting, true);
+
+ TGuard<TMutex> guard(Data_.mutex);
+ Data_.condVar1.Signal();
+ Data_.condVar1.Signal();
+ }
+ }
+
+ void RunBasicsWithPredicate() {
+ Y_ASSERT(TotalIds_ == 3);
+
+ if (Id_ < 2) {
+ TGuard<TMutex> guard(Data_.mutex);
+ const auto res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1), [&] {
+ return AtomicGet(Data_.stopWaiting);
+ });
+ FAIL_ASSERT(res == true);
+ } else {
+ usleep(100000);
+ AtomicSet(Data_.stopWaiting, true);
+
+ TGuard<TMutex> guard(Data_.mutex);
+ Data_.condVar1.Signal();
+ Data_.condVar1.Signal();
+ }
+ }
+
+ void RunSyncronize() {
+ for (size_t i = 0; i < 10; ++i) {
+ TGuard<TMutex> guard(Data_.mutex);
+ AtomicIncrement(Data_.in);
+ if (AtomicGet(Data_.in) == TotalIds_) {
+ AtomicSet(Data_.out, 0);
+ Data_.condVar1.BroadCast();
+ } else {
+ AtomicIncrement(Data_.waited);
+ while (AtomicGet(Data_.in) < TotalIds_) {
+ bool res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1));
+ FAIL_ASSERT(res == true);
+ }
+ }
+
+ AtomicIncrement(Data_.out);
+ if (AtomicGet(Data_.out) == TotalIds_) {
+ AtomicSet(Data_.in, 0);
+ Data_.condVar2.BroadCast();
+ } else {
+ while (AtomicGet(Data_.out) < TotalIds_) {
+ bool res = Data_.condVar2.WaitT(Data_.mutex, TDuration::Seconds(1));
+ FAIL_ASSERT(res == true);
+ }
+ }
+ }
+
+ FAIL_ASSERT(AtomicGet(Data_.waited) == (TotalIds_ - 1) * 10);
+ }
+
+ void RunSyncronizeWithPredicate() {
+ for (size_t i = 0; i < 10; ++i) {
+ TGuard<TMutex> guard(Data_.mutex);
+ AtomicIncrement(Data_.in);
+ if (AtomicGet(Data_.in) == TotalIds_) {
+ AtomicSet(Data_.out, 0);
+ Data_.condVar1.BroadCast();
+ } else {
+ AtomicIncrement(Data_.waited);
+ const auto res = Data_.condVar1.WaitT(Data_.mutex, TDuration::Seconds(1), [&] {
+ return AtomicGet(Data_.in) >= TotalIds_;
+ });
+ FAIL_ASSERT(res == true);
+ }
+
+ AtomicIncrement(Data_.out);
+ if (AtomicGet(Data_.out) == TotalIds_) {
+ AtomicSet(Data_.in, 0);
+ Data_.condVar2.BroadCast();
+ } else {
+ const auto res = Data_.condVar2.WaitT(Data_.mutex, TDuration::Seconds(1), [&] {
+ return AtomicGet(Data_.out) >= TotalIds_;
+ });
+ FAIL_ASSERT(res == true);
+ }
+ }
+
+ FAIL_ASSERT(Data_.waited == (TotalIds_ - 1) * 10);
+ }
+#undef FAIL_ASSERT
+
+ private:
+ PFunc Func_;
+ size_t Id_;
+ TAtomicBase TotalIds_;
+ TSharedData& Data_;
+ };
+
+private:
+#define RUN_CYCLE(what, count) \
+ Q_.Start(count); \
+ for (size_t i = 0; i < count; ++i) { \
+ UNIT_ASSERT(Q_.Add(new TThreadTask(&TThreadTask::what, i, count, Data_))); \
+ } \
+ Q_.Stop(); \
+ bool b = Data_.failed; \
+ Data_.failed = false; \
+ UNIT_ASSERT(!b);
+
+ inline void TestBasics() {
+ RUN_CYCLE(RunBasics, 3);
+ }
+
+ inline void TestBasicsWithPredicate() {
+ RUN_CYCLE(RunBasicsWithPredicate, 3);
+ }
+
+ inline void TestSyncronize() {
+ RUN_CYCLE(RunSyncronize, 6);
+ }
+
+ inline void TestSyncronizeWithPredicate() {
+ RUN_CYCLE(RunSyncronizeWithPredicate, 6);
+ }
+#undef RUN_CYCLE
+ TSharedData Data_;
+ TThreadPool Q_;
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TCondVarTest);