#include <library/cpp/threading/future/future.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/generic/algorithm.h>
#include <util/generic/vector.h>
#include <util/generic/ptr.h>
#include <util/thread/pool.h>
#include "lfqueue.h"
class TMoveTest {
public:
TMoveTest(int marker = 0, int value = 0)
: Marker_(marker)
, Value_(value)
{
}
TMoveTest(const TMoveTest& other) {
*this = other;
}
TMoveTest(TMoveTest&& other) {
*this = std::move(other);
}
TMoveTest& operator=(const TMoveTest& other) {
Value_ = other.Value_;
Marker_ = other.Marker_ + 1024;
return *this;
}
TMoveTest& operator=(TMoveTest&& other) {
Value_ = other.Value_;
Marker_ = other.Marker_;
other.Marker_ = 0;
return *this;
}
int Marker() const {
return Marker_;
}
int Value() const {
return Value_;
}
private:
int Marker_ = 0;
int Value_ = 0;
};
class TOperationsChecker {
public:
TOperationsChecker() {
++DefaultCtor_;
}
TOperationsChecker(TOperationsChecker&&) {
++MoveCtor_;
}
TOperationsChecker(const TOperationsChecker&) {
++CopyCtor_;
}
TOperationsChecker& operator=(TOperationsChecker&&) {
++MoveAssign_;
return *this;
}
TOperationsChecker& operator=(const TOperationsChecker&) {
++CopyAssign_;
return *this;
}
static void Check(int defaultCtor, int moveCtor, int copyCtor, int moveAssign, int copyAssign) {
UNIT_ASSERT_VALUES_EQUAL(defaultCtor, DefaultCtor_);
UNIT_ASSERT_VALUES_EQUAL(moveCtor, MoveCtor_);
UNIT_ASSERT_VALUES_EQUAL(copyCtor, CopyCtor_);
UNIT_ASSERT_VALUES_EQUAL(moveAssign, MoveAssign_);
UNIT_ASSERT_VALUES_EQUAL(copyAssign, CopyAssign_);
Clear();
}
private:
static void Clear() {
DefaultCtor_ = MoveCtor_ = CopyCtor_ = MoveAssign_ = CopyAssign_ = 0;
}
static int DefaultCtor_;
static int MoveCtor_;
static int CopyCtor_;
static int MoveAssign_;
static int CopyAssign_;
};
int TOperationsChecker::DefaultCtor_ = 0;
int TOperationsChecker::MoveCtor_ = 0;
int TOperationsChecker::CopyCtor_ = 0;
int TOperationsChecker::MoveAssign_ = 0;
int TOperationsChecker::CopyAssign_ = 0;
Y_UNIT_TEST_SUITE(TLockFreeQueueTests) {
Y_UNIT_TEST(TestMoveEnqueue) {
TMoveTest value(0xFF, 0xAA);
TMoveTest tmp;
TLockFreeQueue<TMoveTest> queue;
queue.Enqueue(value);
UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0xFF);
UNIT_ASSERT(queue.Dequeue(&tmp));
UNIT_ASSERT_VALUES_UNEQUAL(tmp.Marker(), 0xFF);
UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA);
queue.Enqueue(std::move(value));
UNIT_ASSERT_VALUES_EQUAL(value.Marker(), 0);
UNIT_ASSERT(queue.Dequeue(&tmp));
UNIT_ASSERT_VALUES_EQUAL(tmp.Value(), 0xAA);
}
Y_UNIT_TEST(TestSimpleEnqueueDequeue) {
TLockFreeQueue<int> queue;
int i = -1;
UNIT_ASSERT(!queue.Dequeue(&i));
UNIT_ASSERT_VALUES_EQUAL(i, -1);
queue.Enqueue(10);
queue.Enqueue(11);
queue.Enqueue(12);
UNIT_ASSERT(queue.Dequeue(&i));
UNIT_ASSERT_VALUES_EQUAL(10, i);
UNIT_ASSERT(queue.Dequeue(&i));
UNIT_ASSERT_VALUES_EQUAL(11, i);
queue.Enqueue(13);
UNIT_ASSERT(queue.Dequeue(&i));
UNIT_ASSERT_VALUES_EQUAL(12, i);
UNIT_ASSERT(queue.Dequeue(&i));
UNIT_ASSERT_VALUES_EQUAL(13, i);
UNIT_ASSERT(!queue.Dequeue(&i));
const int tmp = 100;
queue.Enqueue(tmp);
UNIT_ASSERT(queue.Dequeue(&i));
UNIT_ASSERT_VALUES_EQUAL(i, tmp);
}
Y_UNIT_TEST(TestSimpleEnqueueAllDequeue) {
TLockFreeQueue<int> queue;
int i = -1;
UNIT_ASSERT(!queue.Dequeue(&i));
UNIT_ASSERT_VALUES_EQUAL(i, -1);
TVector<int> v;
v.push_back(20);
v.push_back(21);
queue.EnqueueAll(v);
v.clear();
v.push_back(22);
v.push_back(23);
v.push_back(24);
queue.EnqueueAll(v);
v.clear();
queue.EnqueueAll(v);
v.clear();
v.push_back(25);
queue.EnqueueAll(v);
for (int j = 20; j <= 25; ++j) {
UNIT_ASSERT(queue.Dequeue(&i));
UNIT_ASSERT_VALUES_EQUAL(j, i);
}
UNIT_ASSERT(!queue.Dequeue(&i));
}
void DequeueAllRunner(TLockFreeQueue<int>& queue, bool singleConsumer) {
size_t threadsNum = 4;
size_t enqueuesPerThread = 10'000;
TThreadPool p;
p.Start(threadsNum, 0);
TVector<NThreading::TFuture<void>> futures;
for (size_t i = 0; i < threadsNum; ++i) {
NThreading::TPromise<void> promise = NThreading::NewPromise();
futures.emplace_back(promise.GetFuture());
p.SafeAddFunc([enqueuesPerThread, &queue, promise]() mutable {
for (size_t i = 0; i != enqueuesPerThread; ++i) {
queue.Enqueue(i);
}
promise.SetValue();
});
}
std::atomic<size_t> elementsLeft = threadsNum * enqueuesPerThread;
ui64 numOfConsumers = singleConsumer ? 1 : threadsNum;
TVector<TVector<int>> dataBuckets(numOfConsumers);
for (size_t i = 0; i < numOfConsumers; ++i) {
NThreading::TPromise<void> promise = NThreading::NewPromise();
futures.emplace_back(promise.GetFuture());
p.SafeAddFunc([&queue, &elementsLeft, promise, consumerData{&dataBuckets[i]}]() mutable {
TVector<int> vec;
while (static_cast<i64>(elementsLeft.load()) > 0) {
for (size_t i = 0; i != 100; ++i) {
vec.clear();
queue.DequeueAll(&vec);
elementsLeft -= vec.size();
consumerData->insert(consumerData->end(), vec.begin(), vec.end());
}
}
promise.SetValue();
});
}
NThreading::WaitExceptionOrAll(futures).GetValueSync();
p.Stop();
TVector<int> left;
queue.DequeueAll(&left);
UNIT_ASSERT(left.empty());
TVector<int> data;
for (auto& dataBucket : dataBuckets) {
data.insert(data.end(), dataBucket.begin(), dataBucket.end());
}
UNIT_ASSERT_EQUAL(data.size(), threadsNum * enqueuesPerThread);
size_t threadIdx = 0;
size_t cntValue = 0;
Sort(data.begin(), data.end());
for (size_t i = 0; i != data.size(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(cntValue, data[i]);
++threadIdx;
if (threadIdx == threadsNum) {
++cntValue;
threadIdx = 0;
}
}
}
Y_UNIT_TEST(TestDequeueAllSingleConsumer) {
TLockFreeQueue<int> queue;
DequeueAllRunner(queue, true);
}
Y_UNIT_TEST(TestDequeueAllMultipleConsumers) {
TLockFreeQueue<int> queue;
DequeueAllRunner(queue, false);
}
Y_UNIT_TEST(TestDequeueAllEmptyQueue) {
TLockFreeQueue<int> queue;
TVector<int> vec;
queue.DequeueAll(&vec);
UNIT_ASSERT(vec.empty());
}
Y_UNIT_TEST(TestDequeueAllQueueOrder) {
TLockFreeQueue<int> queue;
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
TVector<int> v;
queue.DequeueAll(&v);
UNIT_ASSERT_VALUES_EQUAL(v.size(), 3);
UNIT_ASSERT_VALUES_EQUAL(v[0], 1);
UNIT_ASSERT_VALUES_EQUAL(v[1], 2);
UNIT_ASSERT_VALUES_EQUAL(v[2], 3);
}
Y_UNIT_TEST(CleanInDestructor) {
TSimpleSharedPtr<bool> p(new bool);
UNIT_ASSERT_VALUES_EQUAL(1u, p.RefCount());
{
TLockFreeQueue<TSimpleSharedPtr<bool>> stack;
stack.Enqueue(p);
stack.Enqueue(p);
UNIT_ASSERT_VALUES_EQUAL(3u, p.RefCount());
}
UNIT_ASSERT_VALUES_EQUAL(1, p.RefCount());
}
Y_UNIT_TEST(CheckOperationsCount) {
TOperationsChecker o;
o.Check(1, 0, 0, 0, 0);
TLockFreeQueue<TOperationsChecker> queue;
o.Check(0, 0, 0, 0, 0);
queue.Enqueue(std::move(o));
o.Check(0, 1, 0, 0, 0);
queue.Enqueue(o);
o.Check(0, 0, 1, 0, 0);
queue.Dequeue(&o);
o.Check(0, 0, 2, 1, 0);
}
}