diff options
| author | tytskiy <[email protected]> | 2023-12-11 12:13:09 +0300 | 
|---|---|---|
| committer | tytskiy <[email protected]> | 2023-12-11 12:48:09 +0300 | 
| commit | 28270b9563018fdd7860768a92b12cacba4b1fa9 (patch) | |
| tree | a8592df7c2c16b3dcb1530c15f9c1cc829fe013e /library/cpp | |
| parent | 5ad01fbb4ebdd0f09304aaefc56dbce3d58dae6d (diff) | |
Add Drain method for TBlockingQueue
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/threading/blocking_queue/blocking_queue.h | 24 | ||||
| -rw-r--r-- | library/cpp/threading/blocking_queue/blocking_queue_ut.cpp | 85 | 
2 files changed, 109 insertions, 0 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h index 48d3762f68a..fa9b1d2f917 100644 --- a/library/cpp/threading/blocking_queue/blocking_queue.h +++ b/library/cpp/threading/blocking_queue/blocking_queue.h @@ -73,6 +73,30 @@ namespace NThreading {          }          /// +        /// Blocks until queue has some elements or queue is stopped or deadline is reached. +        /// Returns empty internal deque if queue is stopped or deadline is reached. +        /// Returns iternal deque element otherwise. +        TDeque<TElement> Drain(TInstant deadline = TInstant::Max()) { +            TGuard<TMutex> g(Lock); + +            const auto canPop = [this]() { return CanPop(); }; +            if (!CanPopCV.WaitD(Lock, deadline, canPop)) { +                return {}; +            } + +            TDeque<TElement> result; +            std::swap(result, Queue); + +            CanPushCV.BroadCast(); + +            return result; +        } + +        TDeque<TElement> Drain(TDuration duration) { +            return Drain(TInstant::Now() + duration); +        } + +        ///          /// Blocks until queue has space for new elements or queue is stopped or deadline is reached.          /// Returns false exception if queue is stopped and push failed or deadline is reached.          /// Pushes element to queue and returns true otherwise. diff --git a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp b/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp index 26e125279d2..3b4bbc518f8 100644 --- a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp +++ b/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp @@ -1,5 +1,6 @@  #include "blocking_queue.h" +#include <library/cpp/iterator/enumerate.h>  #include <library/cpp/testing/unittest/registar.h>  #include <util/string/builder.h> @@ -53,6 +54,25 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) {          UNIT_ASSERT(queue.Empty());      } +    Y_UNIT_TEST(SimplePushDrainTest) { +        const size_t limit = 100; + +        NThreading::TBlockingQueue<int> queue(100); + +        for (int i = 0; i != limit; ++i) { +            queue.Push(i); +        } + +        auto res = queue.Drain(); + +        UNIT_ASSERT_VALUES_EQUAL(queue.Empty(), true); +        UNIT_ASSERT_VALUES_EQUAL(res.size(), limit); + +        for (auto [i, elem] : Enumerate(res)) { +            UNIT_ASSERT_VALUES_EQUAL(elem, i); +        } +    } +      Y_UNIT_TEST(SimpleStopTest) {          const size_t limit = 100; @@ -71,6 +91,7 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) {          }          UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>()); +        UNIT_ASSERT_VALUES_EQUAL(queue.Drain().empty(), true);      }      Y_UNIT_TEST(BigPushPop) { @@ -111,13 +132,18 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) {          TFunctionThread popper2([&] {              UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());          }); +        TFunctionThread drainer([&] { +            UNIT_ASSERT_VALUES_EQUAL(queue.Drain().empty(), true); +        });          popper1.Start();          popper2.Start(); +        drainer.Start();          queue.Stop();          popper1.Join();          popper2.Join(); +        drainer.Join();      }      Y_UNIT_TEST(StopWhenMultiplePushers) { @@ -138,6 +164,28 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) {          pusher2.Join();      } +    Y_UNIT_TEST(WakeUpAllProducers) { +        NThreading::TBlockingQueue<int> queue(2); +        queue.Push(1); +        queue.Push(2); + +        TFunctionThread pusher1([&] { +            UNIT_ASSERT_VALUES_EQUAL(queue.Push(3), true); +        }); + +        TFunctionThread pusher2([&] { +            UNIT_ASSERT_VALUES_EQUAL(queue.Push(4), true); +        }); + +        pusher1.Start(); +        pusher2.Start(); + +        queue.Drain(); + +        pusher1.Join(); +        pusher2.Join(); +    } +      Y_UNIT_TEST(InterruptPopByDeadline) {          NThreading::TBlockingQueue<int> queue1(10);          NThreading::TBlockingQueue<int> queue2(10); @@ -172,6 +220,43 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) {          popper2.Join();      } + +     Y_UNIT_TEST(InterruptDrainByDeadline) { +        NThreading::TBlockingQueue<int> queue1(10); +        NThreading::TBlockingQueue<int> queue2(10); + +        const auto drainer1DeadLine = TInstant::Now(); +        const auto drainer2DeadLine = TInstant::Now() + TDuration::Seconds(2); + +        TFunctionThread drainer1([&] { +            UNIT_ASSERT_VALUES_EQUAL(queue1.Drain(drainer1DeadLine).empty(), true); +            UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false); +        }); + +        TFunctionThread drainer2([&] { +            auto res = queue2.Drain(drainer2DeadLine); +            UNIT_ASSERT_VALUES_EQUAL(res.size(), 1); +            UNIT_ASSERT_VALUES_EQUAL(res.front(), 2); +            UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false); +        }); + +        drainer1.Start(); +        drainer2.Start(); + +        Sleep(TDuration::Seconds(1)); + +        queue1.Push(1); +        queue2.Push(2); + +        Sleep(TDuration::Seconds(1)); + +        queue1.Stop(); +        queue2.Stop(); + +        drainer1.Join(); +        drainer2.Join(); +    } +      Y_UNIT_TEST(InterruptPushByDeadline) {          NThreading::TBlockingQueue<int> queue1(1);          NThreading::TBlockingQueue<int> queue2(1);  | 
