diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 11:13:34 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 11:13:34 +0300 |
commit | 3e1899838408bbad47622007aa382bc8a2b01f87 (patch) | |
tree | 0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /library/cpp/threading | |
parent | 5463eb3f5e72a86f858a3d27c886470a724ede34 (diff) | |
download | ydb-3e1899838408bbad47622007aa382bc8a2b01f87.tar.gz |
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing
changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'library/cpp/threading')
-rw-r--r-- | library/cpp/threading/blocking_queue/blocking_queue.cpp | 3 | ||||
-rw-r--r-- | library/cpp/threading/blocking_queue/blocking_queue.h | 158 | ||||
-rw-r--r-- | library/cpp/threading/blocking_queue/blocking_queue_ut.cpp | 211 | ||||
-rw-r--r-- | library/cpp/threading/blocking_queue/ut/ya.make | 7 | ||||
-rw-r--r-- | library/cpp/threading/blocking_queue/ya.make | 9 | ||||
-rw-r--r-- | library/cpp/threading/cron/cron.cpp | 69 | ||||
-rw-r--r-- | library/cpp/threading/cron/cron.h | 18 | ||||
-rw-r--r-- | library/cpp/threading/cron/ya.make | 11 |
8 files changed, 0 insertions, 486 deletions
diff --git a/library/cpp/threading/blocking_queue/blocking_queue.cpp b/library/cpp/threading/blocking_queue/blocking_queue.cpp deleted file mode 100644 index db199c80be5..00000000000 --- a/library/cpp/threading/blocking_queue/blocking_queue.cpp +++ /dev/null @@ -1,3 +0,0 @@ -#include "blocking_queue.h" - -// just check compilability diff --git a/library/cpp/threading/blocking_queue/blocking_queue.h b/library/cpp/threading/blocking_queue/blocking_queue.h deleted file mode 100644 index 48d3762f68a..00000000000 --- a/library/cpp/threading/blocking_queue/blocking_queue.h +++ /dev/null @@ -1,158 +0,0 @@ -#pragma once - -#include <util/generic/deque.h> -#include <util/generic/maybe.h> -#include <util/generic/yexception.h> -#include <util/system/condvar.h> -#include <util/system/guard.h> -#include <util/system/mutex.h> - -#include <utility> - -namespace NThreading { - /// - /// TBlockingQueue is a queue of elements of limited or unlimited size. - /// Queue provides Push and Pop operations that block if operation can't be executed - /// (queue is empty or maximum size is reached). - /// - /// Queue can be stopped, in that case all blocked operation will return `Nothing` / false. - /// - /// All operations are thread safe. - /// - /// - /// Example of usage: - /// TBlockingQueue<int> queue; - /// - /// ... - /// - /// // thread 1 - /// queue.Push(42); - /// queue.Push(100500); - /// - /// ... - /// - /// // thread 2 - /// while (TMaybe<int> number = queue.Pop()) { - /// ProcessNumber(number.GetRef()); - /// } - template <class TElement> - class TBlockingQueue { - public: - /// - /// Creates blocking queue with given maxSize - /// if maxSize == 0 then queue is unlimited - TBlockingQueue(size_t maxSize) - : MaxSize(maxSize == 0 ? Max<size_t>() : maxSize) - , Stopped(false) - { - } - - /// - /// Blocks until queue has some elements or queue is stopped or deadline is reached. - /// Returns `Nothing` if queue is stopped or deadline is reached. - /// Returns element otherwise. - TMaybe<TElement> Pop(TInstant deadline = TInstant::Max()) { - TGuard<TMutex> g(Lock); - - const auto canPop = [this]() { return CanPop(); }; - if (!CanPopCV.WaitD(Lock, deadline, canPop)) { - return Nothing(); - } - - if (Stopped && Queue.empty()) { - return Nothing(); - } - TElement e = std::move(Queue.front()); - Queue.pop_front(); - CanPushCV.Signal(); - return std::move(e); - } - - TMaybe<TElement> Pop(TDuration duration) { - return Pop(TInstant::Now() + duration); - } - - /// - /// Blocks until queue has space for new elements or queue is stopped or deadline is reached. - /// Returns false exception if queue is stopped and push failed or deadline is reached. - /// Pushes element to queue and returns true otherwise. - bool Push(const TElement& e, TInstant deadline = TInstant::Max()) { - return PushRef(e, deadline); - } - - bool Push(TElement&& e, TInstant deadline = TInstant::Max()) { - return PushRef(std::move(e), deadline); - } - - bool Push(const TElement& e, TDuration duration) { - return Push(e, TInstant::Now() + duration); - } - - bool Push(TElement&& e, TDuration duration) { - return Push(std::move(e), TInstant::Now() + duration); - } - - /// - /// Stops the queue, all blocked operations will be aborted. - void Stop() { - TGuard<TMutex> g(Lock); - Stopped = true; - CanPopCV.BroadCast(); - CanPushCV.BroadCast(); - } - - /// - /// Checks whether queue is empty. - bool Empty() const { - TGuard<TMutex> g(Lock); - return Queue.empty(); - } - - /// - /// Returns size of the queue. - size_t Size() const { - TGuard<TMutex> g(Lock); - return Queue.size(); - } - - /// - /// Checks whether queue is stopped. - bool IsStopped() const { - TGuard<TMutex> g(Lock); - return Stopped; - } - - private: - bool CanPush() const { - return Queue.size() < MaxSize || Stopped; - } - - bool CanPop() const { - return !Queue.empty() || Stopped; - } - - template <typename Ref> - bool PushRef(Ref e, TInstant deadline) { - TGuard<TMutex> g(Lock); - const auto canPush = [this]() { return CanPush(); }; - if (!CanPushCV.WaitD(Lock, deadline, canPush)) { - return false; - } - if (Stopped) { - return false; - } - Queue.push_back(std::forward<TElement>(e)); - CanPopCV.Signal(); - return true; - } - - private: - TMutex Lock; - TCondVar CanPopCV; - TCondVar CanPushCV; - TDeque<TElement> Queue; - size_t MaxSize; - bool Stopped; - }; - -} diff --git a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp b/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp deleted file mode 100644 index 26e125279d2..00000000000 --- a/library/cpp/threading/blocking_queue/blocking_queue_ut.cpp +++ /dev/null @@ -1,211 +0,0 @@ -#include "blocking_queue.h" - -#include <library/cpp/testing/unittest/registar.h> - -#include <util/string/builder.h> -#include <util/system/thread.h> - -namespace { - class TFunctionThread: public ISimpleThread { - public: - using TFunc = std::function<void()>; - - private: - TFunc Func; - - public: - TFunctionThread(const TFunc& func) - : Func(func) - { - } - - void* ThreadProc() noexcept override { - Func(); - return nullptr; - } - }; - -} - -IOutputStream& operator<<(IOutputStream& o, const TMaybe<int>& val) { - if (val) { - o << "TMaybe<int>(" << val.GetRef() << ')'; - } else { - o << "TMaybe<int>()"; - } - return o; -} - -Y_UNIT_TEST_SUITE(BlockingQueueTest) { - Y_UNIT_TEST(SimplePushPopTest) { - const size_t limit = 100; - - NThreading::TBlockingQueue<int> queue(100); - - for (int i = 0; i != limit; ++i) { - queue.Push(i); - } - - for (int i = 0; i != limit; ++i) { - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i); - } - - UNIT_ASSERT(queue.Empty()); - } - - Y_UNIT_TEST(SimpleStopTest) { - const size_t limit = 100; - - NThreading::TBlockingQueue<int> queue(100); - - for (int i = 0; i != limit; ++i) { - queue.Push(i); - } - queue.Stop(); - - bool ok = queue.Push(100500); - UNIT_ASSERT_VALUES_EQUAL(ok, false); - - for (int i = 0; i != limit; ++i) { - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i); - } - - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>()); - } - - Y_UNIT_TEST(BigPushPop) { - const int limit = 100000; - - NThreading::TBlockingQueue<int> queue(10); - - TFunctionThread pusher([&] { - for (int i = 0; i != limit; ++i) { - if (!queue.Push(i)) { - break; - } - } - }); - - pusher.Start(); - - try { - for (int i = 0; i != limit; ++i) { - size_t size = queue.Size(); - UNIT_ASSERT_C(size <= 10, (TStringBuilder() << "Size exceeds 10: " << size).data()); - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), i); - } - } catch (...) { - // gracefull shutdown of pusher thread if assertion fails - queue.Stop(); - throw; - } - - pusher.Join(); - } - - Y_UNIT_TEST(StopWhenMultiplePoppers) { - NThreading::TBlockingQueue<int> queue(10); - TFunctionThread popper1([&] { - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>()); - }); - TFunctionThread popper2([&] { - UNIT_ASSERT_VALUES_EQUAL(queue.Pop(), TMaybe<int>()); - }); - popper1.Start(); - popper2.Start(); - - queue.Stop(); - - popper1.Join(); - popper2.Join(); - } - - Y_UNIT_TEST(StopWhenMultiplePushers) { - NThreading::TBlockingQueue<int> queue(1); - queue.Push(1); - TFunctionThread pusher1([&] { - UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false); - }); - TFunctionThread pusher2([&] { - UNIT_ASSERT_VALUES_EQUAL(queue.Push(2), false); - }); - pusher1.Start(); - pusher2.Start(); - - queue.Stop(); - - pusher1.Join(); - pusher2.Join(); - } - - Y_UNIT_TEST(InterruptPopByDeadline) { - NThreading::TBlockingQueue<int> queue1(10); - NThreading::TBlockingQueue<int> queue2(10); - - const auto popper1DeadLine = TInstant::Now(); - const auto popper2DeadLine = TInstant::Now() + TDuration::Seconds(2); - - TFunctionThread popper1([&] { - UNIT_ASSERT_VALUES_EQUAL(queue1.Pop(popper1DeadLine), TMaybe<int>()); - UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false); - }); - - TFunctionThread popper2([&] { - UNIT_ASSERT_VALUES_EQUAL(queue2.Pop(popper2DeadLine), 2); - UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false); - }); - - popper1.Start(); - popper2.Start(); - - Sleep(TDuration::Seconds(1)); - - queue1.Push(1); - queue2.Push(2); - - Sleep(TDuration::Seconds(1)); - - queue1.Stop(); - queue2.Stop(); - - popper1.Join(); - popper2.Join(); - } - - Y_UNIT_TEST(InterruptPushByDeadline) { - NThreading::TBlockingQueue<int> queue1(1); - NThreading::TBlockingQueue<int> queue2(1); - - queue1.Push(0); - queue2.Push(0); - - const auto pusher1DeadLine = TInstant::Now(); - const auto pusher2DeadLine = TInstant::Now() + TDuration::Seconds(2); - - TFunctionThread pusher1([&] { - UNIT_ASSERT_VALUES_EQUAL(queue1.Push(1, pusher1DeadLine), false); - UNIT_ASSERT_VALUES_EQUAL(queue1.IsStopped(), false); - }); - - TFunctionThread pusher2([&] { - UNIT_ASSERT_VALUES_EQUAL(queue2.Push(2, pusher2DeadLine), true); - UNIT_ASSERT_VALUES_EQUAL(queue2.IsStopped(), false); - }); - - pusher1.Start(); - pusher2.Start(); - - Sleep(TDuration::Seconds(1)); - - queue1.Pop(); - queue2.Pop(); - - Sleep(TDuration::Seconds(1)); - - queue1.Stop(); - queue2.Stop(); - - pusher1.Join(); - pusher2.Join(); - } -} diff --git a/library/cpp/threading/blocking_queue/ut/ya.make b/library/cpp/threading/blocking_queue/ut/ya.make deleted file mode 100644 index 50f220d5526..00000000000 --- a/library/cpp/threading/blocking_queue/ut/ya.make +++ /dev/null @@ -1,7 +0,0 @@ -UNITTEST_FOR(library/cpp/threading/blocking_queue) - -SRCS( - blocking_queue_ut.cpp -) - -END() diff --git a/library/cpp/threading/blocking_queue/ya.make b/library/cpp/threading/blocking_queue/ya.make deleted file mode 100644 index ce1104c1c99..00000000000 --- a/library/cpp/threading/blocking_queue/ya.make +++ /dev/null @@ -1,9 +0,0 @@ -LIBRARY() - -SRCS( - blocking_queue.cpp -) - -END() - -RECURSE_FOR_TESTS(ut) diff --git a/library/cpp/threading/cron/cron.cpp b/library/cpp/threading/cron/cron.cpp deleted file mode 100644 index e7c1c59735b..00000000000 --- a/library/cpp/threading/cron/cron.cpp +++ /dev/null @@ -1,69 +0,0 @@ -#include "cron.h" - -#include <library/cpp/deprecated/atomic/atomic.h> - -#include <util/system/thread.h> -#include <util/system/event.h> - -using namespace NCron; - -namespace { - struct TPeriodicHandle: public IHandle { - inline TPeriodicHandle(TJob job, TDuration interval, const TString& threadName) - : Job(job) - , Interval(interval) - , Done(false) - { - TThread::TParams params(DoRun, this); - if (!threadName.empty()) { - params.SetName(threadName); - } - Thread = MakeHolder<TThread>(params); - Thread->Start(); - } - - static inline void* DoRun(void* data) noexcept { - ((TPeriodicHandle*)data)->Run(); - - return nullptr; - } - - inline void Run() noexcept { - while (true) { - Job(); - - Event.WaitT(Interval); - - if (AtomicGet(Done)) { - return; - } - } - } - - ~TPeriodicHandle() override { - AtomicSet(Done, true); - Event.Signal(); - Thread->Join(); - } - - TJob Job; - TDuration Interval; - TManualEvent Event; - TAtomic Done; - THolder<TThread> Thread; - }; -} - -IHandlePtr NCron::StartPeriodicJob(TJob job) { - return NCron::StartPeriodicJob(job, TDuration::Seconds(0), ""); -} - -IHandlePtr NCron::StartPeriodicJob(TJob job, TDuration interval) { - return NCron::StartPeriodicJob(job, interval, ""); -} - -IHandlePtr NCron::StartPeriodicJob(TJob job, TDuration interval, const TString& threadName) { - return new TPeriodicHandle(job, interval, threadName); -} - -IHandle::~IHandle() = default; diff --git a/library/cpp/threading/cron/cron.h b/library/cpp/threading/cron/cron.h deleted file mode 100644 index 77fa40c5e20..00000000000 --- a/library/cpp/threading/cron/cron.h +++ /dev/null @@ -1,18 +0,0 @@ -#pragma once - -#include <util/generic/ptr.h> -#include <util/generic/function.h> -#include <util/datetime/base.h> - -namespace NCron { - struct IHandle { - virtual ~IHandle(); - }; - - using TJob = std::function<void()>; - using IHandlePtr = TAutoPtr<IHandle>; - - IHandlePtr StartPeriodicJob(TJob job); - IHandlePtr StartPeriodicJob(TJob job, TDuration interval); - IHandlePtr StartPeriodicJob(TJob job, TDuration interval, const TString& threadName); -} diff --git a/library/cpp/threading/cron/ya.make b/library/cpp/threading/cron/ya.make deleted file mode 100644 index ead272e8373..00000000000 --- a/library/cpp/threading/cron/ya.make +++ /dev/null @@ -1,11 +0,0 @@ -LIBRARY() - -SRCS( - cron.cpp -) - -PEERDIR( - library/cpp/deprecated/atomic -) - -END() |