aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/equeue/fast
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-07-28 06:50:19 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-07-28 06:50:19 +0300
commit85ef4ee49c3edbb700d0ef903d01177bf9984018 (patch)
tree37825f0e393cbad3b58f4c209082871753c6e02d /library/cpp/threading/equeue/fast
parent5ea97cfd8a8f61d96636778ed64de3cb003e1589 (diff)
downloadydb-85ef4ee49c3edbb700d0ef903d01177bf9984018.tar.gz
Intermediate changes
Diffstat (limited to 'library/cpp/threading/equeue/fast')
-rw-r--r--library/cpp/threading/equeue/fast/equeue.h151
-rw-r--r--library/cpp/threading/equeue/fast/ya.make12
2 files changed, 163 insertions, 0 deletions
diff --git a/library/cpp/threading/equeue/fast/equeue.h b/library/cpp/threading/equeue/fast/equeue.h
new file mode 100644
index 0000000000..0a3ba47184
--- /dev/null
+++ b/library/cpp/threading/equeue/fast/equeue.h
@@ -0,0 +1,151 @@
+#pragma once
+
+#include <util/thread/pool.h>
+
+#include <util/datetime/base.h>
+#include <util/thread/lfqueue.h>
+#include <util/system/thread.h>
+#include <util/generic/vector.h>
+#include <util/generic/scope.h>
+#include <util/stream/str.h>
+
+#include <library/cpp/threading/bounded_queue/bounded_queue.h>
+#include <library/cpp/yt/threading/event_count.h>
+
+class TFastElasticQueue
+ : public TThreadPoolBase
+ , private IThreadFactory::IThreadAble
+{
+public:
+ explicit TFastElasticQueue(const TParams& params = {})
+ : TThreadPoolBase(params)
+ {
+ Y_ENSURE(!params.Blocking_);
+ }
+
+ ~TFastElasticQueue() {
+ Stop();
+ }
+
+ void Start(size_t threadCount, size_t maxQueueSize) override {
+ Y_ENSURE(Threads_.empty());
+ Y_ENSURE(maxQueueSize > 0);
+
+ Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events
+ MaxQueueSize_ = maxQueueSize;
+
+ try {
+ for (size_t i = 0; i < threadCount; ++i) {
+ Threads_.push_back(Pool()->Run(this));
+ }
+ } catch (...) {
+ Stop();
+ throw;
+ }
+
+ Stopped_ = false;
+ }
+
+ size_t ObjectCount() const {
+ //GuardCount_ can be temporary incremented above real object count in queue
+ return Min(GuardCount_.load(), MaxQueueSize_);
+ }
+
+ bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT {
+ if (Stopped_ || !obj) {
+ return false;
+ }
+
+ if (GuardCount_.fetch_add(1) >= MaxQueueSize_) {
+ GuardCount_.fetch_sub(1);
+ return false;
+ }
+
+ QueueSize_.fetch_add(1);
+
+ if (!Queue_->Enqueue(obj)) {
+ //Simultaneous Dequeue calls can return not in exact fifo order of items,
+ //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of
+ //the oldest enqueued item is not actually dequeued and ring buffer can't proceed.
+ GuardCount_.fetch_sub(1);
+ QueueSize_.fetch_sub(1);
+ return false;
+ }
+
+
+ Event_.NotifyOne();
+
+ return true;
+ }
+
+ size_t Size() const noexcept override {
+ return QueueSize_.load();
+ }
+
+ void Stop() noexcept override {
+ Stopped_ = true;
+
+ for (size_t i = 0; i < Threads_.size(); ++i) {
+ while (!Queue_->Enqueue(nullptr)) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
+
+ Event_.NotifyOne();
+ }
+
+ while (!Threads_.empty()) {
+ Threads_.back()->Join();
+ Threads_.pop_back();
+ }
+
+ Queue_.Reset();
+ }
+
+ void DoExecute() override {
+ TThread::SetCurrentThreadName(Params.ThreadName_.c_str());
+
+ while (true) {
+ IObjectInQueue* job = nullptr;
+
+ Event_.Await([&]() {
+ return Queue_->Dequeue(job);
+ });
+
+ if (!job) {
+ break;
+ }
+
+ QueueSize_.fetch_sub(1);
+
+ Y_DEFER {
+ GuardCount_.fetch_sub(1);
+ };
+
+ if (Params.Catching_) {
+ try {
+ try {
+ job->Process(nullptr);
+ } catch (...) {
+ Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
+ }
+ } catch (...) {
+ ;
+ }
+ } else {
+ job->Process(nullptr);
+ }
+ }
+ }
+private:
+ std::atomic<bool> Stopped_ = false;
+ size_t MaxQueueSize_ = 0;
+
+ alignas(64) std::atomic<size_t> GuardCount_ = 0;
+ alignas(64) std::atomic<size_t> QueueSize_ = 0;
+
+ TVector<THolder<IThreadFactory::IThread>> Threads_;
+
+ THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_;
+ NYT::NThreading::TEventCount Event_;
+};
+
diff --git a/library/cpp/threading/equeue/fast/ya.make b/library/cpp/threading/equeue/fast/ya.make
new file mode 100644
index 0000000000..4a93a6b5ba
--- /dev/null
+++ b/library/cpp/threading/equeue/fast/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ equeue.h
+)
+
+PEERDIR(
+ library/cpp/threading/bounded_queue
+ library/cpp/yt/threading
+)
+
+END()