aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/blocking_queue
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 11:13:34 +0300
committermax42 <max42@yandex-team.com>2023-06-30 11:13:34 +0300
commit3e1899838408bbad47622007aa382bc8a2b01f87 (patch)
tree0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /library/cpp/threading/blocking_queue
parent5463eb3f5e72a86f858a3d27c886470a724ede34 (diff)
downloadydb-3e1899838408bbad47622007aa382bc8a2b01f87.tar.gz
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'library/cpp/threading/blocking_queue')
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue.cpp3
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue.h158
-rw-r--r--library/cpp/threading/blocking_queue/blocking_queue_ut.cpp211
-rw-r--r--library/cpp/threading/blocking_queue/ut/ya.make7
-rw-r--r--library/cpp/threading/blocking_queue/ya.make9
5 files changed, 0 insertions, 388 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.cpp b/library/cpp/threading/blocking_queue/blocking_queue.cpp
deleted file mode 100644
index db199c80be..0000000000
--- a/library/cpp/threading/blocking_queue/blocking_queue.cpp
+++ /dev/null
@@ -1,3 +0,0 @@
-#include "blocking_queue.h"
-
-// just check compilability
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h
deleted file mode 100644
index 48d3762f68..0000000000
--- a/library/cpp/threading/blocking_queue/blocking_queue.h
+++ /dev/null
@@ -1,158 +0,0 @@
-#pragma once
-
-#include <util/generic/deque.h>
-#include <util/generic/maybe.h>
-#include <util/generic/yexception.h>
-#include <util/system/condvar.h>
-#include <util/system/guard.h>
-#include <util/system/mutex.h>
-
-#include <utility>
-
-namespace NThreading {
- ///
- /// TBlockingQueue is a queue of elements of limited or unlimited size.
- /// Queue provides Push and Pop operations that block if operation can't be executed
- /// (queue is empty or maximum size is reached).
- ///
- /// Queue can be stopped, in that case all blocked operation will return `Nothing` / false.
- ///
- /// All operations are thread safe.
- ///
- ///
- /// Example of usage:
- /// TBlockingQueue<int> queue;
- ///
- /// ...
- ///
- /// // thread 1
- /// queue.Push(42);
- /// queue.Push(100500);
- ///
- /// ...
- ///
- /// // thread 2
- /// while (TMaybe<int> number = queue.Pop()) {
- /// ProcessNumber(number.GetRef());
- /// }
- template <class TElement>
- class TBlockingQueue {
- public:
- ///
- /// Creates blocking queue with given maxSize
- /// if maxSize == 0 then queue is unlimited
- TBlockingQueue(size_t maxSize)
- : MaxSize(maxSize == 0 ? Max<size_t>() : maxSize)
- , Stopped(false)
- {
- }
-
- ///
- /// Blocks until queue has some elements or queue is stopped or deadline is reached.
- /// Returns `Nothing` if queue is stopped or deadline is reached.
- /// Returns element otherwise.
- TMaybe<TElement> Pop(TInstant deadline = TInstant::Max()) {
- TGuard<TMutex> g(Lock);
-
- const auto canPop = [this]() { return CanPop(); };
- if (!CanPopCV.WaitD(Lock, deadline, canPop)) {
- return Nothing();
- }
-
- if (Stopped && Queue.empty()) {
- return Nothing();
- }
- TElement e = std::move(Queue.front());
- Queue.pop_front();
- CanPushCV.Signal();
- return std::move(e);
- }
-
- TMaybe<TElement> Pop(TDuration duration) {
- return Pop(TInstant::Now() + duration);
- }
-
- ///
- /// Blocks until queue has space for new elements or queue is stopped or deadline is reached.
- /// Returns false exception if queue is stopped and push failed or deadline is reached.
- /// Pushes element to queue and returns true otherwise.
- bool Push(const TElement& e, TInstant deadline = TInstant::Max()) {
- return PushRef(e, deadline);
- }
-
- bool Push(TElement&& e, TInstant deadline = TInstant::Max()) {
- return PushRef(std::move(e), deadline);
- }
-
- bool Push(const TElement& e, TDuration duration) {
- return Push(e, TInstant::Now() + duration);
- }
-
- bool Push(TElement&& e, TDuration duration) {
- return Push(std::move(e), TInstant::Now() + duration);
- }
-
- ///
- /// Stops the queue, all blocked operations will be aborted.
- void Stop() {
- TGuard<TMutex> g(Lock);
- Stopped = true;
- CanPopCV.BroadCast();
- CanPushCV.BroadCast();
- }
-
- ///
- /// Checks whether queue is empty.
- bool Empty() const {
- TGuard<TMutex> g(Lock);
- return Queue.empty();
- }
-
- ///
- /// Returns size of the queue.
- size_t Size() const {
- TGuard<TMutex> g(Lock);
- return Queue.size();
- }
-
- ///
- /// Checks whether queue is stopped.
- bool IsStopped() const {
- TGuard<TMutex> g(Lock);
- return Stopped;
- }
-
- private:
- bool CanPush() const {
- return Queue.size() < MaxSize || Stopped;
- }
-
- bool CanPop() const {
- return !Queue.empty() || Stopped;
- }
-
- template <typename Ref>
- bool PushRef(Ref e, TInstant deadline) {
- TGuard<TMutex> g(Lock);
- const auto canPush = [this]() { return CanPush(); };
- if (!CanPushCV.WaitD(Lock, deadline, canPush)) {
- return false;
- }
- if (Stopped) {
- return false;
- }
- Queue.push_back(std::forward<TElement>(e));
- CanPopCV.Signal();
- return true;
- }
-
- private:
- TMutex Lock;
- TCondVar CanPopCV;
- TCondVar CanPushCV;
- TDeque<TElement> Queue;
- size_t MaxSize;
- bool Stopped;
- };
-
-}
diff --git a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp b/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp
deleted file mode 100644
index 26e125279d..0000000000
--- a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp
+++ /dev/null
@@ -1,211 +0,0 @@
-#include "blocking_queue.h"
-
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/string/builder.h>
-#include <util/system/thread.h>
-
-namespace {
- class TFunctionThread: public ISimpleThread {
- public:
- using TFunc = std::function<void()>;
-
- private:
- TFunc Func;
-
- public:
- TFunctionThread(const TFunc& func)
- : Func(func)
- {
- }
-
- void* ThreadProc() noexcept override {
- Func();
- return nullptr;
- }
- };
-
-}
-
-IOutputStream& operator<<(IOutputStream& o, const TMaybe<int>& val) {
- if (val) {
- o << "TMaybe<int>(" << val.GetRef() << ')';
- } else {
- o << "TMaybe<int>()";
- }
- return o;
-}
-
-Y_UNIT_TEST_SUITE(BlockingQueueTest) {
- Y_UNIT_TEST(SimplePushPopTest) {
- const size_t limit = 100;
-
- NThreading::TBlockingQueue<int> queue(100);
-
- for (int i = 0; i != limit; ++i) {
- queue.Push(i);
- }
-
- for (int i = 0; i != limit; ++i) {
- UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
- }
-
- UNIT_ASSERT(queue.Empty());
- }
-
- Y_UNIT_TEST(SimpleStopTest) {
- const size_t limit = 100;
-
- NThreading::TBlockingQueue<int> queue(100);
-
- for (int i = 0; i != limit; ++i) {
- queue.Push(i);
- }
- queue.Stop();
-
- bool ok = queue.Push(100500);
- UNIT_ASSERT_VALUES_EQUAL(ok, false);
-
- for (int i = 0; i != limit; ++i) {
- UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
- }
-
- UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
- }
-
- Y_UNIT_TEST(BigPushPop) {
- const int limit = 100000;
-
- NThreading::TBlockingQueue<int> queue(10);
-
- TFunctionThread pusher([&] {
- for (int i = 0; i != limit; ++i) {
- if (!queue.Push(i)) {
- break;
- }
- }
- });
-
- pusher.Start();
-
- try {
- for (int i = 0; i != limit; ++i) {
- size_t size = queue.Size();
- UNIT_ASSERT_C(size <= 10, (TStringBuilder() << "Size exceeds 10: " << size).data());
- UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i);
- }
- } catch (...) {
- // gracefull shutdown of pusher thread if assertion fails
- queue.Stop();
- throw;
- }
-
- pusher.Join();
- }
-
- Y_UNIT_TEST(StopWhenMultiplePoppers) {
- NThreading::TBlockingQueue<int> queue(10);
- TFunctionThread popper1([&] {
- UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
- });
- TFunctionThread popper2([&] {
- UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>());
- });
- popper1.Start();
- popper2.Start();
-
- queue.Stop();
-
- popper1.Join();
- popper2.Join();
- }
-
- Y_UNIT_TEST(StopWhenMultiplePushers) {
- NThreading::TBlockingQueue<int> queue(1);
- queue.Push(1);
- TFunctionThread pusher1([&] {
- UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false);
- });
- TFunctionThread pusher2([&] {
- UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false);
- });
- pusher1.Start();
- pusher2.Start();
-
- queue.Stop();
-
- pusher1.Join();
- pusher2.Join();
- }
-
- Y_UNIT_TEST(InterruptPopByDeadline) {
- NThreading::TBlockingQueue<int> queue1(10);
- NThreading::TBlockingQueue<int> queue2(10);
-
- const auto popper1DeadLine = TInstant::Now();
- const auto popper2DeadLine = TInstant::Now() + TDuration::Seconds(2);
-
- TFunctionThread popper1([&] {
- UNIT_ASSERT_VALUES_EQUAL(queue1.Pop(popper1DeadLine), TMaybe<int>());
- UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false);
- });
-
- TFunctionThread popper2([&] {
- UNIT_ASSERT_VALUES_EQUAL(queue2.Pop(popper2DeadLine), 2);
- UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false);
- });
-
- popper1.Start();
- popper2.Start();
-
- Sleep(TDuration::Seconds(1));
-
- queue1.Push(1);
- queue2.Push(2);
-
- Sleep(TDuration::Seconds(1));
-
- queue1.Stop();
- queue2.Stop();
-
- popper1.Join();
- popper2.Join();
- }
-
- Y_UNIT_TEST(InterruptPushByDeadline) {
- NThreading::TBlockingQueue<int> queue1(1);
- NThreading::TBlockingQueue<int> queue2(1);
-
- queue1.Push(0);
- queue2.Push(0);
-
- const auto pusher1DeadLine = TInstant::Now();
- const auto pusher2DeadLine = TInstant::Now() + TDuration::Seconds(2);
-
- TFunctionThread pusher1([&] {
- UNIT_ASSERT_VALUES_EQUAL(queue1.Push(1, pusher1DeadLine), false);
- UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false);
- });
-
- TFunctionThread pusher2([&] {
- UNIT_ASSERT_VALUES_EQUAL(queue2.Push(2, pusher2DeadLine), true);
- UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false);
- });
-
- pusher1.Start();
- pusher2.Start();
-
- Sleep(TDuration::Seconds(1));
-
- queue1.Pop();
- queue2.Pop();
-
- Sleep(TDuration::Seconds(1));
-
- queue1.Stop();
- queue2.Stop();
-
- pusher1.Join();
- pusher2.Join();
- }
-}
diff --git a/library/cpp/threading/blocking_queue/ut/ya.make b/library/cpp/threading/blocking_queue/ut/ya.make
deleted file mode 100644
index 50f220d552..0000000000
--- a/library/cpp/threading/blocking_queue/ut/ya.make
+++ /dev/null
@@ -1,7 +0,0 @@
-UNITTEST_FOR(library/cpp/threading/blocking_queue)
-
-SRCS(
- blocking_queue_ut.cpp
-)
-
-END()
diff --git a/library/cpp/threading/blocking_queue/ya.make b/library/cpp/threading/blocking_queue/ya.make
deleted file mode 100644
index ce1104c1c9..0000000000
--- a/library/cpp/threading/blocking_queue/ya.make
+++ /dev/null
@@ -1,9 +0,0 @@
-LIBRARY()
-
-SRCS(
- blocking_queue.cpp
-)
-
-END()
-
-RECURSE_FOR_TESTS(ut)