diff options
author | kulikov <kulikov@yandex-team.com> | 2023-07-27 12:28:50 +0300 |
---|---|---|
committer | kulikov <kulikov@yandex-team.com> | 2023-07-27 12:28:50 +0300 |
commit | 4a7691c519e6114e013dc1dd0c3b2528154507f9 (patch) | |
tree | af6ee3a210f95bb4532036b1f1a5352ef072068a /library/cpp/threading/equeue/fast.h | |
parent | dcde92436ae71c3fdf1d6b9916e3858f3b35146e (diff) | |
download | ydb-4a7691c519e6114e013dc1dd0c3b2528154507f9.tar.gz |
revert rXXXXXX (see discusstion in pr), will commit again more pci-dss friendly way
Diffstat (limited to 'library/cpp/threading/equeue/fast.h')
-rw-r--r-- | library/cpp/threading/equeue/fast.h | 167 |
1 files changed, 0 insertions, 167 deletions
diff --git a/library/cpp/threading/equeue/fast.h b/library/cpp/threading/equeue/fast.h deleted file mode 100644 index 3f96e279fc9..00000000000 --- a/library/cpp/threading/equeue/fast.h +++ /dev/null @@ -1,167 +0,0 @@ -#pragma once - -#include <util/thread/pool.h> - -#include <util/datetime/base.h> -#include <util/thread/lfqueue.h> -#include <util/system/thread.h> -#include <util/generic/vector.h> -#include <util/generic/scope.h> -#include <util/stream/str.h> - -#include <library/cpp/threading/bounded_queue/bounded_queue.h> - -#if defined(_android_) || defined(_arm_) -//by now library/cpp/yt/threading doesn't compile in targets like default-android-armv7a, fallback to ordinal elastic queue -#include "equeue.h" - class TFastElasticQueue - : public TElasticQueue - { - public: - explicit TFastElasticQueue(const TParams& params = {}) - : TElasticQueue(MakeHolder<TSimpleThreadPool>(params)) - { - } - }; -#else - -#include <library/cpp/yt/threading/event_count.h> - -class TFastElasticQueue - : public TThreadPoolBase - , private IThreadFactory::IThreadAble -{ -public: - explicit TFastElasticQueue(const TParams& params = {}) - : TThreadPoolBase(params) - { - Y_ENSURE(!params.Blocking_); - } - - ~TFastElasticQueue() { - Stop(); - } - - void Start(size_t threadCount, size_t maxQueueSize) override { - Y_ENSURE(Threads_.empty()); - Y_ENSURE(maxQueueSize > 0); - - Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events - MaxQueueSize_ = maxQueueSize; - - try { - for (size_t i = 0; i < threadCount; ++i) { - Threads_.push_back(Pool()->Run(this)); - } - } catch (...) { - Stop(); - throw; - } - - Stopped_ = false; - } - - size_t ObjectCount() const { - //GuardCount_ can be temporary incremented above real object count in queue - return Min(GuardCount_.load(), MaxQueueSize_); - } - - bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT { - if (Stopped_ || !obj) { - return false; - } - - if (GuardCount_.fetch_add(1) >= MaxQueueSize_) { - GuardCount_.fetch_sub(1); - return false; - } - - QueueSize_.fetch_add(1); - - if (!Queue_->Enqueue(obj)) { - //Simultaneous Dequeue calls can return not in exact fifo order of items, - //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of - //the oldest enqueued item is not actually dequeued and ring buffer can't proceed. - GuardCount_.fetch_sub(1); - QueueSize_.fetch_sub(1); - return false; - } - - - Event_.NotifyOne(); - - return true; - } - - size_t Size() const noexcept override { - return QueueSize_.load(); - } - - void Stop() noexcept override { - Stopped_ = true; - - for (size_t i = 0; i < Threads_.size(); ++i) { - while (!Queue_->Enqueue(nullptr)) { - Sleep(TDuration::MilliSeconds(1)); - } - - Event_.NotifyOne(); - } - - while (!Threads_.empty()) { - Threads_.back()->Join(); - Threads_.pop_back(); - } - - Queue_.Reset(); - } - - void DoExecute() override { - TThread::SetCurrentThreadName(Params.ThreadName_.c_str()); - - while (true) { - IObjectInQueue* job = nullptr; - - Event_.Await([&]() { - return Queue_->Dequeue(job); - }); - - if (!job) { - break; - } - - QueueSize_.fetch_sub(1); - - Y_DEFER { - GuardCount_.fetch_sub(1); - }; - - if (Params.Catching_) { - try { - try { - job->Process(nullptr); - } catch (...) { - Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; - } - } catch (...) { - ; - } - } else { - job->Process(nullptr); - } - } - } -private: - std::atomic<bool> Stopped_ = false; - size_t MaxQueueSize_ = 0; - - alignas(64) std::atomic<size_t> GuardCount_ = 0; - alignas(64) std::atomic<size_t> QueueSize_ = 0; - - TVector<THolder<IThreadFactory::IThread>> Threads_; - - THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_; - NYT::NThreading::TEventCount Event_; -}; - -#endif |