blob: 47b1029a2f97ef795a09ee83244b4d7c8947e041 (
plain) (
tree)
|
|
#include "equeue.h"
#include <library/cpp/threading/equeue/fast/equeue.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/system/event.h>
#include <util/datetime/base.h>
#include <util/generic/vector.h>
Y_UNIT_TEST_SUITE(TElasticQueueTest) {
const size_t MaxQueueSize = 20;
const size_t ThreadCount = 10;
template <typename T>
THolder<T> MakeQueue();
template <>
THolder<TElasticQueue> MakeQueue() {
return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>());
}
template <>
THolder<TFastElasticQueue> MakeQueue() {
return MakeHolder<TFastElasticQueue>();
}
template <typename T>
struct TEnv {
static inline THolder<T> Queue;
struct TQueueSetup {
TQueueSetup() {
Queue.Reset(MakeQueue<T>());
Queue->Start(ThreadCount, MaxQueueSize);
}
~TQueueSetup() {
Queue->Stop();
}
};
};
struct TCounters {
void Reset() {
Processed = Scheduled = Discarded = Total = 0;
}
TAtomic Processed;
TAtomic Scheduled;
TAtomic Discarded;
TAtomic Total;
};
static TCounters Counters;
//fill test -- fill queue with "endless" jobs
TSystemEvent WaitEvent;
template <typename T>
void FillTest() {
Counters.Reset();
struct TWaitJob: public IObjectInQueue {
void Process(void*) override {
WaitEvent.Wait();
AtomicIncrement(Counters.Processed);
}
} job;
struct TLocalSetup: TEnv<T>::TQueueSetup {
TLocalSetup() {
WaitEvent.Reset();
}
~TLocalSetup() {
WaitEvent.Signal();
}
};
size_t enqueued = 0;
{
TLocalSetup setup;
while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
++enqueued;
}
UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize);
UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount());
}
UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount());
UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size());
UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
}
Y_UNIT_TEST(FillTest) {
FillTest<TElasticQueue>();
}
Y_UNIT_TEST(FillTestFast) {
FillTest<TFastElasticQueue>();
}
//concurrent test -- send many jobs from different threads
struct TJob: public IObjectInQueue {
void Process(void*) override {
AtomicIncrement(Counters.Processed);
}
};
static TJob Job;
template <typename T>
static bool TryAdd() {
AtomicIncrement(Counters.Total);
if (TEnv<T>::Queue->Add(&Job)) {
AtomicIncrement(Counters.Scheduled);
return true;
} else {
AtomicIncrement(Counters.Discarded);
return false;
}
}
const size_t N = 100000;
static size_t TryCounter;
template <typename T>
void ConcurrentTest() {
Counters.Reset();
TryCounter = 0;
struct TSender: public IThreadFactory::IThreadAble {
void DoExecute() override {
while ((size_t)AtomicIncrement(TryCounter) <= N) {
if (!TryAdd<T>()) {
Sleep(TDuration::MicroSeconds(50));
}
}
}
} sender;
{
typename TEnv<T>::TQueueSetup setup;
TVector< TAutoPtr<IThreadFactory::IThread> > senders;
for (size_t i = 0; i < ThreadCount; ++i) {
senders.push_back(::SystemThreadFactory()->Run(&sender));
}
for (size_t i = 0; i < senders.size(); ++i) {
senders[i]->Join();
}
}
UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N);
UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
}
Y_UNIT_TEST(ConcurrentTest) {
ConcurrentTest<TElasticQueue>();
}
Y_UNIT_TEST(ConcurrentTestFast) {
ConcurrentTest<TFastElasticQueue>();
}
}
|