aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/queue/queue_ut.cpp
diff options
context:
space:
mode:
authoragri <agri@yandex-team.ru>2022-02-10 16:48:12 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:12 +0300
commit2909866fbc652492b7d7cab3023cb19489dc4fd8 (patch)
treeb222e5ac2e2e98872661c51ccceee5da0d291e13 /library/cpp/threading/queue/queue_ut.cpp
parentd3530b2692e400bd4d29bd4f07cafaee139164e7 (diff)
downloadydb-2909866fbc652492b7d7cab3023cb19489dc4fd8.tar.gz
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/queue/queue_ut.cpp')
-rw-r--r--library/cpp/threading/queue/queue_ut.cpp434
1 files changed, 217 insertions, 217 deletions
diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp
index 8b36437034..80eca147da 100644
--- a/library/cpp/threading/queue/queue_ut.cpp
+++ b/library/cpp/threading/queue/queue_ut.cpp
@@ -1,242 +1,242 @@
#include <library/cpp/testing/unittest/registar.h>
-#include <util/system/thread.h>
-
-#include "ut_helpers.h"
-
-typedef void* TMsgLink;
-
+#include <util/system/thread.h>
+
+#include "ut_helpers.h"
+
+typedef void* TMsgLink;
+
template <typename TQueueType>
-class TQueueTestProcs: public TTestBase {
-private:
+class TQueueTestProcs: public TTestBase {
+private:
UNIT_TEST_SUITE_DEMANGLE(TQueueTestProcs<TQueueType>);
- UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)
- UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)
- UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)
+ UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)
+ UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)
+ UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)
/*
- UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)
- UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)
- UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)
-*/
- UNIT_TEST_SUITE_END();
-
-public:
- void Push1M_Pop1M() {
+ UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)
+ UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)
+ UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)
+*/
+ UNIT_TEST_SUITE_END();
+
+public:
+ void Push1M_Pop1M() {
TQueueType queue;
- TMsgLink msg = &queue;
-
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
-
- for (int i = 0; i < 1000000; ++i) {
- queue.Push((char*)msg + i);
- }
-
- for (int i = 0; i < 1000000; ++i) {
- auto popped = queue.Pop();
- UNIT_ASSERT_EQUAL((char*)msg + i, popped);
- }
-
- pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
-
- void Threads2_Push1M_Threads1_Pop2M() {
+ TMsgLink msg = &queue;
+
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+
+ for (int i = 0; i < 1000000; ++i) {
+ queue.Push((char*)msg + i);
+ }
+
+ for (int i = 0; i < 1000000; ++i) {
+ auto popped = queue.Pop();
+ UNIT_ASSERT_EQUAL((char*)msg + i, popped);
+ }
+
+ pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+
+ void Threads2_Push1M_Threads1_Pop2M() {
TQueueType queue;
-
+
class TPusherThread: public ISimpleThread {
- public:
+ public:
TPusherThread(TQueueType& theQueue, char* start)
: Queue(theQueue)
- , Arg(start)
- {
- }
-
+ , Arg(start)
+ {
+ }
+
TQueueType& Queue;
- char* Arg;
-
- void* ThreadProc() override {
- for (int i = 0; i < 1000000; ++i) {
- Queue.Push(Arg + i);
- }
- return nullptr;
- }
- };
-
- TPusherThread pusher1(queue, (char*)&queue);
- TPusherThread pusher2(queue, (char*)&queue + 2000000);
-
- pusher1.Start();
- pusher2.Start();
-
- for (int i = 0; i < 2000000; ++i) {
- while (queue.Pop() == nullptr) {
- SpinLockPause();
- }
- }
-
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
-
- void Threads4_Push1M_Threads1_Pop4M() {
+ char* Arg;
+
+ void* ThreadProc() override {
+ for (int i = 0; i < 1000000; ++i) {
+ Queue.Push(Arg + i);
+ }
+ return nullptr;
+ }
+ };
+
+ TPusherThread pusher1(queue, (char*)&queue);
+ TPusherThread pusher2(queue, (char*)&queue + 2000000);
+
+ pusher1.Start();
+ pusher2.Start();
+
+ for (int i = 0; i < 2000000; ++i) {
+ while (queue.Pop() == nullptr) {
+ SpinLockPause();
+ }
+ }
+
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+
+ void Threads4_Push1M_Threads1_Pop4M() {
TQueueType queue;
-
+
class TPusherThread: public ISimpleThread {
- public:
+ public:
TPusherThread(TQueueType& theQueue, char* start)
: Queue(theQueue)
- , Arg(start)
- {
- }
-
+ , Arg(start)
+ {
+ }
+
TQueueType& Queue;
- char* Arg;
-
- void* ThreadProc() override {
- for (int i = 0; i < 1000000; ++i) {
- Queue.Push(Arg + i);
- }
- return nullptr;
- }
- };
-
- TPusherThread pusher1(queue, (char*)&queue);
- TPusherThread pusher2(queue, (char*)&queue + 2000000);
- TPusherThread pusher3(queue, (char*)&queue + 4000000);
- TPusherThread pusher4(queue, (char*)&queue + 6000000);
-
- pusher1.Start();
- pusher2.Start();
- pusher3.Start();
- pusher4.Start();
-
- for (int i = 0; i < 4000000; ++i) {
- while (queue.Pop() == nullptr) {
- SpinLockPause();
- }
- }
-
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
-
- template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES>
- void ManyRndPush100K_ManyQueues() {
+ char* Arg;
+
+ void* ThreadProc() override {
+ for (int i = 0; i < 1000000; ++i) {
+ Queue.Push(Arg + i);
+ }
+ return nullptr;
+ }
+ };
+
+ TPusherThread pusher1(queue, (char*)&queue);
+ TPusherThread pusher2(queue, (char*)&queue + 2000000);
+ TPusherThread pusher3(queue, (char*)&queue + 4000000);
+ TPusherThread pusher4(queue, (char*)&queue + 6000000);
+
+ pusher1.Start();
+ pusher2.Start();
+ pusher3.Start();
+ pusher4.Start();
+
+ for (int i = 0; i < 4000000; ++i) {
+ while (queue.Pop() == nullptr) {
+ SpinLockPause();
+ }
+ }
+
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+
+ template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES>
+ void ManyRndPush100K_ManyQueues() {
TQueueType queue[NUMBER_OF_QUEUES];
-
+
class TPusherThread: public ISimpleThread {
- public:
+ public:
TPusherThread(TQueueType* queues, char* start)
- : Queues(queues)
- , Arg(start)
- {
- }
-
+ : Queues(queues)
+ , Arg(start)
+ {
+ }
+
TQueueType* Queues;
- char* Arg;
-
- void* ThreadProc() override {
- ui64 counters[NUMBER_OF_QUEUES];
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- counters[i] = 0;
- }
-
- for (int i = 0; i < 100000; ++i) {
- size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES;
- int cookie = counters[rnd]++;
- Queues[rnd].Push(Arg + cookie);
- }
-
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- Queues[i].Push((void*)2ULL);
- }
-
- return nullptr;
- }
- };
-
+ char* Arg;
+
+ void* ThreadProc() override {
+ ui64 counters[NUMBER_OF_QUEUES];
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ counters[i] = 0;
+ }
+
+ for (int i = 0; i < 100000; ++i) {
+ size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES;
+ int cookie = counters[rnd]++;
+ Queues[rnd].Push(Arg + cookie);
+ }
+
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ Queues[i].Push((void*)2ULL);
+ }
+
+ return nullptr;
+ }
+ };
+
class TPopperThread: public ISimpleThread {
- public:
+ public:
TPopperThread(TQueueType* theQueue, char* base)
: Queue(theQueue)
- , Base(base)
- {
- }
-
+ , Base(base)
+ {
+ }
+
TQueueType* Queue;
- char* Base;
-
- void* ThreadProc() override {
- ui64 counters[NUMBER_OF_PUSHERS];
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- counters[i] = 0;
- }
-
- for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) {
- auto msg = Queue->Pop();
- if (msg == nullptr) {
- SpinLockPause();
- continue;
- }
- if (msg == (void*)2ULL) {
- ++fin;
- continue;
- }
- ui64 shift = (char*)msg - Base;
- auto pusherNum = shift / 200000000ULL;
- auto msgNum = shift % 200000000ULL;
-
- UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum);
- ++counters[pusherNum];
- }
-
- auto pmsg = Queue->Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
-
- return nullptr;
- }
- };
-
+ char* Base;
+
+ void* ThreadProc() override {
+ ui64 counters[NUMBER_OF_PUSHERS];
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ counters[i] = 0;
+ }
+
+ for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) {
+ auto msg = Queue->Pop();
+ if (msg == nullptr) {
+ SpinLockPause();
+ continue;
+ }
+ if (msg == (void*)2ULL) {
+ ++fin;
+ continue;
+ }
+ ui64 shift = (char*)msg - Base;
+ auto pusherNum = shift / 200000000ULL;
+ auto msgNum = shift % 200000000ULL;
+
+ UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum);
+ ++counters[pusherNum];
+ }
+
+ auto pmsg = Queue->Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+
+ return nullptr;
+ }
+ };
+
TVector<TAutoPtr<TPopperThread>> poppers;
TVector<TAutoPtr<TPusherThread>> pushers;
-
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue));
- poppers.back()->Start();
- }
-
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- pushers.emplace_back(
- new TPusherThread(queue, (char*)&queue + 200000000ULL * i));
- pushers.back()->Start();
- }
-
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- poppers[i]->Join();
- }
-
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- pushers[i]->Join();
- }
- }
-
- void Threads8_RndPush100K_Threads8_Queues() {
- ManyRndPush100K_ManyQueues<8, 8>();
- }
-
- /*
- void Threads24_RndPush100K_Threads24_Queues() {
- ManyRndPush100K_ManyQueues<24, 24>();
- }
-
- void Threads24_RndPush100K_Threads8_Queues() {
- ManyRndPush100K_ManyQueues<24, 8>();
- }
-
- void Threads24_RndPush100K_Threads4_Queues() {
- ManyRndPush100K_ManyQueues<24, 4>();
- }
- */
-};
-
-REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs);
+
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue));
+ poppers.back()->Start();
+ }
+
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ pushers.emplace_back(
+ new TPusherThread(queue, (char*)&queue + 200000000ULL * i));
+ pushers.back()->Start();
+ }
+
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ poppers[i]->Join();
+ }
+
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ pushers[i]->Join();
+ }
+ }
+
+ void Threads8_RndPush100K_Threads8_Queues() {
+ ManyRndPush100K_ManyQueues<8, 8>();
+ }
+
+ /*
+ void Threads24_RndPush100K_Threads24_Queues() {
+ ManyRndPush100K_ManyQueues<24, 24>();
+ }
+
+ void Threads24_RndPush100K_Threads8_Queues() {
+ ManyRndPush100K_ManyQueues<24, 8>();
+ }
+
+ void Threads24_RndPush100K_Threads4_Queues() {
+ ManyRndPush100K_ManyQueues<24, 4>();
+ }
+ */
+};
+
+REGISTER_TESTS_FOR_ALL_ORDERED_QUEUES(TQueueTestProcs);