aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/queue/unordered_ut.cpp
diff options
context:
space:
mode:
authoragri <agri@yandex-team.ru>2022-02-10 16:48:12 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:48:12 +0300
commitd3530b2692e400bd4d29bd4f07cafaee139164e7 (patch)
treeb7ae636a74490e649a2ed0fdd5361f1bec83b9f9 /library/cpp/threading/queue/unordered_ut.cpp
parent0f4c5d1e8c0672bf0a1f2f2d8acac5ba24772435 (diff)
downloadydb-d3530b2692e400bd4d29bd4f07cafaee139164e7.tar.gz
Restoring authorship annotation for <agri@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/threading/queue/unordered_ut.cpp')
-rw-r--r--library/cpp/threading/queue/unordered_ut.cpp274
1 files changed, 137 insertions, 137 deletions
diff --git a/library/cpp/threading/queue/unordered_ut.cpp b/library/cpp/threading/queue/unordered_ut.cpp
index a43b7f520e..2018538bf7 100644
--- a/library/cpp/threading/queue/unordered_ut.cpp
+++ b/library/cpp/threading/queue/unordered_ut.cpp
@@ -1,154 +1,154 @@
#include <library/cpp/testing/unittest/registar.h>
-#include <util/system/thread.h>
-#include <algorithm>
-#include <util/generic/vector.h>
-#include <util/random/fast.h>
-
-#include "ut_helpers.h"
-
+#include <util/system/thread.h>
+#include <algorithm>
+#include <util/generic/vector.h>
+#include <util/random/fast.h>
+
+#include "ut_helpers.h"
+
template <typename TQueueType>
-class TTestUnorderedQueue: public TTestBase {
-private:
- using TLink = TIntrusiveLink;
-
+class TTestUnorderedQueue: public TTestBase {
+private:
+ using TLink = TIntrusiveLink;
+
UNIT_TEST_SUITE_DEMANGLE(TTestUnorderedQueue<TQueueType>);
- UNIT_TEST(Push1M_Pop1M_Unordered)
- UNIT_TEST_SUITE_END();
-
-public:
- void Push1M_Pop1M_Unordered() {
- constexpr int REPEAT = 1000000;
+ UNIT_TEST(Push1M_Pop1M_Unordered)
+ UNIT_TEST_SUITE_END();
+
+public:
+ void Push1M_Pop1M_Unordered() {
+ constexpr int REPEAT = 1000000;
TQueueType queue;
- TLink msg[REPEAT];
-
- auto pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
-
- for (int i = 0; i < REPEAT; ++i) {
- queue.Push(&msg[i]);
- }
-
+ TLink msg[REPEAT];
+
+ auto pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+
+ for (int i = 0; i < REPEAT; ++i) {
+ queue.Push(&msg[i]);
+ }
+
TVector<TLink*> popped;
- popped.reserve(REPEAT);
- for (int i = 0; i < REPEAT; ++i) {
- popped.push_back((TLink*)queue.Pop());
- }
-
- pmsg = queue.Pop();
- UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
-
- std::sort(popped.begin(), popped.end());
- for (int i = 0; i < REPEAT; ++i) {
- UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]);
- }
- }
-};
-
+ popped.reserve(REPEAT);
+ for (int i = 0; i < REPEAT; ++i) {
+ popped.push_back((TLink*)queue.Pop());
+ }
+
+ pmsg = queue.Pop();
+ UNIT_ASSERT_VALUES_EQUAL(pmsg, nullptr);
+
+ std::sort(popped.begin(), popped.end());
+ for (int i = 0; i < REPEAT; ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(&msg[i], popped[i]);
+ }
+ }
+};
+
template <typename TQueueType>
-class TTestWeakQueue: public TTestBase {
-private:
+class TTestWeakQueue: public TTestBase {
+private:
UNIT_TEST_SUITE_DEMANGLE(TTestWeakQueue<TQueueType>);
- UNIT_TEST(Threads8_Rnd_Exchange)
- UNIT_TEST_SUITE_END();
-
-public:
- template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000>
- void ManyThreadsRndExchange() {
+ UNIT_TEST(Threads8_Rnd_Exchange)
+ UNIT_TEST_SUITE_END();
+
+public:
+ template <ui16 COUNT = 48, ui32 MSG_COUNT = 10000>
+ void ManyThreadsRndExchange() {
TQueueType queues[COUNT];
-
+
class TWorker: public ISimpleThread {
- public:
- TWorker(
+ public:
+ TWorker(
TQueueType* queues_,
ui16 mine,
TAtomic* pushDone)
- : Queues(queues_)
- , MineQueue(mine)
- , PushDone(pushDone)
- {
- }
-
+ : Queues(queues_)
+ , MineQueue(mine)
+ , PushDone(pushDone)
+ {
+ }
+
TQueueType* Queues;
- ui16 MineQueue;
+ ui16 MineQueue;
TVector<uintptr_t> Received;
- TAtomic* PushDone;
-
- void* ThreadProc() override {
- TReallyFastRng32 rng(GetCycleCount());
- Received.reserve(MSG_COUNT * 2);
-
- for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) {
- for (;;) {
- auto msg = Queues[MineQueue].Pop();
- if (msg == nullptr) {
- break;
- }
-
- Received.push_back((uintptr_t)msg);
- }
-
- ui16 rnd = rng.GenRand64() % COUNT;
- ui64 msg = ((ui64)MineQueue << 32) + loop;
- while (!Queues[rnd].Push((void*)msg)) {
- }
- }
-
- AtomicIncrement(*PushDone);
-
- for (;;) {
- bool isItLast = AtomicGet(*PushDone) == COUNT;
- auto msg = Queues[MineQueue].Pop();
- if (msg != nullptr) {
- Received.push_back((uintptr_t)msg);
- } else {
- if (isItLast) {
- break;
- }
- SpinLockPause();
- }
- }
-
- for (ui64 last = 0;;) {
- auto msg = Queues[MineQueue].UnsafeScanningPop(&last);
- if (msg == nullptr) {
- break;
- }
- Received.push_back((uintptr_t)msg);
- }
-
- return nullptr;
- }
- };
-
+ TAtomic* PushDone;
+
+ void* ThreadProc() override {
+ TReallyFastRng32 rng(GetCycleCount());
+ Received.reserve(MSG_COUNT * 2);
+
+ for (ui32 loop = 1; loop <= MSG_COUNT; ++loop) {
+ for (;;) {
+ auto msg = Queues[MineQueue].Pop();
+ if (msg == nullptr) {
+ break;
+ }
+
+ Received.push_back((uintptr_t)msg);
+ }
+
+ ui16 rnd = rng.GenRand64() % COUNT;
+ ui64 msg = ((ui64)MineQueue << 32) + loop;
+ while (!Queues[rnd].Push((void*)msg)) {
+ }
+ }
+
+ AtomicIncrement(*PushDone);
+
+ for (;;) {
+ bool isItLast = AtomicGet(*PushDone) == COUNT;
+ auto msg = Queues[MineQueue].Pop();
+ if (msg != nullptr) {
+ Received.push_back((uintptr_t)msg);
+ } else {
+ if (isItLast) {
+ break;
+ }
+ SpinLockPause();
+ }
+ }
+
+ for (ui64 last = 0;;) {
+ auto msg = Queues[MineQueue].UnsafeScanningPop(&last);
+ if (msg == nullptr) {
+ break;
+ }
+ Received.push_back((uintptr_t)msg);
+ }
+
+ return nullptr;
+ }
+ };
+
TVector<TAutoPtr<TWorker>> workers;
- TAtomic pushDone = 0;
-
- for (ui32 i = 0; i < COUNT; ++i) {
- workers.emplace_back(new TWorker(&queues[0], i, &pushDone));
- workers.back()->Start();
- }
-
+ TAtomic pushDone = 0;
+
+ for (ui32 i = 0; i < COUNT; ++i) {
+ workers.emplace_back(new TWorker(&queues[0], i, &pushDone));
+ workers.back()->Start();
+ }
+
TVector<uintptr_t> all;
- for (ui32 i = 0; i < COUNT; ++i) {
- workers[i]->Join();
- all.insert(all.begin(),
+ for (ui32 i = 0; i < COUNT; ++i) {
+ workers[i]->Join();
+ all.insert(all.begin(),
workers[i]->Received.begin(), workers[i]->Received.end());
- }
-
- std::sort(all.begin(), all.end());
- auto iter = all.begin();
- for (ui32 i = 0; i < COUNT; ++i) {
- for (ui32 k = 1; k <= MSG_COUNT; ++k) {
- UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter);
- ++iter;
- }
- }
- }
-
- void Threads8_Rnd_Exchange() {
- ManyThreadsRndExchange<8>();
- }
-};
-
-REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TTestUnorderedQueue);
-UNIT_TEST_SUITE_REGISTRATION(TTestWeakQueue<TMPMCUnorderedRing>);
+ }
+
+ std::sort(all.begin(), all.end());
+ auto iter = all.begin();
+ for (ui32 i = 0; i < COUNT; ++i) {
+ for (ui32 k = 1; k <= MSG_COUNT; ++k) {
+ UNIT_ASSERT_VALUES_EQUAL(((ui64)i << 32) + k, *iter);
+ ++iter;
+ }
+ }
+ }
+
+ void Threads8_Rnd_Exchange() {
+ ManyThreadsRndExchange<8>();
+ }
+};
+
+REGISTER_TESTS_FOR_ALL_UNORDERED_QUEUES(TTestUnorderedQueue);
+UNIT_TEST_SUITE_REGISTRATION(TTestWeakQueue<TMPMCUnorderedRing>);