path: root/library/cpp/threading/queue/queue_ut.cpp
diff options
authoragri <agri@yandex-team.ru>2022-02-10 16:48:12 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:12 +0300
commitd3530b2692e400bd4d29bd4f07cafaee139164e7 (patch)
treeb7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/threading/queue/queue_ut.cpp
parent0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff)
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/queue/queue_ut.cpp')
1 files changed, 217 insertions, 217 deletions
diff --git a/library/cpp/threading/queue/queue_ut.cpp b/library/cpp/threading/queue/queue_ut.cpp
index 80eca147da..8b36437034 100644
--- a/library/cpp/threading/queue/queue_ut.cpp
+++ b/library/cpp/threading/queue/queue_ut.cpp
@@ -1,242 +1,242 @@
#include <library/cpp/testing/unittest/registar.h>
-#include <util/system/thread.h>
-#include "ut_helpers.h"
-typedef void* TMsgLink;
+#include <util/system/thread.h>
+#include "ut_helpers.h"
+typedef void* TMsgLink;
template <typename TQueueType>
-class TQueueTestProcs: public TTestBase {
+class TQueueTestProcs: public TTestBase {
- UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)
- UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)
- UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)
+ UNIT_TEST(Threads2_Push1M_Threads1_Pop2M)
+ UNIT_TEST(Threads4_Push1M_Threads1_Pop4M)
+ UNIT_TEST(Threads8_RndPush100K_Threads8_Queues)
- UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)
- UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)
- UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)
- void Push1M_Pop1M() {
+ UNIT_TEST(Threads24_RndPush100K_Threads24_Queues)
+ UNIT_TEST(Threads24_RndPush100K_Threads8_Queues)
+ UNIT_TEST(Threads24_RndPush100K_Threads4_Queues)
+ void Push1M_Pop1M() {
TQueueType queue;
- TMsgLink msg = &queue;
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- for (int i = 0; i < 1000000; ++i) {
- queue.Push((char*)msg + i);
- }
- for (int i = 0; i < 1000000; ++i) {
- auto popped = queue.Pop();
- UNIT_ASSERT_EQUAL((char*)msg + i, popped);
- }
- pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
- void Threads2_Push1M_Threads1_Pop2M() {
+ TMsgLink msg = &queue;
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ for (int i = 0; i < 1000000; ++i) {
+ queue.Push((char*)msg + i);
+ }
+ for (int i = 0; i < 1000000; ++i) {
+ auto popped = queue.Pop();
+ UNIT_ASSERT_EQUAL((char*)msg + i, popped);
+ }
+ pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+ void Threads2_Push1M_Threads1_Pop2M() {
TQueueType queue;
class TPusherThread: public ISimpleThread {
- public:
+ public:
TPusherThread(TQueueType& theQueue, char* start)
: Queue(theQueue)
- , Arg(start)
- {
- }
+ , Arg(start)
+ {
+ }
TQueueType& Queue;
- char* Arg;
- void* ThreadProc() override {
- for (int i = 0; i < 1000000; ++i) {
- Queue.Push(Arg + i);
- }
- return nullptr;
- }
- };
- TPusherThread pusher1(queue, (char*)&queue);
- TPusherThread pusher2(queue, (char*)&queue + 2000000);
- pusher1.Start();
- pusher2.Start();
- for (int i = 0; i < 2000000; ++i) {
- while (queue.Pop() == nullptr) {
- SpinLockPause();
- }
- }
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
- void Threads4_Push1M_Threads1_Pop4M() {
+ char* Arg;
+ void* ThreadProc() override {
+ for (int i = 0; i < 1000000; ++i) {
+ Queue.Push(Arg + i);
+ }
+ return nullptr;
+ }
+ };
+ TPusherThread pusher1(queue, (char*)&queue);
+ TPusherThread pusher2(queue, (char*)&queue + 2000000);
+ pusher1.Start();
+ pusher2.Start();
+ for (int i = 0; i < 2000000; ++i) {
+ while (queue.Pop() == nullptr) {
+ SpinLockPause();
+ }
+ }
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+ void Threads4_Push1M_Threads1_Pop4M() {
TQueueType queue;
class TPusherThread: public ISimpleThread {
- public:
+ public:
TPusherThread(TQueueType& theQueue, char* start)
: Queue(theQueue)
- , Arg(start)
- {
- }
+ , Arg(start)
+ {
+ }
TQueueType& Queue;
- char* Arg;
- void* ThreadProc() override {
- for (int i = 0; i < 1000000; ++i) {
- Queue.Push(Arg + i);
- }
- return nullptr;
- }
- };
- TPusherThread pusher1(queue, (char*)&queue);
- TPusherThread pusher2(queue, (char*)&queue + 2000000);
- TPusherThread pusher3(queue, (char*)&queue + 4000000);
- TPusherThread pusher4(queue, (char*)&queue + 6000000);
- pusher1.Start();
- pusher2.Start();
- pusher3.Start();
- pusher4.Start();
- for (int i = 0; i < 4000000; ++i) {
- while (queue.Pop() == nullptr) {
- SpinLockPause();
- }
- }
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- }
- template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES>
- void ManyRndPush100K_ManyQueues() {
+ char* Arg;
+ void* ThreadProc() override {
+ for (int i = 0; i < 1000000; ++i) {
+ Queue.Push(Arg + i);
+ }
+ return nullptr;
+ }
+ };
+ TPusherThread pusher1(queue, (char*)&queue);
+ TPusherThread pusher2(queue, (char*)&queue + 2000000);
+ TPusherThread pusher3(queue, (char*)&queue + 4000000);
+ TPusherThread pusher4(queue, (char*)&queue + 6000000);
+ pusher1.Start();
+ pusher2.Start();
+ pusher3.Start();
+ pusher4.Start();
+ for (int i = 0; i < 4000000; ++i) {
+ while (queue.Pop() == nullptr) {
+ SpinLockPause();
+ }
+ }
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ }
+ template <size_t NUMBER_OF_PUSHERS, size_t NUMBER_OF_QUEUES>
+ void ManyRndPush100K_ManyQueues() {
TQueueType queue[NUMBER_OF_QUEUES];
class TPusherThread: public ISimpleThread {
- public:
+ public:
TPusherThread(TQueueType* queues, char* start)
- : Queues(queues)
- , Arg(start)
- {
- }
+ : Queues(queues)
+ , Arg(start)
+ {
+ }
TQueueType* Queues;
- char* Arg;
- void* ThreadProc() override {
- ui64 counters[NUMBER_OF_QUEUES];
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- counters[i] = 0;
- }
- for (int i = 0; i < 100000; ++i) {
- size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES;
- int cookie = counters[rnd]++;
- Queues[rnd].Push(Arg + cookie);
- }
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- Queues[i].Push((void*)2ULL);
- }
- return nullptr;
- }
- };
+ char* Arg;
+ void* ThreadProc() override {
+ ui64 counters[NUMBER_OF_QUEUES];
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ counters[i] = 0;
+ }
+ for (int i = 0; i < 100000; ++i) {
+ size_t rnd = GetCycleCount() % NUMBER_OF_QUEUES;
+ int cookie = counters[rnd]++;
+ Queues[rnd].Push(Arg + cookie);
+ }
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ Queues[i].Push((void*)2ULL);
+ }
+ return nullptr;
+ }
+ };
class TPopperThread: public ISimpleThread {
- public:
+ public:
TPopperThread(TQueueType* theQueue, char* base)
: Queue(theQueue)
- , Base(base)
- {
- }
+ , Base(base)
+ {
+ }
TQueueType* Queue;
- char* Base;
- void* ThreadProc() override {
- ui64 counters[NUMBER_OF_PUSHERS];
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- counters[i] = 0;
- }
- for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) {
- auto msg = Queue->Pop();
- if (msg == nullptr) {
- SpinLockPause();
- continue;
- }
- if (msg == (void*)2ULL) {
- ++fin;
- continue;
- }
- ui64 shift = (char*)msg - Base;
- auto pusherNum = shift / 200000000ULL;
- auto msgNum = shift % 200000000ULL;
- UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum);
- ++counters[pusherNum];
- }
- auto pmsg = Queue->Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
- return nullptr;
- }
- };
+ char* Base;
+ void* ThreadProc() override {
+ ui64 counters[NUMBER_OF_PUSHERS];
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ counters[i] = 0;
+ }
+ for (size_t fin = 0; fin < NUMBER_OF_PUSHERS;) {
+ auto msg = Queue->Pop();
+ if (msg == nullptr) {
+ SpinLockPause();
+ continue;
+ }
+ if (msg == (void*)2ULL) {
+ ++fin;
+ continue;
+ }
+ ui64 shift = (char*)msg - Base;
+ auto pusherNum = shift / 200000000ULL;
+ auto msgNum = shift % 200000000ULL;
+ UNIT_ASSERT_EQUAL(counters[pusherNum], msgNum);
+ ++counters[pusherNum];
+ }
+ auto pmsg = Queue->Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+ return nullptr;
+ }
+ };
TVector<TAutoPtr<TPopperThread>> poppers;
TVector<TAutoPtr<TPusherThread>> pushers;
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue));
- poppers.back()->Start();
- }
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- pushers.emplace_back(
- new TPusherThread(queue, (char*)&queue + 200000000ULL * i));
- pushers.back()->Start();
- }
- for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
- poppers[i]->Join();
- }
- for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
- pushers[i]->Join();
- }
- }
- void Threads8_RndPush100K_Threads8_Queues() {
- ManyRndPush100K_ManyQueues<8, 8>();
- }
- /*
- void Threads24_RndPush100K_Threads24_Queues() {
- ManyRndPush100K_ManyQueues<24, 24>();
- }
- void Threads24_RndPush100K_Threads8_Queues() {
- ManyRndPush100K_ManyQueues<24, 8>();
- }
- void Threads24_RndPush100K_Threads4_Queues() {
- ManyRndPush100K_ManyQueues<24, 4>();
- }
- */
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ poppers.emplace_back(new TPopperThread(&queue[i], (char*)&queue));
+ poppers.back()->Start();
+ }
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ pushers.emplace_back(
+ new TPusherThread(queue, (char*)&queue + 200000000ULL * i));
+ pushers.back()->Start();
+ }
+ for (size_t i = 0; i < NUMBER_OF_QUEUES; ++i) {
+ poppers[i]->Join();
+ }
+ for (size_t i = 0; i < NUMBER_OF_PUSHERS; ++i) {
+ pushers[i]->Join();
+ }
+ }
+ void Threads8_RndPush100K_Threads8_Queues() {
+ ManyRndPush100K_ManyQueues<8, 8>();
+ }
+ /*
+ void Threads24_RndPush100K_Threads24_Queues() {
+ ManyRndPush100K_ManyQueues<24, 24>();
+ }
+ void Threads24_RndPush100K_Threads8_Queues() {
+ ManyRndPush100K_ManyQueues<24, 8>();
+ }
+ void Threads24_RndPush100K_Threads4_Queues() {
+ ManyRndPush100K_ManyQueues<24, 4>();
+ }
+ */