aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/equeue/equeue_ut.cpp
diff options
context:
space:
mode:
authorkulikov <kulikov@yandex-team.com>2023-07-21 13:59:33 +0300
committerkulikov <kulikov@yandex-team.com>2023-07-21 13:59:33 +0300
commit5706cb392271ea40eab053314e7c0f4d9d4547ba (patch)
tree72bce210dc3747df1d9319fc9f56b848852d2aab /library/cpp/threading/equeue/equeue_ut.cpp
parent122a6055cef2bc785407c69b33b858a07b319e66 (diff)
downloadydb-5706cb392271ea40eab053314e7c0f4d9d4547ba.tar.gz
try to get rid of locks and allocations for elastic queue thread pool
In case of heavy load and high rps current thread pool implementation seems to have problems at least with contention on lock inside condvar (long futex wait calls from http server listener thread), so try to implement something more efficient: - replace condvar with TEventCounter implementation without internal lock (pthread condvar maintains waiters wakeup order, thread pool doesn't need it); - introduce well-known bounded mpmc queue over ring buffer; - get rid of TDecrementingWrapper; - add options to turn on new pool in library/cpp/http/server and search/daemons (will remove after adoption); - make elastic queue ut check both versions; - workaround problems with android/arm build targets.
Diffstat (limited to 'library/cpp/threading/equeue/equeue_ut.cpp')
-rw-r--r--library/cpp/threading/equeue/equeue_ut.cpp79
1 files changed, 59 insertions, 20 deletions
diff --git a/library/cpp/threading/equeue/equeue_ut.cpp b/library/cpp/threading/equeue/equeue_ut.cpp
index 8557f63ac0..2c7d2c7b1e 100644
--- a/library/cpp/threading/equeue/equeue_ut.cpp
+++ b/library/cpp/threading/equeue/equeue_ut.cpp
@@ -1,4 +1,5 @@
#include "equeue.h"
+#include "fast.h"
#include <library/cpp/testing/unittest/registar.h>
@@ -9,18 +10,33 @@
Y_UNIT_TEST_SUITE(TElasticQueueTest) {
const size_t MaxQueueSize = 20;
const size_t ThreadCount = 10;
- const size_t N = 100000;
- static THolder<TElasticQueue> Queue;
+ template <typename T>
+ THolder<T> MakeQueue();
- struct TQueueSetup {
- TQueueSetup() {
- Queue.Reset(new TElasticQueue(MakeHolder<TSimpleThreadPool>()));
- Queue->Start(ThreadCount, MaxQueueSize);
- }
- ~TQueueSetup() {
- Queue->Stop();
- }
+ template <>
+ THolder<TElasticQueue> MakeQueue() {
+ return MakeHolder<TElasticQueue>(MakeHolder<TSimpleThreadPool>());
+ }
+
+ template <>
+ THolder<TFastElasticQueue> MakeQueue() {
+ return MakeHolder<TFastElasticQueue>();
+ }
+
+ template <typename T>
+ struct TEnv {
+ static inline THolder<T> Queue;
+
+ struct TQueueSetup {
+ TQueueSetup() {
+ Queue.Reset(MakeQueue<T>());
+ Queue->Start(ThreadCount, MaxQueueSize);
+ }
+ ~TQueueSetup() {
+ Queue->Stop();
+ }
+ };
};
struct TCounters {
@@ -37,7 +53,9 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
//fill test -- fill queue with "endless" jobs
TSystemEvent WaitEvent;
- Y_UNIT_TEST(FillTest) {
+
+ template <typename T>
+ void FillTest() {
Counters.Reset();
struct TWaitJob: public IObjectInQueue {
@@ -47,7 +65,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
}
} job;
- struct TLocalSetup: TQueueSetup {
+ struct TLocalSetup: TEnv<T>::TQueueSetup {
+ TLocalSetup() {
+ WaitEvent.Reset();
+ }
~TLocalSetup() {
WaitEvent.Signal();
}
@@ -56,19 +77,26 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
size_t enqueued = 0;
{
TLocalSetup setup;
- while (Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
+ while (TEnv<T>::Queue->Add(&job) && enqueued < MaxQueueSize + 100) {
++enqueued;
}
UNIT_ASSERT_VALUES_EQUAL(enqueued, MaxQueueSize);
- UNIT_ASSERT_VALUES_EQUAL(enqueued, Queue->ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(enqueued, TEnv<T>::Queue->ObjectCount());
}
- UNIT_ASSERT_VALUES_EQUAL(0u, Queue->ObjectCount());
- UNIT_ASSERT_VALUES_EQUAL(0u, Queue->Size());
+ UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->ObjectCount());
+ UNIT_ASSERT_VALUES_EQUAL(0u, TEnv<T>::Queue->Size());
UNIT_ASSERT_VALUES_EQUAL((size_t)Counters.Processed, enqueued);
}
+ Y_UNIT_TEST(FillTest) {
+ FillTest<TElasticQueue>();
+ }
+
+ Y_UNIT_TEST(FillTestFast) {
+ FillTest<TFastElasticQueue>();
+ }
//concurrent test -- send many jobs from different threads
struct TJob: public IObjectInQueue {
@@ -78,9 +106,10 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
};
static TJob Job;
+ template <typename T>
static bool TryAdd() {
AtomicIncrement(Counters.Total);
- if (Queue->Add(&Job)) {
+ if (TEnv<T>::Queue->Add(&Job)) {
AtomicIncrement(Counters.Scheduled);
return true;
} else {
@@ -89,16 +118,18 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
}
}
+ const size_t N = 100000;
static size_t TryCounter;
- Y_UNIT_TEST(ConcurrentTest) {
+ template <typename T>
+ void ConcurrentTest() {
Counters.Reset();
TryCounter = 0;
struct TSender: public IThreadFactory::IThreadAble {
void DoExecute() override {
while ((size_t)AtomicIncrement(TryCounter) <= N) {
- if (!TryAdd()) {
+ if (!TryAdd<T>()) {
Sleep(TDuration::MicroSeconds(50));
}
}
@@ -106,7 +137,7 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
} sender;
{
- TQueueSetup setup;
+ typename TEnv<T>::TQueueSetup setup;
TVector< TAutoPtr<IThreadFactory::IThread> > senders;
for (size_t i = 0; i < ThreadCount; ++i) {
@@ -122,4 +153,12 @@ Y_UNIT_TEST_SUITE(TElasticQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(Counters.Processed, Counters.Scheduled);
UNIT_ASSERT_VALUES_EQUAL(Counters.Total, Counters.Scheduled + Counters.Discarded);
}
+
+ Y_UNIT_TEST(ConcurrentTest) {
+ ConcurrentTest<TElasticQueue>();
+ }
+
+ Y_UNIT_TEST(ConcurrentTestFast) {
+ ConcurrentTest<TFastElasticQueue>();
+ }
}