aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading
diff options
context:
space:
mode:
authorkulikov <kulikov@yandex-team.com>2023-07-21 13:59:33 +0300
committerkulikov <kulikov@yandex-team.com>2023-07-21 13:59:33 +0300
commit5706cb392271ea40eab053314e7c0f4d9d4547ba (patch)
tree72bce210dc3747df1d9319fc9f56b848852d2aab /library/cpp/threading
parent122a6055cef2bc785407c69b33b858a07b319e66 (diff)
downloadydb-5706cb392271ea40eab053314e7c0f4d9d4547ba.tar.gz
try to get rid of locks and allocations for elastic queue thread pool
In case of heavy load and high rps current thread pool implementation seems to have problems at least with contention on lock inside condvar (long futex wait calls from http server listener thread), so try to implement something more efficient: - replace condvar with TEventCounter implementation without internal lock (pthread condvar maintains waiters wakeup order, thread pool doesn't need it); - introduce well-known bounded mpmc queue over ring buffer; - get rid of TDecrementingWrapper; - add options to turn on new pool in library/cpp/http/server and search/daemons (will remove after adoption); - make elastic queue ut check both versions; - workaround problems with android/arm build targets.
Diffstat (limited to 'library/cpp/threading')
-rw-r--r--library/cpp/threading/CMakeLists.txt1
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt14
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt15
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt15
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.txt17
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt14
-rw-r--r--library/cpp/threading/bounded_queue/bounded_queue.h89
-rw-r--r--library/cpp/threading/bounded_queue/bounded_queue_ut.cpp106
-rw-r--r--library/cpp/threading/bounded_queue/ut/ya.make8
-rw-r--r--library/cpp/threading/bounded_queue/ya.make9
-rw-r--r--library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/equeue_ut.cpp79
-rw-r--r--library/cpp/threading/equeue/fast.h167
-rw-r--r--library/cpp/threading/equeue/ya.make9
17 files changed, 530 insertions, 20 deletions
diff --git a/library/cpp/threading/CMakeLists.txt b/library/cpp/threading/CMakeLists.txt
index 681ef6b24e..f541042208 100644
--- a/library/cpp/threading/CMakeLists.txt
+++ b/library/cpp/threading/CMakeLists.txt
@@ -7,6 +7,7 @@
add_subdirectory(atomic)
+add_subdirectory(bounded_queue)
add_subdirectory(chunk_queue)
add_subdirectory(equeue)
add_subdirectory(future)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..bd9b68ab62
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,14 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..613a18db10
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..613a18db10
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.txt b/library/cpp/threading/bounded_queue/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..bd9b68ab62
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,14 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/bounded_queue.h b/library/cpp/threading/bounded_queue/bounded_queue.h
new file mode 100644
index 0000000000..c5c6714086
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/bounded_queue.h
@@ -0,0 +1,89 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+
+//https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
+
+namespace NThreading {
+ template<typename T>
+ class TBoundedQueue {
+ public:
+ explicit TBoundedQueue(size_t size)
+ : Buffer_(new TCell[size])
+ , Mask_(size - 1)
+ {
+ Y_ENSURE(size >= 2 && (size & (size - 1)) == 0);
+
+ for (size_t i = 0; i < size; ++i) {
+ Buffer_[i].Sequence.store(i, std::memory_order_relaxed);
+ }
+ }
+
+ template <typename T_>
+ [[nodiscard]] bool Enqueue(T_&& data) noexcept {
+ TCell* cell;
+ size_t pos = EnqueuePos_.load(std::memory_order_relaxed);
+
+ for (;;) {
+ cell = &Buffer_[pos & Mask_];
+ size_t seq = cell->Sequence.load(std::memory_order_acquire);
+ intptr_t dif = (intptr_t)seq - (intptr_t)pos;
+
+ if (dif == 0) {
+ if (EnqueuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
+ break;
+ }
+ } else if (dif < 0) {
+ return false;
+ } else {
+ pos = EnqueuePos_.load(std::memory_order_relaxed);
+ }
+ }
+
+ static_assert(noexcept(cell->Data = std::forward<T_>(data)));
+ cell->Data = std::forward<T_>(data);
+ cell->Sequence.store(pos + 1, std::memory_order_release);
+
+ return true;
+ }
+
+ [[nodiscard]] bool Dequeue(T& data) noexcept {
+ TCell* cell;
+ size_t pos = DequeuePos_.load(std::memory_order_relaxed);
+
+ for (;;) {
+ cell = &Buffer_[pos & Mask_];
+ size_t seq = cell->Sequence.load(std::memory_order_acquire);
+ intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
+
+ if (dif == 0) {
+ if (DequeuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
+ break;
+ }
+ } else if (dif < 0) {
+ return false;
+ } else {
+ pos = DequeuePos_.load(std::memory_order_relaxed);
+ }
+ }
+
+ static_assert(noexcept(data = std::move(cell->Data)));
+ data = std::move(cell->Data);
+
+ cell->Sequence.store(pos + Mask_ + 1, std::memory_order_release);
+ return true;
+ }
+ private:
+ struct TCell {
+ std::atomic<size_t> Sequence;
+ T Data;
+ };
+
+ std::unique_ptr<TCell[]> Buffer_;
+ const size_t Mask_;
+
+ alignas(64) std::atomic<size_t> EnqueuePos_ = 0;
+ alignas(64) std::atomic<size_t> DequeuePos_ = 0;
+ };
+}
+
diff --git a/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp b/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp
new file mode 100644
index 0000000000..bb5b6eb787
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp
@@ -0,0 +1,106 @@
+#include "bounded_queue.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/thread/factory.h>
+
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TBoundedQueueTest) {
+ Y_UNIT_TEST(QueueSize) {
+ const size_t queueSize = 16;
+ TBoundedQueue<size_t> boundedQueue(queueSize);
+
+ for (size_t i = 0; i < queueSize; ++i) {
+ UNIT_ASSERT(boundedQueue.Enqueue(i));
+ }
+ UNIT_ASSERT(!boundedQueue.Enqueue(0));
+ size_t tmp = 0;
+ UNIT_ASSERT(boundedQueue.Dequeue(tmp));
+ UNIT_ASSERT(boundedQueue.Enqueue(0));
+ UNIT_ASSERT(!boundedQueue.Enqueue(0));
+ }
+
+ Y_UNIT_TEST(Move) {
+ const size_t queueSize = 16;
+ TBoundedQueue<TString> boundedQueue(queueSize);
+
+ for (size_t i = 0; i < queueSize; ++i) {
+ TString v = "xxx";
+ UNIT_ASSERT(boundedQueue.Enqueue(std::move(v)));
+ UNIT_ASSERT(v.empty());
+ }
+
+ {
+ TString v = "xxx";
+ UNIT_ASSERT(!boundedQueue.Enqueue(std::move(v)));
+ UNIT_ASSERT(v == "xxx");
+ }
+
+ TString v;
+ UNIT_ASSERT(boundedQueue.Dequeue(v));
+ UNIT_ASSERT(v == "xxx");
+ }
+
+ Y_UNIT_TEST(MPMC) {
+ size_t queueSize = 16;
+ size_t producers = 10;
+ size_t consumers = 10;
+ size_t itemsCount = 10000;
+
+ TVector<THolder<IThreadFactory::IThread>> threads;
+ TBoundedQueue<std::pair<size_t, size_t>> boundedQueue(queueSize);
+
+ std::atomic<size_t> itemCounter = 0;
+ std::atomic<size_t> consumed = 0;
+
+ for (size_t i = 0; i < consumers; ++i) {
+ threads.push_back(SystemThreadFactory()->Run(
+ [&]() {
+ TVector<size_t> prevItems(producers);
+ for (;;) {
+ std::pair<size_t, size_t> item;
+ while (!boundedQueue.Dequeue(item)) {
+ ;
+ }
+
+ if (item.first >= producers) {
+ break;
+ }
+
+ UNIT_ASSERT(item.second > prevItems[item.first]);
+ prevItems[item.first] = item.second;
+ ++consumed;
+ }
+ })
+ );
+ }
+
+ for (size_t i = 0; i < producers ; ++i) {
+ threads.push_back(SystemThreadFactory()->Run(
+ [&, producerNum = i]() {
+ for (;;) {
+ size_t item = ++itemCounter;
+ if (item > itemsCount) {
+ break;
+ }
+
+ while (!boundedQueue.Enqueue(std::make_pair(producerNum, item))) {
+ ;
+ }
+ }
+
+ while (!boundedQueue.Enqueue(std::make_pair(producers, size_t(0)))) {
+ ;
+ }
+ })
+ );
+ }
+
+
+ for (auto& t : threads) {
+ t->Join();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(consumed.load(), itemsCount);
+ }
+}
diff --git a/library/cpp/threading/bounded_queue/ut/ya.make b/library/cpp/threading/bounded_queue/ut/ya.make
new file mode 100644
index 0000000000..5db0a22713
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/ut/ya.make
@@ -0,0 +1,8 @@
+UNITTEST_FOR(library/cpp/threading/bounded_queue)
+
+SRCS(
+ bounded_queue_ut.cpp
+)
+
+END()
+
diff --git a/library/cpp/threading/bounded_queue/ya.make b/library/cpp/threading/bounded_queue/ya.make
new file mode 100644
index 0000000000..4c5336f031
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/ya.make
@@ -0,0 +1,9 @@
+LIBRARY()
+
+SRCS(
+ bounded_queue.h
+)
+
+END()
+
+RECURSE_FOR_TESTS(ut)
diff --git a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
index 902b6d7a9a..43ff0adc1c 100644
--- a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
@@ -12,6 +12,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
+ cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
index 5653d3d3f3..805b237943 100644
--- a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
index 5653d3d3f3..36f8a2632e 100644
--- a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
@@ -13,6 +13,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
+ cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
index 902b6d7a9a..43ff0adc1c 100644
--- a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
@@ -12,6 +12,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
+ cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp
index 8557f63ac0..2c7d2c7b1e 100644
--- a/library/cpp/threading/equeue/equeue_ut.cpp
+++ b/library/cpp/threading/equeue/equeue_ut.cpp
@@ -1,4 +1,5 @@
#include "equeue.h"
+#include "fast.h"
#include <library/cpp/testing/unittest/registar.h>
@@ -9,18 +10,33 @@
Y_UNIT_TEST_SUITE(TElasticQueueTest) {
const size_t MaxQueueSize = 20;
const size_t ThreadCount = 10;
- const size_t N = 100000;
- static THolder<TElasticQueue> Queue;
+ template <typename T>
+ THolder<T> MakeQueue();
- struct TQueueSetup {
- TQueueSetup() {
- Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>()));
- Queue->Start(ThreadCount, MaxQueueSize);
- }
- ~TQueueSetup() {
- Queue->Stop();
- }
+ template <>
+ THolder<TElasticQueue> MakeQueue() {
+ return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>());
+ }
+
+ template <>
+ THolder<TFastElasticQueue> MakeQueue() {
+ return MakeHolder<TFastElasticQueue>();
+ }
+
+ template <typename T>
+ struct TEnv {
+ static inline THolder<T> Queue;
+
+ struct TQueueSetup {
+ TQueueSetup() {
+ Queue.Reset(MakeQueue<T>());
+ Queue->Start(ThreadCount, MaxQueueSize);
+ }
+ ~TQueueSetup() {
+ Queue->Stop();
+ }
+ };
};
struct TCounters {
@@ -37,7 +53,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
//fill test -- fill queue with "endless" jobs
TSystemEvent WaitEvent;
- Y_UNIT_TEST(FillTest) {
+
+ template <typename T>
+ void FillTest() {
Counters.Reset();
struct TWaitJob: public IObjectInQueue {
@@ -47,7 +65,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
}
} job;
- struct TLocalSetup: TQueueSetup {
+ struct TLocalSetup: TEnv<T>::TQueueSetup {
+ TLocalSetup() {
+ WaitEvent.Reset();
+ }
~TLocalSetup() {
WaitEvent.Signal();
}
@@ -56,19 +77,26 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
size_t enqueued = 0;
{
TLocalSetup setup;
- while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
+ while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
++enqueued;
}
UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize);
- UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount());
}
- UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount());
- UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size());
+ UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size());
UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
}
+ Y_UNIT_TEST(FillTest) {
+ FillTest<TElasticQueue>();
+ }
+
+ Y_UNIT_TEST(FillTestFast) {
+ FillTest<TFastElasticQueue>();
+ }
//concurrent test -- send many jobs from different threads
struct TJob: public IObjectInQueue {
@@ -78,9 +106,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
};
static TJob Job;
+ template <typename T>
static bool TryAdd() {
AtomicIncrement(Counters.Total);
- if (Queue->Add(&Job)) {
+ if (TEnv<T>::Queue->Add(&Job)) {
AtomicIncrement(Counters.Scheduled);
return true;
} else {
@@ -89,16 +118,18 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
}
}
+ const size_t N = 100000;
static size_t TryCounter;
- Y_UNIT_TEST(ConcurrentTest) {
+ template <typename T>
+ void ConcurrentTest() {
Counters.Reset();
TryCounter = 0;
struct TSender: public IThreadFactory::IThreadAble {
void DoExecute() override {
while ((size_t)AtomicIncrement(TryCounter) <= N) {
- if (!TryAdd()) {
+ if (!TryAdd<T>()) {
Sleep(TDuration::MicroSeconds(50));
}
}
@@ -106,7 +137,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
} sender;
{
- TQueueSetup setup;
+ typename TEnv<T>::TQueueSetup setup;
TVector< TAutoPtr<IThreadFactory::IThread> > senders;
for (size_t i = 0; i < ThreadCount; ++i) {
@@ -122,4 +153,12 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
}
+
+ Y_UNIT_TEST(ConcurrentTest) {
+ ConcurrentTest<TElasticQueue>();
+ }
+
+ Y_UNIT_TEST(ConcurrentTestFast) {
+ ConcurrentTest<TFastElasticQueue>();
+ }
}
diff --git a/library/cpp/threading/equeue/fast.h b/library/cpp/threading/equeue/fast.h
new file mode 100644
index 0000000000..3f96e279fc
--- /dev/null
+++ b/library/cpp/threading/equeue/fast.h
@@ -0,0 +1,167 @@
+#pragma once
+
+#include <util/thread/pool.h>
+
+#include <util/datetime/base.h>
+#include <util/thread/lfqueue.h>
+#include <util/system/thread.h>
+#include <util/generic/vector.h>
+#include <util/generic/scope.h>
+#include <util/stream/str.h>
+
+#include <library/cpp/threading/bounded_queue/bounded_queue.h>
+
+#if defined(_android_) || defined(_arm_)
+//by now library/cpp/yt/threading doesn't compile in targets like default-android-armv7a, fallback to ordinal elastic queue
+#include "equeue.h"
+ class TFastElasticQueue
+ : public TElasticQueue
+ {
+ public:
+ explicit TFastElasticQueue(const TParams& params = {})
+ : TElasticQueue(MakeHolder<TSimpleThreadPool>(params))
+ {
+ }
+ };
+#else
+
+#include <library/cpp/yt/threading/event_count.h>
+
+class TFastElasticQueue
+ : public TThreadPoolBase
+ , private IThreadFactory::IThreadAble
+{
+public:
+ explicit TFastElasticQueue(const TParams& params = {})
+ : TThreadPoolBase(params)
+ {
+ Y_ENSURE(!params.Blocking_);
+ }
+
+ ~TFastElasticQueue() {
+ Stop();
+ }
+
+ void Start(size_t threadCount, size_t maxQueueSize) override {
+ Y_ENSURE(Threads_.empty());
+ Y_ENSURE(maxQueueSize > 0);
+
+ Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events
+ MaxQueueSize_ = maxQueueSize;
+
+ try {
+ for (size_t i = 0; i < threadCount; ++i) {
+ Threads_.push_back(Pool()->Run(this));
+ }
+ } catch (...) {
+ Stop();
+ throw;
+ }
+
+ Stopped_ = false;
+ }
+
+ size_t ObjectCount() const {
+ //GuardCount_ can be temporary incremented above real object count in queue
+ return Min(GuardCount_.load(), MaxQueueSize_);
+ }
+
+ bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT {
+ if (Stopped_ || !obj) {
+ return false;
+ }
+
+ if (GuardCount_.fetch_add(1) >= MaxQueueSize_) {
+ GuardCount_.fetch_sub(1);
+ return false;
+ }
+
+ QueueSize_.fetch_add(1);
+
+ if (!Queue_->Enqueue(obj)) {
+ //Simultaneous Dequeue calls can return not in exact fifo order of items,
+ //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of
+ //the oldest enqueued item is not actually dequeued and ring buffer can't proceed.
+ GuardCount_.fetch_sub(1);
+ QueueSize_.fetch_sub(1);
+ return false;
+ }
+
+
+ Event_.NotifyOne();
+
+ return true;
+ }
+
+ size_t Size() const noexcept override {
+ return QueueSize_.load();
+ }
+
+ void Stop() noexcept override {
+ Stopped_ = true;
+
+ for (size_t i = 0; i < Threads_.size(); ++i) {
+ while (!Queue_->Enqueue(nullptr)) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
+
+ Event_.NotifyOne();
+ }
+
+ while (!Threads_.empty()) {
+ Threads_.back()->Join();
+ Threads_.pop_back();
+ }
+
+ Queue_.Reset();
+ }
+
+ void DoExecute() override {
+ TThread::SetCurrentThreadName(Params.ThreadName_.c_str());
+
+ while (true) {
+ IObjectInQueue* job = nullptr;
+
+ Event_.Await([&]() {
+ return Queue_->Dequeue(job);
+ });
+
+ if (!job) {
+ break;
+ }
+
+ QueueSize_.fetch_sub(1);
+
+ Y_DEFER {
+ GuardCount_.fetch_sub(1);
+ };
+
+ if (Params.Catching_) {
+ try {
+ try {
+ job->Process(nullptr);
+ } catch (...) {
+ Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
+ }
+ } catch (...) {
+ ;
+ }
+ } else {
+ job->Process(nullptr);
+ }
+ }
+ }
+private:
+ std::atomic<bool> Stopped_ = false;
+ size_t MaxQueueSize_ = 0;
+
+ alignas(64) std::atomic<size_t> GuardCount_ = 0;
+ alignas(64) std::atomic<size_t> QueueSize_ = 0;
+
+ TVector<THolder<IThreadFactory::IThread>> Threads_;
+
+ THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_;
+ NYT::NThreading::TEventCount Event_;
+};
+
+#endif
diff --git a/library/cpp/threading/equeue/ya.make b/library/cpp/threading/equeue/ya.make
index 445797aa12..95677366c9 100644
--- a/library/cpp/threading/equeue/ya.make
+++ b/library/cpp/threading/equeue/ya.make
@@ -3,14 +3,23 @@ LIBRARY()
SRCS(
equeue.h
equeue.cpp
+ fast.h
)
PEERDIR(
library/cpp/deprecated/atomic
+ library/cpp/threading/bounded_queue
)
+IF (NOT OS_ANDROID AND NOT ARCH_ARM)
+ PEERDIR(
+ library/cpp/yt/threading
+ )
+ENDIF()
+
END()
RECURSE_FOR_TESTS(
ut
)
+