diff options
author | tytskiy <tytskiy@yandex-team.com> | 2023-12-11 12:13:09 +0300 |
---|---|---|
committer | tytskiy <tytskiy@yandex-team.com> | 2023-12-11 12:48:09 +0300 |
commit | 28270b9563018fdd7860768a92b12cacba4b1fa9 (patch) | |
tree | a8592df7c2c16b3dcb1530c15f9c1cc829fe013e /library/cpp/threading/blocking_queue | |
parent | 5ad01fbb4ebdd0f09304aaefc56dbce3d58dae6d (diff) | |
download | ydb-28270b9563018fdd7860768a92b12cacba4b1fa9.tar.gz |
Add Drain method for TBlockingQueue
Diffstat (limited to 'library/cpp/threading/blocking_queue')
-rw-r--r-- | library/cpp/threading/blocking_queue/blocking_queue.h | 24 | ||||
-rw-r--r-- | library/cpp/threading/blocking_queue/blocking_queue_ut.cpp | 85 |
2 files changed, 109 insertions, 0 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h index 48d3762f68..fa9b1d2f91 100644 --- a/library/cpp/threading/blocking_queue/blocking_queue.h +++ b/library/cpp/threading/blocking_queue/blocking_queue.h @@ -73,6 +73,30 @@ namespace NThreading { } /// + /// Blocks until queue has some elements or queue is stopped or deadline is reached. + /// Returns empty internal deque if queue is stopped or deadline is reached. + /// Returns iternal deque element otherwise. + TDeque<TElement> Drain(TInstant deadline = TInstant::Max()) { + TGuard<TMutex> g(Lock); + + const auto canPop = [this]() { return CanPop(); }; + if (!CanPopCV.WaitD(Lock, deadline, canPop)) { + return {}; + } + + TDeque<TElement> result; + std::swap(result, Queue); + + CanPushCV.BroadCast(); + + return result; + } + + TDeque<TElement> Drain(TDuration duration) { + return Drain(TInstant::Now() + duration); + } + + /// /// Blocks until queue has space for new elements or queue is stopped or deadline is reached. /// Returns false exception if queue is stopped and push failed or deadline is reached. /// Pushes element to queue and returns true otherwise. diff --git a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp b/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp index 26e125279d..3b4bbc518f 100644 --- a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp +++ b/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp @@ -1,5 +1,6 @@ #include "blocking_queue.h" +#include <library/cpp/iterator/enumerate.h> #include <library/cpp/testing/unittest/registar.h> #include <util/string/builder.h> @@ -53,6 +54,25 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) { UNIT_ASSERT(queue.Empty()); } + Y_UNIT_TEST(SimplePushDrainTest) { + const size_t limit = 100; + + NThreading::TBlockingQueue<int> queue(100); + + for (int i = 0; i != limit; ++i) { + queue.Push(i); + } + + auto res = queue.Drain(); + + UNIT_ASSERT_VALUES_EQUAL(queue.Empty(), true); + UNIT_ASSERT_VALUES_EQUAL(res.size(), limit); + + for (auto [i, elem] : Enumerate(res)) { + UNIT_ASSERT_VALUES_EQUAL(elem, i); + } + } + Y_UNIT_TEST(SimpleStopTest) { const size_t limit = 100; @@ -71,6 +91,7 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) { } UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>()); + UNIT_ASSERT_VALUES_EQUAL(queue.Drain().empty(), true); } Y_UNIT_TEST(BigPushPop) { @@ -111,13 +132,18 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) { TFunctionThread popper2([&] { UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>()); }); + TFunctionThread drainer([&] { + UNIT_ASSERT_VALUES_EQUAL(queue.Drain().empty(), true); + }); popper1.Start(); popper2.Start(); + drainer.Start(); queue.Stop(); popper1.Join(); popper2.Join(); + drainer.Join(); } Y_UNIT_TEST(StopWhenMultiplePushers) { @@ -138,6 +164,28 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) { pusher2.Join(); } + Y_UNIT_TEST(WakeUpAllProducers) { + NThreading::TBlockingQueue<int> queue(2); + queue.Push(1); + queue.Push(2); + + TFunctionThread pusher1([&] { + UNIT_ASSERT_VALUES_EQUAL(queue.Push(3), true); + }); + + TFunctionThread pusher2([&] { + UNIT_ASSERT_VALUES_EQUAL(queue.Push(4), true); + }); + + pusher1.Start(); + pusher2.Start(); + + queue.Drain(); + + pusher1.Join(); + pusher2.Join(); + } + Y_UNIT_TEST(InterruptPopByDeadline) { NThreading::TBlockingQueue<int> queue1(10); NThreading::TBlockingQueue<int> queue2(10); @@ -172,6 +220,43 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) { popper2.Join(); } + + Y_UNIT_TEST(InterruptDrainByDeadline) { + NThreading::TBlockingQueue<int> queue1(10); + NThreading::TBlockingQueue<int> queue2(10); + + const auto drainer1DeadLine = TInstant::Now(); + const auto drainer2DeadLine = TInstant::Now() + TDuration::Seconds(2); + + TFunctionThread drainer1([&] { + UNIT_ASSERT_VALUES_EQUAL(queue1.Drain(drainer1DeadLine).empty(), true); + UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false); + }); + + TFunctionThread drainer2([&] { + auto res = queue2.Drain(drainer2DeadLine); + UNIT_ASSERT_VALUES_EQUAL(res.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(res.front(), 2); + UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false); + }); + + drainer1.Start(); + drainer2.Start(); + + Sleep(TDuration::Seconds(1)); + + queue1.Push(1); + queue2.Push(2); + + Sleep(TDuration::Seconds(1)); + + queue1.Stop(); + queue2.Stop(); + + drainer1.Join(); + drainer2.Join(); + } + Y_UNIT_TEST(InterruptPushByDeadline) { NThreading::TBlockingQueue<int> queue1(1); NThreading::TBlockingQueue<int> queue2(1); |