aboutsummaryrefslogtreecommitdiffstats
path: root/util/thread
diff options
context:
space:
mode:
authorbulatman <bulatman@yandex-team.ru>2022-02-10 16:45:50 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:50 +0300
commit6560e4993b14d193f8c879e33a3de5e5eba6e21d (patch)
treecfd2e2baa05c3196f2caacbb63c32e1df40bc3de /util/thread
parent7489e4682331202b9c7d863c0898eb83d7b12c2b (diff)
downloadydb-6560e4993b14d193f8c879e33a3de5e5eba6e21d.tar.gz
Restoring authorship annotation for <bulatman@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'util/thread')
-rw-r--r--util/thread/lfqueue.h10
-rw-r--r--util/thread/lfstack.h36
-rw-r--r--util/thread/lfstack_ut.cpp202
3 files changed, 124 insertions, 124 deletions
diff --git a/util/thread/lfqueue.h b/util/thread/lfqueue.h
index ab523631e4..fc606963d3 100644
--- a/util/thread/lfqueue.h
+++ b/util/thread/lfqueue.h
@@ -18,11 +18,11 @@ struct TDefaultLFCounter {
}
};
-// @brief lockfree queue
-// @tparam T - the queue element, should be movable
-// @tparam TCounter, a observer class to count number of items in queue
-// be carifull, IncCount and DecCount can be called on a moved object and
-// it is TCounter class responsibility to check validity of passed object
+// @brief lockfree queue
+// @tparam T - the queue element, should be movable
+// @tparam TCounter, a observer class to count number of items in queue
+// be carifull, IncCount and DecCount can be called on a moved object and
+// it is TCounter class responsibility to check validity of passed object
template <class T, class TCounter>
class TLockFreeQueue: public TNonCopyable {
struct TListNode {
diff --git a/util/thread/lfstack.h b/util/thread/lfstack.h
index ca3d95f3c3..168e2b1d24 100644
--- a/util/thread/lfstack.h
+++ b/util/thread/lfstack.h
@@ -12,10 +12,10 @@ class TLockFreeStack: TNonCopyable {
TNode* Next;
TNode() = default;
-
+
template <class U>
- explicit TNode(U&& val)
- : Value(std::forward<U>(val))
+ explicit TNode(U&& val)
+ : Value(std::forward<U>(val))
, Next(nullptr)
{
}
@@ -49,11 +49,11 @@ class TLockFreeStack: TNonCopyable {
break;
}
}
- template <class U>
- void EnqueueImpl(U&& u) {
+ template <class U>
+ void EnqueueImpl(U&& u) {
TNode* volatile node = new TNode(std::forward<U>(u));
- EnqueueImpl(node, node);
- }
+ EnqueueImpl(node, node);
+ }
public:
TLockFreeStack()
@@ -66,15 +66,15 @@ public:
EraseList(Head);
EraseList(FreePtr);
}
-
+
void Enqueue(const T& t) {
- EnqueueImpl(t);
+ EnqueueImpl(t);
}
-
- void Enqueue(T&& t) {
- EnqueueImpl(std::move(t));
- }
-
+
+ void Enqueue(T&& t) {
+ EnqueueImpl(std::move(t));
+ }
+
template <typename TCollection>
void EnqueueAll(const TCollection& data) {
EnqueueAll(data.begin(), data.end());
@@ -99,7 +99,7 @@ public:
AtomicAdd(DequeueCount, 1);
for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
if (AtomicCas(&Head, AtomicGet(current->Next), current)) {
- *res = std::move(current->Value);
+ *res = std::move(current->Value);
// delete current; // ABA problem
// even more complex node deletion
TryToFreeMemory();
@@ -129,7 +129,7 @@ public:
for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
if (AtomicCas(&Head, (TNode*)nullptr, current)) {
for (TNode* x = current; x;) {
- res->push_back(std::move(x->Value));
+ res->push_back(std::move(x->Value));
x = x->Next;
}
// EraseList(current); // ABA problem
@@ -159,7 +159,7 @@ public:
bool DequeueSingleConsumer(T* res) {
for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
if (AtomicCas(&Head, current->Next, current)) {
- *res = std::move(current->Value);
+ *res = std::move(current->Value);
delete current; // with single consumer thread ABA does not happen
return true;
}
@@ -173,7 +173,7 @@ public:
for (TNode* current = AtomicGet(Head); current; current = AtomicGet(Head)) {
if (AtomicCas(&Head, (TNode*)nullptr, current)) {
for (TNode* x = current; x;) {
- res->push_back(std::move(x->Value));
+ res->push_back(std::move(x->Value));
x = x->Next;
}
EraseList(current); // with single consumer thread ABA does not happen
diff --git a/util/thread/lfstack_ut.cpp b/util/thread/lfstack_ut.cpp
index e20a838f95..00ae2864d7 100644
--- a/util/thread/lfstack_ut.cpp
+++ b/util/thread/lfstack_ut.cpp
@@ -190,55 +190,55 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount());
}
- Y_UNIT_TEST(NoCopyTest) {
- static unsigned copied = 0;
- struct TCopyCount {
+ Y_UNIT_TEST(NoCopyTest) {
+ static unsigned copied = 0;
+ struct TCopyCount {
TCopyCount(int) {
}
TCopyCount(const TCopyCount&) {
++copied;
}
-
+
TCopyCount(TCopyCount&&) {
}
-
- TCopyCount& operator=(const TCopyCount&) {
- ++copied;
- return *this;
- }
-
- TCopyCount& operator=(TCopyCount&&) {
- return *this;
- }
- };
-
- TLockFreeStack<TCopyCount> stack;
- stack.Enqueue(TCopyCount(1));
- TCopyCount val(0);
- stack.Dequeue(&val);
- UNIT_ASSERT_VALUES_EQUAL(0, copied);
- }
-
- Y_UNIT_TEST(MoveOnlyTest) {
- TLockFreeStack<THolder<bool>> stack;
- stack.Enqueue(MakeHolder<bool>(true));
- THolder<bool> val;
- stack.Dequeue(&val);
- UNIT_ASSERT(val);
- UNIT_ASSERT_VALUES_EQUAL(true, *val);
- }
-
- template <class TTest>
- struct TMultiThreadTester {
- using ThisType = TMultiThreadTester<TTest>;
-
+
+ TCopyCount& operator=(const TCopyCount&) {
+ ++copied;
+ return *this;
+ }
+
+ TCopyCount& operator=(TCopyCount&&) {
+ return *this;
+ }
+ };
+
+ TLockFreeStack<TCopyCount> stack;
+ stack.Enqueue(TCopyCount(1));
+ TCopyCount val(0);
+ stack.Dequeue(&val);
+ UNIT_ASSERT_VALUES_EQUAL(0, copied);
+ }
+
+ Y_UNIT_TEST(MoveOnlyTest) {
+ TLockFreeStack<THolder<bool>> stack;
+ stack.Enqueue(MakeHolder<bool>(true));
+ THolder<bool> val;
+ stack.Dequeue(&val);
+ UNIT_ASSERT(val);
+ UNIT_ASSERT_VALUES_EQUAL(true, *val);
+ }
+
+ template <class TTest>
+ struct TMultiThreadTester {
+ using ThisType = TMultiThreadTester<TTest>;
+
size_t Threads;
size_t OperationsPerThread;
TCountDownLatch StartLatch;
- TLockFreeStack<typename TTest::ValueType> Stack;
+ TLockFreeStack<typename TTest::ValueType> Stack;
- TMultiThreadTester()
+ TMultiThreadTester()
: Threads(10)
, OperationsPerThread(100000)
, StartLatch(Threads)
@@ -249,23 +249,23 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
StartLatch.CountDown();
StartLatch.Await();
- TVector<typename TTest::ValueType> unused;
+ TVector<typename TTest::ValueType> unused;
for (size_t i = 0; i < OperationsPerThread; ++i) {
switch (GetCycleCount() % 4) {
case 0: {
- TTest::Enqueue(Stack, i);
+ TTest::Enqueue(Stack, i);
break;
}
case 1: {
- TTest::Dequeue(Stack);
+ TTest::Dequeue(Stack);
break;
}
case 2: {
- TTest::EnqueueAll(Stack);
+ TTest::EnqueueAll(Stack);
break;
}
case 3: {
- TTest::DequeueAll(Stack);
+ TTest::DequeueAll(Stack);
break;
}
}
@@ -276,71 +276,71 @@ Y_UNIT_TEST_SUITE(TLockFreeStackTests) {
TDeque<NThreading::TLegacyFuture<>> futures;
for (size_t i = 0; i < Threads; ++i) {
- futures.emplace_back(std::bind(&ThisType::Worker, this));
+ futures.emplace_back(std::bind(&ThisType::Worker, this));
}
futures.clear();
- TTest::DequeueAll(Stack);
- }
- };
-
- struct TFreeListTest {
- using ValueType = int;
-
- static void Enqueue(TLockFreeStack<int>& stack, size_t i) {
- stack.Enqueue(static_cast<int>(i));
- }
-
- static void Dequeue(TLockFreeStack<int>& stack) {
- int value;
- stack.Dequeue(&value);
- }
-
- static void EnqueueAll(TLockFreeStack<int>& stack) {
- TVector<int> values(5);
- stack.EnqueueAll(values);
- }
-
- static void DequeueAll(TLockFreeStack<int>& stack) {
- TVector<int> value;
- stack.DequeueAll(&value);
+ TTest::DequeueAll(Stack);
+ }
+ };
+
+ struct TFreeListTest {
+ using ValueType = int;
+
+ static void Enqueue(TLockFreeStack<int>& stack, size_t i) {
+ stack.Enqueue(static_cast<int>(i));
}
+
+ static void Dequeue(TLockFreeStack<int>& stack) {
+ int value;
+ stack.Dequeue(&value);
+ }
+
+ static void EnqueueAll(TLockFreeStack<int>& stack) {
+ TVector<int> values(5);
+ stack.EnqueueAll(values);
+ }
+
+ static void DequeueAll(TLockFreeStack<int>& stack) {
+ TVector<int> value;
+ stack.DequeueAll(&value);
+ }
};
// Test for catching thread sanitizer problems
Y_UNIT_TEST(TestFreeList) {
- TMultiThreadTester<TFreeListTest>().Run();
- }
-
- struct TMoveTest {
- using ValueType = THolder<int>;
-
- static void Enqueue(TLockFreeStack<ValueType>& stack, size_t i) {
- stack.Enqueue(MakeHolder<int>(static_cast<int>(i)));
- }
-
- static void Dequeue(TLockFreeStack<ValueType>& stack) {
- ValueType value;
- if (stack.Dequeue(&value)) {
- UNIT_ASSERT(value);
- }
- }
-
- static void EnqueueAll(TLockFreeStack<ValueType>& stack) {
- // there is no enqueAll with moving signature in LockFreeStack
- Enqueue(stack, 0);
- }
-
- static void DequeueAll(TLockFreeStack<ValueType>& stack) {
- TVector<ValueType> values;
- stack.DequeueAll(&values);
- for (auto& v : values) {
- UNIT_ASSERT(v);
- }
- }
- };
-
- // Test for catching thread sanitizer problems
- Y_UNIT_TEST(TesMultiThreadMove) {
- TMultiThreadTester<TMoveTest>().Run();
+ TMultiThreadTester<TFreeListTest>().Run();
}
+
+ struct TMoveTest {
+ using ValueType = THolder<int>;
+
+ static void Enqueue(TLockFreeStack<ValueType>& stack, size_t i) {
+ stack.Enqueue(MakeHolder<int>(static_cast<int>(i)));
+ }
+
+ static void Dequeue(TLockFreeStack<ValueType>& stack) {
+ ValueType value;
+ if (stack.Dequeue(&value)) {
+ UNIT_ASSERT(value);
+ }
+ }
+
+ static void EnqueueAll(TLockFreeStack<ValueType>& stack) {
+ // there is no enqueAll with moving signature in LockFreeStack
+ Enqueue(stack, 0);
+ }
+
+ static void DequeueAll(TLockFreeStack<ValueType>& stack) {
+ TVector<ValueType> values;
+ stack.DequeueAll(&values);
+ for (auto& v : values) {
+ UNIT_ASSERT(v);
+ }
+ }
+ };
+
+ // Test for catching thread sanitizer problems
+ Y_UNIT_TEST(TesMultiThreadMove) {
+ TMultiThreadTester<TMoveTest>().Run();
+ }
}