diff options
author | kulikov <kulikov@yandex-team.com> | 2023-07-27 12:28:50 +0300 |
---|---|---|
committer | kulikov <kulikov@yandex-team.com> | 2023-07-27 12:28:50 +0300 |
commit | 4a7691c519e6114e013dc1dd0c3b2528154507f9 (patch) | |
tree | af6ee3a210f95bb4532036b1f1a5352ef072068a /library/cpp/threading | |
parent | dcde92436ae71c3fdf1d6b9916e3858f3b35146e (diff) | |
download | ydb-4a7691c519e6114e013dc1dd0c3b2528154507f9.tar.gz |
revert rXXXXXX (see discusstion in pr), will commit again more pci-dss friendly way
Diffstat (limited to 'library/cpp/threading')
17 files changed, 20 insertions, 530 deletions
diff --git a/library/cpp/threading/CMakeLists.txt b/library/cpp/threading/CMakeLists.txt index f541042208..681ef6b24e 100644 --- a/library/cpp/threading/CMakeLists.txt +++ b/library/cpp/threading/CMakeLists.txt @@ -7,7 +7,6 @@ add_subdirectory(atomic) -add_subdirectory(bounded_queue) add_subdirectory(chunk_queue) add_subdirectory(equeue) add_subdirectory(future) diff --git a/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt deleted file mode 100644 index bd9b68ab62..0000000000 --- a/library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt +++ /dev/null @@ -1,14 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(cpp-threading-bounded_queue INTERFACE) -target_link_libraries(cpp-threading-bounded_queue INTERFACE - contrib-libs-cxxsupp - yutil -) diff --git a/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt deleted file mode 100644 index 613a18db10..0000000000 --- a/library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt +++ /dev/null @@ -1,15 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(cpp-threading-bounded_queue INTERFACE) -target_link_libraries(cpp-threading-bounded_queue INTERFACE - contrib-libs-linux-headers - contrib-libs-cxxsupp - yutil -) diff --git a/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt deleted file mode 100644 index 613a18db10..0000000000 --- a/library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt +++ /dev/null @@ -1,15 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(cpp-threading-bounded_queue INTERFACE) -target_link_libraries(cpp-threading-bounded_queue INTERFACE - contrib-libs-linux-headers - contrib-libs-cxxsupp - yutil -) diff --git a/library/cpp/threading/bounded_queue/CMakeLists.txt b/library/cpp/threading/bounded_queue/CMakeLists.txt deleted file mode 100644 index f8b31df0c1..0000000000 --- a/library/cpp/threading/bounded_queue/CMakeLists.txt +++ /dev/null @@ -1,17 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-aarch64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") - include(CMakeLists.darwin-x86_64.txt) -elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) - include(CMakeLists.windows-x86_64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-x86_64.txt) -endif() diff --git a/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt deleted file mode 100644 index bd9b68ab62..0000000000 --- a/library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt +++ /dev/null @@ -1,14 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(cpp-threading-bounded_queue INTERFACE) -target_link_libraries(cpp-threading-bounded_queue INTERFACE - contrib-libs-cxxsupp - yutil -) diff --git a/library/cpp/threading/bounded_queue/bounded_queue.h b/library/cpp/threading/bounded_queue/bounded_queue.h deleted file mode 100644 index c5c6714086..0000000000 --- a/library/cpp/threading/bounded_queue/bounded_queue.h +++ /dev/null @@ -1,89 +0,0 @@ -#pragma once - -#include <util/generic/yexception.h> - -//https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue - -namespace NThreading { - template<typename T> - class TBoundedQueue { - public: - explicit TBoundedQueue(size_t size) - : Buffer_(new TCell[size]) - , Mask_(size - 1) - { - Y_ENSURE(size >= 2 && (size & (size - 1)) == 0); - - for (size_t i = 0; i < size; ++i) { - Buffer_[i].Sequence.store(i, std::memory_order_relaxed); - } - } - - template <typename T_> - [[nodiscard]] bool Enqueue(T_&& data) noexcept { - TCell* cell; - size_t pos = EnqueuePos_.load(std::memory_order_relaxed); - - for (;;) { - cell = &Buffer_[pos & Mask_]; - size_t seq = cell->Sequence.load(std::memory_order_acquire); - intptr_t dif = (intptr_t)seq - (intptr_t)pos; - - if (dif == 0) { - if (EnqueuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { - break; - } - } else if (dif < 0) { - return false; - } else { - pos = EnqueuePos_.load(std::memory_order_relaxed); - } - } - - static_assert(noexcept(cell->Data = std::forward<T_>(data))); - cell->Data = std::forward<T_>(data); - cell->Sequence.store(pos + 1, std::memory_order_release); - - return true; - } - - [[nodiscard]] bool Dequeue(T& data) noexcept { - TCell* cell; - size_t pos = DequeuePos_.load(std::memory_order_relaxed); - - for (;;) { - cell = &Buffer_[pos & Mask_]; - size_t seq = cell->Sequence.load(std::memory_order_acquire); - intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1); - - if (dif == 0) { - if (DequeuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) { - break; - } - } else if (dif < 0) { - return false; - } else { - pos = DequeuePos_.load(std::memory_order_relaxed); - } - } - - static_assert(noexcept(data = std::move(cell->Data))); - data = std::move(cell->Data); - - cell->Sequence.store(pos + Mask_ + 1, std::memory_order_release); - return true; - } - private: - struct TCell { - std::atomic<size_t> Sequence; - T Data; - }; - - std::unique_ptr<TCell[]> Buffer_; - const size_t Mask_; - - alignas(64) std::atomic<size_t> EnqueuePos_ = 0; - alignas(64) std::atomic<size_t> DequeuePos_ = 0; - }; -} - diff --git a/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp b/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp deleted file mode 100644 index bb5b6eb787..0000000000 --- a/library/cpp/threading/bounded_queue/bounded_queue_ut.cpp +++ /dev/null @@ -1,106 +0,0 @@ -#include "bounded_queue.h" - -#include <library/cpp/testing/unittest/registar.h> -#include <util/thread/factory.h> - -using namespace NThreading; - -Y_UNIT_TEST_SUITE(TBoundedQueueTest) { - Y_UNIT_TEST(QueueSize) { - const size_t queueSize = 16; - TBoundedQueue<size_t> boundedQueue(queueSize); - - for (size_t i = 0; i < queueSize; ++i) { - UNIT_ASSERT(boundedQueue.Enqueue(i)); - } - UNIT_ASSERT(!boundedQueue.Enqueue(0)); - size_t tmp = 0; - UNIT_ASSERT(boundedQueue.Dequeue(tmp)); - UNIT_ASSERT(boundedQueue.Enqueue(0)); - UNIT_ASSERT(!boundedQueue.Enqueue(0)); - } - - Y_UNIT_TEST(Move) { - const size_t queueSize = 16; - TBoundedQueue<TString> boundedQueue(queueSize); - - for (size_t i = 0; i < queueSize; ++i) { - TString v = "xxx"; - UNIT_ASSERT(boundedQueue.Enqueue(std::move(v))); - UNIT_ASSERT(v.empty()); - } - - { - TString v = "xxx"; - UNIT_ASSERT(!boundedQueue.Enqueue(std::move(v))); - UNIT_ASSERT(v == "xxx"); - } - - TString v; - UNIT_ASSERT(boundedQueue.Dequeue(v)); - UNIT_ASSERT(v == "xxx"); - } - - Y_UNIT_TEST(MPMC) { - size_t queueSize = 16; - size_t producers = 10; - size_t consumers = 10; - size_t itemsCount = 10000; - - TVector<THolder<IThreadFactory::IThread>> threads; - TBoundedQueue<std::pair<size_t, size_t>> boundedQueue(queueSize); - - std::atomic<size_t> itemCounter = 0; - std::atomic<size_t> consumed = 0; - - for (size_t i = 0; i < consumers; ++i) { - threads.push_back(SystemThreadFactory()->Run( - [&]() { - TVector<size_t> prevItems(producers); - for (;;) { - std::pair<size_t, size_t> item; - while (!boundedQueue.Dequeue(item)) { - ; - } - - if (item.first >= producers) { - break; - } - - UNIT_ASSERT(item.second > prevItems[item.first]); - prevItems[item.first] = item.second; - ++consumed; - } - }) - ); - } - - for (size_t i = 0; i < producers ; ++i) { - threads.push_back(SystemThreadFactory()->Run( - [&, producerNum = i]() { - for (;;) { - size_t item = ++itemCounter; - if (item > itemsCount) { - break; - } - - while (!boundedQueue.Enqueue(std::make_pair(producerNum, item))) { - ; - } - } - - while (!boundedQueue.Enqueue(std::make_pair(producers, size_t(0)))) { - ; - } - }) - ); - } - - - for (auto& t : threads) { - t->Join(); - } - - UNIT_ASSERT_VALUES_EQUAL(consumed.load(), itemsCount); - } -} diff --git a/library/cpp/threading/bounded_queue/ut/ya.make b/library/cpp/threading/bounded_queue/ut/ya.make deleted file mode 100644 index 5db0a22713..0000000000 --- a/library/cpp/threading/bounded_queue/ut/ya.make +++ /dev/null @@ -1,8 +0,0 @@ -UNITTEST_FOR(library/cpp/threading/bounded_queue) - -SRCS( - bounded_queue_ut.cpp -) - -END() - diff --git a/library/cpp/threading/bounded_queue/ya.make b/library/cpp/threading/bounded_queue/ya.make deleted file mode 100644 index 4c5336f031..0000000000 --- a/library/cpp/threading/bounded_queue/ya.make +++ /dev/null @@ -1,9 +0,0 @@ -LIBRARY() - -SRCS( - bounded_queue.h -) - -END() - -RECURSE_FOR_TESTS(ut) diff --git a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt index 43ff0adc1c..902b6d7a9a 100644 --- a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt +++ b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt @@ -12,8 +12,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic - cpp-threading-bounded_queue - cpp-yt-threading ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt index 805b237943..5653d3d3f3 100644 --- a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt +++ b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt @@ -13,7 +13,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic - cpp-threading-bounded_queue ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt index 36f8a2632e..5653d3d3f3 100644 --- a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt +++ b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt @@ -13,8 +13,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic - cpp-threading-bounded_queue - cpp-yt-threading ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt index 43ff0adc1c..902b6d7a9a 100644 --- a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt +++ b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt @@ -12,8 +12,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic - cpp-threading-bounded_queue - cpp-yt-threading ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp index 2c7d2c7b1e..8557f63ac0 100644 --- a/library/cpp/threading/equeue/equeue_ut.cpp +++ b/library/cpp/threading/equeue/equeue_ut.cpp @@ -1,5 +1,4 @@ #include "equeue.h" -#include "fast.h" #include <library/cpp/testing/unittest/registar.h> @@ -10,33 +9,18 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { const size_t MaxQueueSize = 20; const size_t ThreadCount = 10; + const size_t N = 100000; - template <typename T> - THolder<T> MakeQueue(); - - template <> - THolder<TElasticQueue> MakeQueue() { - return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>()); - } - - template <> - THolder<TFastElasticQueue> MakeQueue() { - return MakeHolder<TFastElasticQueue>(); - } - - template <typename T> - struct TEnv { - static inline THolder<T> Queue; + static THolder<TElasticQueue> Queue; - struct TQueueSetup { - TQueueSetup() { - Queue.Reset(MakeQueue<T>()); - Queue->Start(ThreadCount, MaxQueueSize); - } - ~TQueueSetup() { - Queue->Stop(); - } - }; + struct TQueueSetup { + TQueueSetup() { + Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>())); + Queue->Start(ThreadCount, MaxQueueSize); + } + ~TQueueSetup() { + Queue->Stop(); + } }; struct TCounters { @@ -53,9 +37,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { //fill test -- fill queue with "endless" jobs TSystemEvent WaitEvent; - - template <typename T> - void FillTest() { + Y_UNIT_TEST(FillTest) { Counters.Reset(); struct TWaitJob: public IObjectInQueue { @@ -65,10 +47,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } } job; - struct TLocalSetup: TEnv<T>::TQueueSetup { - TLocalSetup() { - WaitEvent.Reset(); - } + struct TLocalSetup: TQueueSetup { ~TLocalSetup() { WaitEvent.Signal(); } @@ -77,26 +56,19 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { size_t enqueued = 0; { TLocalSetup setup; - while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) { + while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) { ++enqueued; } UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize); - UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount()); } - UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount()); - UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size()); + UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size()); UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued); } - Y_UNIT_TEST(FillTest) { - FillTest<TElasticQueue>(); - } - - Y_UNIT_TEST(FillTestFast) { - FillTest<TFastElasticQueue>(); - } //concurrent test -- send many jobs from different threads struct TJob: public IObjectInQueue { @@ -106,10 +78,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { }; static TJob Job; - template <typename T> static bool TryAdd() { AtomicIncrement(Counters.Total); - if (TEnv<T>::Queue->Add(&Job)) { + if (Queue->Add(&Job)) { AtomicIncrement(Counters.Scheduled); return true; } else { @@ -118,18 +89,16 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } } - const size_t N = 100000; static size_t TryCounter; - template <typename T> - void ConcurrentTest() { + Y_UNIT_TEST(ConcurrentTest) { Counters.Reset(); TryCounter = 0; struct TSender: public IThreadFactory::IThreadAble { void DoExecute() override { while ((size_t)AtomicIncrement(TryCounter) <= N) { - if (!TryAdd<T>()) { + if (!TryAdd()) { Sleep(TDuration::MicroSeconds(50)); } } @@ -137,7 +106,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } sender; { - typename TEnv<T>::TQueueSetup setup; + TQueueSetup setup; TVector< TAutoPtr<IThreadFactory::IThread> > senders; for (size_t i = 0; i < ThreadCount; ++i) { @@ -153,12 +122,4 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled); UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded); } - - Y_UNIT_TEST(ConcurrentTest) { - ConcurrentTest<TElasticQueue>(); - } - - Y_UNIT_TEST(ConcurrentTestFast) { - ConcurrentTest<TFastElasticQueue>(); - } } diff --git a/library/cpp/threading/equeue/fast.h b/library/cpp/threading/equeue/fast.h deleted file mode 100644 index 3f96e279fc..0000000000 --- a/library/cpp/threading/equeue/fast.h +++ /dev/null @@ -1,167 +0,0 @@ -#pragma once - -#include <util/thread/pool.h> - -#include <util/datetime/base.h> -#include <util/thread/lfqueue.h> -#include <util/system/thread.h> -#include <util/generic/vector.h> -#include <util/generic/scope.h> -#include <util/stream/str.h> - -#include <library/cpp/threading/bounded_queue/bounded_queue.h> - -#if defined(_android_) || defined(_arm_) -//by now library/cpp/yt/threading doesn't compile in targets like default-android-armv7a, fallback to ordinal elastic queue -#include "equeue.h" - class TFastElasticQueue - : public TElasticQueue - { - public: - explicit TFastElasticQueue(const TParams& params = {}) - : TElasticQueue(MakeHolder<TSimpleThreadPool>(params)) - { - } - }; -#else - -#include <library/cpp/yt/threading/event_count.h> - -class TFastElasticQueue - : public TThreadPoolBase - , private IThreadFactory::IThreadAble -{ -public: - explicit TFastElasticQueue(const TParams& params = {}) - : TThreadPoolBase(params) - { - Y_ENSURE(!params.Blocking_); - } - - ~TFastElasticQueue() { - Stop(); - } - - void Start(size_t threadCount, size_t maxQueueSize) override { - Y_ENSURE(Threads_.empty()); - Y_ENSURE(maxQueueSize > 0); - - Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events - MaxQueueSize_ = maxQueueSize; - - try { - for (size_t i = 0; i < threadCount; ++i) { - Threads_.push_back(Pool()->Run(this)); - } - } catch (...) { - Stop(); - throw; - } - - Stopped_ = false; - } - - size_t ObjectCount() const { - //GuardCount_ can be temporary incremented above real object count in queue - return Min(GuardCount_.load(), MaxQueueSize_); - } - - bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT { - if (Stopped_ || !obj) { - return false; - } - - if (GuardCount_.fetch_add(1) >= MaxQueueSize_) { - GuardCount_.fetch_sub(1); - return false; - } - - QueueSize_.fetch_add(1); - - if (!Queue_->Enqueue(obj)) { - //Simultaneous Dequeue calls can return not in exact fifo order of items, - //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of - //the oldest enqueued item is not actually dequeued and ring buffer can't proceed. - GuardCount_.fetch_sub(1); - QueueSize_.fetch_sub(1); - return false; - } - - - Event_.NotifyOne(); - - return true; - } - - size_t Size() const noexcept override { - return QueueSize_.load(); - } - - void Stop() noexcept override { - Stopped_ = true; - - for (size_t i = 0; i < Threads_.size(); ++i) { - while (!Queue_->Enqueue(nullptr)) { - Sleep(TDuration::MilliSeconds(1)); - } - - Event_.NotifyOne(); - } - - while (!Threads_.empty()) { - Threads_.back()->Join(); - Threads_.pop_back(); - } - - Queue_.Reset(); - } - - void DoExecute() override { - TThread::SetCurrentThreadName(Params.ThreadName_.c_str()); - - while (true) { - IObjectInQueue* job = nullptr; - - Event_.Await([&]() { - return Queue_->Dequeue(job); - }); - - if (!job) { - break; - } - - QueueSize_.fetch_sub(1); - - Y_DEFER { - GuardCount_.fetch_sub(1); - }; - - if (Params.Catching_) { - try { - try { - job->Process(nullptr); - } catch (...) { - Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; - } - } catch (...) { - ; - } - } else { - job->Process(nullptr); - } - } - } -private: - std::atomic<bool> Stopped_ = false; - size_t MaxQueueSize_ = 0; - - alignas(64) std::atomic<size_t> GuardCount_ = 0; - alignas(64) std::atomic<size_t> QueueSize_ = 0; - - TVector<THolder<IThreadFactory::IThread>> Threads_; - - THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_; - NYT::NThreading::TEventCount Event_; -}; - -#endif diff --git a/library/cpp/threading/equeue/ya.make b/library/cpp/threading/equeue/ya.make index 95677366c9..445797aa12 100644 --- a/library/cpp/threading/equeue/ya.make +++ b/library/cpp/threading/equeue/ya.make @@ -3,23 +3,14 @@ LIBRARY() SRCS( equeue.h equeue.cpp - fast.h ) PEERDIR( library/cpp/deprecated/atomic - library/cpp/threading/bounded_queue ) -IF (NOT OS_ANDROID AND NOT ARCH_ARM) - PEERDIR( - library/cpp/yt/threading - ) -ENDIF() - END() RECURSE_FOR_TESTS( ut ) - |