diff options
author | agri <agri@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:12 +0300 |
commit | 2909866fbc652492b7d7cab3023cb19489dc4fd8 (patch) | |
tree | b222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/threading/queue/queue_ut.cpp | |
parent | d3530b2692e400bd4d29bd4f07cafaee139164e7 (diff) | |
download | ydb-2909866fbc652492b7d7cab3023cb19489dc4fd8.tar.gz |
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/queue/queue_ut.cpp')
-rw-r--r-- | library/cpp/threading/queue/queue_ut.cpp | 434 |
1 files changed, 217 insertions, 217 deletions
diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp index 8b36437034..80eca147da 100644 --- a/library/cpp/threading/queue/queue_ut.cpp +++ b/library/cpp/threading/queue/queue_ut.cpp @@ -1,242 +1,242 @@ #include <library/cpp/testing/unittest/registar.h> -#include <util/system/thread.h> - -#include "ut_helpers.h" - -typedef void* TMsgLink; - +#include <util/system/thread.h> + +#include "ut_helpers.h" + +typedef void* TMsgLink; + template <typename TQueueType> -class TQueueTestProcs: public TTestBase { -private: +class TQueueTestProcs: public TTestBase { +private: UNIT_TEST_SUITE_DEMANGLE(TQueueTestProcs<TQueueType>); - UNIT_TEST(Threads2_Push1M_Threads1_Pop2M) - UNIT_TEST(Threads4_Push1M_Threads1_Pop4M) - UNIT_TEST(Threads8_RndPush100K_Threads8_Queues) + UNIT_TEST(Threads2_Push1M_Threads1_Pop2M) + UNIT_TEST(Threads4_Push1M_Threads1_Pop4M) + UNIT_TEST(Threads8_RndPush100K_Threads8_Queues) /* - UNIT_TEST(Threads24_RndPush100K_Threads24_Queues) - UNIT_TEST(Threads24_RndPush100K_Threads8_Queues) - UNIT_TEST(Threads24_RndPush100K_Threads4_Queues) -*/ - UNIT_TEST_SUITE_END(); - -public: - void Push1M_Pop1M() { + UNIT_TEST(Threads24_RndPush100K_Threads24_Queues) + UNIT_TEST(Threads24_RndPush100K_Threads8_Queues) + UNIT_TEST(Threads24_RndPush100K_Threads4_Queues) +*/ + UNIT_TEST_SUITE_END(); + +public: + void Push1M_Pop1M() { TQueueType queue; - TMsgLink msg = &queue; - - auto pmsg = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - - for (int i = 0; i < 1000000; ++i) { - queue.Push((char*)msg + i); - } - - for (int i = 0; i < 1000000; ++i) { - auto popped = queue.Pop(); - UNIT_ASSERT_EQUAL((char*)msg + i, popped); - } - - pmsg = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - } - - void Threads2_Push1M_Threads1_Pop2M() { + TMsgLink msg = &queue; + + auto pmsg = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + + for (int i = 0; i < 1000000; ++i) { + queue.Push((char*)msg + i); + } + + for (int i = 0; i < 1000000; ++i) { + auto popped = queue.Pop(); + UNIT_ASSERT_EQUAL((char*)msg + i, popped); + } + + pmsg = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + } + + void Threads2_Push1M_Threads1_Pop2M() { TQueueType queue; - + class TPusherThread: public ISimpleThread { - public: + public: TPusherThread(TQueueType& theQueue, char* start) : Queue(theQueue) - , Arg(start) - { - } - + , Arg(start) + { + } + TQueueType& Queue; - char* Arg; - - void* ThreadProc() override { - for (int i = 0; i < 1000000; ++i) { - Queue.Push(Arg + i); - } - return nullptr; - } - }; - - TPusherThread pusher1(queue, (char*)&queue); - TPusherThread pusher2(queue, (char*)&queue + 2000000); - - pusher1.Start(); - pusher2.Start(); - - for (int i = 0; i < 2000000; ++i) { - while (queue.Pop() == nullptr) { - SpinLockPause(); - } - } - - auto pmsg = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - } - - void Threads4_Push1M_Threads1_Pop4M() { + char* Arg; + + void* ThreadProc() override { + for (int i = 0; i < 1000000; ++i) { + Queue.Push(Arg + i); + } + return nullptr; + } + }; + + TPusherThread pusher1(queue, (char*)&queue); + TPusherThread pusher2(queue, (char*)&queue + 2000000); + + pusher1.Start(); + pusher2.Start(); + + for (int i = 0; i < 2000000; ++i) { + while (queue.Pop() == nullptr) { + SpinLockPause(); + } + } + + auto pmsg = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + } + + void Threads4_Push1M_Threads1_Pop4M() { TQueueType queue; - + class TPusherThread: public ISimpleThread { - public: + public: TPusherThread(TQueueType& theQueue, char* start) : Queue(theQueue) - , Arg(start) - { - } - + , Arg(start) + { + } + TQueueType& Queue; - char* Arg; - - void* ThreadProc() override { - for (int i = 0; i < 1000000; ++i) { - Queue.Push(Arg + i); - } - return nullptr; - } - }; - - TPusherThread pusher1(queue, (char*)&queue); - TPusherThread pusher2(queue, (char*)&queue + 2000000); - TPusherThread pusher3(queue, (char*)&queue + 4000000); - TPusherThread pusher4(queue, (char*)&queue + 6000000); - - pusher1.Start(); - pusher2.Start(); - pusher3.Start(); - pusher4.Start(); - - for (int i = 0; i < 4000000; ++i) { - while (queue.Pop() == nullptr) { - SpinLockPause(); - } - } - - auto pmsg = queue.Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - } - - template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES> - void ManyRndPush100K_ManyQueues() { + char* Arg; + + void* ThreadProc() override { + for (int i = 0; i < 1000000; ++i) { + Queue.Push(Arg + i); + } + return nullptr; + } + }; + + TPusherThread pusher1(queue, (char*)&queue); + TPusherThread pusher2(queue, (char*)&queue + 2000000); + TPusherThread pusher3(queue, (char*)&queue + 4000000); + TPusherThread pusher4(queue, (char*)&queue + 6000000); + + pusher1.Start(); + pusher2.Start(); + pusher3.Start(); + pusher4.Start(); + + for (int i = 0; i < 4000000; ++i) { + while (queue.Pop() == nullptr) { + SpinLockPause(); + } + } + + auto pmsg = queue.Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + } + + template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES> + void ManyRndPush100K_ManyQueues() { TQueueType queue[NUMBER_OF_QUEUES]; - + class TPusherThread: public ISimpleThread { - public: + public: TPusherThread(TQueueType* queues, char* start) - : Queues(queues) - , Arg(start) - { - } - + : Queues(queues) + , Arg(start) + { + } + TQueueType* Queues; - char* Arg; - - void* ThreadProc() override { - ui64 counters[NUMBER_OF_QUEUES]; - for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { - counters[i] = 0; - } - - for (int i = 0; i < 100000; ++i) { - size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES; - int cookie = counters[rnd]++; - Queues[rnd].Push(Arg + cookie); - } - - for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { - Queues[i].Push((void*)2ULL); - } - - return nullptr; - } - }; - + char* Arg; + + void* ThreadProc() override { + ui64 counters[NUMBER_OF_QUEUES]; + for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { + counters[i] = 0; + } + + for (int i = 0; i < 100000; ++i) { + size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES; + int cookie = counters[rnd]++; + Queues[rnd].Push(Arg + cookie); + } + + for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { + Queues[i].Push((void*)2ULL); + } + + return nullptr; + } + }; + class TPopperThread: public ISimpleThread { - public: + public: TPopperThread(TQueueType* theQueue, char* base) : Queue(theQueue) - , Base(base) - { - } - + , Base(base) + { + } + TQueueType* Queue; - char* Base; - - void* ThreadProc() override { - ui64 counters[NUMBER_OF_PUSHERS]; - for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { - counters[i] = 0; - } - - for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) { - auto msg = Queue->Pop(); - if (msg == nullptr) { - SpinLockPause(); - continue; - } - if (msg == (void*)2ULL) { - ++fin; - continue; - } - ui64 shift = (char*)msg - Base; - auto pusherNum = shift / 200000000ULL; - auto msgNum = shift % 200000000ULL; - - UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum); - ++counters[pusherNum]; - } - - auto pmsg = Queue->Pop(); - UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); - - return nullptr; - } - }; - + char* Base; + + void* ThreadProc() override { + ui64 counters[NUMBER_OF_PUSHERS]; + for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { + counters[i] = 0; + } + + for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) { + auto msg = Queue->Pop(); + if (msg == nullptr) { + SpinLockPause(); + continue; + } + if (msg == (void*)2ULL) { + ++fin; + continue; + } + ui64 shift = (char*)msg - Base; + auto pusherNum = shift / 200000000ULL; + auto msgNum = shift % 200000000ULL; + + UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum); + ++counters[pusherNum]; + } + + auto pmsg = Queue->Pop(); + UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr); + + return nullptr; + } + }; + TVector<TAutoPtr<TPopperThread>> poppers; TVector<TAutoPtr<TPusherThread>> pushers; - - for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { - poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue)); - poppers.back()->Start(); - } - - for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { - pushers.emplace_back( - new TPusherThread(queue, (char*)&queue + 200000000ULL * i)); - pushers.back()->Start(); - } - - for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { - poppers[i]->Join(); - } - - for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { - pushers[i]->Join(); - } - } - - void Threads8_RndPush100K_Threads8_Queues() { - ManyRndPush100K_ManyQueues<8, 8>(); - } - - /* - void Threads24_RndPush100K_Threads24_Queues() { - ManyRndPush100K_ManyQueues<24, 24>(); - } - - void Threads24_RndPush100K_Threads8_Queues() { - ManyRndPush100K_ManyQueues<24, 8>(); - } - - void Threads24_RndPush100K_Threads4_Queues() { - ManyRndPush100K_ManyQueues<24, 4>(); - } - */ -}; - -REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs); + + for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { + poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue)); + poppers.back()->Start(); + } + + for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { + pushers.emplace_back( + new TPusherThread(queue, (char*)&queue + 200000000ULL * i)); + pushers.back()->Start(); + } + + for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) { + poppers[i]->Join(); + } + + for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) { + pushers[i]->Join(); + } + } + + void Threads8_RndPush100K_Threads8_Queues() { + ManyRndPush100K_ManyQueues<8, 8>(); + } + + /* + void Threads24_RndPush100K_Threads24_Queues() { + ManyRndPush100K_ManyQueues<24, 24>(); + } + + void Threads24_RndPush100K_Threads8_Queues() { + ManyRndPush100K_ManyQueues<24, 8>(); + } + + void Threads24_RndPush100K_Threads4_Queues() { + ManyRndPush100K_ManyQueues<24, 4>(); + } + */ +}; + +REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs); |