summaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/equeue
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-12-17 20:38:49 +0300
committerrobot-piglet <[email protected]>2025-12-17 20:49:43 +0300
commit86bd733ab3b7b99852186e3f8a58f6ab71799b27 (patch)
tree18e4258e935e74078d986cf073c8640886f499aa /library/cpp/threading/equeue
parentd6a1311abd6696c21e5df445a4176ff378d53e8a (diff)
Intermediate changes
commit_hash:b8e5120d089fefb3429f136699614ed9ea813c21
Diffstat (limited to 'library/cpp/threading/equeue')
-rw-r--r--library/cpp/threading/equeue/fast/equeue.h28
1 files changed, 24 insertions, 4 deletions
diff --git a/library/cpp/threading/equeue/fast/equeue.h b/library/cpp/threading/equeue/fast/equeue.h
index f294a4513d5..a24f0e83a9f 100644
--- a/library/cpp/threading/equeue/fast/equeue.h
+++ b/library/cpp/threading/equeue/fast/equeue.h
@@ -34,6 +34,8 @@ public:
Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events
MaxQueueSize_ = maxQueueSize;
+ CurrentMaxQueueSize_ = maxQueueSize;
+ ActiveThreadCount_ = threadCount;
try {
for (size_t i = 0; i < threadCount; ++i) {
@@ -48,8 +50,8 @@ public:
}
size_t ObjectCount() const {
- //GuardCount_ can be temporary incremented above real object count in queue
- return Min(GuardCount_.load(), MaxQueueSize_);
+ //May return extra +1 for real object count if near CurrentMaxQueueSize_
+ return GuardCount_.load();
}
bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT {
@@ -57,7 +59,7 @@ public:
return false;
}
- if (GuardCount_.fetch_add(1) >= MaxQueueSize_) {
+ if (GuardCount_.fetch_add(1) >= CurrentMaxQueueSize_) {
GuardCount_.fetch_sub(1);
return false;
}
@@ -66,7 +68,7 @@ public:
if (!Queue_->Enqueue(obj)) {
//Simultaneous Dequeue calls can return not in exact fifo order of items,
- //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of
+ //so there can be GuardCount_ < CurrentMaxQueueSize_ but Enqueue will fail because of
//the oldest enqueued item is not actually dequeued and ring buffer can't proceed.
GuardCount_.fetch_sub(1);
QueueSize_.fetch_sub(1);
@@ -104,10 +106,15 @@ public:
void DoExecute() override {
TThread::SetCurrentThreadName(Params.ThreadName_.c_str());
+ const size_t thisThreadId = ThreadCount_++;
while (true) {
IObjectInQueue* job = nullptr;
+ while (thisThreadId >= ActiveThreadCount_) {
+ Sleep(TDuration::Seconds(1));
+ }
+
Event_.Await([&]() {
return Queue_->Dequeue(job);
});
@@ -137,9 +144,22 @@ public:
}
}
}
+
+ void SetCurrentMaxQueueSize(size_t v) {
+ Y_ENSURE(v < MaxQueueSize_);
+ CurrentMaxQueueSize_ = v;
+ }
+
+ void SetActiveThreadCount(size_t v) {
+ Y_ENSURE(v <= ThreadCount_);
+ ActiveThreadCount_ = v;
+ }
private:
std::atomic<bool> Stopped_ = false;
size_t MaxQueueSize_ = 0;
+ std::atomic<size_t> CurrentMaxQueueSize_ = 0;
+ std::atomic<size_t> ThreadCount_ = 0;
+ std::atomic<size_t> ActiveThreadCount_ = 0;
alignas(64) std::atomic<size_t> GuardCount_ = 0;
alignas(64) std::atomic<size_t> QueueSize_ = 0;