aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/equeue/equeue.cpp
diff options
context:
space:
mode:
authorkulikov <kulikov@yandex-team.ru>2022-02-10 16:49:34 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:49:34 +0300
commitc707901605d7b7c6cba0998cd52e1ae619c97762 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/threading/equeue/equeue.cpp
parent65e5266709e7ff94b14ae128309e229de714b0df (diff)
downloadydb-c707901605d7b7c6cba0998cd52e1ae619c97762.tar.gz
Restoring authorship annotation for <kulikov@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/threading/equeue/equeue.cpp')
-rw-r--r--library/cpp/threading/equeue/equeue.cpp152
1 files changed, 76 insertions, 76 deletions
diff --git a/library/cpp/threading/equeue/equeue.cpp b/library/cpp/threading/equeue/equeue.cpp
index aaec19daa6..54a848e912 100644
--- a/library/cpp/threading/equeue/equeue.cpp
+++ b/library/cpp/threading/equeue/equeue.cpp
@@ -1,80 +1,80 @@
-#include "equeue.h"
-
-TElasticQueue::TElasticQueue(THolder<IThreadPool> slaveQueue)
- : SlaveQueue_(std::move(slaveQueue))
-{
-}
-
-size_t TElasticQueue::ObjectCount() const {
- return (size_t)AtomicGet(ObjectCount_);
-}
-
-bool TElasticQueue::TryIncCounter() {
- if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) {
- AtomicDecrement(GuardCount_);
- return false;
- }
-
- return true;
-}
-
-
-
-class TElasticQueue::TDecrementingWrapper: TNonCopyable, public IObjectInQueue {
-public:
- TDecrementingWrapper(IObjectInQueue* realObject, TElasticQueue* queue)
- : RealObject_(realObject)
- , Queue_(queue)
- {
- AtomicIncrement(Queue_->ObjectCount_);
- }
-
+#include "equeue.h"
+
+TElasticQueue::TElasticQueue(THolder<IThreadPool> slaveQueue)
+ : SlaveQueue_(std::move(slaveQueue))
+{
+}
+
+size_t TElasticQueue::ObjectCount() const {
+ return (size_t)AtomicGet(ObjectCount_);
+}
+
+bool TElasticQueue::TryIncCounter() {
+ if ((size_t)AtomicIncrement(GuardCount_) > MaxQueueSize_) {
+ AtomicDecrement(GuardCount_);
+ return false;
+ }
+
+ return true;
+}
+
+
+
+class TElasticQueue::TDecrementingWrapper: TNonCopyable, public IObjectInQueue {
+public:
+ TDecrementingWrapper(IObjectInQueue* realObject, TElasticQueue* queue)
+ : RealObject_(realObject)
+ , Queue_(queue)
+ {
+ AtomicIncrement(Queue_->ObjectCount_);
+ }
+
~TDecrementingWrapper() override {
- AtomicDecrement(Queue_->ObjectCount_);
- AtomicDecrement(Queue_->GuardCount_);
- }
-private:
- void Process(void *tsr) override {
- THolder<TDecrementingWrapper> self(this);
- RealObject_->Process(tsr);
- }
-private:
- IObjectInQueue* const RealObject_;
- TElasticQueue* const Queue_;
-};
-
-
-
-bool TElasticQueue::Add(IObjectInQueue* obj) {
- if (!TryIncCounter()) {
- return false;
- }
-
- THolder<TDecrementingWrapper> wrapper;
- try {
- wrapper.Reset(new TDecrementingWrapper(obj, this));
- } catch (...) {
- AtomicDecrement(GuardCount_);
- throw;
- }
-
- if (SlaveQueue_->Add(wrapper.Get())) {
+ AtomicDecrement(Queue_->ObjectCount_);
+ AtomicDecrement(Queue_->GuardCount_);
+ }
+private:
+ void Process(void *tsr) override {
+ THolder<TDecrementingWrapper> self(this);
+ RealObject_->Process(tsr);
+ }
+private:
+ IObjectInQueue* const RealObject_;
+ TElasticQueue* const Queue_;
+};
+
+
+
+bool TElasticQueue::Add(IObjectInQueue* obj) {
+ if (!TryIncCounter()) {
+ return false;
+ }
+
+ THolder<TDecrementingWrapper> wrapper;
+ try {
+ wrapper.Reset(new TDecrementingWrapper(obj, this));
+ } catch (...) {
+ AtomicDecrement(GuardCount_);
+ throw;
+ }
+
+ if (SlaveQueue_->Add(wrapper.Get())) {
Y_UNUSED(wrapper.Release());
- return true;
- } else {
- return false;
- }
-}
-
-void TElasticQueue::Start(size_t threadCount, size_t maxQueueSize) {
- MaxQueueSize_ = maxQueueSize;
- SlaveQueue_->Start(threadCount, maxQueueSize);
-}
-
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void TElasticQueue::Start(size_t threadCount, size_t maxQueueSize) {
+ MaxQueueSize_ = maxQueueSize;
+ SlaveQueue_->Start(threadCount, maxQueueSize);
+}
+
void TElasticQueue::Stop() noexcept {
- return SlaveQueue_->Stop();
-}
-
+ return SlaveQueue_->Stop();
+}
+
size_t TElasticQueue::Size() const noexcept {
- return SlaveQueue_->Size();
-}
+ return SlaveQueue_->Size();
+}