diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 11:13:34 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 11:13:34 +0300 |
commit | 3e1899838408bbad47622007aa382bc8a2b01f87 (patch) | |
tree | 0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /library/cpp/threading/blocking_queue | |
parent | 5463eb3f5e72a86f858a3d27c886470a724ede34 (diff) | |
download | ydb-3e1899838408bbad47622007aa382bc8a2b01f87.tar.gz |
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing
changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'library/cpp/threading/blocking_queue')
5 files changed, 0 insertions, 388 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.cpp b/library/cpp/threading/blocking_queue/blocking_queue.cpp deleted file mode 100644 index db199c80be..0000000000 --- a/library/cpp/threading/blocking_queue/blocking_queue.cpp +++ /dev/null @@ -1,3 +0,0 @@ -#include "blocking_queue.h" - -// just check compilability diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h deleted file mode 100644 index 48d3762f68..0000000000 --- a/library/cpp/threading/blocking_queue/blocking_queue.h +++ /dev/null @@ -1,158 +0,0 @@ -#pragma once - -#include <util/generic/deque.h> -#include <util/generic/maybe.h> -#include <util/generic/yexception.h> -#include <util/system/condvar.h> -#include <util/system/guard.h> -#include <util/system/mutex.h> - -#include <utility> - -namespace NThreading { - /// - /// TBlockingQueue is a queue of elements of limited or unlimited size. - /// Queue provides Push and Pop operations that block if operation can't be executed - /// (queue is empty or maximum size is reached). - /// - /// Queue can be stopped, in that case all blocked operation will return `Nothing` / false. - /// - /// All operations are thread safe. - /// - /// - /// Example of usage: - /// TBlockingQueue<int> queue; - /// - /// ... - /// - /// // thread 1 - /// queue.Push(42); - /// queue.Push(100500); - /// - /// ... - /// - /// // thread 2 - /// while (TMaybe<int> number = queue.Pop()) { - /// ProcessNumber(number.GetRef()); - /// } - template <class TElement> - class TBlockingQueue { - public: - /// - /// Creates blocking queue with given maxSize - /// if maxSize == 0 then queue is unlimited - TBlockingQueue(size_t maxSize) - : MaxSize(maxSize == 0 ? Max<size_t>() : maxSize) - , Stopped(false) - { - } - - /// - /// Blocks until queue has some elements or queue is stopped or deadline is reached. - /// Returns `Nothing` if queue is stopped or deadline is reached. - /// Returns element otherwise. - TMaybe<TElement> Pop(TInstant deadline = TInstant::Max()) { - TGuard<TMutex> g(Lock); - - const auto canPop = [this]() { return CanPop(); }; - if (!CanPopCV.WaitD(Lock, deadline, canPop)) { - return Nothing(); - } - - if (Stopped && Queue.empty()) { - return Nothing(); - } - TElement e = std::move(Queue.front()); - Queue.pop_front(); - CanPushCV.Signal(); - return std::move(e); - } - - TMaybe<TElement> Pop(TDuration duration) { - return Pop(TInstant::Now() + duration); - } - - /// - /// Blocks until queue has space for new elements or queue is stopped or deadline is reached. - /// Returns false exception if queue is stopped and push failed or deadline is reached. - /// Pushes element to queue and returns true otherwise. - bool Push(const TElement& e, TInstant deadline = TInstant::Max()) { - return PushRef(e, deadline); - } - - bool Push(TElement&& e, TInstant deadline = TInstant::Max()) { - return PushRef(std::move(e), deadline); - } - - bool Push(const TElement& e, TDuration duration) { - return Push(e, TInstant::Now() + duration); - } - - bool Push(TElement&& e, TDuration duration) { - return Push(std::move(e), TInstant::Now() + duration); - } - - /// - /// Stops the queue, all blocked operations will be aborted. - void Stop() { - TGuard<TMutex> g(Lock); - Stopped = true; - CanPopCV.BroadCast(); - CanPushCV.BroadCast(); - } - - /// - /// Checks whether queue is empty. - bool Empty() const { - TGuard<TMutex> g(Lock); - return Queue.empty(); - } - - /// - /// Returns size of the queue. - size_t Size() const { - TGuard<TMutex> g(Lock); - return Queue.size(); - } - - /// - /// Checks whether queue is stopped. - bool IsStopped() const { - TGuard<TMutex> g(Lock); - return Stopped; - } - - private: - bool CanPush() const { - return Queue.size() < MaxSize || Stopped; - } - - bool CanPop() const { - return !Queue.empty() || Stopped; - } - - template <typename Ref> - bool PushRef(Ref e, TInstant deadline) { - TGuard<TMutex> g(Lock); - const auto canPush = [this]() { return CanPush(); }; - if (!CanPushCV.WaitD(Lock, deadline, canPush)) { - return false; - } - if (Stopped) { - return false; - } - Queue.push_back(std::forward<TElement>(e)); - CanPopCV.Signal(); - return true; - } - - private: - TMutex Lock; - TCondVar CanPopCV; - TCondVar CanPushCV; - TDeque<TElement> Queue; - size_t MaxSize; - bool Stopped; - }; - -} diff --git a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp b/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp deleted file mode 100644 index 26e125279d..0000000000 --- a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp +++ /dev/null @@ -1,211 +0,0 @@ -#include "blocking_queue.h" - -#include <library/cpp/testing/unittest/registar.h> - -#include <util/string/builder.h> -#include <util/system/thread.h> - -namespace { - class TFunctionThread: public ISimpleThread { - public: - using TFunc = std::function<void()>; - - private: - TFunc Func; - - public: - TFunctionThread(const TFunc& func) - : Func(func) - { - } - - void* ThreadProc() noexcept override { - Func(); - return nullptr; - } - }; - -} - -IOutputStream& operator<<(IOutputStream& o, const TMaybe<int>& val) { - if (val) { - o << "TMaybe<int>(" << val.GetRef() << ')'; - } else { - o << "TMaybe<int>()"; - } - return o; -} - -Y_UNIT_TEST_SUITE(BlockingQueueTest) { - Y_UNIT_TEST(SimplePushPopTest) { - const size_t limit = 100; - - NThreading::TBlockingQueue<int> queue(100); - - for (int i = 0; i != limit; ++i) { - queue.Push(i); - } - - for (int i = 0; i != limit; ++i) { - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i); - } - - UNIT_ASSERT(queue.Empty()); - } - - Y_UNIT_TEST(SimpleStopTest) { - const size_t limit = 100; - - NThreading::TBlockingQueue<int> queue(100); - - for (int i = 0; i != limit; ++i) { - queue.Push(i); - } - queue.Stop(); - - bool ok = queue.Push(100500); - UNIT_ASSERT_VALUES_EQUAL(ok, false); - - for (int i = 0; i != limit; ++i) { - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i); - } - - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>()); - } - - Y_UNIT_TEST(BigPushPop) { - const int limit = 100000; - - NThreading::TBlockingQueue<int> queue(10); - - TFunctionThread pusher([&] { - for (int i = 0; i != limit; ++i) { - if (!queue.Push(i)) { - break; - } - } - }); - - pusher.Start(); - - try { - for (int i = 0; i != limit; ++i) { - size_t size = queue.Size(); - UNIT_ASSERT_C(size <= 10, (TStringBuilder() << "Size exceeds 10: " << size).data()); - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i); - } - } catch (...) { - // gracefull shutdown of pusher thread if assertion fails - queue.Stop(); - throw; - } - - pusher.Join(); - } - - Y_UNIT_TEST(StopWhenMultiplePoppers) { - NThreading::TBlockingQueue<int> queue(10); - TFunctionThread popper1([&] { - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>()); - }); - TFunctionThread popper2([&] { - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>()); - }); - popper1.Start(); - popper2.Start(); - - queue.Stop(); - - popper1.Join(); - popper2.Join(); - } - - Y_UNIT_TEST(StopWhenMultiplePushers) { - NThreading::TBlockingQueue<int> queue(1); - queue.Push(1); - TFunctionThread pusher1([&] { - UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false); - }); - TFunctionThread pusher2([&] { - UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false); - }); - pusher1.Start(); - pusher2.Start(); - - queue.Stop(); - - pusher1.Join(); - pusher2.Join(); - } - - Y_UNIT_TEST(InterruptPopByDeadline) { - NThreading::TBlockingQueue<int> queue1(10); - NThreading::TBlockingQueue<int> queue2(10); - - const auto popper1DeadLine = TInstant::Now(); - const auto popper2DeadLine = TInstant::Now() + TDuration::Seconds(2); - - TFunctionThread popper1([&] { - UNIT_ASSERT_VALUES_EQUAL(queue1.Pop(popper1DeadLine), TMaybe<int>()); - UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false); - }); - - TFunctionThread popper2([&] { - UNIT_ASSERT_VALUES_EQUAL(queue2.Pop(popper2DeadLine), 2); - UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false); - }); - - popper1.Start(); - popper2.Start(); - - Sleep(TDuration::Seconds(1)); - - queue1.Push(1); - queue2.Push(2); - - Sleep(TDuration::Seconds(1)); - - queue1.Stop(); - queue2.Stop(); - - popper1.Join(); - popper2.Join(); - } - - Y_UNIT_TEST(InterruptPushByDeadline) { - NThreading::TBlockingQueue<int> queue1(1); - NThreading::TBlockingQueue<int> queue2(1); - - queue1.Push(0); - queue2.Push(0); - - const auto pusher1DeadLine = TInstant::Now(); - const auto pusher2DeadLine = TInstant::Now() + TDuration::Seconds(2); - - TFunctionThread pusher1([&] { - UNIT_ASSERT_VALUES_EQUAL(queue1.Push(1, pusher1DeadLine), false); - UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false); - }); - - TFunctionThread pusher2([&] { - UNIT_ASSERT_VALUES_EQUAL(queue2.Push(2, pusher2DeadLine), true); - UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false); - }); - - pusher1.Start(); - pusher2.Start(); - - Sleep(TDuration::Seconds(1)); - - queue1.Pop(); - queue2.Pop(); - - Sleep(TDuration::Seconds(1)); - - queue1.Stop(); - queue2.Stop(); - - pusher1.Join(); - pusher2.Join(); - } -} diff --git a/library/cpp/threading/blocking_queue/ut/ya.make b/library/cpp/threading/blocking_queue/ut/ya.make deleted file mode 100644 index 50f220d552..0000000000 --- a/library/cpp/threading/blocking_queue/ut/ya.make +++ /dev/null @@ -1,7 +0,0 @@ -UNITTEST_FOR(library/cpp/threading/blocking_queue) - -SRCS( - blocking_queue_ut.cpp -) - -END() diff --git a/library/cpp/threading/blocking_queue/ya.make b/library/cpp/threading/blocking_queue/ya.make deleted file mode 100644 index ce1104c1c9..0000000000 --- a/library/cpp/threading/blocking_queue/ya.make +++ /dev/null @@ -1,9 +0,0 @@ -LIBRARY() - -SRCS( - blocking_queue.cpp -) - -END() - -RECURSE_FOR_TESTS(ut) |