aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-06-04 21:05:23 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-06-04 21:18:26 +0300
commitb6909b2f9fb6e2b7ea15c8f889f1856fc4c3c57d (patch)
tree5aea95a84b4c26d4b0498dda6b0d638ddeb8342d
parentdabfa1a29d0c5f48f8ff57471e5095124d355bc0 (diff)
downloadydb-b6909b2f9fb6e2b7ea15c8f889f1856fc4c3c57d.tar.gz
YT-21872, YT-21402: Introduce configurable idle fiber pool
c70b25e58264f96e168e8032daf67c63d1fdeede
-rw-r--r--yt/yt/core/concurrency/fiber.cpp8
-rw-r--r--yt/yt/core/concurrency/fiber_scheduler_thread.cpp165
-rw-r--r--yt/yt/core/concurrency/fiber_scheduler_thread.h5
-rw-r--r--yt/yt/library/program/config.cpp4
-rw-r--r--yt/yt/library/program/config.h1
-rw-r--r--yt/yt/library/program/helpers.cpp3
6 files changed, 112 insertions, 74 deletions
diff --git a/yt/yt/core/concurrency/fiber.cpp b/yt/yt/core/concurrency/fiber.cpp
index 1edce18402..564d9b29e5 100644
--- a/yt/yt/core/concurrency/fiber.cpp
+++ b/yt/yt/core/concurrency/fiber.cpp
@@ -17,7 +17,7 @@
#include <util/random/random.h>
-#ifndef NDEBUG
+#if defined(_asan_enabled_)
#include <yt/yt/core/misc/shutdown.h>
#endif
@@ -154,9 +154,9 @@ private:
// cause realistically this is a "problem"
// only during the shutdown which means that
// process is going to be killed shortly after.
-// In debug we cleanup properly so that
+// In for asan we cleanup properly so that
// there are no actual leaks.
-#ifndef NDEBUG
+#if defined(_asan_enabled_)
TShutdownCookie ShutdownCookie_;
void InitializeShutdownCookie()
@@ -176,7 +176,7 @@ private:
// were observed empty.
bool GuardedProcessQueues()
{
-#ifndef NDEBUG
+#if defined(_asan_enabled_)
if (!ShutdownCookie_) {
InitializeShutdownCookie();
}
diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
index 631d0794bc..39efb9f1c3 100644
--- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
@@ -1,7 +1,8 @@
#include "fiber_scheduler_thread.h"
-#include "private.h"
#include "fiber.h"
+#include "moody_camel_concurrent_queue.h"
+#include "private.h"
#include <yt/yt/library/profiling/producer.h>
@@ -37,10 +38,6 @@
namespace NYT::NConcurrency {
-// NB(arkady-e1ppa): Please run core tests with this macro undefined
-// if you are changing fibers.
-#define YT_REUSE_FIBERS
-
////////////////////////////////////////////////////////////////////////////////
using namespace NLogging;
@@ -140,7 +137,7 @@ struct TFiberContext
TFiber* CurrentFiber = nullptr;
};
-YT_DEFINE_THREAD_LOCAL(TFiberContext*, FiberContext);
+YT_DEFINE_THREAD_LOCAL(TFiberContext*, FiberContext, nullptr);
// Forbid inlining these accessors to prevent the compiler from
// miss-optimizing TLS access in presence of fiber context switches.
@@ -160,17 +157,21 @@ class TFiberContextGuard
{
public:
explicit TFiberContextGuard(TFiberContext* context)
+ : Prev_(TryGetFiberContext())
{
SetFiberContext(context);
}
~TFiberContextGuard()
{
- SetFiberContext(nullptr);
+ SetFiberContext(Prev_);
}
TFiberContextGuard(const TFiberContextGuard&) = delete;
TFiberContextGuard operator=(const TFiberContextGuard&) = delete;
+
+private:
+ TFiberContext* Prev_;
};
////////////////////////////////////////////////////////////////////////////////
@@ -357,8 +358,6 @@ private:
////////////////////////////////////////////////////////////////////////////////
-#ifdef YT_REUSE_FIBERS
-
class TIdleFiberPool
{
public:
@@ -371,6 +370,8 @@ public:
// Save fiber in AfterSwitch because it can be immediately concurrently reused.
void SwichFromFiberAndMakeItIdle(TFiber* currentFiber, TFiber* targetFiber)
{
+ RemoveOverdraftedIdleFibers();
+
auto afterSwitch = MakeAfterSwitch([currentFiber, this] {
currentFiber->SetIdle();
EnqueueIdleFiber(currentFiber);
@@ -388,50 +389,24 @@ public:
return TFiber::CreateFiber();
}
-private:
- const TShutdownCookie ShutdownCookie_ = RegisterShutdownCallback(
- "FiberManager",
- BIND_NO_PROPAGATE(&TIdleFiberPool::DestroyIdleFibers, this),
- /*priority*/ -100);
-
- TLockFreeStack<TFiber*> IdleFibers_;
- std::atomic<bool> DestroyingIdleFibers_ = false;
-
- void EnqueueIdleFiber(TFiber* fiber)
+ void UpdateMaxIdleFibers(ui64 maxIdleFibers)
{
- IdleFibers_.Enqueue(fiber);
- if (DestroyingIdleFibers_.load()) {
- DoDestroyIdleFibers();
- }
+ MaxIdleFibers_.store(maxIdleFibers, std::memory_order::relaxed);
}
- TFiber* TryDequeueIdleFiber()
- {
- TFiber* fiber = nullptr;
- IdleFibers_.Dequeue(&fiber);
- return fiber;
- }
+private:
+ moodycamel::ConcurrentQueue<TFiber*> IdleFibers_;
+ std::atomic<ui64> MaxIdleFibers_ = DefaultMaxIdleFibers;
- void DestroyIdleFibers()
- {
- DestroyingIdleFibers_.store(true);
- DoDestroyIdleFibers();
- }
+ // NB(arkady-e1ppa): Construct this last so that every other
+ // field is initialized if this callback is ran concurrently.
+ const TShutdownCookie ShutdownCookie_ = RegisterShutdownCallback(
+ "IdleFiberPool",
+ BIND_NO_PROPAGATE(&TIdleFiberPool::Shutdown, this),
+ /*priority*/std::numeric_limits<int>::min() + 1);
- void DoDestroyIdleFibers()
+ void Shutdown()
{
- auto destroyFibers = [&] {
- TFiberContext fiberContext;
- TFiberContextGuard fiberContextGuard(&fiberContext);
-
- std::vector<TFiber*> fibers;
- IdleFibers_.DequeueAll(&fibers);
-
- for (auto fiber : fibers) {
- SwitchFromThread(fiber);
- }
- };
-
#ifdef _unix_
// The current thread could be already exiting and MacOS has some issues
// with registering new thread-local terminators in this case:
@@ -441,22 +416,83 @@ private:
std::thread thread([&] {
::TThread::SetCurrentThreadName("IdleFiberDtor");
- destroyFibers();
+ JoinAllFibers();
});
thread.join();
#else
// Starting threads in exit handlers on Windows causes immediate calling exit
// so the routine will not be executed. Moreover, if we try to join this thread we'll get deadlock
// because this thread will try to acquire atexit lock which is owned by this thread.
- destroyFibers();
+ JoinAllFibers();
#endif
}
+ void JoinAllFibers()
+ {
+ std::vector<TFiber*> fibers;
+
+ while (true) {
+ auto size = std::max<size_t>(1, IdleFibers_.size_approx());
+
+ DequeueBulk(&fibers, size);
+ if (fibers.empty()) {
+ break;
+ }
+ JoinFibers(std::move(fibers));
+ }
+ }
+
+ void JoinFibers(std::vector<TFiber*>&& fibers)
+ {
+ TFiberContext fiberContext;
+ TFiberContextGuard fiberContextGuard(&fiberContext);
+
+ for (auto fiber : fibers) {
+ // Fibers will observe nullptr fiberThread
+ // and switch back with deleter in afterSwitch.
+ SwitchFromThread(fiber);
+ }
+ }
+
+ void RemoveOverdraftedIdleFibers()
+ {
+ auto size = IdleFibers_.size_approx();
+ if (size <= MaxIdleFibers_.load(std::memory_order::relaxed)) {
+ return;
+ }
+
+ auto targetSize = std::max<size_t>(1, MaxIdleFibers_ / 2);
+
+ std::vector<TFiber*> fibers;
+ DequeueBulk(&fibers, size - targetSize);
+ if (fibers.empty()) {
+ return;
+ }
+ JoinFibers(std::move(fibers));
+ }
+
+ void DequeueBulk(std::vector<TFiber*>* fibers, ui64 count)
+ {
+ fibers->resize(count);
+ auto dequeued = IdleFibers_.try_dequeue_bulk(std::begin(*fibers), count);
+ fibers->resize(dequeued);
+ }
+
+ void EnqueueIdleFiber(TFiber* fiber)
+ {
+ IdleFibers_.enqueue(fiber);
+ }
+
+ TFiber* TryDequeueIdleFiber()
+ {
+ TFiber* fiber = nullptr;
+ IdleFibers_.try_dequeue(fiber);
+ return fiber;
+ }
+
DECLARE_LEAKY_SINGLETON_FRIEND()
};
-#endif
-
////////////////////////////////////////////////////////////////////////////////
Y_FORCE_INLINE TClosure PickCallback(TFiberSchedulerThread* fiberThread)
@@ -502,26 +538,14 @@ void FiberTrampoline()
// not necessarily null. Check them after switch from and returning into current fiber.
if (successorFiber = ExtractResumerFiber()) {
// Suspend current fiber.
-#ifdef YT_REUSE_FIBERS
TIdleFiberPool::Get()->SwichFromFiberAndMakeItIdle(currentFiber, successorFiber);
-#else
- break;
-#endif
}
}
YT_LOG_DEBUG("Fiber finished");
- auto afterSwitch = MakeAfterSwitch([currentFiber, successorFiber] () mutable {
+ auto afterSwitch = MakeAfterSwitch([currentFiber] () mutable {
TFiber::ReleaseFiber(currentFiber);
-
-#ifdef YT_REUSE_FIBERS
- Y_UNUSED(successorFiber);
-#else
- if (successorFiber != nullptr) {
- SwitchFromThread(successorFiber);
- }
-#endif
});
// All allocated objects in this frame must be destroyed here.
@@ -536,15 +560,9 @@ void YieldFiber(TAfterSwitch afterSwitch)
auto targetFiber = ExtractResumerFiber();
// If there is no resumer switch to idle fiber. Or switch to thread main.
-#ifdef YT_REUSE_FIBERS
if (!targetFiber) {
targetFiber = TIdleFiberPool::Get()->GetFiber();
}
-#endif
-
- if (!targetFiber) {
- targetFiber = TFiber::CreateFiber();
- }
auto waitingFibersCounter = GetWaitingFibersCounter();
waitingFibersCounter->Increment(1);
@@ -1051,6 +1069,13 @@ void TFiberSchedulerThread::ThreadMain()
////////////////////////////////////////////////////////////////////////////////
+void UpdateMaxIdleFibers(ui64 maxIdleFibers)
+{
+ NDetail::TIdleFiberPool::Get()->UpdateMaxIdleFibers(maxIdleFibers);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
YT_DEFINE_THREAD_LOCAL(TFiberId, CurrentFiberId);
TFiberId GetCurrentFiberId()
diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.h b/yt/yt/core/concurrency/fiber_scheduler_thread.h
index b368a7910c..87e3ae08dd 100644
--- a/yt/yt/core/concurrency/fiber_scheduler_thread.h
+++ b/yt/yt/core/concurrency/fiber_scheduler_thread.h
@@ -31,4 +31,9 @@ private:
////////////////////////////////////////////////////////////////////////////////
+inline constexpr ui64 DefaultMaxIdleFibers = 5000;
+void UpdateMaxIdleFibers(ui64 maxIdleFibers);
+
+////////////////////////////////////////////////////////////////////////////////
+
} //namespace NYT::NConcurrency
diff --git a/yt/yt/library/program/config.cpp b/yt/yt/library/program/config.cpp
index 27d47c564d..b8957f6708 100644
--- a/yt/yt/library/program/config.cpp
+++ b/yt/yt/library/program/config.cpp
@@ -1,5 +1,7 @@
#include "config.h"
+#include <yt/yt/core/concurrency/fiber_scheduler_thread.h>
+
namespace NYT {
using namespace NYTree;
@@ -117,6 +119,8 @@ void TSingletonsDynamicConfig::Register(TRegistrar registrar)
{
registrar.Parameter("spin_lock_slow_path_logging_threshold", &TThis::SpinWaitSlowPathLoggingThreshold)
.Optional();
+ registrar.Parameter("max_idle_fibers", &TThis::MaxIdleFibers)
+ .Default(NConcurrency::DefaultMaxIdleFibers);
registrar.Parameter("yt_alloc", &TThis::YTAlloc)
.Optional();
registrar.Parameter("tcp_dispatcher", &TThis::TcpDispatcher)
diff --git a/yt/yt/library/program/config.h b/yt/yt/library/program/config.h
index a54d2fa7ce..6b28208d76 100644
--- a/yt/yt/library/program/config.h
+++ b/yt/yt/library/program/config.h
@@ -156,6 +156,7 @@ class TSingletonsDynamicConfig
{
public:
std::optional<TDuration> SpinWaitSlowPathLoggingThreshold;
+ ui64 MaxIdleFibers;
NYTAlloc::TYTAllocConfigPtr YTAlloc;
NBus::TTcpDispatcherDynamicConfigPtr TcpDispatcher;
NRpc::TDispatcherDynamicConfigPtr RpcDispatcher;
diff --git a/yt/yt/library/program/helpers.cpp b/yt/yt/library/program/helpers.cpp
index eba6dbbaad..a956588735 100644
--- a/yt/yt/library/program/helpers.cpp
+++ b/yt/yt/library/program/helpers.cpp
@@ -19,6 +19,7 @@
#include <yt/yt/core/logging/log_manager.h>
#include <yt/yt/core/concurrency/execution_stack.h>
+#include <yt/yt/core/concurrency/fiber_scheduler_thread.h>
#include <yt/yt/core/concurrency/periodic_executor.h>
#include <tcmalloc/malloc_extension.h>
@@ -238,6 +239,8 @@ void ReconfigureSingletons(const TSingletonsConfigPtr& config, const TSingletons
{
SetSpinWaitSlowPathLoggingThreshold(dynamicConfig->SpinWaitSlowPathLoggingThreshold.value_or(config->SpinWaitSlowPathLoggingThreshold));
+ NConcurrency::UpdateMaxIdleFibers(dynamicConfig->MaxIdleFibers);
+
if (!NYTAlloc::IsConfiguredFromEnv()) {
NYTAlloc::Configure(dynamicConfig->YTAlloc ? dynamicConfig->YTAlloc : config->YTAlloc);
}