aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/blocking_queue
diff options
context:
space:
mode:
authortytskiy <tytskiy@yandex-team.com>2023-12-11 12:13:09 +0300
committertytskiy <tytskiy@yandex-team.com>2023-12-11 12:48:09 +0300
commit28270b9563018fdd7860768a92b12cacba4b1fa9 (patch)
treea8592df7c2c16b3dcb1530c15f9c1cc829fe013e /library/cpp/threading/blocking_queue
parent5ad01fbb4ebdd0f09304aaefc56dbce3d58dae6d (diff)
downloadydb-28270b9563018fdd7860768a92b12cacba4b1fa9.tar.gz
Add Drain method for TBlockingQueue
Diffstat (limited to 'library/cpp/threading/blocking_queue')
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue.h24
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue_ut.cpp85
2 files changed, 109 insertions, 0 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h
index 48d3762f68..fa9b1d2f91 100644
--- a/library/cpp/threading/blocking_queue/blocking_queue.h
+++ b/library/cpp/threading/blocking_queue/blocking_queue.h
@@ -73,6 +73,30 @@ namespace NThreading {
}
///
+ /// Blocks until queue has some elements or queue is stopped or deadline is reached.
+ /// Returns empty internal deque if queue is stopped or deadline is reached.
+ /// Returns iternal deque element otherwise.
+ TDeque<TElement> Drain(TInstant deadline = TInstant::Max()) {
+ TGuard<TMutex> g(Lock);
+
+ const auto canPop = [this]() { return CanPop(); };
+ if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
+ return {};
+ }
+
+ TDeque<TElement> result;
+ std::swap(result, Queue);
+
+ CanPushCV.BroadCast();
+
+ return result;
+ }
+
+ TDeque<TElement> Drain(TDuration duration) {
+ return Drain(TInstant::Now() + duration);
+ }
+
+ ///
/// Blocks until queue has space for new elements or queue is stopped or deadline is reached.
/// Returns false exception if queue is stopped and push failed or deadline is reached.
/// Pushes element to queue and returns true otherwise.
diff --git a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp b/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp
index 26e125279d..3b4bbc518f 100644
--- a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp
+++ b/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp
@@ -1,5 +1,6 @@
#include "blocking_queue.h"
+#include <library/cpp/iterator/enumerate.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/string/builder.h>
@@ -53,6 +54,25 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) {
UNIT_ASSERT(queue.Empty());
}
+ Y_UNIT_TEST(SimplePushDrainTest) {
+ const size_t limit = 100;
+
+ NThreading::TBlockingQueue<int> queue(100);
+
+ for (int i = 0; i != limit; ++i) {
+ queue.Push(i);
+ }
+
+ auto res = queue.Drain();
+
+ UNIT_ASSERT_VALUES_EQUAL(queue.Empty(), true);
+ UNIT_ASSERT_VALUES_EQUAL(res.size(), limit);
+
+ for (auto [i, elem] : Enumerate(res)) {
+ UNIT_ASSERT_VALUES_EQUAL(elem, i);
+ }
+ }
+
Y_UNIT_TEST(SimpleStopTest) {
const size_t limit = 100;
@@ -71,6 +91,7 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) {
}
UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
+ UNIT_ASSERT_VALUES_EQUAL(queue.Drain().empty(), true);
}
Y_UNIT_TEST(BigPushPop) {
@@ -111,13 +132,18 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) {
TFunctionThread popper2([&] {
UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
});
+ TFunctionThread drainer([&] {
+ UNIT_ASSERT_VALUES_EQUAL(queue.Drain().empty(), true);
+ });
popper1.Start();
popper2.Start();
+ drainer.Start();
queue.Stop();
popper1.Join();
popper2.Join();
+ drainer.Join();
}
Y_UNIT_TEST(StopWhenMultiplePushers) {
@@ -138,6 +164,28 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) {
pusher2.Join();
}
+ Y_UNIT_TEST(WakeUpAllProducers) {
+ NThreading::TBlockingQueue<int> queue(2);
+ queue.Push(1);
+ queue.Push(2);
+
+ TFunctionThread pusher1([&] {
+ UNIT_ASSERT_VALUES_EQUAL(queue.Push(3), true);
+ });
+
+ TFunctionThread pusher2([&] {
+ UNIT_ASSERT_VALUES_EQUAL(queue.Push(4), true);
+ });
+
+ pusher1.Start();
+ pusher2.Start();
+
+ queue.Drain();
+
+ pusher1.Join();
+ pusher2.Join();
+ }
+
Y_UNIT_TEST(InterruptPopByDeadline) {
NThreading::TBlockingQueue<int> queue1(10);
NThreading::TBlockingQueue<int> queue2(10);
@@ -172,6 +220,43 @@ Y_UNIT_TEST_SUITE(BlockingQueueTest) {
popper2.Join();
}
+
+ Y_UNIT_TEST(InterruptDrainByDeadline) {
+ NThreading::TBlockingQueue<int> queue1(10);
+ NThreading::TBlockingQueue<int> queue2(10);
+
+ const auto drainer1DeadLine = TInstant::Now();
+ const auto drainer2DeadLine = TInstant::Now() + TDuration::Seconds(2);
+
+ TFunctionThread drainer1([&] {
+ UNIT_ASSERT_VALUES_EQUAL(queue1.Drain(drainer1DeadLine).empty(), true);
+ UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false);
+ });
+
+ TFunctionThread drainer2([&] {
+ auto res = queue2.Drain(drainer2DeadLine);
+ UNIT_ASSERT_VALUES_EQUAL(res.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(res.front(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false);
+ });
+
+ drainer1.Start();
+ drainer2.Start();
+
+ Sleep(TDuration::Seconds(1));
+
+ queue1.Push(1);
+ queue2.Push(2);
+
+ Sleep(TDuration::Seconds(1));
+
+ queue1.Stop();
+ queue2.Stop();
+
+ drainer1.Join();
+ drainer2.Join();
+ }
+
Y_UNIT_TEST(InterruptPushByDeadline) {
NThreading::TBlockingQueue<int> queue1(1);
NThreading::TBlockingQueue<int> queue2(1);