aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authordtyo <dtyo@yandex-team.com>2023-05-03 15:29:08 +0300
committerdtyo <dtyo@yandex-team.com>2023-05-03 15:29:08 +0300
commit200c355ded38bd5a84be4da890b492ac56602273 (patch)
tree7a0831a72e1252edbbe10f2a87bd99e5cd42accd /library/cpp
parent103968390dd41b88d8a33fc64d3c730961bf3199 (diff)
downloadydb-200c355ded38bd5a84be4da890b492ac56602273.tar.gz
Quick sort benchmark for actor system vs thread pool
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/benchmark_ut.cpp416
-rw-r--r--library/cpp/actors/core/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--library/cpp/actors/core/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/actors/core/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--library/cpp/actors/core/ut/CMakeLists.windows-x86_64.txt1
5 files changed, 420 insertions, 0 deletions
diff --git a/library/cpp/actors/core/benchmark_ut.cpp b/library/cpp/actors/core/benchmark_ut.cpp
new file mode 100644
index 0000000000..b00c172044
--- /dev/null
+++ b/library/cpp/actors/core/benchmark_ut.cpp
@@ -0,0 +1,416 @@
+#include "actorsystem.h"
+#include "actor_bootstrapped.h"
+#include "config.h"
+#include "executor_pool_basic.h"
+#include "hfunc.h"
+#include "scheduler_basic.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/thread/pool.h>
+
+#include <algorithm>
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
+#include <iostream>
+#include <memory>
+#include <mutex>
+#include <span>
+#include <utility>
+#include <unordered_set>
+#include <vector>
+
+#define BENCH_START(label) auto label##Start = std::chrono::steady_clock::now()
+#define BENCH_END(label) std::chrono::steady_clock::now() - label##Start
+
+using namespace NActors;
+using namespace std::chrono_literals;
+
+Y_UNIT_TEST_SUITE(ActorSystemBenchmark) {
+
+ enum ESimpleEventType {
+ EvQuickSort,
+ EvActorStatus
+ };
+
+ using TContainer = std::vector<i32>;
+ using TActorIds = std::span<TActorId>;
+
+ template <typename TId>
+ class TActiveEntityRegistry {
+ private:
+ std::unordered_set<TId> ActiveIds_;
+ std::mutex ActiveIdsMutex_;
+ std::condition_variable ActiveIdsCv_;
+
+ public:
+ void SetActive(TId id) {
+ std::unique_lock lock(ActiveIdsMutex_);
+ ActiveIds_.insert(std::move(id));
+ }
+
+ void SetInactive(const TId& id) {
+ std::unique_lock lock(ActiveIdsMutex_);
+ ActiveIds_.erase(id);
+ ActiveIdsCv_.notify_all();
+ }
+
+ bool WaitForAllInactive(std::chrono::microseconds timeout = 1ms) {
+ std::unique_lock lock(ActiveIdsMutex_);
+ ActiveIdsCv_.wait_for(lock, timeout, [this] {
+ return ActiveIds_.empty();
+ });
+ return ActiveIds_.empty();
+ }
+ };
+
+ class TQuickSortEngine {
+ public:
+ struct TParameters {
+ TContainer &Container;
+ i32 Left;
+ i32 Right;
+ void* CustomData = nullptr;
+
+ TParameters() = delete;
+ TParameters(TContainer& container, i32 left, i32 right, void* customData)
+ : Container(container)
+ , Left(left)
+ , Right(right)
+ , CustomData(customData)
+ {}
+ };
+
+ public:
+ void Sort(TQuickSortEngine::TParameters& params) {
+ auto [newRight, newLeft] = Partition(params.Container, params.Left, params.Right);
+ if (!(params.Left < newRight || newLeft < params.Right)) {
+ return;
+ }
+
+ auto [leftParams, rightParams] = SplitParameters(params, newRight, newLeft);
+
+ bool ranAsync = false;
+
+ if (newLeft < params.Right) {
+ ranAsync = TryRunAsync(rightParams);
+ if (ranAsync) {
+ Sort(rightParams);
+ }
+ }
+ if (params.Left < newRight) {
+ if (!ranAsync && !TryRunAsync(leftParams)) {
+ Sort(leftParams);
+ }
+ }
+ }
+
+ // returns bounds of left and right sub-arrays for the next iteration
+ std::pair<i32, i32> Partition(TContainer& container, i32 left, i32 right) {
+ ui32 pivotIndex = (left + right) / 2;
+ auto pivot = container[pivotIndex];
+ while (left <= right) {
+ while (container[left] < pivot) {
+ left++;
+ }
+ while (container[right] > pivot) {
+ right--;
+ }
+
+ if (left <= right) {
+ std::swap(container[left++], container[right--]);
+ }
+ }
+ return {right, left};
+ }
+
+ protected:
+ virtual std::pair<TParameters, TParameters> SplitParameters(const TParameters& params, i32 newRight, i32 newLeft) = 0;
+ virtual bool TryRunAsync(const TParameters& params) = 0;
+ };
+
+ class TQuickSortTask : public TQuickSortEngine, public IObjectInQueue {
+ public:
+ using TParameters = TQuickSortEngine::TParameters;
+ struct TThreadPoolParameters {
+ const ui32 ThreadsLimit = 0;
+ std::atomic<ui32> ThreadsUsed = 0;
+ TThreadPool& ThreadPool;
+
+ TThreadPoolParameters(ui32 threadsLimit, ui32 threadsUsed, TThreadPool &threadPool)
+ : ThreadsLimit(threadsLimit)
+ , ThreadsUsed(threadsUsed)
+ , ThreadPool(threadPool)
+ {}
+ };
+ TParameters Params;
+ TActiveEntityRegistry<TQuickSortTask*>& ActiveThreadRegistry;
+
+ public:
+ TQuickSortTask() = delete;
+ TQuickSortTask(TParameters params, TActiveEntityRegistry<TQuickSortTask*>& activeThreadRegistry)
+ : Params(params)
+ , ActiveThreadRegistry(activeThreadRegistry)
+ {
+ ActiveThreadRegistry.SetActive(this);
+ }
+
+ void Process(void*) override {
+ Sort(Params);
+ ActiveThreadRegistry.SetInactive(this);
+ }
+
+ protected:
+ std::pair<TParameters, TParameters> SplitParameters(const TParameters& params, i32 newRight, i32 newLeft) override {
+ return {
+ {params.Container, params.Left, newRight, params.CustomData},
+ {params.Container, newLeft, params.Right, params.CustomData}
+ };
+ }
+
+ bool TryRunAsync(const TParameters& params) override {
+ auto threadPoolParams = static_cast<TThreadPoolParameters*>(params.CustomData);
+ if (threadPoolParams->ThreadsUsed++ >= threadPoolParams->ThreadsLimit) {
+ threadPoolParams->ThreadsUsed--;
+ return false;
+ }
+ return threadPoolParams->ThreadPool.AddAndOwn(THolder(new TQuickSortTask(params, ActiveThreadRegistry)));
+ }
+ };
+
+ class TEvQuickSort : public TEventLocal<TEvQuickSort, EvQuickSort> {
+ public:
+ using TParameters = TQuickSortEngine::TParameters;
+ struct TActorSystemParameters {
+ TActorIds ActorIds;
+ std::atomic<ui32> ActorIdsUsed = 0;
+ TActorSystemParameters() = delete;
+ TActorSystemParameters(const TActorIds& actorIds, ui32 actorIdsUsed = 0)
+ : ActorIds(actorIds)
+ , ActorIdsUsed(actorIdsUsed)
+ {}
+ };
+
+ TQuickSortEngine::TParameters Params;
+
+ public:
+ TEvQuickSort() = delete;
+ TEvQuickSort(TParameters params, TActiveEntityRegistry<TEvQuickSort*>& activeEventRegistry)
+ : Params(params)
+ , ActiveEventRegistry_(activeEventRegistry)
+ {
+ Y_VERIFY(!Params.Container.empty());
+ Y_VERIFY(Params.Right - Params.Left + 1 <= static_cast<i32>(Params.Container.size()),
+ "left: %d, right: %d, cont.size: %d", Params.Left, Params.Right, static_cast<i32>(Params.Container.size()));
+ ActiveEventRegistry_.SetActive(this);
+ }
+
+ virtual ~TEvQuickSort() {
+ ActiveEventRegistry_.SetInactive(this);
+ }
+
+ private:
+ TActiveEntityRegistry<TEvQuickSort*>& ActiveEventRegistry_;
+ };
+
+ class TQuickSortActor : public TQuickSortEngine, public TActorBootstrapped<TQuickSortActor> {
+ public:
+ using TParameters = TQuickSortEngine::TParameters;
+
+ private:
+ TActiveEntityRegistry<TEvQuickSort*>& ActiveEventRegistry_;
+
+ public:
+ TQuickSortActor() = delete;
+ TQuickSortActor(TActiveEntityRegistry<TEvQuickSort*>& activeEventRegistry)
+ : TActorBootstrapped<TQuickSortActor>()
+ , ActiveEventRegistry_(activeEventRegistry)
+ {}
+
+ STFUNC(StateInit) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvQuickSort, Handle);
+ default:
+ Y_VERIFY(false);
+ }
+ }
+
+ void Bootstrap() {
+ Become(&TThis::StateInit);
+ }
+
+ protected:
+ std::pair<TParameters, TParameters> SplitParameters(const TParameters& params, i32 newRight, i32 newLeft) override {
+ return {
+ {params.Container, params.Left, newRight, params.CustomData},
+ {params.Container, newLeft, params.Right, params.CustomData}
+ };
+ }
+
+ bool TryRunAsync(const TParameters& params) override {
+ auto actorSystemParams = static_cast<TEvQuickSort::TActorSystemParameters*>(params.CustomData);
+ const auto actorIdIndex = actorSystemParams->ActorIdsUsed++;
+ if (actorIdIndex >= actorSystemParams->ActorIds.size()) {
+ actorSystemParams->ActorIdsUsed--;
+ return false;
+ }
+ auto targetActorId = actorSystemParams->ActorIds[actorIdIndex];
+ Send(targetActorId, new TEvQuickSort(params, ActiveEventRegistry_));
+ return true;
+ }
+
+ private:
+ void Handle(TEvQuickSort::TPtr& ev) {
+ auto evPtr = ev->Get();
+ Sort(evPtr->Params);
+ }
+ };
+
+ std::vector<i32> PrepareVectorToSort(ui32 n) {
+ std::vector<i32> numbers(n);
+ for (ui32 i = 0; i < numbers.size(); i++) {
+ numbers[i] = numbers.size() - i;
+ }
+ return numbers;
+ }
+
+ std::unique_ptr<TActorSystem> PrepareActorSystem(ui32 poolThreads) {
+ auto setup = MakeHolder<TActorSystemSetup>();
+ setup->NodeId = 1;
+
+ setup->ExecutorsCount = 1;
+ setup->Executors.Reset(new TAutoPtr<IExecutorPool>[setup->ExecutorsCount]);
+
+ ui32 poolId = 0;
+ ui64 poolSpinThreashold = 20;
+ setup->Executors[0].Reset(new TBasicExecutorPool(poolId, poolThreads, poolSpinThreashold));
+
+ TSchedulerConfig schedulerConfig;
+ schedulerConfig.ResolutionMicroseconds = 512;
+ schedulerConfig.SpinThreshold = 100;
+ setup->Scheduler.Reset(new TBasicSchedulerThread(schedulerConfig));
+
+ return std::make_unique<TActorSystem>(setup);
+ }
+
+ std::vector<TActorId> prepareQuickSortActors(
+ TActorSystem* actorSystem, ui32 actorsNum, TActiveEntityRegistry<TEvQuickSort*>& activeEventRegistry
+ ) {
+ std::vector<TActorId> actorIds;
+ actorIds.reserve(actorsNum);
+
+ for (ui32 i = 0; i < actorsNum; i++) {
+ auto actor = new TQuickSortActor(activeEventRegistry);
+ auto actorId = actorSystem->Register(actor);
+ actorIds.push_back(actorId);
+ }
+
+ return actorIds;
+ }
+
+ std::pair<std::chrono::microseconds, std::vector<i32>> BenchmarkQuickSortActor(
+ ui32 threads,
+ ui32 iterations,
+ ui32 vectorSize
+ ) {
+ auto actorSystem = PrepareActorSystem(threads);
+ actorSystem->Start();
+
+ TActiveEntityRegistry<TEvQuickSort*> activeEventRegistry;
+ auto actorIds = prepareQuickSortActors(actorSystem.get(), threads, activeEventRegistry);
+
+ std::vector<i32> actorSortResult;
+ auto actorQsDurationTotal = 0us;
+ for (ui32 i = 0; i < iterations; i++) {
+ auto numbers = PrepareVectorToSort(vectorSize);
+
+ TEvQuickSort::TActorSystemParameters actorSystemParams(actorIds, 1);
+ TEvQuickSort::TParameters params(numbers, 0, numbers.size() - 1, &actorSystemParams);
+ auto ev3 = new TEvQuickSort(params, activeEventRegistry);
+
+ BENCH_START(qs);
+
+ actorSystem->Send(actorIds.front(), ev3);
+ UNIT_ASSERT_C(activeEventRegistry.WaitForAllInactive(60s), "timeout");
+
+ actorQsDurationTotal += std::chrono::duration_cast<std::chrono::microseconds>(BENCH_END(qs));
+
+ if (i + 1 == iterations) {
+ actorSortResult = numbers;
+ }
+ }
+
+ return {actorQsDurationTotal / iterations, actorSortResult};
+ }
+
+ std::pair<std::chrono::microseconds, std::vector<i32>> BenchmarkQuickSortThreadPool(
+ ui32 threads,
+ ui32 iterations,
+ ui32 vectorSize
+ ) {
+ TThreadPool threadPool;
+ threadPool.Start(threads);
+ TActiveEntityRegistry<TQuickSortTask*> activeThreadRegistry;
+
+ auto threaPoolSortDurationTotal = 0us;
+ std::vector<i32> threadPoolSortResult;
+ for (ui32 i = 0; i < iterations; i++) {
+ auto numbers = PrepareVectorToSort(vectorSize);
+
+ TQuickSortTask::TThreadPoolParameters threadPoolParams(threads, 1, threadPool);
+ TQuickSortTask::TParameters params(numbers, 0, numbers.size() - 1, &threadPoolParams);
+
+ BENCH_START(thread);
+
+ Y_VERIFY(threadPoolParams.ThreadPool.AddAndOwn(THolder(new TQuickSortTask(params, activeThreadRegistry))));
+ UNIT_ASSERT_C(activeThreadRegistry.WaitForAllInactive(60s), "timeout");
+
+ threaPoolSortDurationTotal += std::chrono::duration_cast<std::chrono::microseconds>(BENCH_END(thread));
+
+ if (i + 1 == iterations) {
+ threadPoolSortResult = numbers;
+ }
+ }
+
+ threadPool.Stop();
+
+ return {threaPoolSortDurationTotal / iterations, threadPoolSortResult};
+ }
+
+ Y_UNIT_TEST(QuickSortActor) {
+ const std::vector<ui32> threadss{1, 4};
+ const std::vector<ui32> vectorSizes{100, 1'000, 1'000'000};
+ const ui32 iterations = 3;
+
+ std::cout << "sep=," << std::endl;
+ std::cout << "size,threads,actor_time(us),thread_pool_time(us)" << std::endl;
+
+ for (auto vectorSize : vectorSizes) {
+ for (auto threads : threadss) {
+ std::cerr << "vector size: " << vectorSize << ", threads: " << threads << std::endl;
+
+ auto [actorSortDuration, actorSortResult] = BenchmarkQuickSortActor(threads, iterations, vectorSize);
+ std::cerr << "actor sort duration: " << actorSortDuration.count() << "us" << std::endl;
+
+ auto [threadPoolSortDuration, threadPoolSortResult] = BenchmarkQuickSortThreadPool(threads, iterations, vectorSize);
+ std::cerr << "thread pool sort duration: " << threadPoolSortDuration.count() << "us" << std::endl;
+
+ auto referenceVector = PrepareVectorToSort(vectorSize);
+ std::sort(referenceVector.begin(), referenceVector.end());
+
+ UNIT_ASSERT_EQUAL_C(actorSortResult, referenceVector,
+ "vector size: " << vectorSize << "; threads: " << threads);
+ UNIT_ASSERT_EQUAL_C(threadPoolSortResult, referenceVector,
+ "vector size: " << vectorSize << "; threads: " << threads);
+
+ std::cout << vectorSize << ","
+ << threads << ","
+ << actorSortDuration.count() << ","
+ << threadPoolSortDuration.count() << std::endl;
+ }
+
+ std::cerr << "-----" << std::endl << std::endl;
+ }
+ }
+}
diff --git a/library/cpp/actors/core/ut/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/core/ut/CMakeLists.darwin-x86_64.txt
index ef83a150ec..e56713f557 100644
--- a/library/cpp/actors/core/ut/CMakeLists.darwin-x86_64.txt
+++ b/library/cpp/actors/core/ut/CMakeLists.darwin-x86_64.txt
@@ -29,6 +29,7 @@ target_link_options(library-cpp-actors-core-ut PRIVATE
)
target_sources(library-cpp-actors-core-ut PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actor_coroutine_ut.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/benchmark_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actor_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actorsystem_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/performance_ut.cpp
diff --git a/library/cpp/actors/core/ut/CMakeLists.linux-aarch64.txt b/library/cpp/actors/core/ut/CMakeLists.linux-aarch64.txt
index 768f9af719..bfd1f0f226 100644
--- a/library/cpp/actors/core/ut/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/actors/core/ut/CMakeLists.linux-aarch64.txt
@@ -32,6 +32,7 @@ target_link_options(library-cpp-actors-core-ut PRIVATE
)
target_sources(library-cpp-actors-core-ut PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actor_coroutine_ut.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/benchmark_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actor_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actorsystem_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/performance_ut.cpp
diff --git a/library/cpp/actors/core/ut/CMakeLists.linux-x86_64.txt b/library/cpp/actors/core/ut/CMakeLists.linux-x86_64.txt
index f2b6f3e305..145a291b45 100644
--- a/library/cpp/actors/core/ut/CMakeLists.linux-x86_64.txt
+++ b/library/cpp/actors/core/ut/CMakeLists.linux-x86_64.txt
@@ -33,6 +33,7 @@ target_link_options(library-cpp-actors-core-ut PRIVATE
)
target_sources(library-cpp-actors-core-ut PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actor_coroutine_ut.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/benchmark_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actor_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actorsystem_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/performance_ut.cpp
diff --git a/library/cpp/actors/core/ut/CMakeLists.windows-x86_64.txt b/library/cpp/actors/core/ut/CMakeLists.windows-x86_64.txt
index bc26cc7c5d..8e7e6bd499 100644
--- a/library/cpp/actors/core/ut/CMakeLists.windows-x86_64.txt
+++ b/library/cpp/actors/core/ut/CMakeLists.windows-x86_64.txt
@@ -22,6 +22,7 @@ target_link_libraries(library-cpp-actors-core-ut PUBLIC
)
target_sources(library-cpp-actors-core-ut PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actor_coroutine_ut.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/core/benchmark_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actor_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/actorsystem_ut.cpp
${CMAKE_SOURCE_DIR}/library/cpp/actors/core/performance_ut.cpp