summaryrefslogtreecommitdiffstats
path: root/library/cpp/threading
diff options
context:
space:
mode:
Diffstat (limited to 'library/cpp/threading')
-rw-r--r--library/cpp/threading/CMakeLists.txt1
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt14
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt15
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt15
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.txt17
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt14
-rw-r--r--library/cpp/threading/bounded_queue/bounded_queue.h89
-rw-r--r--library/cpp/threading/bounded_queue/bounded_queue_ut.cpp106
-rw-r--r--library/cpp/threading/bounded_queue/ut/ya.make8
-rw-r--r--library/cpp/threading/bounded_queue/ya.make9
-rw-r--r--library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/equeue_ut.cpp79
-rw-r--r--library/cpp/threading/equeue/fast.h167
-rw-r--r--library/cpp/threading/equeue/ya.make9
17 files changed, 530 insertions, 20 deletions
diff --git a/library/cpp/threading/CMakeLists.txt b/library/cpp/threading/CMakeLists.txt
index 681ef6b24e3..f5410422081 100644
--- a/library/cpp/threading/CMakeLists.txt
+++ b/library/cpp/threading/CMakeLists.txt
@@ -7,6 +7,7 @@
add_subdirectory(atomic)
+add_subdirectory(bounded_queue)
add_subdirectory(chunk_queue)
add_subdirectory(equeue)
add_subdirectory(future)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..bd9b68ab62b
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,14 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..613a18db103
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..613a18db103
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.txt b/library/cpp/threading/bounded_queue/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..bd9b68ab62b
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,14 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-threading-bounded_queue INTERFACE)
+target_link_libraries(cpp-threading-bounded_queue INTERFACE
+ contrib-libs-cxxsupp
+ yutil
+)
diff --git a/library/cpp/threading/bounded_queue/bounded_queue.h b/library/cpp/threading/bounded_queue/bounded_queue.h
new file mode 100644
index 00000000000..c5c67140865
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/bounded_queue.h
@@ -0,0 +1,89 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+
+//https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
+
+namespace NThreading {
+ template<typename T>
+ class TBoundedQueue {
+ public:
+ explicit TBoundedQueue(size_t size)
+ : Buffer_(new TCell[size])
+ , Mask_(size - 1)
+ {
+ Y_ENSURE(size >= 2 && (size & (size - 1)) == 0);
+
+ for (size_t i = 0; i < size; ++i) {
+ Buffer_[i].Sequence.store(i, std::memory_order_relaxed);
+ }
+ }
+
+ template <typename T_>
+ [[nodiscard]] bool Enqueue(T_&& data) noexcept {
+ TCell* cell;
+ size_t pos = EnqueuePos_.load(std::memory_order_relaxed);
+
+ for (;;) {
+ cell = &Buffer_[pos & Mask_];
+ size_t seq = cell->Sequence.load(std::memory_order_acquire);
+ intptr_t dif = (intptr_t)seq - (intptr_t)pos;
+
+ if (dif == 0) {
+ if (EnqueuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
+ break;
+ }
+ } else if (dif < 0) {
+ return false;
+ } else {
+ pos = EnqueuePos_.load(std::memory_order_relaxed);
+ }
+ }
+
+ static_assert(noexcept(cell->Data = std::forward<T_>(data)));
+ cell->Data = std::forward<T_>(data);
+ cell->Sequence.store(pos + 1, std::memory_order_release);
+
+ return true;
+ }
+
+ [[nodiscard]] bool Dequeue(T& data) noexcept {
+ TCell* cell;
+ size_t pos = DequeuePos_.load(std::memory_order_relaxed);
+
+ for (;;) {
+ cell = &Buffer_[pos & Mask_];
+ size_t seq = cell->Sequence.load(std::memory_order_acquire);
+ intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
+
+ if (dif == 0) {
+ if (DequeuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
+ break;
+ }
+ } else if (dif < 0) {
+ return false;
+ } else {
+ pos = DequeuePos_.load(std::memory_order_relaxed);
+ }
+ }
+
+ static_assert(noexcept(data = std::move(cell->Data)));
+ data = std::move(cell->Data);
+
+ cell->Sequence.store(pos + Mask_ + 1, std::memory_order_release);
+ return true;
+ }
+ private:
+ struct TCell {
+ std::atomic<size_t> Sequence;
+ T Data;
+ };
+
+ std::unique_ptr<TCell[]> Buffer_;
+ const size_t Mask_;
+
+ alignas(64) std::atomic<size_t> EnqueuePos_ = 0;
+ alignas(64) std::atomic<size_t> DequeuePos_ = 0;
+ };
+}
+
diff --git a/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp b/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp
new file mode 100644
index 00000000000..bb5b6eb787b
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp
@@ -0,0 +1,106 @@
+#include "bounded_queue.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <util/thread/factory.h>
+
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TBoundedQueueTest) {
+ Y_UNIT_TEST(QueueSize) {
+ const size_t queueSize = 16;
+ TBoundedQueue<size_t> boundedQueue(queueSize);
+
+ for (size_t i = 0; i < queueSize; ++i) {
+ UNIT_ASSERT(boundedQueue.Enqueue(i));
+ }
+ UNIT_ASSERT(!boundedQueue.Enqueue(0));
+ size_t tmp = 0;
+ UNIT_ASSERT(boundedQueue.Dequeue(tmp));
+ UNIT_ASSERT(boundedQueue.Enqueue(0));
+ UNIT_ASSERT(!boundedQueue.Enqueue(0));
+ }
+
+ Y_UNIT_TEST(Move) {
+ const size_t queueSize = 16;
+ TBoundedQueue<TString> boundedQueue(queueSize);
+
+ for (size_t i = 0; i < queueSize; ++i) {
+ TString v = "xxx";
+ UNIT_ASSERT(boundedQueue.Enqueue(std::move(v)));
+ UNIT_ASSERT(v.empty());
+ }
+
+ {
+ TString v = "xxx";
+ UNIT_ASSERT(!boundedQueue.Enqueue(std::move(v)));
+ UNIT_ASSERT(v == "xxx");
+ }
+
+ TString v;
+ UNIT_ASSERT(boundedQueue.Dequeue(v));
+ UNIT_ASSERT(v == "xxx");
+ }
+
+ Y_UNIT_TEST(MPMC) {
+ size_t queueSize = 16;
+ size_t producers = 10;
+ size_t consumers = 10;
+ size_t itemsCount = 10000;
+
+ TVector<THolder<IThreadFactory::IThread>> threads;
+ TBoundedQueue<std::pair<size_t, size_t>> boundedQueue(queueSize);
+
+ std::atomic<size_t> itemCounter = 0;
+ std::atomic<size_t> consumed = 0;
+
+ for (size_t i = 0; i < consumers; ++i) {
+ threads.push_back(SystemThreadFactory()->Run(
+ [&]() {
+ TVector<size_t> prevItems(producers);
+ for (;;) {
+ std::pair<size_t, size_t> item;
+ while (!boundedQueue.Dequeue(item)) {
+ ;
+ }
+
+ if (item.first >= producers) {
+ break;
+ }
+
+ UNIT_ASSERT(item.second > prevItems[item.first]);
+ prevItems[item.first] = item.second;
+ ++consumed;
+ }
+ })
+ );
+ }
+
+ for (size_t i = 0; i < producers ; ++i) {
+ threads.push_back(SystemThreadFactory()->Run(
+ [&, producerNum = i]() {
+ for (;;) {
+ size_t item = ++itemCounter;
+ if (item > itemsCount) {
+ break;
+ }
+
+ while (!boundedQueue.Enqueue(std::make_pair(producerNum, item))) {
+ ;
+ }
+ }
+
+ while (!boundedQueue.Enqueue(std::make_pair(producers, size_t(0)))) {
+ ;
+ }
+ })
+ );
+ }
+
+
+ for (auto& t : threads) {
+ t->Join();
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(consumed.load(), itemsCount);
+ }
+}
diff --git a/library/cpp/threading/bounded_queue/ut/ya.make b/library/cpp/threading/bounded_queue/ut/ya.make
new file mode 100644
index 00000000000..5db0a227131
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/ut/ya.make
@@ -0,0 +1,8 @@
+UNITTEST_FOR(library/cpp/threading/bounded_queue)
+
+SRCS(
+ bounded_queue_ut.cpp
+)
+
+END()
+
diff --git a/library/cpp/threading/bounded_queue/ya.make b/library/cpp/threading/bounded_queue/ya.make
new file mode 100644
index 00000000000..4c5336f0311
--- /dev/null
+++ b/library/cpp/threading/bounded_queue/ya.make
@@ -0,0 +1,9 @@
+LIBRARY()
+
+SRCS(
+ bounded_queue.h
+)
+
+END()
+
+RECURSE_FOR_TESTS(ut)
diff --git a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
index 902b6d7a9ac..43ff0adc1c0 100644
--- a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
@@ -12,6 +12,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
+ cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
index 5653d3d3f3f..805b2379439 100644
--- a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
index 5653d3d3f3f..36f8a2632e3 100644
--- a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
@@ -13,6 +13,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
+ cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
index 902b6d7a9ac..43ff0adc1c0 100644
--- a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
@@ -12,6 +12,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
+ cpp-threading-bounded_queue
+ cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp
index 8557f63ac06..2c7d2c7b1e8 100644
--- a/library/cpp/threading/equeue/equeue_ut.cpp
+++ b/library/cpp/threading/equeue/equeue_ut.cpp
@@ -1,4 +1,5 @@
#include "equeue.h"
+#include "fast.h"
#include <library/cpp/testing/unittest/registar.h>
@@ -9,18 +10,33 @@
Y_UNIT_TEST_SUITE(TElasticQueueTest) {
const size_t MaxQueueSize = 20;
const size_t ThreadCount = 10;
- const size_t N = 100000;
- static THolder<TElasticQueue> Queue;
+ template <typename T>
+ THolder<T> MakeQueue();
- struct TQueueSetup {
- TQueueSetup() {
- Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>()));
- Queue->Start(ThreadCount, MaxQueueSize);
- }
- ~TQueueSetup() {
- Queue->Stop();
- }
+ template <>
+ THolder<TElasticQueue> MakeQueue() {
+ return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>());
+ }
+
+ template <>
+ THolder<TFastElasticQueue> MakeQueue() {
+ return MakeHolder<TFastElasticQueue>();
+ }
+
+ template <typename T>
+ struct TEnv {
+ static inline THolder<T> Queue;
+
+ struct TQueueSetup {
+ TQueueSetup() {
+ Queue.Reset(MakeQueue<T>());
+ Queue->Start(ThreadCount, MaxQueueSize);
+ }
+ ~TQueueSetup() {
+ Queue->Stop();
+ }
+ };
};
struct TCounters {
@@ -37,7 +53,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
//fill test -- fill queue with "endless" jobs
TSystemEvent WaitEvent;
- Y_UNIT_TEST(FillTest) {
+
+ template <typename T>
+ void FillTest() {
Counters.Reset();
struct TWaitJob: public IObjectInQueue {
@@ -47,7 +65,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
}
} job;
- struct TLocalSetup: TQueueSetup {
+ struct TLocalSetup: TEnv<T>::TQueueSetup {
+ TLocalSetup() {
+ WaitEvent.Reset();
+ }
~TLocalSetup() {
WaitEvent.Signal();
}
@@ -56,19 +77,26 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
size_t enqueued = 0;
{
TLocalSetup setup;
- while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
+ while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
++enqueued;
}
UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize);
- UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount());
}
- UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount());
- UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size());
+ UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size());
UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
}
+ Y_UNIT_TEST(FillTest) {
+ FillTest<TElasticQueue>();
+ }
+
+ Y_UNIT_TEST(FillTestFast) {
+ FillTest<TFastElasticQueue>();
+ }
//concurrent test -- send many jobs from different threads
struct TJob: public IObjectInQueue {
@@ -78,9 +106,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
};
static TJob Job;
+ template <typename T>
static bool TryAdd() {
AtomicIncrement(Counters.Total);
- if (Queue->Add(&Job)) {
+ if (TEnv<T>::Queue->Add(&Job)) {
AtomicIncrement(Counters.Scheduled);
return true;
} else {
@@ -89,16 +118,18 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
}
}
+ const size_t N = 100000;
static size_t TryCounter;
- Y_UNIT_TEST(ConcurrentTest) {
+ template <typename T>
+ void ConcurrentTest() {
Counters.Reset();
TryCounter = 0;
struct TSender: public IThreadFactory::IThreadAble {
void DoExecute() override {
while ((size_t)AtomicIncrement(TryCounter) <= N) {
- if (!TryAdd()) {
+ if (!TryAdd<T>()) {
Sleep(TDuration::MicroSeconds(50));
}
}
@@ -106,7 +137,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
} sender;
{
- TQueueSetup setup;
+ typename TEnv<T>::TQueueSetup setup;
TVector< TAutoPtr<IThreadFactory::IThread> > senders;
for (size_t i = 0; i < ThreadCount; ++i) {
@@ -122,4 +153,12 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
}
+
+ Y_UNIT_TEST(ConcurrentTest) {
+ ConcurrentTest<TElasticQueue>();
+ }
+
+ Y_UNIT_TEST(ConcurrentTestFast) {
+ ConcurrentTest<TFastElasticQueue>();
+ }
}
diff --git a/library/cpp/threading/equeue/fast.h b/library/cpp/threading/equeue/fast.h
new file mode 100644
index 00000000000..3f96e279fc9
--- /dev/null
+++ b/library/cpp/threading/equeue/fast.h
@@ -0,0 +1,167 @@
+#pragma once
+
+#include <util/thread/pool.h>
+
+#include <util/datetime/base.h>
+#include <util/thread/lfqueue.h>
+#include <util/system/thread.h>
+#include <util/generic/vector.h>
+#include <util/generic/scope.h>
+#include <util/stream/str.h>
+
+#include <library/cpp/threading/bounded_queue/bounded_queue.h>
+
+#if defined(_android_) || defined(_arm_)
+//by now library/cpp/yt/threading doesn't compile in targets like default-android-armv7a, fallback to ordinal elastic queue
+#include "equeue.h"
+ class TFastElasticQueue
+ : public TElasticQueue
+ {
+ public:
+ explicit TFastElasticQueue(const TParams& params = {})
+ : TElasticQueue(MakeHolder<TSimpleThreadPool>(params))
+ {
+ }
+ };
+#else
+
+#include <library/cpp/yt/threading/event_count.h>
+
+class TFastElasticQueue
+ : public TThreadPoolBase
+ , private IThreadFactory::IThreadAble
+{
+public:
+ explicit TFastElasticQueue(const TParams& params = {})
+ : TThreadPoolBase(params)
+ {
+ Y_ENSURE(!params.Blocking_);
+ }
+
+ ~TFastElasticQueue() {
+ Stop();
+ }
+
+ void Start(size_t threadCount, size_t maxQueueSize) override {
+ Y_ENSURE(Threads_.empty());
+ Y_ENSURE(maxQueueSize > 0);
+
+ Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events
+ MaxQueueSize_ = maxQueueSize;
+
+ try {
+ for (size_t i = 0; i < threadCount; ++i) {
+ Threads_.push_back(Pool()->Run(this));
+ }
+ } catch (...) {
+ Stop();
+ throw;
+ }
+
+ Stopped_ = false;
+ }
+
+ size_t ObjectCount() const {
+ //GuardCount_ can be temporary incremented above real object count in queue
+ return Min(GuardCount_.load(), MaxQueueSize_);
+ }
+
+ bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT {
+ if (Stopped_ || !obj) {
+ return false;
+ }
+
+ if (GuardCount_.fetch_add(1) >= MaxQueueSize_) {
+ GuardCount_.fetch_sub(1);
+ return false;
+ }
+
+ QueueSize_.fetch_add(1);
+
+ if (!Queue_->Enqueue(obj)) {
+ //Simultaneous Dequeue calls can return not in exact fifo order of items,
+ //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of
+ //the oldest enqueued item is not actually dequeued and ring buffer can't proceed.
+ GuardCount_.fetch_sub(1);
+ QueueSize_.fetch_sub(1);
+ return false;
+ }
+
+
+ Event_.NotifyOne();
+
+ return true;
+ }
+
+ size_t Size() const noexcept override {
+ return QueueSize_.load();
+ }
+
+ void Stop() noexcept override {
+ Stopped_ = true;
+
+ for (size_t i = 0; i < Threads_.size(); ++i) {
+ while (!Queue_->Enqueue(nullptr)) {
+ Sleep(TDuration::MilliSeconds(1));
+ }
+
+ Event_.NotifyOne();
+ }
+
+ while (!Threads_.empty()) {
+ Threads_.back()->Join();
+ Threads_.pop_back();
+ }
+
+ Queue_.Reset();
+ }
+
+ void DoExecute() override {
+ TThread::SetCurrentThreadName(Params.ThreadName_.c_str());
+
+ while (true) {
+ IObjectInQueue* job = nullptr;
+
+ Event_.Await([&]() {
+ return Queue_->Dequeue(job);
+ });
+
+ if (!job) {
+ break;
+ }
+
+ QueueSize_.fetch_sub(1);
+
+ Y_DEFER {
+ GuardCount_.fetch_sub(1);
+ };
+
+ if (Params.Catching_) {
+ try {
+ try {
+ job->Process(nullptr);
+ } catch (...) {
+ Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
+ }
+ } catch (...) {
+ ;
+ }
+ } else {
+ job->Process(nullptr);
+ }
+ }
+ }
+private:
+ std::atomic<bool> Stopped_ = false;
+ size_t MaxQueueSize_ = 0;
+
+ alignas(64) std::atomic<size_t> GuardCount_ = 0;
+ alignas(64) std::atomic<size_t> QueueSize_ = 0;
+
+ TVector<THolder<IThreadFactory::IThread>> Threads_;
+
+ THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_;
+ NYT::NThreading::TEventCount Event_;
+};
+
+#endif
diff --git a/library/cpp/threading/equeue/ya.make b/library/cpp/threading/equeue/ya.make
index 445797aa121..95677366c93 100644
--- a/library/cpp/threading/equeue/ya.make
+++ b/library/cpp/threading/equeue/ya.make
@@ -3,14 +3,23 @@ LIBRARY()
SRCS(
equeue.h
equeue.cpp
+ fast.h
)
PEERDIR(
library/cpp/deprecated/atomic
+ library/cpp/threading/bounded_queue
)
+IF (NOT OS_ANDROID AND NOT ARCH_ARM)
+ PEERDIR(
+ library/cpp/yt/threading
+ )
+ENDIF()
+
END()
RECURSE_FOR_TESTS(
ut
)
+