aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/bounded_queue/bounded_queue.h
diff options
context:
space:
mode:
authorkulikov <kulikov@yandex-team.com>2023-07-21 13:59:33 +0300
committerkulikov <kulikov@yandex-team.com>2023-07-21 13:59:33 +0300
commit5706cb392271ea40eab053314e7c0f4d9d4547ba (patch)
tree72bce210dc3747df1d9319fc9f56b848852d2aab /library/cpp/threading/bounded_queue/bounded_queue.h
parent122a6055cef2bc785407c69b33b858a07b319e66 (diff)
downloadydb-5706cb392271ea40eab053314e7c0f4d9d4547ba.tar.gz
try to get rid of locks and allocations for elastic queue thread pool
In case of heavy load and high rps current thread pool implementation seems to have problems at least with contention on lock inside condvar (long futex wait calls from http server listener thread), so try to implement something more efficient: - replace condvar with TEventCounter implementation without internal lock (pthread condvar maintains waiters wakeup order, thread pool doesn't need it); - introduce well-known bounded mpmc queue over ring buffer; - get rid of TDecrementingWrapper; - add options to turn on new pool in library/cpp/http/server and search/daemons (will remove after adoption); - make elastic queue ut check both versions; - workaround problems with android/arm build targets.
Diffstat (limited to 'library/cpp/threading/bounded_queue/bounded_queue.h')
-rw-r--r--library/cpp/threading/bounded_queue/bounded_queue.h89
1 files changed, 89 insertions, 0 deletions
diff --git a/library/cpp/threading/bounded_queue/bounded_queue.h b/library/cpp/threading/bounded_queue/bounded_queue.h
new file mode 100644
index 0000000000..c5c6714086
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/bounded_queue.h
@@ -0,0 +1,89 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+
+//https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
+
+namespace NThreading {
+ template<typename T>
+ class TBoundedQueue {
+ public:
+ explicit TBoundedQueue(size_t size)
+ : Buffer_(new TCell[size])
+ , Mask_(size - 1)
+ {
+ Y_ENSURE(size >= 2 && (size & (size - 1)) == 0);
+
+ for (size_t i = 0; i < size; ++i) {
+ Buffer_[i].Sequence.store(i, std::memory_order_relaxed);
+ }
+ }
+
+ template <typename T_>
+ [[nodiscard]] bool Enqueue(T_&& data) noexcept {
+ TCell* cell;
+ size_t pos = EnqueuePos_.load(std::memory_order_relaxed);
+
+ for (;;) {
+ cell = &Buffer_[pos & Mask_];
+ size_t seq = cell->Sequence.load(std::memory_order_acquire);
+ intptr_t dif = (intptr_t)seq - (intptr_t)pos;
+
+ if (dif == 0) {
+ if (EnqueuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
+ break;
+ }
+ } else if (dif < 0) {
+ return false;
+ } else {
+ pos = EnqueuePos_.load(std::memory_order_relaxed);
+ }
+ }
+
+ static_assert(noexcept(cell->Data = std::forward<T_>(data)));
+ cell->Data = std::forward<T_>(data);
+ cell->Sequence.store(pos + 1, std::memory_order_release);
+
+ return true;
+ }
+
+ [[nodiscard]] bool Dequeue(T& data) noexcept {
+ TCell* cell;
+ size_t pos = DequeuePos_.load(std::memory_order_relaxed);
+
+ for (;;) {
+ cell = &Buffer_[pos & Mask_];
+ size_t seq = cell->Sequence.load(std::memory_order_acquire);
+ intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
+
+ if (dif == 0) {
+ if (DequeuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
+ break;
+ }
+ } else if (dif < 0) {
+ return false;
+ } else {
+ pos = DequeuePos_.load(std::memory_order_relaxed);
+ }
+ }
+
+ static_assert(noexcept(data = std::move(cell->Data)));
+ data = std::move(cell->Data);
+
+ cell->Sequence.store(pos + Mask_ + 1, std::memory_order_release);
+ return true;
+ }
+ private:
+ struct TCell {
+ std::atomic<size_t> Sequence;
+ T Data;
+ };
+
+ std::unique_ptr<TCell[]> Buffer_;
+ const size_t Mask_;
+
+ alignas(64) std::atomic<size_t> EnqueuePos_ = 0;
+ alignas(64) std::atomic<size_t> DequeuePos_ = 0;
+ };
+}
+