diff options
| author | robot-piglet <[email protected]> | 2025-12-17 20:38:49 +0300 |
|---|---|---|
| committer | robot-piglet <[email protected]> | 2025-12-17 20:49:43 +0300 |
| commit | 86bd733ab3b7b99852186e3f8a58f6ab71799b27 (patch) | |
| tree | 18e4258e935e74078d986cf073c8640886f499aa /library/cpp/threading/equeue | |
| parent | d6a1311abd6696c21e5df445a4176ff378d53e8a (diff) | |
Intermediate changes
commit_hash:b8e5120d089fefb3429f136699614ed9ea813c21
Diffstat (limited to 'library/cpp/threading/equeue')
| -rw-r--r-- | library/cpp/threading/equeue/fast/equeue.h | 28 |
1 files changed, 24 insertions, 4 deletions
diff --git a/library/cpp/threading/equeue/fast/equeue.h b/library/cpp/threading/equeue/fast/equeue.h index f294a4513d5..a24f0e83a9f 100644 --- a/library/cpp/threading/equeue/fast/equeue.h +++ b/library/cpp/threading/equeue/fast/equeue.h @@ -34,6 +34,8 @@ public: Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events MaxQueueSize_ = maxQueueSize; + CurrentMaxQueueSize_ = maxQueueSize; + ActiveThreadCount_ = threadCount; try { for (size_t i = 0; i < threadCount; ++i) { @@ -48,8 +50,8 @@ public: } size_t ObjectCount() const { - //GuardCount_ can be temporary incremented above real object count in queue - return Min(GuardCount_.load(), MaxQueueSize_); + //May return extra +1 for real object count if near CurrentMaxQueueSize_ + return GuardCount_.load(); } bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT { @@ -57,7 +59,7 @@ public: return false; } - if (GuardCount_.fetch_add(1) >= MaxQueueSize_) { + if (GuardCount_.fetch_add(1) >= CurrentMaxQueueSize_) { GuardCount_.fetch_sub(1); return false; } @@ -66,7 +68,7 @@ public: if (!Queue_->Enqueue(obj)) { //Simultaneous Dequeue calls can return not in exact fifo order of items, - //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of + //so there can be GuardCount_ < CurrentMaxQueueSize_ but Enqueue will fail because of //the oldest enqueued item is not actually dequeued and ring buffer can't proceed. GuardCount_.fetch_sub(1); QueueSize_.fetch_sub(1); @@ -104,10 +106,15 @@ public: void DoExecute() override { TThread::SetCurrentThreadName(Params.ThreadName_.c_str()); + const size_t thisThreadId = ThreadCount_++; while (true) { IObjectInQueue* job = nullptr; + while (thisThreadId >= ActiveThreadCount_) { + Sleep(TDuration::Seconds(1)); + } + Event_.Await([&]() { return Queue_->Dequeue(job); }); @@ -137,9 +144,22 @@ public: } } } + + void SetCurrentMaxQueueSize(size_t v) { + Y_ENSURE(v < MaxQueueSize_); + CurrentMaxQueueSize_ = v; + } + + void SetActiveThreadCount(size_t v) { + Y_ENSURE(v <= ThreadCount_); + ActiveThreadCount_ = v; + } private: std::atomic<bool> Stopped_ = false; size_t MaxQueueSize_ = 0; + std::atomic<size_t> CurrentMaxQueueSize_ = 0; + std::atomic<size_t> ThreadCount_ = 0; + std::atomic<size_t> ActiveThreadCount_ = 0; alignas(64) std::atomic<size_t> GuardCount_ = 0; alignas(64) std::atomic<size_t> QueueSize_ = 0; |
