blob: f294a4513d52ad5d9b9543694f30181e227af932 (
plain) (
tree)
|
|
#pragma once
#include <util/thread/pool.h>
#include <util/datetime/base.h>
#include <util/thread/lfqueue.h>
#include <util/system/thread.h>
#include <util/generic/bitops.h>
#include <util/generic/vector.h>
#include <util/generic/scope.h>
#include <util/stream/str.h>
#include <library/cpp/threading/bounded_queue/bounded_queue.h>
#include <library/cpp/yt/threading/event_count.h>
class TFastElasticQueue
: public TThreadPoolBase
, private IThreadFactory::IThreadAble
{
public:
explicit TFastElasticQueue(const TParams& params = {})
: TThreadPoolBase(params)
{
Y_ENSURE(!params.Blocking_);
}
~TFastElasticQueue() {
Stop();
}
void Start(size_t threadCount, size_t maxQueueSize) override {
Y_ENSURE(Threads_.empty());
Y_ENSURE(maxQueueSize > 0);
Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events
MaxQueueSize_ = maxQueueSize;
try {
for (size_t i = 0; i < threadCount; ++i) {
Threads_.push_back(Pool()->Run(this));
}
} catch (...) {
Stop();
throw;
}
Stopped_ = false;
}
size_t ObjectCount() const {
//GuardCount_ can be temporary incremented above real object count in queue
return Min(GuardCount_.load(), MaxQueueSize_);
}
bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT {
if (Stopped_ || !obj) {
return false;
}
if (GuardCount_.fetch_add(1) >= MaxQueueSize_) {
GuardCount_.fetch_sub(1);
return false;
}
QueueSize_.fetch_add(1);
if (!Queue_->Enqueue(obj)) {
//Simultaneous Dequeue calls can return not in exact fifo order of items,
//so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of
//the oldest enqueued item is not actually dequeued and ring buffer can't proceed.
GuardCount_.fetch_sub(1);
QueueSize_.fetch_sub(1);
return false;
}
Event_.NotifyOne();
return true;
}
size_t Size() const noexcept override {
return QueueSize_.load();
}
void Stop() noexcept override {
Stopped_ = true;
for (size_t i = 0; i < Threads_.size(); ++i) {
while (!Queue_->Enqueue(nullptr)) {
Sleep(TDuration::MilliSeconds(1));
}
Event_.NotifyOne();
}
while (!Threads_.empty()) {
Threads_.back()->Join();
Threads_.pop_back();
}
Queue_.Reset();
}
void DoExecute() override {
TThread::SetCurrentThreadName(Params.ThreadName_.c_str());
while (true) {
IObjectInQueue* job = nullptr;
Event_.Await([&]() {
return Queue_->Dequeue(job);
});
if (!job) {
break;
}
QueueSize_.fetch_sub(1);
Y_DEFER {
GuardCount_.fetch_sub(1);
};
if (Params.Catching_) {
try {
try {
job->Process(nullptr);
} catch (...) {
Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
}
} catch (...) {
;
}
} else {
job->Process(nullptr);
}
}
}
private:
std::atomic<bool> Stopped_ = false;
size_t MaxQueueSize_ = 0;
alignas(64) std::atomic<size_t> GuardCount_ = 0;
alignas(64) std::atomic<size_t> QueueSize_ = 0;
TVector<THolder<IThreadFactory::IThread>> Threads_;
THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_;
NYT::NThreading::TEventCount Event_;
};
|