#include "equeue.h" #include <library/cpp/threading/equeue/fast/equeue.h> #include <library/cpp/testing/unittest/registar.h> #include <util/system/event.h> #include <util/datetime/base.h> #include <util/generic/vector.h> Y_UNIT_TEST_SUITE(TElasticQueueTest) { const size_t MaxQueueSize = 20; const size_t ThreadCount = 10; template <typename T> THolder<T> MakeQueue(); template <> THolder<TElasticQueue> MakeQueue() { return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>()); } template <> THolder<TFastElasticQueue> MakeQueue() { return MakeHolder<TFastElasticQueue>(); } template <typename T> struct TEnv { static inline THolder<T> Queue; struct TQueueSetup { TQueueSetup() { Queue.Reset(MakeQueue<T>()); Queue->Start(ThreadCount, MaxQueueSize); } ~TQueueSetup() { Queue->Stop(); } }; }; struct TCounters { void Reset() { Processed = Scheduled = Discarded = Total = 0; } TAtomic Processed; TAtomic Scheduled; TAtomic Discarded; TAtomic Total; }; static TCounters Counters; //fill test -- fill queue with "endless" jobs TSystemEvent WaitEvent; template <typename T> void FillTest() { Counters.Reset(); struct TWaitJob: public IObjectInQueue { void Process(void*) override { WaitEvent.Wait(); AtomicIncrement(Counters.Processed); } } job; struct TLocalSetup: TEnv<T>::TQueueSetup { TLocalSetup() { WaitEvent.Reset(); } ~TLocalSetup() { WaitEvent.Signal(); } }; size_t enqueued = 0; { TLocalSetup setup; while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) { ++enqueued; } UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize); UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount()); } UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount()); UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size()); UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued); } Y_UNIT_TEST(FillTest) { FillTest<TElasticQueue>(); } Y_UNIT_TEST(FillTestFast) { FillTest<TFastElasticQueue>(); } //concurrent test -- send many jobs from different threads struct TJob: public IObjectInQueue { void Process(void*) override { AtomicIncrement(Counters.Processed); } }; static TJob Job; template <typename T> static bool TryAdd() { AtomicIncrement(Counters.Total); if (TEnv<T>::Queue->Add(&Job)) { AtomicIncrement(Counters.Scheduled); return true; } else { AtomicIncrement(Counters.Discarded); return false; } } const size_t N = 100000; static size_t TryCounter; template <typename T> void ConcurrentTest() { Counters.Reset(); TryCounter = 0; struct TSender: public IThreadFactory::IThreadAble { void DoExecute() override { while ((size_t)AtomicIncrement(TryCounter) <= N) { if (!TryAdd<T>()) { Sleep(TDuration::MicroSeconds(50)); } } } } sender; { typename TEnv<T>::TQueueSetup setup; TVector< TAutoPtr<IThreadFactory::IThread> > senders; for (size_t i = 0; i < ThreadCount; ++i) { senders.push_back(::SystemThreadFactory()->Run(&sender)); } for (size_t i = 0; i < senders.size(); ++i) { senders[i]->Join(); } } UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Total, N); UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled); UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded); } Y_UNIT_TEST(ConcurrentTest) { ConcurrentTest<TElasticQueue>(); } Y_UNIT_TEST(ConcurrentTestFast) { ConcurrentTest<TFastElasticQueue>(); } }