diff options
author | kulikov <kulikov@yandex-team.com> | 2023-07-27 12:28:50 +0300 |
---|---|---|
committer | kulikov <kulikov@yandex-team.com> | 2023-07-27 12:28:50 +0300 |
commit | 4a7691c519e6114e013dc1dd0c3b2528154507f9 (patch) | |
tree | af6ee3a210f95bb4532036b1f1a5352ef072068a /library/cpp/threading/equeue | |
parent | dcde92436ae71c3fdf1d6b9916e3858f3b35146e (diff) | |
download | ydb-4a7691c519e6114e013dc1dd0c3b2528154507f9.tar.gz |
revert rXXXXXX (see discusstion in pr), will commit again more pci-dss friendly way
Diffstat (limited to 'library/cpp/threading/equeue')
-rw-r--r-- | library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt | 2 | ||||
-rw-r--r-- | library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt | 2 | ||||
-rw-r--r-- | library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt | 2 | ||||
-rw-r--r-- | library/cpp/threading/equeue/equeue_ut.cpp | 79 | ||||
-rw-r--r-- | library/cpp/threading/equeue/fast.h | 167 | ||||
-rw-r--r-- | library/cpp/threading/equeue/ya.make | 9 |
7 files changed, 20 insertions, 242 deletions
diff --git a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt index 43ff0adc1c..902b6d7a9a 100644 --- a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt +++ b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt @@ -12,8 +12,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic - cpp-threading-bounded_queue - cpp-yt-threading ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt index 805b237943..5653d3d3f3 100644 --- a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt +++ b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt @@ -13,7 +13,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic - cpp-threading-bounded_queue ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt index 36f8a2632e..5653d3d3f3 100644 --- a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt +++ b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt @@ -13,8 +13,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic - cpp-threading-bounded_queue - cpp-yt-threading ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt index 43ff0adc1c..902b6d7a9a 100644 --- a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt +++ b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt @@ -12,8 +12,6 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic - cpp-threading-bounded_queue - cpp-yt-threading ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp index 2c7d2c7b1e..8557f63ac0 100644 --- a/library/cpp/threading/equeue/equeue_ut.cpp +++ b/library/cpp/threading/equeue/equeue_ut.cpp @@ -1,5 +1,4 @@ #include "equeue.h" -#include "fast.h" #include <library/cpp/testing/unittest/registar.h> @@ -10,33 +9,18 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { const size_t MaxQueueSize = 20; const size_t ThreadCount = 10; + const size_t N = 100000; - template <typename T> - THolder<T> MakeQueue(); - - template <> - THolder<TElasticQueue> MakeQueue() { - return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>()); - } - - template <> - THolder<TFastElasticQueue> MakeQueue() { - return MakeHolder<TFastElasticQueue>(); - } - - template <typename T> - struct TEnv { - static inline THolder<T> Queue; + static THolder<TElasticQueue> Queue; - struct TQueueSetup { - TQueueSetup() { - Queue.Reset(MakeQueue<T>()); - Queue->Start(ThreadCount, MaxQueueSize); - } - ~TQueueSetup() { - Queue->Stop(); - } - }; + struct TQueueSetup { + TQueueSetup() { + Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>())); + Queue->Start(ThreadCount, MaxQueueSize); + } + ~TQueueSetup() { + Queue->Stop(); + } }; struct TCounters { @@ -53,9 +37,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { //fill test -- fill queue with "endless" jobs TSystemEvent WaitEvent; - - template <typename T> - void FillTest() { + Y_UNIT_TEST(FillTest) { Counters.Reset(); struct TWaitJob: public IObjectInQueue { @@ -65,10 +47,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } } job; - struct TLocalSetup: TEnv<T>::TQueueSetup { - TLocalSetup() { - WaitEvent.Reset(); - } + struct TLocalSetup: TQueueSetup { ~TLocalSetup() { WaitEvent.Signal(); } @@ -77,26 +56,19 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { size_t enqueued = 0; { TLocalSetup setup; - while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) { + while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) { ++enqueued; } UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize); - UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount()); } - UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount()); - UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size()); + UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size()); UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued); } - Y_UNIT_TEST(FillTest) { - FillTest<TElasticQueue>(); - } - - Y_UNIT_TEST(FillTestFast) { - FillTest<TFastElasticQueue>(); - } //concurrent test -- send many jobs from different threads struct TJob: public IObjectInQueue { @@ -106,10 +78,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { }; static TJob Job; - template <typename T> static bool TryAdd() { AtomicIncrement(Counters.Total); - if (TEnv<T>::Queue->Add(&Job)) { + if (Queue->Add(&Job)) { AtomicIncrement(Counters.Scheduled); return true; } else { @@ -118,18 +89,16 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } } - const size_t N = 100000; static size_t TryCounter; - template <typename T> - void ConcurrentTest() { + Y_UNIT_TEST(ConcurrentTest) { Counters.Reset(); TryCounter = 0; struct TSender: public IThreadFactory::IThreadAble { void DoExecute() override { while ((size_t)AtomicIncrement(TryCounter) <= N) { - if (!TryAdd<T>()) { + if (!TryAdd()) { Sleep(TDuration::MicroSeconds(50)); } } @@ -137,7 +106,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } sender; { - typename TEnv<T>::TQueueSetup setup; + TQueueSetup setup; TVector< TAutoPtr<IThreadFactory::IThread> > senders; for (size_t i = 0; i < ThreadCount; ++i) { @@ -153,12 +122,4 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled); UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded); } - - Y_UNIT_TEST(ConcurrentTest) { - ConcurrentTest<TElasticQueue>(); - } - - Y_UNIT_TEST(ConcurrentTestFast) { - ConcurrentTest<TFastElasticQueue>(); - } } diff --git a/library/cpp/threading/equeue/fast.h b/library/cpp/threading/equeue/fast.h deleted file mode 100644 index 3f96e279fc..0000000000 --- a/library/cpp/threading/equeue/fast.h +++ /dev/null @@ -1,167 +0,0 @@ -#pragma once - -#include <util/thread/pool.h> - -#include <util/datetime/base.h> -#include <util/thread/lfqueue.h> -#include <util/system/thread.h> -#include <util/generic/vector.h> -#include <util/generic/scope.h> -#include <util/stream/str.h> - -#include <library/cpp/threading/bounded_queue/bounded_queue.h> - -#if defined(_android_) || defined(_arm_) -//by now library/cpp/yt/threading doesn't compile in targets like default-android-armv7a, fallback to ordinal elastic queue -#include "equeue.h" - class TFastElasticQueue - : public TElasticQueue - { - public: - explicit TFastElasticQueue(const TParams& params = {}) - : TElasticQueue(MakeHolder<TSimpleThreadPool>(params)) - { - } - }; -#else - -#include <library/cpp/yt/threading/event_count.h> - -class TFastElasticQueue - : public TThreadPoolBase - , private IThreadFactory::IThreadAble -{ -public: - explicit TFastElasticQueue(const TParams& params = {}) - : TThreadPoolBase(params) - { - Y_ENSURE(!params.Blocking_); - } - - ~TFastElasticQueue() { - Stop(); - } - - void Start(size_t threadCount, size_t maxQueueSize) override { - Y_ENSURE(Threads_.empty()); - Y_ENSURE(maxQueueSize > 0); - - Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events - MaxQueueSize_ = maxQueueSize; - - try { - for (size_t i = 0; i < threadCount; ++i) { - Threads_.push_back(Pool()->Run(this)); - } - } catch (...) { - Stop(); - throw; - } - - Stopped_ = false; - } - - size_t ObjectCount() const { - //GuardCount_ can be temporary incremented above real object count in queue - return Min(GuardCount_.load(), MaxQueueSize_); - } - - bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT { - if (Stopped_ || !obj) { - return false; - } - - if (GuardCount_.fetch_add(1) >= MaxQueueSize_) { - GuardCount_.fetch_sub(1); - return false; - } - - QueueSize_.fetch_add(1); - - if (!Queue_->Enqueue(obj)) { - //Simultaneous Dequeue calls can return not in exact fifo order of items, - //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of - //the oldest enqueued item is not actually dequeued and ring buffer can't proceed. - GuardCount_.fetch_sub(1); - QueueSize_.fetch_sub(1); - return false; - } - - - Event_.NotifyOne(); - - return true; - } - - size_t Size() const noexcept override { - return QueueSize_.load(); - } - - void Stop() noexcept override { - Stopped_ = true; - - for (size_t i = 0; i < Threads_.size(); ++i) { - while (!Queue_->Enqueue(nullptr)) { - Sleep(TDuration::MilliSeconds(1)); - } - - Event_.NotifyOne(); - } - - while (!Threads_.empty()) { - Threads_.back()->Join(); - Threads_.pop_back(); - } - - Queue_.Reset(); - } - - void DoExecute() override { - TThread::SetCurrentThreadName(Params.ThreadName_.c_str()); - - while (true) { - IObjectInQueue* job = nullptr; - - Event_.Await([&]() { - return Queue_->Dequeue(job); - }); - - if (!job) { - break; - } - - QueueSize_.fetch_sub(1); - - Y_DEFER { - GuardCount_.fetch_sub(1); - }; - - if (Params.Catching_) { - try { - try { - job->Process(nullptr); - } catch (...) { - Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; - } - } catch (...) { - ; - } - } else { - job->Process(nullptr); - } - } - } -private: - std::atomic<bool> Stopped_ = false; - size_t MaxQueueSize_ = 0; - - alignas(64) std::atomic<size_t> GuardCount_ = 0; - alignas(64) std::atomic<size_t> QueueSize_ = 0; - - TVector<THolder<IThreadFactory::IThread>> Threads_; - - THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_; - NYT::NThreading::TEventCount Event_; -}; - -#endif diff --git a/library/cpp/threading/equeue/ya.make b/library/cpp/threading/equeue/ya.make index 95677366c9..445797aa12 100644 --- a/library/cpp/threading/equeue/ya.make +++ b/library/cpp/threading/equeue/ya.make @@ -3,23 +3,14 @@ LIBRARY() SRCS( equeue.h equeue.cpp - fast.h ) PEERDIR( library/cpp/deprecated/atomic - library/cpp/threading/bounded_queue ) -IF (NOT OS_ANDROID AND NOT ARCH_ARM) - PEERDIR( - library/cpp/yt/threading - ) -ENDIF() - END() RECURSE_FOR_TESTS( ut ) - |