diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-07-28 06:50:19 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-07-28 06:50:19 +0300 |
commit | 85ef4ee49c3edbb700d0ef903d01177bf9984018 (patch) | |
tree | 37825f0e393cbad3b58f4c209082871753c6e02d /library/cpp/threading/equeue | |
parent | 5ea97cfd8a8f61d96636778ed64de3cb003e1589 (diff) | |
download | ydb-85ef4ee49c3edbb700d0ef903d01177bf9984018.tar.gz |
Intermediate changes
Diffstat (limited to 'library/cpp/threading/equeue')
-rw-r--r-- | library/cpp/threading/equeue/equeue_ut.cpp | 79 | ||||
-rw-r--r-- | library/cpp/threading/equeue/fast/equeue.h | 151 | ||||
-rw-r--r-- | library/cpp/threading/equeue/fast/ya.make | 12 | ||||
-rw-r--r-- | library/cpp/threading/equeue/ut/ya.make | 1 |
4 files changed, 223 insertions, 20 deletions
diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp index 8557f63ac0..47b1029a2f 100644 --- a/library/cpp/threading/equeue/equeue_ut.cpp +++ b/library/cpp/threading/equeue/equeue_ut.cpp @@ -1,4 +1,5 @@ #include "equeue.h" +#include <library/cpp/threading/equeue/fast/equeue.h> #include <library/cpp/testing/unittest/registar.h> @@ -9,18 +10,33 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { const size_t MaxQueueSize = 20; const size_t ThreadCount = 10; - const size_t N = 100000; - static THolder<TElasticQueue> Queue; + template <typename T> + THolder<T> MakeQueue(); - struct TQueueSetup { - TQueueSetup() { - Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>())); - Queue->Start(ThreadCount, MaxQueueSize); - } - ~TQueueSetup() { - Queue->Stop(); - } + template <> + THolder<TElasticQueue> MakeQueue() { + return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>()); + } + + template <> + THolder<TFastElasticQueue> MakeQueue() { + return MakeHolder<TFastElasticQueue>(); + } + + template <typename T> + struct TEnv { + static inline THolder<T> Queue; + + struct TQueueSetup { + TQueueSetup() { + Queue.Reset(MakeQueue<T>()); + Queue->Start(ThreadCount, MaxQueueSize); + } + ~TQueueSetup() { + Queue->Stop(); + } + }; }; struct TCounters { @@ -37,7 +53,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { //fill test -- fill queue with "endless" jobs TSystemEvent WaitEvent; - Y_UNIT_TEST(FillTest) { + + template <typename T> + void FillTest() { Counters.Reset(); struct TWaitJob: public IObjectInQueue { @@ -47,7 +65,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } } job; - struct TLocalSetup: TQueueSetup { + struct TLocalSetup: TEnv<T>::TQueueSetup { + TLocalSetup() { + WaitEvent.Reset(); + } ~TLocalSetup() { WaitEvent.Signal(); } @@ -56,19 +77,26 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { size_t enqueued = 0; { TLocalSetup setup; - while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) { + while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) { ++enqueued; } UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize); - UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount()); } - UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount()); - UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size()); + UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size()); UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued); } + Y_UNIT_TEST(FillTest) { + FillTest<TElasticQueue>(); + } + + Y_UNIT_TEST(FillTestFast) { + FillTest<TFastElasticQueue>(); + } //concurrent test -- send many jobs from different threads struct TJob: public IObjectInQueue { @@ -78,9 +106,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { }; static TJob Job; + template <typename T> static bool TryAdd() { AtomicIncrement(Counters.Total); - if (Queue->Add(&Job)) { + if (TEnv<T>::Queue->Add(&Job)) { AtomicIncrement(Counters.Scheduled); return true; } else { @@ -89,16 +118,18 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } } + const size_t N = 100000; static size_t TryCounter; - Y_UNIT_TEST(ConcurrentTest) { + template <typename T> + void ConcurrentTest() { Counters.Reset(); TryCounter = 0; struct TSender: public IThreadFactory::IThreadAble { void DoExecute() override { while ((size_t)AtomicIncrement(TryCounter) <= N) { - if (!TryAdd()) { + if (!TryAdd<T>()) { Sleep(TDuration::MicroSeconds(50)); } } @@ -106,7 +137,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } sender; { - TQueueSetup setup; + typename TEnv<T>::TQueueSetup setup; TVector< TAutoPtr<IThreadFactory::IThread> > senders; for (size_t i = 0; i < ThreadCount; ++i) { @@ -122,4 +153,12 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled); UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded); } + + Y_UNIT_TEST(ConcurrentTest) { + ConcurrentTest<TElasticQueue>(); + } + + Y_UNIT_TEST(ConcurrentTestFast) { + ConcurrentTest<TFastElasticQueue>(); + } } diff --git a/library/cpp/threading/equeue/fast/equeue.h b/library/cpp/threading/equeue/fast/equeue.h new file mode 100644 index 0000000000..0a3ba47184 --- /dev/null +++ b/library/cpp/threading/equeue/fast/equeue.h @@ -0,0 +1,151 @@ +#pragma once + +#include <util/thread/pool.h> + +#include <util/datetime/base.h> +#include <util/thread/lfqueue.h> +#include <util/system/thread.h> +#include <util/generic/vector.h> +#include <util/generic/scope.h> +#include <util/stream/str.h> + +#include <library/cpp/threading/bounded_queue/bounded_queue.h> +#include <library/cpp/yt/threading/event_count.h> + +class TFastElasticQueue + : public TThreadPoolBase + , private IThreadFactory::IThreadAble +{ +public: + explicit TFastElasticQueue(const TParams& params = {}) + : TThreadPoolBase(params) + { + Y_ENSURE(!params.Blocking_); + } + + ~TFastElasticQueue() { + Stop(); + } + + void Start(size_t threadCount, size_t maxQueueSize) override { + Y_ENSURE(Threads_.empty()); + Y_ENSURE(maxQueueSize > 0); + + Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events + MaxQueueSize_ = maxQueueSize; + + try { + for (size_t i = 0; i < threadCount; ++i) { + Threads_.push_back(Pool()->Run(this)); + } + } catch (...) { + Stop(); + throw; + } + + Stopped_ = false; + } + + size_t ObjectCount() const { + //GuardCount_ can be temporary incremented above real object count in queue + return Min(GuardCount_.load(), MaxQueueSize_); + } + + bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT { + if (Stopped_ || !obj) { + return false; + } + + if (GuardCount_.fetch_add(1) >= MaxQueueSize_) { + GuardCount_.fetch_sub(1); + return false; + } + + QueueSize_.fetch_add(1); + + if (!Queue_->Enqueue(obj)) { + //Simultaneous Dequeue calls can return not in exact fifo order of items, + //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of + //the oldest enqueued item is not actually dequeued and ring buffer can't proceed. + GuardCount_.fetch_sub(1); + QueueSize_.fetch_sub(1); + return false; + } + + + Event_.NotifyOne(); + + return true; + } + + size_t Size() const noexcept override { + return QueueSize_.load(); + } + + void Stop() noexcept override { + Stopped_ = true; + + for (size_t i = 0; i < Threads_.size(); ++i) { + while (!Queue_->Enqueue(nullptr)) { + Sleep(TDuration::MilliSeconds(1)); + } + + Event_.NotifyOne(); + } + + while (!Threads_.empty()) { + Threads_.back()->Join(); + Threads_.pop_back(); + } + + Queue_.Reset(); + } + + void DoExecute() override { + TThread::SetCurrentThreadName(Params.ThreadName_.c_str()); + + while (true) { + IObjectInQueue* job = nullptr; + + Event_.Await([&]() { + return Queue_->Dequeue(job); + }); + + if (!job) { + break; + } + + QueueSize_.fetch_sub(1); + + Y_DEFER { + GuardCount_.fetch_sub(1); + }; + + if (Params.Catching_) { + try { + try { + job->Process(nullptr); + } catch (...) { + Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; + } + } catch (...) { + ; + } + } else { + job->Process(nullptr); + } + } + } +private: + std::atomic<bool> Stopped_ = false; + size_t MaxQueueSize_ = 0; + + alignas(64) std::atomic<size_t> GuardCount_ = 0; + alignas(64) std::atomic<size_t> QueueSize_ = 0; + + TVector<THolder<IThreadFactory::IThread>> Threads_; + + THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_; + NYT::NThreading::TEventCount Event_; +}; + diff --git a/library/cpp/threading/equeue/fast/ya.make b/library/cpp/threading/equeue/fast/ya.make new file mode 100644 index 0000000000..4a93a6b5ba --- /dev/null +++ b/library/cpp/threading/equeue/fast/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + equeue.h +) + +PEERDIR( + library/cpp/threading/bounded_queue + library/cpp/yt/threading +) + +END() diff --git a/library/cpp/threading/equeue/ut/ya.make b/library/cpp/threading/equeue/ut/ya.make index 60bc0aa604..2cfcb4ae00 100644 --- a/library/cpp/threading/equeue/ut/ya.make +++ b/library/cpp/threading/equeue/ut/ya.make @@ -2,6 +2,7 @@ UNITTEST() PEERDIR( ADDINCL library/cpp/threading/equeue + library/cpp/threading/equeue/fast ) SRCDIR(library/cpp/threading/equeue) |