diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-06-04 21:05:23 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-06-04 21:18:26 +0300 |
commit | b6909b2f9fb6e2b7ea15c8f889f1856fc4c3c57d (patch) | |
tree | 5aea95a84b4c26d4b0498dda6b0d638ddeb8342d | |
parent | dabfa1a29d0c5f48f8ff57471e5095124d355bc0 (diff) | |
download | ydb-b6909b2f9fb6e2b7ea15c8f889f1856fc4c3c57d.tar.gz |
YT-21872, YT-21402: Introduce configurable idle fiber pool
c70b25e58264f96e168e8032daf67c63d1fdeede
-rw-r--r-- | yt/yt/core/concurrency/fiber.cpp | 8 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fiber_scheduler_thread.cpp | 165 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fiber_scheduler_thread.h | 5 | ||||
-rw-r--r-- | yt/yt/library/program/config.cpp | 4 | ||||
-rw-r--r-- | yt/yt/library/program/config.h | 1 | ||||
-rw-r--r-- | yt/yt/library/program/helpers.cpp | 3 |
6 files changed, 112 insertions, 74 deletions
diff --git a/yt/yt/core/concurrency/fiber.cpp b/yt/yt/core/concurrency/fiber.cpp index 1edce18402..564d9b29e5 100644 --- a/yt/yt/core/concurrency/fiber.cpp +++ b/yt/yt/core/concurrency/fiber.cpp @@ -17,7 +17,7 @@ #include <util/random/random.h> -#ifndef NDEBUG +#if defined(_asan_enabled_) #include <yt/yt/core/misc/shutdown.h> #endif @@ -154,9 +154,9 @@ private: // cause realistically this is a "problem" // only during the shutdown which means that // process is going to be killed shortly after. -// In debug we cleanup properly so that +// In for asan we cleanup properly so that // there are no actual leaks. -#ifndef NDEBUG +#if defined(_asan_enabled_) TShutdownCookie ShutdownCookie_; void InitializeShutdownCookie() @@ -176,7 +176,7 @@ private: // were observed empty. bool GuardedProcessQueues() { -#ifndef NDEBUG +#if defined(_asan_enabled_) if (!ShutdownCookie_) { InitializeShutdownCookie(); } diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp index 631d0794bc..39efb9f1c3 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp @@ -1,7 +1,8 @@ #include "fiber_scheduler_thread.h" -#include "private.h" #include "fiber.h" +#include "moody_camel_concurrent_queue.h" +#include "private.h" #include <yt/yt/library/profiling/producer.h> @@ -37,10 +38,6 @@ namespace NYT::NConcurrency { -// NB(arkady-e1ppa): Please run core tests with this macro undefined -// if you are changing fibers. -#define YT_REUSE_FIBERS - //////////////////////////////////////////////////////////////////////////////// using namespace NLogging; @@ -140,7 +137,7 @@ struct TFiberContext TFiber* CurrentFiber = nullptr; }; -YT_DEFINE_THREAD_LOCAL(TFiberContext*, FiberContext); +YT_DEFINE_THREAD_LOCAL(TFiberContext*, FiberContext, nullptr); // Forbid inlining these accessors to prevent the compiler from // miss-optimizing TLS access in presence of fiber context switches. @@ -160,17 +157,21 @@ class TFiberContextGuard { public: explicit TFiberContextGuard(TFiberContext* context) + : Prev_(TryGetFiberContext()) { SetFiberContext(context); } ~TFiberContextGuard() { - SetFiberContext(nullptr); + SetFiberContext(Prev_); } TFiberContextGuard(const TFiberContextGuard&) = delete; TFiberContextGuard operator=(const TFiberContextGuard&) = delete; + +private: + TFiberContext* Prev_; }; //////////////////////////////////////////////////////////////////////////////// @@ -357,8 +358,6 @@ private: //////////////////////////////////////////////////////////////////////////////// -#ifdef YT_REUSE_FIBERS - class TIdleFiberPool { public: @@ -371,6 +370,8 @@ public: // Save fiber in AfterSwitch because it can be immediately concurrently reused. void SwichFromFiberAndMakeItIdle(TFiber* currentFiber, TFiber* targetFiber) { + RemoveOverdraftedIdleFibers(); + auto afterSwitch = MakeAfterSwitch([currentFiber, this] { currentFiber->SetIdle(); EnqueueIdleFiber(currentFiber); @@ -388,50 +389,24 @@ public: return TFiber::CreateFiber(); } -private: - const TShutdownCookie ShutdownCookie_ = RegisterShutdownCallback( - "FiberManager", - BIND_NO_PROPAGATE(&TIdleFiberPool::DestroyIdleFibers, this), - /*priority*/ -100); - - TLockFreeStack<TFiber*> IdleFibers_; - std::atomic<bool> DestroyingIdleFibers_ = false; - - void EnqueueIdleFiber(TFiber* fiber) + void UpdateMaxIdleFibers(ui64 maxIdleFibers) { - IdleFibers_.Enqueue(fiber); - if (DestroyingIdleFibers_.load()) { - DoDestroyIdleFibers(); - } + MaxIdleFibers_.store(maxIdleFibers, std::memory_order::relaxed); } - TFiber* TryDequeueIdleFiber() - { - TFiber* fiber = nullptr; - IdleFibers_.Dequeue(&fiber); - return fiber; - } +private: + moodycamel::ConcurrentQueue<TFiber*> IdleFibers_; + std::atomic<ui64> MaxIdleFibers_ = DefaultMaxIdleFibers; - void DestroyIdleFibers() - { - DestroyingIdleFibers_.store(true); - DoDestroyIdleFibers(); - } + // NB(arkady-e1ppa): Construct this last so that every other + // field is initialized if this callback is ran concurrently. + const TShutdownCookie ShutdownCookie_ = RegisterShutdownCallback( + "IdleFiberPool", + BIND_NO_PROPAGATE(&TIdleFiberPool::Shutdown, this), + /*priority*/std::numeric_limits<int>::min() + 1); - void DoDestroyIdleFibers() + void Shutdown() { - auto destroyFibers = [&] { - TFiberContext fiberContext; - TFiberContextGuard fiberContextGuard(&fiberContext); - - std::vector<TFiber*> fibers; - IdleFibers_.DequeueAll(&fibers); - - for (auto fiber : fibers) { - SwitchFromThread(fiber); - } - }; - #ifdef _unix_ // The current thread could be already exiting and MacOS has some issues // with registering new thread-local terminators in this case: @@ -441,22 +416,83 @@ private: std::thread thread([&] { ::TThread::SetCurrentThreadName("IdleFiberDtor"); - destroyFibers(); + JoinAllFibers(); }); thread.join(); #else // Starting threads in exit handlers on Windows causes immediate calling exit // so the routine will not be executed. Moreover, if we try to join this thread we'll get deadlock // because this thread will try to acquire atexit lock which is owned by this thread. - destroyFibers(); + JoinAllFibers(); #endif } + void JoinAllFibers() + { + std::vector<TFiber*> fibers; + + while (true) { + auto size = std::max<size_t>(1, IdleFibers_.size_approx()); + + DequeueBulk(&fibers, size); + if (fibers.empty()) { + break; + } + JoinFibers(std::move(fibers)); + } + } + + void JoinFibers(std::vector<TFiber*>&& fibers) + { + TFiberContext fiberContext; + TFiberContextGuard fiberContextGuard(&fiberContext); + + for (auto fiber : fibers) { + // Fibers will observe nullptr fiberThread + // and switch back with deleter in afterSwitch. + SwitchFromThread(fiber); + } + } + + void RemoveOverdraftedIdleFibers() + { + auto size = IdleFibers_.size_approx(); + if (size <= MaxIdleFibers_.load(std::memory_order::relaxed)) { + return; + } + + auto targetSize = std::max<size_t>(1, MaxIdleFibers_ / 2); + + std::vector<TFiber*> fibers; + DequeueBulk(&fibers, size - targetSize); + if (fibers.empty()) { + return; + } + JoinFibers(std::move(fibers)); + } + + void DequeueBulk(std::vector<TFiber*>* fibers, ui64 count) + { + fibers->resize(count); + auto dequeued = IdleFibers_.try_dequeue_bulk(std::begin(*fibers), count); + fibers->resize(dequeued); + } + + void EnqueueIdleFiber(TFiber* fiber) + { + IdleFibers_.enqueue(fiber); + } + + TFiber* TryDequeueIdleFiber() + { + TFiber* fiber = nullptr; + IdleFibers_.try_dequeue(fiber); + return fiber; + } + DECLARE_LEAKY_SINGLETON_FRIEND() }; -#endif - //////////////////////////////////////////////////////////////////////////////// Y_FORCE_INLINE TClosure PickCallback(TFiberSchedulerThread* fiberThread) @@ -502,26 +538,14 @@ void FiberTrampoline() // not necessarily null. Check them after switch from and returning into current fiber. if (successorFiber = ExtractResumerFiber()) { // Suspend current fiber. -#ifdef YT_REUSE_FIBERS TIdleFiberPool::Get()->SwichFromFiberAndMakeItIdle(currentFiber, successorFiber); -#else - break; -#endif } } YT_LOG_DEBUG("Fiber finished"); - auto afterSwitch = MakeAfterSwitch([currentFiber, successorFiber] () mutable { + auto afterSwitch = MakeAfterSwitch([currentFiber] () mutable { TFiber::ReleaseFiber(currentFiber); - -#ifdef YT_REUSE_FIBERS - Y_UNUSED(successorFiber); -#else - if (successorFiber != nullptr) { - SwitchFromThread(successorFiber); - } -#endif }); // All allocated objects in this frame must be destroyed here. @@ -536,15 +560,9 @@ void YieldFiber(TAfterSwitch afterSwitch) auto targetFiber = ExtractResumerFiber(); // If there is no resumer switch to idle fiber. Or switch to thread main. -#ifdef YT_REUSE_FIBERS if (!targetFiber) { targetFiber = TIdleFiberPool::Get()->GetFiber(); } -#endif - - if (!targetFiber) { - targetFiber = TFiber::CreateFiber(); - } auto waitingFibersCounter = GetWaitingFibersCounter(); waitingFibersCounter->Increment(1); @@ -1051,6 +1069,13 @@ void TFiberSchedulerThread::ThreadMain() //////////////////////////////////////////////////////////////////////////////// +void UpdateMaxIdleFibers(ui64 maxIdleFibers) +{ + NDetail::TIdleFiberPool::Get()->UpdateMaxIdleFibers(maxIdleFibers); +} + +//////////////////////////////////////////////////////////////////////////////// + YT_DEFINE_THREAD_LOCAL(TFiberId, CurrentFiberId); TFiberId GetCurrentFiberId() diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.h b/yt/yt/core/concurrency/fiber_scheduler_thread.h index b368a7910c..87e3ae08dd 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.h +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.h @@ -31,4 +31,9 @@ private: //////////////////////////////////////////////////////////////////////////////// +inline constexpr ui64 DefaultMaxIdleFibers = 5000; +void UpdateMaxIdleFibers(ui64 maxIdleFibers); + +//////////////////////////////////////////////////////////////////////////////// + } //namespace NYT::NConcurrency diff --git a/yt/yt/library/program/config.cpp b/yt/yt/library/program/config.cpp index 27d47c564d..b8957f6708 100644 --- a/yt/yt/library/program/config.cpp +++ b/yt/yt/library/program/config.cpp @@ -1,5 +1,7 @@ #include "config.h" +#include <yt/yt/core/concurrency/fiber_scheduler_thread.h> + namespace NYT { using namespace NYTree; @@ -117,6 +119,8 @@ void TSingletonsDynamicConfig::Register(TRegistrar registrar) { registrar.Parameter("spin_lock_slow_path_logging_threshold", &TThis::SpinWaitSlowPathLoggingThreshold) .Optional(); + registrar.Parameter("max_idle_fibers", &TThis::MaxIdleFibers) + .Default(NConcurrency::DefaultMaxIdleFibers); registrar.Parameter("yt_alloc", &TThis::YTAlloc) .Optional(); registrar.Parameter("tcp_dispatcher", &TThis::TcpDispatcher) diff --git a/yt/yt/library/program/config.h b/yt/yt/library/program/config.h index a54d2fa7ce..6b28208d76 100644 --- a/yt/yt/library/program/config.h +++ b/yt/yt/library/program/config.h @@ -156,6 +156,7 @@ class TSingletonsDynamicConfig { public: std::optional<TDuration> SpinWaitSlowPathLoggingThreshold; + ui64 MaxIdleFibers; NYTAlloc::TYTAllocConfigPtr YTAlloc; NBus::TTcpDispatcherDynamicConfigPtr TcpDispatcher; NRpc::TDispatcherDynamicConfigPtr RpcDispatcher; diff --git a/yt/yt/library/program/helpers.cpp b/yt/yt/library/program/helpers.cpp index eba6dbbaad..a956588735 100644 --- a/yt/yt/library/program/helpers.cpp +++ b/yt/yt/library/program/helpers.cpp @@ -19,6 +19,7 @@ #include <yt/yt/core/logging/log_manager.h> #include <yt/yt/core/concurrency/execution_stack.h> +#include <yt/yt/core/concurrency/fiber_scheduler_thread.h> #include <yt/yt/core/concurrency/periodic_executor.h> #include <tcmalloc/malloc_extension.h> @@ -238,6 +239,8 @@ void ReconfigureSingletons(const TSingletonsConfigPtr& config, const TSingletons { SetSpinWaitSlowPathLoggingThreshold(dynamicConfig->SpinWaitSlowPathLoggingThreshold.value_or(config->SpinWaitSlowPathLoggingThreshold)); + NConcurrency::UpdateMaxIdleFibers(dynamicConfig->MaxIdleFibers); + if (!NYTAlloc::IsConfiguredFromEnv()) { NYTAlloc::Configure(dynamicConfig->YTAlloc ? dynamicConfig->YTAlloc : config->YTAlloc); } |