diff options
author | kulikov <kulikov@yandex-team.com> | 2023-07-21 13:59:33 +0300 |
---|---|---|
committer | kulikov <kulikov@yandex-team.com> | 2023-07-21 13:59:33 +0300 |
commit | 5706cb392271ea40eab053314e7c0f4d9d4547ba (patch) | |
tree | 72bce210dc3747df1d9319fc9f56b848852d2aab /library/cpp/threading/equeue | |
parent | 122a6055cef2bc785407c69b33b858a07b319e66 (diff) | |
download | ydb-5706cb392271ea40eab053314e7c0f4d9d4547ba.tar.gz |
try to get rid of locks and allocations for elastic queue thread pool
In case of heavy load and high rps current thread pool implementation seems to have problems at least with contention on lock inside condvar (long futex wait calls from http server listener thread), so try to implement something more efficient:
- replace condvar with TEventCounter implementation without internal lock (pthread condvar maintains waiters wakeup order, thread pool doesn't need it);
- introduce well-known bounded mpmc queue over ring buffer;
- get rid of TDecrementingWrapper;
- add options to turn on new pool in library/cpp/http/server and search/daemons (will remove after adoption);
- make elastic queue ut check both versions;
- workaround problems with android/arm build targets.
Diffstat (limited to 'library/cpp/threading/equeue')
-rw-r--r-- | library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt | 2 | ||||
-rw-r--r-- | library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt | 2 | ||||
-rw-r--r-- | library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt | 2 | ||||
-rw-r--r-- | library/cpp/threading/equeue/equeue_ut.cpp | 79 | ||||
-rw-r--r-- | library/cpp/threading/equeue/fast.h | 167 | ||||
-rw-r--r-- | library/cpp/threading/equeue/ya.make | 9 |
7 files changed, 242 insertions, 20 deletions
diff --git a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt index 902b6d7a9a..43ff0adc1c 100644 --- a/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt +++ b/library/cpp/threading/equeue/CMakeLists.darwin-x86_64.txt @@ -12,6 +12,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic + cpp-threading-bounded_queue + cpp-yt-threading ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt index 5653d3d3f3..805b237943 100644 --- a/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt +++ b/library/cpp/threading/equeue/CMakeLists.linux-aarch64.txt @@ -13,6 +13,7 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic + cpp-threading-bounded_queue ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt index 5653d3d3f3..36f8a2632e 100644 --- a/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt +++ b/library/cpp/threading/equeue/CMakeLists.linux-x86_64.txt @@ -13,6 +13,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic + cpp-threading-bounded_queue + cpp-yt-threading ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt index 902b6d7a9a..43ff0adc1c 100644 --- a/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt +++ b/library/cpp/threading/equeue/CMakeLists.windows-x86_64.txt @@ -12,6 +12,8 @@ target_link_libraries(cpp-threading-equeue PUBLIC contrib-libs-cxxsupp yutil cpp-deprecated-atomic + cpp-threading-bounded_queue + cpp-yt-threading ) target_sources(cpp-threading-equeue PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/threading/equeue/equeue.cpp diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp index 8557f63ac0..2c7d2c7b1e 100644 --- a/library/cpp/threading/equeue/equeue_ut.cpp +++ b/library/cpp/threading/equeue/equeue_ut.cpp @@ -1,4 +1,5 @@ #include "equeue.h" +#include "fast.h" #include <library/cpp/testing/unittest/registar.h> @@ -9,18 +10,33 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { const size_t MaxQueueSize = 20; const size_t ThreadCount = 10; - const size_t N = 100000; - static THolder<TElasticQueue> Queue; + template <typename T> + THolder<T> MakeQueue(); - struct TQueueSetup { - TQueueSetup() { - Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>())); - Queue->Start(ThreadCount, MaxQueueSize); - } - ~TQueueSetup() { - Queue->Stop(); - } + template <> + THolder<TElasticQueue> MakeQueue() { + return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>()); + } + + template <> + THolder<TFastElasticQueue> MakeQueue() { + return MakeHolder<TFastElasticQueue>(); + } + + template <typename T> + struct TEnv { + static inline THolder<T> Queue; + + struct TQueueSetup { + TQueueSetup() { + Queue.Reset(MakeQueue<T>()); + Queue->Start(ThreadCount, MaxQueueSize); + } + ~TQueueSetup() { + Queue->Stop(); + } + }; }; struct TCounters { @@ -37,7 +53,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { //fill test -- fill queue with "endless" jobs TSystemEvent WaitEvent; - Y_UNIT_TEST(FillTest) { + + template <typename T> + void FillTest() { Counters.Reset(); struct TWaitJob: public IObjectInQueue { @@ -47,7 +65,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } } job; - struct TLocalSetup: TQueueSetup { + struct TLocalSetup: TEnv<T>::TQueueSetup { + TLocalSetup() { + WaitEvent.Reset(); + } ~TLocalSetup() { WaitEvent.Signal(); } @@ -56,19 +77,26 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { size_t enqueued = 0; { TLocalSetup setup; - while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) { + while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) { ++enqueued; } UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize); - UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount()); } - UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount()); - UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size()); + UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size()); UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued); } + Y_UNIT_TEST(FillTest) { + FillTest<TElasticQueue>(); + } + + Y_UNIT_TEST(FillTestFast) { + FillTest<TFastElasticQueue>(); + } //concurrent test -- send many jobs from different threads struct TJob: public IObjectInQueue { @@ -78,9 +106,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { }; static TJob Job; + template <typename T> static bool TryAdd() { AtomicIncrement(Counters.Total); - if (Queue->Add(&Job)) { + if (TEnv<T>::Queue->Add(&Job)) { AtomicIncrement(Counters.Scheduled); return true; } else { @@ -89,16 +118,18 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } } + const size_t N = 100000; static size_t TryCounter; - Y_UNIT_TEST(ConcurrentTest) { + template <typename T> + void ConcurrentTest() { Counters.Reset(); TryCounter = 0; struct TSender: public IThreadFactory::IThreadAble { void DoExecute() override { while ((size_t)AtomicIncrement(TryCounter) <= N) { - if (!TryAdd()) { + if (!TryAdd<T>()) { Sleep(TDuration::MicroSeconds(50)); } } @@ -106,7 +137,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { } sender; { - TQueueSetup setup; + typename TEnv<T>::TQueueSetup setup; TVector< TAutoPtr<IThreadFactory::IThread> > senders; for (size_t i = 0; i < ThreadCount; ++i) { @@ -122,4 +153,12 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) { UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled); UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded); } + + Y_UNIT_TEST(ConcurrentTest) { + ConcurrentTest<TElasticQueue>(); + } + + Y_UNIT_TEST(ConcurrentTestFast) { + ConcurrentTest<TFastElasticQueue>(); + } } diff --git a/library/cpp/threading/equeue/fast.h b/library/cpp/threading/equeue/fast.h new file mode 100644 index 0000000000..3f96e279fc --- /dev/null +++ b/library/cpp/threading/equeue/fast.h @@ -0,0 +1,167 @@ +#pragma once + +#include <util/thread/pool.h> + +#include <util/datetime/base.h> +#include <util/thread/lfqueue.h> +#include <util/system/thread.h> +#include <util/generic/vector.h> +#include <util/generic/scope.h> +#include <util/stream/str.h> + +#include <library/cpp/threading/bounded_queue/bounded_queue.h> + +#if defined(_android_) || defined(_arm_) +//by now library/cpp/yt/threading doesn't compile in targets like default-android-armv7a, fallback to ordinal elastic queue +#include "equeue.h" + class TFastElasticQueue + : public TElasticQueue + { + public: + explicit TFastElasticQueue(const TParams& params = {}) + : TElasticQueue(MakeHolder<TSimpleThreadPool>(params)) + { + } + }; +#else + +#include <library/cpp/yt/threading/event_count.h> + +class TFastElasticQueue + : public TThreadPoolBase + , private IThreadFactory::IThreadAble +{ +public: + explicit TFastElasticQueue(const TParams& params = {}) + : TThreadPoolBase(params) + { + Y_ENSURE(!params.Blocking_); + } + + ~TFastElasticQueue() { + Stop(); + } + + void Start(size_t threadCount, size_t maxQueueSize) override { + Y_ENSURE(Threads_.empty()); + Y_ENSURE(maxQueueSize > 0); + + Queue_.Reset(new NThreading::TBoundedQueue<IObjectInQueue*>(FastClp2(maxQueueSize + threadCount))); //threadCount is for stop events + MaxQueueSize_ = maxQueueSize; + + try { + for (size_t i = 0; i < threadCount; ++i) { + Threads_.push_back(Pool()->Run(this)); + } + } catch (...) { + Stop(); + throw; + } + + Stopped_ = false; + } + + size_t ObjectCount() const { + //GuardCount_ can be temporary incremented above real object count in queue + return Min(GuardCount_.load(), MaxQueueSize_); + } + + bool Add(IObjectInQueue* obj) override Y_WARN_UNUSED_RESULT { + if (Stopped_ || !obj) { + return false; + } + + if (GuardCount_.fetch_add(1) >= MaxQueueSize_) { + GuardCount_.fetch_sub(1); + return false; + } + + QueueSize_.fetch_add(1); + + if (!Queue_->Enqueue(obj)) { + //Simultaneous Dequeue calls can return not in exact fifo order of items, + //so there can be GuardCount_ < MaxQueueSize_ but Enqueue will fail because of + //the oldest enqueued item is not actually dequeued and ring buffer can't proceed. + GuardCount_.fetch_sub(1); + QueueSize_.fetch_sub(1); + return false; + } + + + Event_.NotifyOne(); + + return true; + } + + size_t Size() const noexcept override { + return QueueSize_.load(); + } + + void Stop() noexcept override { + Stopped_ = true; + + for (size_t i = 0; i < Threads_.size(); ++i) { + while (!Queue_->Enqueue(nullptr)) { + Sleep(TDuration::MilliSeconds(1)); + } + + Event_.NotifyOne(); + } + + while (!Threads_.empty()) { + Threads_.back()->Join(); + Threads_.pop_back(); + } + + Queue_.Reset(); + } + + void DoExecute() override { + TThread::SetCurrentThreadName(Params.ThreadName_.c_str()); + + while (true) { + IObjectInQueue* job = nullptr; + + Event_.Await([&]() { + return Queue_->Dequeue(job); + }); + + if (!job) { + break; + } + + QueueSize_.fetch_sub(1); + + Y_DEFER { + GuardCount_.fetch_sub(1); + }; + + if (Params.Catching_) { + try { + try { + job->Process(nullptr); + } catch (...) { + Cdbg << "[mtp queue] " << CurrentExceptionMessage() << Endl; + } + } catch (...) { + ; + } + } else { + job->Process(nullptr); + } + } + } +private: + std::atomic<bool> Stopped_ = false; + size_t MaxQueueSize_ = 0; + + alignas(64) std::atomic<size_t> GuardCount_ = 0; + alignas(64) std::atomic<size_t> QueueSize_ = 0; + + TVector<THolder<IThreadFactory::IThread>> Threads_; + + THolder<NThreading::TBoundedQueue<IObjectInQueue*>> Queue_; + NYT::NThreading::TEventCount Event_; +}; + +#endif diff --git a/library/cpp/threading/equeue/ya.make b/library/cpp/threading/equeue/ya.make index 445797aa12..95677366c9 100644 --- a/library/cpp/threading/equeue/ya.make +++ b/library/cpp/threading/equeue/ya.make @@ -3,14 +3,23 @@ LIBRARY() SRCS( equeue.h equeue.cpp + fast.h ) PEERDIR( library/cpp/deprecated/atomic + library/cpp/threading/bounded_queue ) +IF (NOT OS_ANDROID AND NOT ARCH_ARM) + PEERDIR( + library/cpp/yt/threading + ) +ENDIF() + END() RECURSE_FOR_TESTS( ut ) + |