From 85ef4ee49c3edbb700d0ef903d01177bf9984018 Mon Sep 17 00:00:00 2001
From: robot-piglet <robot-piglet@yandex-team.com>
Date: Fri, 28 Jul 2023 06:50:19 +0300
Subject: Intermediate changes

---
 library/cpp/threading/equeue/equeue_ut.cpp | 79 ++++++++++++++++++++++--------
 1 file changed, 59 insertions(+), 20 deletions(-)

(limited to 'library/cpp/threading/equeue/equeue_ut.cpp')

diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp
index 8557f63ac0..47b1029a2f 100644
--- a/library/cpp/threading/equeue/equeue_ut.cpp
+++ b/library/cpp/threading/equeue/equeue_ut.cpp
@@ -1,4 +1,5 @@
 #include "equeue.h"
+#include <library/cpp/threading/equeue/fast/equeue.h>
 
 #include <library/cpp/testing/unittest/registar.h>
 
@@ -9,18 +10,33 @@
 Y_UNIT_TEST_SUITE(TElasticQueueTest) {
     const size_t MaxQueueSize = 20;
     const size_t ThreadCount = 10;
-    const size_t N = 100000;
 
-    static THolder<TElasticQueue> Queue;
+    template <typename T>
+    THolder<T> MakeQueue();
 
-    struct TQueueSetup {
-        TQueueSetup() {
-            Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>()));
-            Queue->Start(ThreadCount, MaxQueueSize);
-        }
-        ~TQueueSetup() {
-            Queue->Stop();
-        }
+    template <>
+    THolder<TElasticQueue> MakeQueue() {
+        return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>());
+    }
+
+    template <>
+    THolder<TFastElasticQueue> MakeQueue() {
+        return MakeHolder<TFastElasticQueue>();
+    }
+
+    template <typename T>
+    struct TEnv {
+        static inline THolder<T> Queue;
+
+        struct TQueueSetup {
+            TQueueSetup() {
+                Queue.Reset(MakeQueue<T>());
+                Queue->Start(ThreadCount, MaxQueueSize);
+            }
+            ~TQueueSetup() {
+                Queue->Stop();
+            }
+        };
     };
 
     struct TCounters {
@@ -37,7 +53,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
 
 //fill test -- fill queue with "endless" jobs
     TSystemEvent WaitEvent;
-    Y_UNIT_TEST(FillTest) {
+
+    template <typename T>
+    void FillTest() {
         Counters.Reset();
 
         struct TWaitJob: public IObjectInQueue {
@@ -47,7 +65,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
             }
         } job;
 
-        struct TLocalSetup: TQueueSetup {
+        struct TLocalSetup: TEnv<T>::TQueueSetup {
+            TLocalSetup() {
+                WaitEvent.Reset();
+            }
             ~TLocalSetup() {
                 WaitEvent.Signal();
             }
@@ -56,19 +77,26 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
         size_t enqueued = 0;
         {
             TLocalSetup setup;
-            while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
+            while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
                 ++enqueued;
             }
 
             UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize);
-            UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount());
+            UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount());
         }
 
-        UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount());
-        UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size());
+        UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount());
+        UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size());
         UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
     }
 
+    Y_UNIT_TEST(FillTest) {
+        FillTest<TElasticQueue>();
+    }
+
+    Y_UNIT_TEST(FillTestFast) {
+        FillTest<TFastElasticQueue>();
+    }
 
 //concurrent test -- send many jobs from different threads
     struct TJob: public IObjectInQueue {
@@ -78,9 +106,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
     };
     static TJob Job;
 
+    template <typename T>
     static bool TryAdd() {
         AtomicIncrement(Counters.Total);
-        if (Queue->Add(&Job)) {
+        if (TEnv<T>::Queue->Add(&Job)) {
             AtomicIncrement(Counters.Scheduled);
             return true;
         } else {
@@ -89,16 +118,18 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
         }
     }
 
+    const size_t N = 100000;
     static size_t TryCounter;
 
-    Y_UNIT_TEST(ConcurrentTest) {
+    template <typename T>
+    void ConcurrentTest() {
         Counters.Reset();
         TryCounter = 0;
 
         struct TSender: public IThreadFactory::IThreadAble {
             void DoExecute() override {
                 while ((size_t)AtomicIncrement(TryCounter) <= N) {
-                    if (!TryAdd()) {
+                    if (!TryAdd<T>()) {
                         Sleep(TDuration::MicroSeconds(50));
                     }
                 }
@@ -106,7 +137,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
         } sender;
 
         {
-            TQueueSetup setup;
+            typename TEnv<T>::TQueueSetup setup;
 
             TVector< TAutoPtr<IThreadFactory::IThread> > senders;
             for (size_t i = 0; i < ThreadCount; ++i) {
@@ -122,4 +153,12 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
         UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
         UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
     }
+
+    Y_UNIT_TEST(ConcurrentTest) {
+        ConcurrentTest<TElasticQueue>();
+    }
+
+    Y_UNIT_TEST(ConcurrentTestFast) {
+        ConcurrentTest<TFastElasticQueue>();
+    }
 }
-- 
cgit v1.2.3