aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/equeue/fast.h
diff options
context:
space:
mode:
authorkulikov <kulikov@yandex-team.com>2023-07-27 12:28:50 +0300
committerkulikov <kulikov@yandex-team.com>2023-07-27 12:28:50 +0300
commit4a7691c519e6114e013dc1dd0c3b2528154507f9 (patch)
treeaf6ee3a210f95bb4532036b1f1a5352ef072068a /library/cpp/threading/equeue/fast.h
parentdcde92436ae71c3fdf1d6b9916e3858f3b35146e (diff)
downloadydb-4a7691c519e6114e013dc1dd0c3b2528154507f9.tar.gz
revert rXXXXXX (see discusstion in pr), will commit again more pci-dss friendly way
Diffstat (limited to 'library/cpp/threading/equeue/fast.h')
-rw-r--r--library/cpp/threading/equeue/fast.h167
1 files changed, 0 insertions, 167 deletions
diff --git a/library/cpp/threading/equeue/fast.h b/library/cpp/threading/equeue/fast.h
deleted file mode 100644
index 3f96e279fc9..00000000000
--- a/library/cpp/threading/equeue/fast.h
+++ /dev/null
@@ -1,167 +0,0 @@
-#pragma once
-
-#include <util/thread/pool.h>
-
-#include <util/datetime/base.h>
-#include <util/thread/lfqueue.h>
-#include <util/system/thread.h>
-#include <util/generic/vector.h>
-#include <util/generic/scope.h>
-#include <util/stream/str.h>
-
-#include <library/cpp/threading/bounded_queue/bounded_queue.h>
-
-#if defined(_android_) || defined(_arm_)
-//by now library/cpp/yt/threading doesn't compile in targets like default-android-armv7a, fallback to ordinal elastic queue
-#include "equeue.h"
- class TFastElasticQueue
- : public TElasticQueue
- {
- public:
- explicit TFastElasticQueue(const TParams& params = {})
- : TElasticQueue(MakeHolder<TSimpleThreadPool>(params))
- {
- }
- };
-#else
-
-#include <library/cpp/yt/threading/event_count.h>
-
-class TFastElasticQueue
- : public TThreadPoolBase
- , private IThreadFactory::IThreadAble
-{
-public:
- explicit TFastElasticQueue(const TParams& params = {})
- : TThreadPoolBase(params)
- {
- Y_ENSURE(!params.Blocking_);
- }
-
- ~TFastElasticQueue() {
- Stop();
- }
-
- void Start(size_t threadCount, size_t maxQueueSize) override {
- Y_ENSURE(Threads_.empty());
- Y_ENSURE(maxQueueSize > 0);
-
- Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events
- MaxQueueSize_ = maxQueueSize;
-
- try {
- for (size_t i = 0; i < threadCount; ++i) {
- Threads_.push_back(Pool()->Run(this));
- }
- } catch (...) {
- Stop();
- throw;
- }
-
- Stopped_ = false;
- }
-
- size_t ObjectCount() const {
- //GuardCount_ can be temporary incremented above real object count in queue
- return Min(GuardCount_.load(), MaxQueueSize_);
- }
-
- bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT {
- if (Stopped_ || !obj) {
- return false;
- }
-
- if (GuardCount_.fetch_add(1) >= MaxQueueSize_) {
- GuardCount_.fetch_sub(1);
- return false;
- }
-
- QueueSize_.fetch_add(1);
-
- if (!Queue_->Enqueue(obj)) {
- //Simultaneous Dequeue calls can return not in exact fifo order of items,
- //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of
- //the oldest enqueued item is not actually dequeued and ring buffer can't proceed.
- GuardCount_.fetch_sub(1);
- QueueSize_.fetch_sub(1);
- return false;
- }
-
-
- Event_.NotifyOne();
-
- return true;
- }
-
- size_t Size() const noexcept override {
- return QueueSize_.load();
- }
-
- void Stop() noexcept override {
- Stopped_ = true;
-
- for (size_t i = 0; i < Threads_.size(); ++i) {
- while (!Queue_->Enqueue(nullptr)) {
- Sleep(TDuration::MilliSeconds(1));
- }
-
- Event_.NotifyOne();
- }
-
- while (!Threads_.empty()) {
- Threads_.back()->Join();
- Threads_.pop_back();
- }
-
- Queue_.Reset();
- }
-
- void DoExecute() override {
- TThread::SetCurrentThreadName(Params.ThreadName_.c_str());
-
- while (true) {
- IObjectInQueue* job = nullptr;
-
- Event_.Await([&]() {
- return Queue_->Dequeue(job);
- });
-
- if (!job) {
- break;
- }
-
- QueueSize_.fetch_sub(1);
-
- Y_DEFER {
- GuardCount_.fetch_sub(1);
- };
-
- if (Params.Catching_) {
- try {
- try {
- job->Process(nullptr);
- } catch (...) {
- Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
- }
- } catch (...) {
- ;
- }
- } else {
- job->Process(nullptr);
- }
- }
- }
-private:
- std::atomic<bool> Stopped_ = false;
- size_t MaxQueueSize_ = 0;
-
- alignas(64) std::atomic<size_t> GuardCount_ = 0;
- alignas(64) std::atomic<size_t> QueueSize_ = 0;
-
- TVector<THolder<IThreadFactory::IThread>> Threads_;
-
- THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_;
- NYT::NThreading::TEventCount Event_;
-};
-
-#endif