diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-07-28 06:50:19 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-07-28 06:50:19 +0300 |
commit | 85ef4ee49c3edbb700d0ef903d01177bf9984018 (patch) | |
tree | 37825f0e393cbad3b58f4c209082871753c6e02d /library/cpp/threading/equeue/fast | |
parent | 5ea97cfd8a8f61d96636778ed64de3cb003e1589 (diff) | |
download | ydb-85ef4ee49c3edbb700d0ef903d01177bf9984018.tar.gz |
Intermediate changes
Diffstat (limited to 'library/cpp/threading/equeue/fast')
-rw-r--r-- | library/cpp/threading/equeue/fast/equeue.h | 151 | ||||
-rw-r--r-- | library/cpp/threading/equeue/fast/ya.make | 12 |
2 files changed, 163 insertions, 0 deletions
diff --git a/library/cpp/threading/equeue/fast/equeue.h b/library/cpp/threading/equeue/fast/equeue.h new file mode 100644 index 0000000000..0a3ba47184 --- /dev/null +++ b/library/cpp/threading/equeue/fast/equeue.h @@ -0,0 +1,151 @@ +#pragma once + +#include <util/thread/pool.h> + +#include <util/datetime/base.h> +#include <util/thread/lfqueue.h> +#include <util/system/thread.h> +#include <util/generic/vector.h> +#include <util/generic/scope.h> +#include <util/stream/str.h> + +#include <library/cpp/threading/bounded_queue/bounded_queue.h> +#include <library/cpp/yt/threading/event_count.h> + +class TFastElasticQueue + : public TThreadPoolBase + , private IThreadFactory::IThreadAble +{ +public: + explicit TFastElasticQueue(const TParams& params = {}) + : TThreadPoolBase(params) + { + Y_ENSURE(!params.Blocking_); + } + + ~TFastElasticQueue() { + Stop(); + } + + void Start(size_t threadCount, size_t maxQueueSize) override { + Y_ENSURE(Threads_.empty()); + Y_ENSURE(maxQueueSize > 0); + + Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events + MaxQueueSize_ = maxQueueSize; + + try { + for (size_t i = 0; i < threadCount; ++i) { + Threads_.push_back(Pool()->Run(this)); + } + } catch (...) { + Stop(); + throw; + } + + Stopped_ = false; + } + + size_t ObjectCount() const { + //GuardCount_ can be temporary incremented above real object count in queue + return Min(GuardCount_.load(), MaxQueueSize_); + } + + bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT { + if (Stopped_ || !obj) { + return false; + } + + if (GuardCount_.fetch_add(1) >= MaxQueueSize_) { + GuardCount_.fetch_sub(1); + return false; + } + + QueueSize_.fetch_add(1); + + if (!Queue_->Enqueue(obj)) { + //Simultaneous Dequeue calls can return not in exact fifo order of items, + //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of + //the oldest enqueued item is not actually dequeued and ring buffer can't proceed. + GuardCount_.fetch_sub(1); + QueueSize_.fetch_sub(1); + return false; + } + + + Event_.NotifyOne(); + + return true; + } + + size_t Size() const noexcept override { + return QueueSize_.load(); + } + + void Stop() noexcept override { + Stopped_ = true; + + for (size_t i = 0; i < Threads_.size(); ++i) { + while (!Queue_->Enqueue(nullptr)) { + Sleep(TDuration::MilliSeconds(1)); + } + + Event_.NotifyOne(); + } + + while (!Threads_.empty()) { + Threads_.back()->Join(); + Threads_.pop_back(); + } + + Queue_.Reset(); + } + + void DoExecute() override { + TThread::SetCurrentThreadName(Params.ThreadName_.c_str()); + + while (true) { + IObjectInQueue* job = nullptr; + + Event_.Await([&]() { + return Queue_->Dequeue(job); + }); + + if (!job) { + break; + } + + QueueSize_.fetch_sub(1); + + Y_DEFER { + GuardCount_.fetch_sub(1); + }; + + if (Params.Catching_) { + try { + try { + job->Process(nullptr); + } catch (...) { + Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; + } + } catch (...) { + ; + } + } else { + job->Process(nullptr); + } + } + } +private: + std::atomic<bool> Stopped_ = false; + size_t MaxQueueSize_ = 0; + + alignas(64) std::atomic<size_t> GuardCount_ = 0; + alignas(64) std::atomic<size_t> QueueSize_ = 0; + + TVector<THolder<IThreadFactory::IThread>> Threads_; + + THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_; + NYT::NThreading::TEventCount Event_; +}; + diff --git a/library/cpp/threading/equeue/fast/ya.make b/library/cpp/threading/equeue/fast/ya.make new file mode 100644 index 0000000000..4a93a6b5ba --- /dev/null +++ b/library/cpp/threading/equeue/fast/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + equeue.h +) + +PEERDIR( + library/cpp/threading/bounded_queue + library/cpp/yt/threading +) + +END() |