aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading
diff options
context:
space:
mode:
authorkulikov <kulikov@yandex-team.com>2023-07-27 12:28:50 +0300
committerkulikov <kulikov@yandex-team.com>2023-07-27 12:28:50 +0300
commit4a7691c519e6114e013dc1dd0c3b2528154507f9 (patch)
treeaf6ee3a210f95bb4532036b1f1a5352ef072068a /library/cpp/threading
parentdcde92436ae71c3fdf1d6b9916e3858f3b35146e (diff)
downloadydb-4a7691c519e6114e013dc1dd0c3b2528154507f9.tar.gz
revert rXXXXXX (see discusstion in pr), will commit again more pci-dss friendly way
Diffstat (limited to 'library/cpp/threading')
-rw-r--r--library/cpp/threading/CMakeLists.txt1
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt14
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt15
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt15
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.txt17
-rw-r--r--library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt14
-rw-r--r--library/cpp/threading/bounded_queue/bounded_queue.h89
-rw-r--r--library/cpp/threading/bounded_queue/bounded_queue_ut.cpp106
-rw-r--r--library/cpp/threading/bounded_queue/ut/ya.make8
-rw-r--r--library/cpp/threading/bounded_queue/ya.make9
-rw-r--r--library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt2
-rw-r--r--library/cpp/threading/equeue/equeue_ut.cpp79
-rw-r--r--library/cpp/threading/equeue/fast.h167
-rw-r--r--library/cpp/threading/equeue/ya.make9
17 files changed, 20 insertions, 530 deletions
diff --git a/library/cpp/threading/CMakeLists.txt b/library/cpp/threading/CMakeLists.txt
index f541042208..681ef6b24e 100644
--- a/library/cpp/threading/CMakeLists.txt
+++ b/library/cpp/threading/CMakeLists.txt
@@ -7,7 +7,6 @@
add_subdirectory(atomic)
-add_subdirectory(bounded_queue)
add_subdirectory(chunk_queue)
add_subdirectory(equeue)
add_subdirectory(future)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt
deleted file mode 100644
index bd9b68ab62..0000000000
--- a/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt
+++ /dev/null
@@ -1,14 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-threading-bounded_queue INTERFACE)
-target_link_libraries(cpp-threading-bounded_queue INTERFACE
- contrib-libs-cxxsupp
- yutil
-)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt
deleted file mode 100644
index 613a18db10..0000000000
--- a/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-threading-bounded_queue INTERFACE)
-target_link_libraries(cpp-threading-bounded_queue INTERFACE
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
-)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt
deleted file mode 100644
index 613a18db10..0000000000
--- a/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-threading-bounded_queue INTERFACE)
-target_link_libraries(cpp-threading-bounded_queue INTERFACE
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
-)
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.txt b/library/cpp/threading/bounded_queue/CMakeLists.txt
deleted file mode 100644
index f8b31df0c1..0000000000
--- a/library/cpp/threading/bounded_queue/CMakeLists.txt
+++ /dev/null
@@ -1,17 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
- include(CMakeLists.darwin-x86_64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
- include(CMakeLists.windows-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-x86_64.txt)
-endif()
diff --git a/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt
deleted file mode 100644
index bd9b68ab62..0000000000
--- a/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt
+++ /dev/null
@@ -1,14 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-threading-bounded_queue INTERFACE)
-target_link_libraries(cpp-threading-bounded_queue INTERFACE
- contrib-libs-cxxsupp
- yutil
-)
diff --git a/library/cpp/threading/bounded_queue/bounded_queue.h b/library/cpp/threading/bounded_queue/bounded_queue.h
deleted file mode 100644
index c5c6714086..0000000000
--- a/library/cpp/threading/bounded_queue/bounded_queue.h
+++ /dev/null
@@ -1,89 +0,0 @@
-#pragma once
-
-#include <util/generic/yexception.h>
-
-//https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
-
-namespace NThreading {
- template<typename T>
- class TBoundedQueue {
- public:
- explicit TBoundedQueue(size_t size)
- : Buffer_(new TCell[size])
- , Mask_(size - 1)
- {
- Y_ENSURE(size >= 2 && (size & (size - 1)) == 0);
-
- for (size_t i = 0; i < size; ++i) {
- Buffer_[i].Sequence.store(i, std::memory_order_relaxed);
- }
- }
-
- template <typename T_>
- [[nodiscard]] bool Enqueue(T_&& data) noexcept {
- TCell* cell;
- size_t pos = EnqueuePos_.load(std::memory_order_relaxed);
-
- for (;;) {
- cell = &Buffer_[pos & Mask_];
- size_t seq = cell->Sequence.load(std::memory_order_acquire);
- intptr_t dif = (intptr_t)seq - (intptr_t)pos;
-
- if (dif == 0) {
- if (EnqueuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
- break;
- }
- } else if (dif < 0) {
- return false;
- } else {
- pos = EnqueuePos_.load(std::memory_order_relaxed);
- }
- }
-
- static_assert(noexcept(cell->Data = std::forward<T_>(data)));
- cell->Data = std::forward<T_>(data);
- cell->Sequence.store(pos + 1, std::memory_order_release);
-
- return true;
- }
-
- [[nodiscard]] bool Dequeue(T& data) noexcept {
- TCell* cell;
- size_t pos = DequeuePos_.load(std::memory_order_relaxed);
-
- for (;;) {
- cell = &Buffer_[pos & Mask_];
- size_t seq = cell->Sequence.load(std::memory_order_acquire);
- intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
-
- if (dif == 0) {
- if (DequeuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
- break;
- }
- } else if (dif < 0) {
- return false;
- } else {
- pos = DequeuePos_.load(std::memory_order_relaxed);
- }
- }
-
- static_assert(noexcept(data = std::move(cell->Data)));
- data = std::move(cell->Data);
-
- cell->Sequence.store(pos + Mask_ + 1, std::memory_order_release);
- return true;
- }
- private:
- struct TCell {
- std::atomic<size_t> Sequence;
- T Data;
- };
-
- std::unique_ptr<TCell[]> Buffer_;
- const size_t Mask_;
-
- alignas(64) std::atomic<size_t> EnqueuePos_ = 0;
- alignas(64) std::atomic<size_t> DequeuePos_ = 0;
- };
-}
-
diff --git a/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp b/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp
deleted file mode 100644
index bb5b6eb787..0000000000
--- a/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp
+++ /dev/null
@@ -1,106 +0,0 @@
-#include "bounded_queue.h"
-
-#include <library/cpp/testing/unittest/registar.h>
-#include <util/thread/factory.h>
-
-using namespace NThreading;
-
-Y_UNIT_TEST_SUITE(TBoundedQueueTest) {
- Y_UNIT_TEST(QueueSize) {
- const size_t queueSize = 16;
- TBoundedQueue<size_t> boundedQueue(queueSize);
-
- for (size_t i = 0; i < queueSize; ++i) {
- UNIT_ASSERT(boundedQueue.Enqueue(i));
- }
- UNIT_ASSERT(!boundedQueue.Enqueue(0));
- size_t tmp = 0;
- UNIT_ASSERT(boundedQueue.Dequeue(tmp));
- UNIT_ASSERT(boundedQueue.Enqueue(0));
- UNIT_ASSERT(!boundedQueue.Enqueue(0));
- }
-
- Y_UNIT_TEST(Move) {
- const size_t queueSize = 16;
- TBoundedQueue<TString> boundedQueue(queueSize);
-
- for (size_t i = 0; i < queueSize; ++i) {
- TString v = "xxx";
- UNIT_ASSERT(boundedQueue.Enqueue(std::move(v)));
- UNIT_ASSERT(v.empty());
- }
-
- {
- TString v = "xxx";
- UNIT_ASSERT(!boundedQueue.Enqueue(std::move(v)));
- UNIT_ASSERT(v == "xxx");
- }
-
- TString v;
- UNIT_ASSERT(boundedQueue.Dequeue(v));
- UNIT_ASSERT(v == "xxx");
- }
-
- Y_UNIT_TEST(MPMC) {
- size_t queueSize = 16;
- size_t producers = 10;
- size_t consumers = 10;
- size_t itemsCount = 10000;
-
- TVector<THolder<IThreadFactory::IThread>> threads;
- TBoundedQueue<std::pair<size_t, size_t>> boundedQueue(queueSize);
-
- std::atomic<size_t> itemCounter = 0;
- std::atomic<size_t> consumed = 0;
-
- for (size_t i = 0; i < consumers; ++i) {
- threads.push_back(SystemThreadFactory()->Run(
- [&]() {
- TVector<size_t> prevItems(producers);
- for (;;) {
- std::pair<size_t, size_t> item;
- while (!boundedQueue.Dequeue(item)) {
- ;
- }
-
- if (item.first >= producers) {
- break;
- }
-
- UNIT_ASSERT(item.second > prevItems[item.first]);
- prevItems[item.first] = item.second;
- ++consumed;
- }
- })
- );
- }
-
- for (size_t i = 0; i < producers ; ++i) {
- threads.push_back(SystemThreadFactory()->Run(
- [&, producerNum = i]() {
- for (;;) {
- size_t item = ++itemCounter;
- if (item > itemsCount) {
- break;
- }
-
- while (!boundedQueue.Enqueue(std::make_pair(producerNum, item))) {
- ;
- }
- }
-
- while (!boundedQueue.Enqueue(std::make_pair(producers, size_t(0)))) {
- ;
- }
- })
- );
- }
-
-
- for (auto& t : threads) {
- t->Join();
- }
-
- UNIT_ASSERT_VALUES_EQUAL(consumed.load(), itemsCount);
- }
-}
diff --git a/library/cpp/threading/bounded_queue/ut/ya.make b/library/cpp/threading/bounded_queue/ut/ya.make
deleted file mode 100644
index 5db0a22713..0000000000
--- a/library/cpp/threading/bounded_queue/ut/ya.make
+++ /dev/null
@@ -1,8 +0,0 @@
-UNITTEST_FOR(library/cpp/threading/bounded_queue)
-
-SRCS(
- bounded_queue_ut.cpp
-)
-
-END()
-
diff --git a/library/cpp/threading/bounded_queue/ya.make b/library/cpp/threading/bounded_queue/ya.make
deleted file mode 100644
index 4c5336f031..0000000000
--- a/library/cpp/threading/bounded_queue/ya.make
+++ /dev/null
@@ -1,9 +0,0 @@
-LIBRARY()
-
-SRCS(
- bounded_queue.h
-)
-
-END()
-
-RECURSE_FOR_TESTS(ut)
diff --git a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
index 43ff0adc1c..902b6d7a9a 100644
--- a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt
@@ -12,8 +12,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
- cpp-threading-bounded_queue
- cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
index 805b237943..5653d3d3f3 100644
--- a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt
@@ -13,7 +13,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
- cpp-threading-bounded_queue
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
index 36f8a2632e..5653d3d3f3 100644
--- a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt
@@ -13,8 +13,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
- cpp-threading-bounded_queue
- cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
index 43ff0adc1c..902b6d7a9a 100644
--- a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
+++ b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt
@@ -12,8 +12,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC
contrib-libs-cxxsupp
yutil
cpp-deprecated-atomic
- cpp-threading-bounded_queue
- cpp-yt-threading
)
target_sources(cpp-threading-equeue PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp
diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp
index 2c7d2c7b1e..8557f63ac0 100644
--- a/library/cpp/threading/equeue/equeue_ut.cpp
+++ b/library/cpp/threading/equeue/equeue_ut.cpp
@@ -1,5 +1,4 @@
#include "equeue.h"
-#include "fast.h"
#include <library/cpp/testing/unittest/registar.h>
@@ -10,33 +9,18 @@
Y_UNIT_TEST_SUITE(TElasticQueueTest) {
const size_t MaxQueueSize = 20;
const size_t ThreadCount = 10;
+ const size_t N = 100000;
- template <typename T>
- THolder<T> MakeQueue();
-
- template <>
- THolder<TElasticQueue> MakeQueue() {
- return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>());
- }
-
- template <>
- THolder<TFastElasticQueue> MakeQueue() {
- return MakeHolder<TFastElasticQueue>();
- }
-
- template <typename T>
- struct TEnv {
- static inline THolder<T> Queue;
+ static THolder<TElasticQueue> Queue;
- struct TQueueSetup {
- TQueueSetup() {
- Queue.Reset(MakeQueue<T>());
- Queue->Start(ThreadCount, MaxQueueSize);
- }
- ~TQueueSetup() {
- Queue->Stop();
- }
- };
+ struct TQueueSetup {
+ TQueueSetup() {
+ Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>()));
+ Queue->Start(ThreadCount, MaxQueueSize);
+ }
+ ~TQueueSetup() {
+ Queue->Stop();
+ }
};
struct TCounters {
@@ -53,9 +37,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
//fill test -- fill queue with "endless" jobs
TSystemEvent WaitEvent;
-
- template <typename T>
- void FillTest() {
+ Y_UNIT_TEST(FillTest) {
Counters.Reset();
struct TWaitJob: public IObjectInQueue {
@@ -65,10 +47,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
}
} job;
- struct TLocalSetup: TEnv<T>::TQueueSetup {
- TLocalSetup() {
- WaitEvent.Reset();
- }
+ struct TLocalSetup: TQueueSetup {
~TLocalSetup() {
WaitEvent.Signal();
}
@@ -77,26 +56,19 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
size_t enqueued = 0;
{
TLocalSetup setup;
- while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
+ while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
++enqueued;
}
UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize);
- UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount());
}
- UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount());
- UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size());
+ UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size());
UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
}
- Y_UNIT_TEST(FillTest) {
- FillTest<TElasticQueue>();
- }
-
- Y_UNIT_TEST(FillTestFast) {
- FillTest<TFastElasticQueue>();
- }
//concurrent test -- send many jobs from different threads
struct TJob: public IObjectInQueue {
@@ -106,10 +78,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
};
static TJob Job;
- template <typename T>
static bool TryAdd() {
AtomicIncrement(Counters.Total);
- if (TEnv<T>::Queue->Add(&Job)) {
+ if (Queue->Add(&Job)) {
AtomicIncrement(Counters.Scheduled);
return true;
} else {
@@ -118,18 +89,16 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
}
}
- const size_t N = 100000;
static size_t TryCounter;
- template <typename T>
- void ConcurrentTest() {
+ Y_UNIT_TEST(ConcurrentTest) {
Counters.Reset();
TryCounter = 0;
struct TSender: public IThreadFactory::IThreadAble {
void DoExecute() override {
while ((size_t)AtomicIncrement(TryCounter) <= N) {
- if (!TryAdd<T>()) {
+ if (!TryAdd()) {
Sleep(TDuration::MicroSeconds(50));
}
}
@@ -137,7 +106,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
} sender;
{
- typename TEnv<T>::TQueueSetup setup;
+ TQueueSetup setup;
TVector< TAutoPtr<IThreadFactory::IThread> > senders;
for (size_t i = 0; i < ThreadCount; ++i) {
@@ -153,12 +122,4 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
}
-
- Y_UNIT_TEST(ConcurrentTest) {
- ConcurrentTest<TElasticQueue>();
- }
-
- Y_UNIT_TEST(ConcurrentTestFast) {
- ConcurrentTest<TFastElasticQueue>();
- }
}
diff --git a/library/cpp/threading/equeue/fast.h b/library/cpp/threading/equeue/fast.h
deleted file mode 100644
index 3f96e279fc..0000000000
--- a/library/cpp/threading/equeue/fast.h
+++ /dev/null
@@ -1,167 +0,0 @@
-#pragma once
-
-#include <util/thread/pool.h>
-
-#include <util/datetime/base.h>
-#include <util/thread/lfqueue.h>
-#include <util/system/thread.h>
-#include <util/generic/vector.h>
-#include <util/generic/scope.h>
-#include <util/stream/str.h>
-
-#include <library/cpp/threading/bounded_queue/bounded_queue.h>
-
-#if defined(_android_) || defined(_arm_)
-//by now library/cpp/yt/threading doesn't compile in targets like default-android-armv7a, fallback to ordinal elastic queue
-#include "equeue.h"
- class TFastElasticQueue
- : public TElasticQueue
- {
- public:
- explicit TFastElasticQueue(const TParams& params = {})
- : TElasticQueue(MakeHolder<TSimpleThreadPool>(params))
- {
- }
- };
-#else
-
-#include <library/cpp/yt/threading/event_count.h>
-
-class TFastElasticQueue
- : public TThreadPoolBase
- , private IThreadFactory::IThreadAble
-{
-public:
- explicit TFastElasticQueue(const TParams& params = {})
- : TThreadPoolBase(params)
- {
- Y_ENSURE(!params.Blocking_);
- }
-
- ~TFastElasticQueue() {
- Stop();
- }
-
- void Start(size_t threadCount, size_t maxQueueSize) override {
- Y_ENSURE(Threads_.empty());
- Y_ENSURE(maxQueueSize > 0);
-
- Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events
- MaxQueueSize_ = maxQueueSize;
-
- try {
- for (size_t i = 0; i < threadCount; ++i) {
- Threads_.push_back(Pool()->Run(this));
- }
- } catch (...) {
- Stop();
- throw;
- }
-
- Stopped_ = false;
- }
-
- size_t ObjectCount() const {
- //GuardCount_ can be temporary incremented above real object count in queue
- return Min(GuardCount_.load(), MaxQueueSize_);
- }
-
- bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT {
- if (Stopped_ || !obj) {
- return false;
- }
-
- if (GuardCount_.fetch_add(1) >= MaxQueueSize_) {
- GuardCount_.fetch_sub(1);
- return false;
- }
-
- QueueSize_.fetch_add(1);
-
- if (!Queue_->Enqueue(obj)) {
- //Simultaneous Dequeue calls can return not in exact fifo order of items,
- //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of
- //the oldest enqueued item is not actually dequeued and ring buffer can't proceed.
- GuardCount_.fetch_sub(1);
- QueueSize_.fetch_sub(1);
- return false;
- }
-
-
- Event_.NotifyOne();
-
- return true;
- }
-
- size_t Size() const noexcept override {
- return QueueSize_.load();
- }
-
- void Stop() noexcept override {
- Stopped_ = true;
-
- for (size_t i = 0; i < Threads_.size(); ++i) {
- while (!Queue_->Enqueue(nullptr)) {
- Sleep(TDuration::MilliSeconds(1));
- }
-
- Event_.NotifyOne();
- }
-
- while (!Threads_.empty()) {
- Threads_.back()->Join();
- Threads_.pop_back();
- }
-
- Queue_.Reset();
- }
-
- void DoExecute() override {
- TThread::SetCurrentThreadName(Params.ThreadName_.c_str());
-
- while (true) {
- IObjectInQueue* job = nullptr;
-
- Event_.Await([&]() {
- return Queue_->Dequeue(job);
- });
-
- if (!job) {
- break;
- }
-
- QueueSize_.fetch_sub(1);
-
- Y_DEFER {
- GuardCount_.fetch_sub(1);
- };
-
- if (Params.Catching_) {
- try {
- try {
- job->Process(nullptr);
- } catch (...) {
- Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl;
- }
- } catch (...) {
- ;
- }
- } else {
- job->Process(nullptr);
- }
- }
- }
-private:
- std::atomic<bool> Stopped_ = false;
- size_t MaxQueueSize_ = 0;
-
- alignas(64) std::atomic<size_t> GuardCount_ = 0;
- alignas(64) std::atomic<size_t> QueueSize_ = 0;
-
- TVector<THolder<IThreadFactory::IThread>> Threads_;
-
- THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_;
- NYT::NThreading::TEventCount Event_;
-};
-
-#endif
diff --git a/library/cpp/threading/equeue/ya.make b/library/cpp/threading/equeue/ya.make
index 95677366c9..445797aa12 100644
--- a/library/cpp/threading/equeue/ya.make
+++ b/library/cpp/threading/equeue/ya.make
@@ -3,23 +3,14 @@ LIBRARY()
SRCS(
equeue.h
equeue.cpp
- fast.h
)
PEERDIR(
library/cpp/deprecated/atomic
- library/cpp/threading/bounded_queue
)
-IF (NOT OS_ANDROID AND NOT ARCH_ARM)
- PEERDIR(
- library/cpp/yt/threading
- )
-ENDIF()
-
END()
RECURSE_FOR_TESTS(
ut
)
-