aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-04-05 15:42:36 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-04-05 15:51:06 +0300
commitbeb1725856691d2bd11a0e2ec8ce7b45cebb9804 (patch)
tree36347f8f8b8c3797fcab9747115ab53ed668b358
parent431eb68d0e52caf69928b4581df3555d18e706b3 (diff)
downloadydb-beb1725856691d2bd11a0e2ec8ce7b45cebb9804.tar.gz
YT-21402: Fibers Refactoring pt.1: Introduce FunctionView to use it as AfterSwitch and improved registry algorithm
1) Added FunctionView -- non-owning type-erasure container. If we know that lambda lifetime is long enough, we can save up allocation by using this instead of TCallback. 2) Used FunctionView as AfterSwitch inside FiberSchedulerThread. We saved up a bunch of allocations (e.g. net worst-case allocations per suspend changed from 4 (x2 after switch + fiber allocation + enqueue to idle pool lf stack) to 2 (fiber allocation + enqueue to idle pool lf stack). 3) Fiber is not longer RefCounted. Its lifetime is managed via contract with TFiberRegistry. 4) TFiberRegistry is now lock-free for fiber insertion and deletion. For introspector it is still blocking. 5) "Introduced" SimpleIntrusiveList and IntrusiveMPSCStack to work be used in aforementioned TFiberRegistry. 6) elsedef branch of YT_REUSE_FIBERS was broken for about 3 years cause of double SetAfterSwitch. Now fixed. 7) (3), (4) and (5) caused some changes in yt_fiber_printers because some stuff was hardcoded there. Compat is in place. d6cf2ae5801c87813a21ca3e7243e1b2baa09f35
-rw-r--r--library/cpp/yt/misc/function_view-inl.h71
-rw-r--r--library/cpp/yt/misc/function_view.h139
-rw-r--r--yt/yt/core/concurrency/fiber.cpp110
-rw-r--r--yt/yt/core/concurrency/fiber.h55
-rw-r--r--yt/yt/core/concurrency/fiber_scheduler_thread.cpp257
-rw-r--r--yt/yt/core/concurrency/private.h2
-rw-r--r--yt/yt/core/concurrency/public.h7
-rw-r--r--yt/yt/core/misc/intrusive_list-inl.h152
-rw-r--r--yt/yt/core/misc/intrusive_list.h87
-rw-r--r--yt/yt/core/misc/intrusive_mpsc_stack-inl.h52
-rw-r--r--yt/yt/core/misc/intrusive_mpsc_stack.h33
-rw-r--r--yt/yt/library/backtrace_introspector/introspect.cpp97
12 files changed, 884 insertions, 178 deletions
diff --git a/library/cpp/yt/misc/function_view-inl.h b/library/cpp/yt/misc/function_view-inl.h
new file mode 100644
index 0000000000..ececfdf335
--- /dev/null
+++ b/library/cpp/yt/misc/function_view-inl.h
@@ -0,0 +1,71 @@
+#pragma once
+#ifndef FUNCTION_VIEW_INL_H_
+#error "Direct inclusion of this file is not allowed, include function_view.h"
+// For the sake of sane code completion.
+#include "function_view.h"
+#endif
+
+#include <library/cpp/yt/assert/assert.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TResult, bool NoExcept, class... TArgs>
+template <CTypeErasable<TResult(TArgs...) noexcept(NoExcept)> TConcrete>
+TFunctionView<TResult(TArgs...) noexcept(NoExcept)>::TFunctionView(TConcrete& concreteRef) noexcept
+ : TFunctionView(&concreteRef)
+{ }
+
+template <class TResult, bool NoExcept, class... TArgs>
+template <CTypeErasable<TResult(TArgs...) noexcept(NoExcept)> TConcrete>
+TFunctionView<TResult(TArgs...) noexcept(NoExcept)>::TFunctionView(TConcrete* concretePtr) noexcept
+{
+ Ptr_ = reinterpret_cast<void*>(concretePtr);
+ Invoke_ = &TFunctionView::ConcreteInvoke<TConcrete>;
+}
+
+template <class TResult, bool NoExcept, class... TArgs>
+TFunctionView<TResult(TArgs...) noexcept(NoExcept)>
+TFunctionView<TResult(TArgs...) noexcept(NoExcept)>::Release() noexcept
+{
+ auto copy = *this;
+ Reset();
+ return copy;
+}
+
+template <class TResult, bool NoExcept, class... TArgs>
+TResult TFunctionView<TResult(TArgs...) noexcept(NoExcept)>::operator()(TArgs... args) noexcept(NoExcept)
+{
+ YT_VERIFY(Ptr_);
+ return Invoke_(std::forward<TArgs>(args)..., Ptr_);
+}
+
+template <class TResult, bool NoExcept, class... TArgs>
+template <class TConcrete>
+TResult TFunctionView<TResult(TArgs...) noexcept(NoExcept)>::ConcreteInvoke(TArgs... args, TErasedPtr ptr) noexcept(NoExcept)
+{
+ return (*reinterpret_cast<TConcrete*>(ptr))(std::forward<TArgs>(args)...);
+}
+
+template <class TResult, bool NoExcept, class... TArgs>
+TFunctionView<TResult(TArgs...) noexcept(NoExcept)>::operator bool() const noexcept
+{
+ return IsValid();
+}
+
+template <class TResult, bool NoExcept, class... TArgs>
+bool TFunctionView<TResult(TArgs...) noexcept(NoExcept)>::IsValid() const noexcept
+{
+ return Ptr_ != nullptr;
+}
+
+template <class TResult, bool NoExcept, class... TArgs>
+void TFunctionView<TResult(TArgs...) noexcept(NoExcept)>::Reset() noexcept
+{
+ Ptr_ = nullptr;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/misc/function_view.h b/library/cpp/yt/misc/function_view.h
new file mode 100644
index 0000000000..259238521f
--- /dev/null
+++ b/library/cpp/yt/misc/function_view.h
@@ -0,0 +1,139 @@
+#pragma once
+
+#include <concepts>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace NDetail {
+
+template <class TSignature>
+struct TTypeErasureTraits;
+
+template <class TResult, bool NoExcept, class... TArgs>
+struct TTypeErasureTraits<TResult(TArgs...) noexcept(NoExcept)>
+{
+ using TSignature = TResult(TArgs...) noexcept(NoExcept);
+
+ // TODO(arkady-e1ppa): Support pointer-to-member-function?
+ template <class T>
+ static constexpr bool IsInvocable = NoExcept
+ ? requires (T obj, TArgs... args) {
+ { obj(std::forward<TArgs>(args)...) } noexcept -> std::same_as<TResult>;
+ }
+ : requires (T obj, TArgs... args) {
+ { obj(std::forward<TArgs>(args)...) } -> std::same_as<TResult>;
+ };
+};
+
+} // namespace NDetail
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Non-owning type-erasure container.
+/*
+ Example:
+
+ template <class T>
+ class TSerializedObject
+ {
+ public:
+ explicit TSerializedObject(T value)
+ : Object_(value)
+ { }
+
+ void Lock(TFunctionView<void(const T&)> callback)
+ {
+ auto guard = Guard(SpinLock_);
+ callback(Object_);
+ }
+
+ private:
+ TSpinLock SpinLock_;
+ T Object_;
+ };
+
+ int main()
+ {
+ TSerializedObject<int> object(42);
+
+ // object.Lock([] (const int& value) {
+ // fmt::println("Value is {}", value);
+ // });
+ // ^ CE -- cannot pass rvalue.
+
+ auto callback = [] (const int& value) {
+ fmt::println("Value is {}", value);
+ };
+
+ object.Lock(callback); // <- prints "Value is 42".
+ }
+*/
+template <class TSignature>
+class TFunctionView;
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T, class TSignature>
+concept CTypeErasable =
+ NDetail::TTypeErasureTraits<TSignature>::template IsInvocable<T> &&
+ (!std::same_as<T, TFunctionView<TSignature>>);
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TResult, bool NoExcept, class... TArgs>
+class TFunctionView<TResult(TArgs...) noexcept(NoExcept)>
+{
+public:
+ using TSignature = TResult(TArgs...) noexcept(NoExcept);
+
+ TFunctionView() = default;
+
+ template <CTypeErasable<TSignature> TConcrete>
+ TFunctionView(TConcrete& concreteRef) noexcept;
+
+ template <CTypeErasable<TSignature> TConcrete>
+ TFunctionView(TConcrete* concretePtr) noexcept;
+
+ TResult operator()(TArgs... args) noexcept(NoExcept);
+
+ explicit operator bool() const noexcept;
+
+ TFunctionView Release() noexcept;
+
+ bool IsValid() const noexcept;
+ void Reset() noexcept;
+
+ // bool operator==(const TFunctionView& other) const & = default;
+
+private:
+ // NB: Technically, this is UB according to C standard, which
+ // was not changed for C++ standard.
+ // This is so because it is allowed to have
+ // function pointers to be modelled by entities
+ // different from object pointers.
+ // No reasonable system architecture (e.g. x86 or ARM
+ // or any other POSIX compliant one) does this.
+ // No reasonable compiler (clang/gcc) does anything with this.
+ // Accounting for such requirement would cause this class
+ // to have std::variant-like storage which would make this class
+ // weight more. Thus, we have decided to keep it this way,
+ // since we are safe on x86 or ARM + clang.
+ using TErasedPtr = void*;
+ using TErasedInvoke = TResult(*)(TArgs..., TErasedPtr);
+
+ TErasedPtr Ptr_ = nullptr;
+ TErasedInvoke Invoke_ = nullptr;
+
+ template <class TConcrete>
+ static TResult ConcreteInvoke(TArgs... args, TErasedPtr ptr) noexcept(NoExcept);
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define FUNCTION_VIEW_INL_H_
+#include "function_view-inl.h"
+#undef FUNCTION_VIEW_INL_H_
diff --git a/yt/yt/core/concurrency/fiber.cpp b/yt/yt/core/concurrency/fiber.cpp
index 6da8552aef..9bdb6fb35c 100644
--- a/yt/yt/core/concurrency/fiber.cpp
+++ b/yt/yt/core/concurrency/fiber.cpp
@@ -4,6 +4,7 @@
#include <yt/yt/core/profiling/timing.h>
+#include <yt/yt/core/misc/intrusive_mpsc_stack.h>
#include <yt/yt/core/misc/singleton.h>
#include <yt/yt/core/misc/finally.h>
@@ -113,6 +114,9 @@ private:
class TFiberRegistry
{
+ template <class Tag>
+ using TFiberStack = TIntrusiveMPSCStack<TFiber, Tag>;
+
public:
//! Do not rename, change the signature, or drop Y_NO_INLINE.
//! Used in devtools/gdb/yt_fibers_printer.py.
@@ -121,43 +125,107 @@ public:
return LeakySingleton<TFiberRegistry>();
}
- TFiber::TCookie Register(TFiber* fiber)
+ void Register(TFiber* fiber)
{
- auto guard = Guard(Lock_);
- return Fibers_.insert(Fibers_.begin(), fiber);
+ RegisterQueue_.Push(fiber);
+
+ if (auto guard = TTryGuard(Lock_)) {
+ GuardedProcessQueues();
+ }
}
- void Unregister(TFiber::TCookie cookie)
+ void Unregister(TFiber* fiber)
{
- auto guard = Guard(Lock_);
- Fibers_.erase(cookie);
+ UnregisterQueue_.Push(fiber);
+
+ if (auto guard = TTryGuard(Lock_)) {
+ GuardedProcessQueues();
+ }
}
- std::vector<TFiberPtr> List()
+ void ReadFibers(TFunctionView<void(TFiber::TFiberList&)> callback)
{
auto guard = Guard(Lock_);
- std::vector<TFiberPtr> fibers;
- for (const auto& fiber : Fibers_) {
- fibers.push_back(fiber);
- }
- return fibers;
+
+ GuardedProcessQueues();
+
+ callback(Fibers_);
+
+ GuardedProcessQueues();
+ }
+
+ ~TFiberRegistry()
+ {
+ GuardedProcessQueues();
}
private:
+ TFiberStack<NDetail::TFiberRegisterTag> RegisterQueue_;
+ TFiberStack<NDetail::TFiberUnregisterTag> UnregisterQueue_;
+
NThreading::TForkAwareSpinLock Lock_;
- std::list<TFiber*> Fibers_;
+ TFiber::TFiberList Fibers_;
+
+ void GuardedProcessQueues()
+ {
+ Fibers_.Append(RegisterQueue_.PopAll());
+
+ auto toUnregister = UnregisterQueue_.PopAll();
+
+ while (auto fiber = toUnregister.PopBack()) {
+ fiber->UnregisterAndDelete();
+ }
+
+ // NB: Around this line guard is released. We do not properly double check
+ // if queues are actually empty after this.
+ // We are okay with this since we expect to have occasional calls of this method
+ // which would unstuck most of the fibers. In dtor of this singleton we
+ // release the last batch of stuck fibers.
+ };
+
+ void DebugPrint()
+ {
+ Cerr << "Debug print begin\n";
+ Cerr << "---------------------------------------------------------------" << '\n';
+ for (auto* iter = Fibers_.Begin(); iter != Fibers_.End(); iter = iter->Next) {
+ auto* fiber = static_cast<TFiber*>(iter);
+ auto* regNode = static_cast<TIntrusiveNode<TFiber, NDetail::TFiberRegisterTag>*>(fiber);
+ auto* delNode = static_cast<TIntrusiveNode<TFiber, NDetail::TFiberUnregisterTag>*>(fiber);
+
+ Cerr << Format("Fiber node at %v. Next is %v, Prev is %v", iter, iter->Next, iter->Prev) << '\n';
+ Cerr << Format("Fiber address after cast is %v", fiber) << '\n';
+ Cerr << Format("Fiber registration queue status: Next: %v, Prev: %v", regNode->Next, regNode->Prev) << '\n';
+ // NB: Reading deletion queue is data race. Don't do this under tsan.
+ Cerr << Format("Fiber deletion queue status: Next: %v, Prev: %v", delNode->Next, delNode->Prev) << '\n';
+ Cerr << "---------------------------------------------------------------" << '\n';
+ }
+ Cerr << "Debug print end\n";
+ }
};
////////////////////////////////////////////////////////////////////////////////
+TFiber* TFiber::CreateFiber(EExecutionStackKind stackKind)
+{
+ return new TFiber(stackKind);
+}
+
+void TFiber::ReleaseFiber(TFiber* fiber)
+{
+ YT_VERIFY(fiber);
+ fiber->SetFinished();
+ fiber->Clear();
+ TFiberRegistry::Get()->Unregister(fiber);
+}
+
TFiber::TFiber(EExecutionStackKind stackKind)
: Stack_(CreateExecutionStack(stackKind))
- , RegistryCookie_(TFiberRegistry::Get()->Register(this))
, MachineContext_({
this,
TArrayRef(static_cast<char*>(Stack_->GetStack()), Stack_->GetSize()),
})
{
+ TFiberRegistry::Get()->Register(this);
TFiberProfiler::Get()->OnFiberCreated();
TFiberProfiler::Get()->OnStackAllocated(Stack_->GetSize());
}
@@ -166,7 +234,6 @@ TFiber::~TFiber()
{
YT_VERIFY(GetState() == EFiberState::Finished);
TFiberProfiler::Get()->OnStackFreed(Stack_->GetSize());
- TFiberRegistry::Get()->Unregister(RegistryCookie_);
}
bool TFiber::CheckFreeStackSpace(size_t space) const
@@ -222,7 +289,6 @@ void TFiber::SetWaiting()
void TFiber::SetFinished()
{
State_.store(EFiberState::Finished);
- Clear();
}
void TFiber::SetIdle()
@@ -276,9 +342,17 @@ void TFiber::Clear()
Fls_.reset();
}
-std::vector<TFiberPtr> TFiber::List()
+void TFiber::ReadFibers(TFunctionView<void(TFiberList&)> callback)
{
- return TFiberRegistry::Get()->List();
+ return TFiberRegistry::Get()->ReadFibers(callback);
+}
+
+void TFiber::UnregisterAndDelete() noexcept
+{
+ YT_VERIFY(!static_cast<TUnregisterBase*>(this)->IsLinked());
+
+ static_cast<TRegisterBase*>(this)->Unlink();
+ delete this;
}
namespace NDetail {
diff --git a/yt/yt/core/concurrency/fiber.h b/yt/yt/core/concurrency/fiber.h
index 1212fd9752..e42510b26a 100644
--- a/yt/yt/core/concurrency/fiber.h
+++ b/yt/yt/core/concurrency/fiber.h
@@ -4,23 +4,56 @@
#include "propagating_storage.h"
#include "fls.h"
+#include <yt/yt/core/misc/intrusive_mpsc_stack.h>
+
+#include <library/cpp/yt/misc/function_view.h>
+
#include <util/system/context.h>
#include <atomic>
-#include <list>
namespace NYT::NConcurrency {
////////////////////////////////////////////////////////////////////////////////
+class TFiberRegistry;
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TFiberRegisterTag
+{ };
+
+struct TFiberUnregisterTag
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Do not change inheritence order or layout.
+// Some offsets are hardcoded at devtools/gdb/yt_fibers_printer.py.
class TFiber
- : public TRefCounted
+ : public TIntrusiveNode<TFiber, NDetail::TFiberRegisterTag>
+ , public TIntrusiveNode<TFiber, NDetail::TFiberUnregisterTag>
, public ITrampoLine
{
+ using TRegisterBase = TIntrusiveNode<TFiber, NDetail::TFiberRegisterTag>;
+ using TUnregisterBase = TIntrusiveNode<TFiber, NDetail::TFiberUnregisterTag>;
+
public:
- using TCookie = std::list<TFiber*>::iterator;
+ using TFiberList = TSimpleIntrusiveList<TFiber, NDetail::TFiberRegisterTag>;
+
+ static TFiber* CreateFiber(EExecutionStackKind stackKind = EExecutionStackKind::Small);
+
+ // Set this as AfterSwitch to release fiber's resources.
+ static void ReleaseFiber(TFiber* fiber);
- explicit TFiber(EExecutionStackKind stackKind = EExecutionStackKind::Small);
~TFiber();
void Recreate();
@@ -33,7 +66,6 @@ public:
void SetRunning();
void SetWaiting();
- void SetFinished();
void SetIdle();
bool TryIntrospectWaiting(EFiberState& state, const std::function<void()>& func);
@@ -42,11 +74,11 @@ public:
const TPropagatingStorage& GetPropagatingStorage() const;
TFls* GetFls() const;
- static std::vector<TFiberPtr> List();
+ static void ReadFibers(TFunctionView<void(TFiberList&)> callback);
private:
const std::shared_ptr<TExecutionStack> Stack_;
- const TCookie RegistryCookie_;
+
TExceptionSafeContext MachineContext_;
std::atomic<TFiberId> FiberId_ = InvalidFiberId;
@@ -55,12 +87,17 @@ private:
std::unique_ptr<TFls> Fls_;
+ explicit TFiber(EExecutionStackKind stackKind = EExecutionStackKind::Small);
+
+ void SetFinished();
void Clear();
void DoRunNaked() override;
-};
-DEFINE_REFCOUNTED_TYPE(TFiber)
+ void UnregisterAndDelete() noexcept;
+
+ friend class ::NYT::NConcurrency::TFiberRegistry;
+};
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
index 62c0e8ad68..62f8afd79a 100644
--- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
@@ -17,6 +17,8 @@
#include <library/cpp/yt/memory/memory_tag.h>
+#include <library/cpp/yt/misc/function_view.h>
+
#include <library/cpp/yt/threading/fork_aware_spin_lock.h>
#include <util/thread/lfstack.h>
@@ -25,6 +27,12 @@
namespace NYT::NConcurrency {
+// NB(arkady-e1ppa): Please run core tests with this macro undefined
+// if you are changing fibers.
+#define YT_REUSE_FIBERS
+
+////////////////////////////////////////////////////////////////////////////////
+
using namespace NLogging;
using namespace NProfiling;
@@ -61,8 +69,41 @@ DEFINE_REFCOUNTED_TYPE(TRefCountedGauge)
////////////////////////////////////////////////////////////////////////////////
+// TODO(arkady-e1ppa): Add noexcept perhaps?
+using TAfterSwitch = TFunctionView<void()>;
+
+// NB: We use MakeAfterSwitch to wrap our lambda in order to move closure
+// on the caller's stack (initially it is on the suspended fiber's stack).
+// We do that because callback can resume fiber which will destroy the
+// closure on its stack creating the risk of stack-use-after-scope.
+// The only safe place at that moment is caller's stack frame.
+template <CInvocable<void()> T>
+auto MakeAfterSwitch(T&& lambda)
+{
+ class TMoveOnCall
+ {
+ public:
+ explicit TMoveOnCall(T&& lambda)
+ : Lambda_(std::move(lambda))
+ { }
+
+ void operator()()
+ {
+ auto lambda = std::move(Lambda_);
+ lambda();
+ }
+
+ private:
+ T Lambda_;
+ };
+
+ return TMoveOnCall(std::move(lambda));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void RunInFiberContext(TFiber* fiber, TClosure callback);
-void SwitchFromThread(TFiberPtr targetFiber);
+void SwitchFromThread(TFiber* targetFiber);
////////////////////////////////////////////////////////////////////////////////
@@ -84,9 +125,9 @@ struct TFiberContext
const TRefCountedGaugePtr WaitingFibersCounter;
TExceptionSafeContext MachineContext;
- TClosure AfterSwitch;
- TFiberPtr ResumerFiber;
- TFiberPtr CurrentFiber;
+ TAfterSwitch AfterSwitch;
+ TFiber* ResumerFiber = nullptr;
+ TFiber* CurrentFiber = nullptr;
};
YT_THREAD_LOCAL(TFiberContext*) FiberContext;
@@ -150,50 +191,50 @@ Y_FORCE_INLINE TExceptionSafeContext* GetMachineContext()
return &TryGetFiberContext()->MachineContext;
}
-Y_FORCE_INLINE void SetAfterSwitch(TClosure&& closure)
+Y_FORCE_INLINE void SetAfterSwitch(TAfterSwitch afterSwitch)
{
auto* context = TryGetFiberContext();
- YT_VERIFY(!context->AfterSwitch);
- context->AfterSwitch = std::move(closure);
+ YT_VERIFY(!context->AfterSwitch.IsValid());
+ context->AfterSwitch = afterSwitch;
}
-Y_FORCE_INLINE TClosure ExtractAfterSwitch()
+Y_FORCE_INLINE TAfterSwitch ExtractAfterSwitch()
{
auto* context = TryGetFiberContext();
- return std::move(context->AfterSwitch);
+ return context->AfterSwitch.Release();
}
-Y_FORCE_INLINE void SetResumerFiber(TFiberPtr fiber)
+Y_FORCE_INLINE void SetResumerFiber(TFiber* fiber)
{
auto* context = TryGetFiberContext();
YT_VERIFY(!context->ResumerFiber);
- context->ResumerFiber = std::move(fiber);
+ context->ResumerFiber = fiber;
}
-Y_FORCE_INLINE TFiberPtr ExtractResumerFiber()
+Y_FORCE_INLINE TFiber* ExtractResumerFiber()
{
- return std::move(TryGetFiberContext()->ResumerFiber);
+ return std::exchange(TryGetFiberContext()->ResumerFiber, nullptr);
}
Y_FORCE_INLINE TFiber* TryGetResumerFiber()
{
- return TryGetFiberContext()->ResumerFiber.Get();
+ return TryGetFiberContext()->ResumerFiber;
}
-Y_FORCE_INLINE TFiberPtr SwapCurrentFiber(TFiberPtr fiber)
+Y_FORCE_INLINE TFiber* SwapCurrentFiber(TFiber* fiber)
{
- return std::exchange(TryGetFiberContext()->CurrentFiber, std::move(fiber));
+ return std::exchange(TryGetFiberContext()->CurrentFiber, fiber);
}
Y_FORCE_INLINE TFiber* TryGetCurrentFiber()
{
auto* context = TryGetFiberContext();
- return context ? context->CurrentFiber.Get() : nullptr;
+ return context ? context->CurrentFiber : nullptr;
}
Y_FORCE_INLINE TFiber* GetCurrentFiber()
{
- auto* fiber = TryGetFiberContext()->CurrentFiber.Get();
+ auto* fiber = TryGetFiberContext()->CurrentFiber;
YT_VERIFY(fiber);
return fiber;
}
@@ -227,15 +268,14 @@ void SwitchMachineContext(TExceptionSafeContext* from, TExceptionSafeContext* to
YT_VERIFY(!ExtractAfterSwitch());
}
-void SwitchFromThread(TFiberPtr targetFiber)
+void SwitchFromThread(TFiber* targetFiber)
{
YT_ASSERT(targetFiber);
targetFiber->SetRunning();
auto* targetContext = targetFiber->GetMachineContext();
-
- auto currentFiber = SwapCurrentFiber(std::move(targetFiber));
+ auto currentFiber = SwapCurrentFiber(targetFiber);
YT_VERIFY(!currentFiber);
SwitchMachineContext(GetMachineContext(), targetContext);
@@ -243,28 +283,29 @@ void SwitchFromThread(TFiberPtr targetFiber)
YT_VERIFY(!TryGetCurrentFiber());
}
-[[noreturn]] void SwitchToThread()
+[[noreturn]] void SwitchToThread(TAfterSwitch afterSwitch)
{
auto currentFiber = SwapCurrentFiber(nullptr);
auto* currentContext = currentFiber->GetMachineContext();
- currentFiber.Reset();
+ SetAfterSwitch(afterSwitch);
SwitchMachineContext(currentContext, GetMachineContext());
YT_ABORT();
}
-void SwitchFromFiber(TFiberPtr targetFiber)
+void SwitchFromFiber(TFiber* targetFiber, TAfterSwitch afterSwitch)
{
YT_ASSERT(targetFiber);
targetFiber->SetRunning();
auto* targetContext = targetFiber->GetMachineContext();
- auto currentFiber = SwapCurrentFiber(std::move(targetFiber));
+ auto currentFiber = SwapCurrentFiber(targetFiber);
YT_VERIFY(currentFiber->GetState() != EFiberState::Waiting);
auto* currentContext = currentFiber->GetMachineContext();
+ SetAfterSwitch(afterSwitch);
SwitchMachineContext(currentContext, targetContext);
YT_VERIFY(TryGetCurrentFiber() == currentFiber);
@@ -282,19 +323,25 @@ public:
return LeakySingleton<TIdleFiberPool>();
}
- void EnqueueIdleFiber(TFiberPtr fiber)
+ // NB(lukyan): Switch out and add fiber to idle fibers.
+ // Save fiber in AfterSwitch because it can be immediately concurrently reused.
+ void SwichFromFiberAndMakeItIdle(TFiber* currentFiber, TFiber* targetFiber)
{
- IdleFibers_.Enqueue(std::move(fiber));
- if (DestroyingIdleFibers_.load()) {
- DoDestroyIdleFibers();
- }
+ auto afterSwitch = MakeAfterSwitch([currentFiber, this] {
+ currentFiber->SetIdle();
+ EnqueueIdleFiber(currentFiber);
+ });
+
+ SwitchFromFiber(targetFiber, afterSwitch);
}
- TFiberPtr TryDequeueIdleFiber()
+ TFiber* GetFiber()
{
- TFiberPtr fiber;
- IdleFibers_.Dequeue(&fiber);
- return fiber;
+ if (auto* fiber = TryDequeueIdleFiber()) {
+ return fiber;
+ }
+
+ return TFiber::CreateFiber();
}
private:
@@ -303,9 +350,23 @@ private:
BIND_NO_PROPAGATE(&TIdleFiberPool::DestroyIdleFibers, this),
/*priority*/ -100);
- TLockFreeStack<TFiberPtr> IdleFibers_;
+ TLockFreeStack<TFiber*> IdleFibers_;
std::atomic<bool> DestroyingIdleFibers_ = false;
+ void EnqueueIdleFiber(TFiber* fiber)
+ {
+ IdleFibers_.Enqueue(fiber);
+ if (DestroyingIdleFibers_.load()) {
+ DoDestroyIdleFibers();
+ }
+ }
+
+ TFiber* TryDequeueIdleFiber()
+ {
+ TFiber* fiber = nullptr;
+ IdleFibers_.Dequeue(&fiber);
+ return fiber;
+ }
void DestroyIdleFibers()
{
@@ -319,11 +380,11 @@ private:
TFiberContext fiberContext;
TFiberContextGuard fiberContextGuard(&fiberContext);
- std::vector<TFiberPtr> fibers;
+ std::vector<TFiber*> fibers;
IdleFibers_.DequeueAll(&fibers);
- for (const auto& fiber : fibers) {
- SwitchFromThread(std::move(fiber));
+ for (auto fiber : fibers) {
+ SwitchFromThread(fiber);
}
};
@@ -354,6 +415,20 @@ private:
////////////////////////////////////////////////////////////////////////////////
+Y_FORCE_INLINE TClosure PickCallback(TFiberSchedulerThread* fiberThread)
+{
+ TCallback<void()> callback;
+ // We wrap fiberThread->OnExecute() into a propagating storage guard to ensure
+ // that the propagating storage created there won't spill into the fiber callbacks.
+ TNullPropagatingStorageGuard guard;
+ YT_VERIFY(guard.GetOldStorage().IsNull());
+ callback = fiberThread->OnExecute();
+
+ return callback;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
void FiberTrampoline()
{
RunAfterSwitch();
@@ -361,19 +436,13 @@ void FiberTrampoline()
YT_LOG_DEBUG("Fiber started");
auto* currentFiber = GetCurrentFiber();
+ TFiber* successorFiber = nullptr;
// Break loop to terminate fiber
while (auto* fiberThread = TryGetFiberThread()) {
YT_VERIFY(!TryGetResumerFiber());
- TCallback<void()> callback;
- {
- // We wrap fiberThread->OnExecute() into a propagating storage guard to ensure
- // that the propagating storage created there won't spill into the fiber callbacks.
- TNullPropagatingStorageGuard guard;
- YT_VERIFY(guard.GetOldStorage().IsNull());
- callback = fiberThread->OnExecute();
- }
+ auto callback = PickCallback(fiberThread);
if (!callback) {
break;
@@ -387,30 +456,11 @@ void FiberTrampoline()
// Trace context can be restored for resumer fiber, so current trace context and memory tag are
// not necessarily null. Check them after switch from and returning into current fiber.
- if (auto resumerFiber = ExtractResumerFiber()) {
+ if (successorFiber = ExtractResumerFiber()) {
// Suspend current fiber.
#ifdef YT_REUSE_FIBERS
- {
- // TODO(lukyan): Use simple callbacks without memory allocation.
- // Make TFiber::MakeIdle method instead of lambda function.
- // Switch out and add fiber to idle fibers.
- // Save fiber in AfterSwitch because it can be immediately concurrently reused.
- SetAfterSwitch(BIND_NO_PROPAGATE([currentFiber = MakeStrong(currentFiber)] () mutable {
- currentFiber->SetIdle();
- TIdleFiberPool::Get()->EnqueueIdleFiber(std::move(currentFiber));
- }));
- }
-
- // Switched to ResumerFiber or thread main.
- SwitchFromFiber(std::move(resumerFiber));
+ TIdleFiberPool::Get()->SwichFromFiberAndMakeItIdle(currentFiber, successorFiber);
#else
- SetAfterSwitch(BIND_NO_PROPAGATE([
- currentFiber = MakeStrong(currentFiber),
- resumerFiber = std::move(resumerFiber)
- ] () mutable {
- currentFiber.Reset();
- SwitchFromThread(std::move(resumerFiber));
- }));
break;
#endif
}
@@ -418,56 +468,60 @@ void FiberTrampoline()
YT_LOG_DEBUG("Fiber finished");
- SetAfterSwitch(BIND_NO_PROPAGATE([currentFiber = MakeStrong(currentFiber)] () mutable {
- currentFiber->SetFinished();
- currentFiber.Reset();
- }));
+ auto afterSwitch = MakeAfterSwitch([currentFiber, successorFiber] () mutable {
+ TFiber::ReleaseFiber(currentFiber);
+
+#ifdef YT_REUSE_FIBERS
+ Y_UNUSED(successorFiber);
+#else
+ if (successorFiber != nullptr) {
+ SwitchFromThread(successorFiber);
+ }
+#endif
+ });
// All allocated objects in this frame must be destroyed here.
- SwitchToThread();
+ SwitchToThread(afterSwitch);
}
-void YieldFiber(TClosure afterSwitch)
+void YieldFiber(TAfterSwitch afterSwitch)
{
YT_VERIFY(TryGetCurrentFiber());
- SetAfterSwitch(std::move(afterSwitch));
-
// Try to get resumer.
auto targetFiber = ExtractResumerFiber();
// If there is no resumer switch to idle fiber. Or switch to thread main.
#ifdef YT_REUSE_FIBERS
if (!targetFiber) {
- targetFiber = TIdleFiberPool::Get()->TryDequeueIdleFiber();
+ targetFiber = TIdleFiberPool::Get()->GetFiber();
}
#endif
if (!targetFiber) {
- targetFiber = New<TFiber>();
+ targetFiber = TFiber::CreateFiber();
}
auto waitingFibersCounter = GetWaitingFibersCounter();
waitingFibersCounter->Increment(1);
- SwitchFromFiber(std::move(targetFiber));
+ SwitchFromFiber(targetFiber, afterSwitch);
YT_VERIFY(TryGetResumerFiber());
waitingFibersCounter->Increment(-1);
}
-void ResumeFiber(TFiberPtr targetFiber)
+void ResumeFiber(TFiber* targetFiber)
{
- TMemoryTagGuard guard(NullMemoryTag);
-
- auto currentFiber = MakeStrong(GetCurrentFiber());
+ auto currentFiber = GetCurrentFiber();
SetResumerFiber(currentFiber);
- SetAfterSwitch(BIND_NO_PROPAGATE([currentFiber = std::move(currentFiber)] {
+ auto afterSwitch = MakeAfterSwitch([currentFiber] {
currentFiber->SetWaiting();
- }));
+ });
+
+ SwitchFromFiber(targetFiber, afterSwitch);
- SwitchFromFiber(std::move(targetFiber));
YT_VERIFY(!TryGetResumerFiber());
}
@@ -800,13 +854,13 @@ Y_NO_INLINE void RunInFiberContext(TFiber* fiber, TClosure callback)
class TResumeGuard
{
public:
- TResumeGuard(TFiberPtr fiber, TCancelerPtr canceler)
- : Fiber_(std::move(fiber))
+ TResumeGuard(TFiber* fiber, TCancelerPtr canceler) noexcept
+ : Fiber_(fiber)
, Canceler_(std::move(canceler))
{ }
- explicit TResumeGuard(TResumeGuard&& other)
- : Fiber_(std::move(other.Fiber_))
+ explicit TResumeGuard(TResumeGuard&& other) noexcept
+ : Fiber_(other.Release())
, Canceler_(std::move(other.Canceler_))
{ }
@@ -819,7 +873,7 @@ public:
{
YT_VERIFY(Fiber_);
Canceler_.Reset();
- NDetail::ResumeFiber(std::move(Fiber_));
+ NDetail::ResumeFiber(Release());
}
~TResumeGuard()
@@ -831,13 +885,20 @@ public:
Canceler_.Reset();
GetFinalizerInvoker()->Invoke(
- BIND_NO_PROPAGATE(&NDetail::ResumeFiber, Passed(std::move(Fiber_))));
+ BIND_NO_PROPAGATE([fiber = Release()] {
+ NDetail::ResumeFiber(fiber);
+ }));
}
}
private:
- TFiberPtr Fiber_;
+ TFiber* Fiber_;
TCancelerPtr Canceler_;
+
+ TFiber* Release()
+ {
+ return std::exchange(Fiber_, nullptr);
+ }
};
} // namespace NDetail
@@ -865,7 +926,7 @@ void TFiberSchedulerThread::ThreadMain()
NDetail::TFiberContext fiberContext(this, ThreadGroupName_);
NDetail::TFiberContextGuard fiberContextGuard(&fiberContext);
- NDetail::SwitchFromThread(New<TFiber>());
+ NDetail::SwitchFromThread(TFiber::CreateFiber());
YT_LOG_DEBUG("Thread stopped (Name: %v)",
GetThreadName());
@@ -968,29 +1029,29 @@ void WaitUntilSet(TFuture<void> future, IInvokerPtr invoker)
// TODO(lukyan): transfer resumer as argument of AfterSwitch.
// Use CallOnTop like in boost.
- auto afterSwitch = BIND_NO_PROPAGATE([
+ auto afterSwitch = NDetail::MakeAfterSwitch([
canceler,
invoker = std::move(invoker),
future = std::move(future),
- currentFiber = MakeStrong(currentFiber)
+ currentFiber
] () mutable {
currentFiber->SetWaiting();
future.Subscribe(BIND_NO_PROPAGATE([
invoker = std::move(invoker),
- currentFiber = std::move(currentFiber),
+ currentFiber,
canceler = std::move(canceler)
] (const TError&) mutable {
YT_LOG_DEBUG("Waking up fiber (TargetFiberId: %x)",
canceler->GetFiberId());
invoker->Invoke(
- BIND_NO_PROPAGATE(NDetail::TResumeGuard(std::move(currentFiber), std::move(canceler))));
+ BIND_NO_PROPAGATE(NDetail::TResumeGuard(currentFiber, std::move(canceler))));
}));
});
{
NDetail::TFiberSwitchHandler::TGuard switchGuard;
- NDetail::YieldFiber(std::move(afterSwitch));
+ NDetail::YieldFiber(afterSwitch);
}
if (canceler->IsCanceled()) {
diff --git a/yt/yt/core/concurrency/private.h b/yt/yt/core/concurrency/private.h
index 5d300edee3..7c1558e0ad 100644
--- a/yt/yt/core/concurrency/private.h
+++ b/yt/yt/core/concurrency/private.h
@@ -50,7 +50,7 @@ using TMpscSuspendableSingleQueueSchedulerThreadPtr = TIntrusivePtr<TMpscSuspend
////////////////////////////////////////////////////////////////////////////////
-DECLARE_REFCOUNTED_CLASS(TFiber)
+class TFiber;
DECLARE_REFCOUNTED_CLASS(TSchedulerThread)
diff --git a/yt/yt/core/concurrency/public.h b/yt/yt/core/concurrency/public.h
index ee0b58635a..7bcc13437f 100644
--- a/yt/yt/core/concurrency/public.h
+++ b/yt/yt/core/concurrency/public.h
@@ -6,11 +6,6 @@ namespace NYT::NConcurrency {
////////////////////////////////////////////////////////////////////////////////
-// Enables fiber instances reuse for improved performance.
-#define YT_REUSE_FIBERS
-
-////////////////////////////////////////////////////////////////////////////////
-
DECLARE_REFCOUNTED_CLASS(TActionQueue)
DECLARE_REFCOUNTED_STRUCT(IThreadPool)
@@ -107,7 +102,7 @@ DECLARE_REFCOUNTED_STRUCT(IPoolWeightProvider)
DECLARE_REFCOUNTED_STRUCT(ITwoLevelFairShareThreadPool)
-DECLARE_REFCOUNTED_CLASS(TFiber)
+class TFiber;
DECLARE_REFCOUNTED_STRUCT(TFairThrottlerConfig)
DECLARE_REFCOUNTED_STRUCT(TFairThrottlerBucketConfig)
diff --git a/yt/yt/core/misc/intrusive_list-inl.h b/yt/yt/core/misc/intrusive_list-inl.h
new file mode 100644
index 0000000000..9bf00d4eb1
--- /dev/null
+++ b/yt/yt/core/misc/intrusive_list-inl.h
@@ -0,0 +1,152 @@
+#ifndef INTRUSIVE_LIST_INL_H_
+#error "Direct inclusion of this file is not allowed, include intrusive_list.h"
+// For the sake of sane code completion.
+#include "intrusive_list.h"
+#endif
+
+#include <concepts>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T, class Tag>
+void TIntrusiveNode<T, Tag>::Unlink() noexcept
+{
+ if (Next) {
+ Next->Prev = Prev;
+ }
+
+ if (Prev) {
+ Prev->Next = Next;
+ }
+
+ Prev = Next = nullptr;
+}
+
+template <class T, class Tag>
+void TIntrusiveNode<T, Tag>::LinkBefore(TIntrusiveNode* next) noexcept
+{
+ YT_VERIFY(!IsLinked());
+
+ Prev = next->Prev;
+ Prev->Next = this;
+ Next = next;
+ next->Prev = this;
+}
+
+template <class T, class Tag>
+bool TIntrusiveNode<T, Tag>::IsLinked() const noexcept
+{
+ return (Next != nullptr) || (Prev != nullptr);
+}
+
+template <class T, class Tag>
+T* TIntrusiveNode<T, Tag>::AsItem() noexcept
+{
+ return static_cast<T*>(this);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T, class Tag>
+TSimpleIntrusiveList<T, Tag>::TSimpleIntrusiveList() noexcept
+{
+ InitializeEmpty();
+}
+
+template <class T, class Tag>
+TSimpleIntrusiveList<T, Tag>::TSimpleIntrusiveList(TList&& other) noexcept
+{
+ InitializeEmpty();
+ Append(std::move(other));
+}
+
+template <class T, class Tag>
+void TSimpleIntrusiveList<T, Tag>::PushBack(TNode* node) noexcept
+{
+ node->LinkBefore(&Head_);
+}
+
+template <class T, class Tag>
+void TSimpleIntrusiveList<T, Tag>::PushFront(TNode* node) noexcept
+{
+ node->LinkBefore(Head_.Next);
+}
+
+template <class T, class Tag>
+T*TSimpleIntrusiveList<T, Tag>::PopBack() noexcept
+{
+ if (IsEmpty()) {
+ return nullptr;
+ }
+
+ TNode* back = Head_.Prev;
+ back->Unlink();
+ return back->AsItem();
+}
+
+template <class T, class Tag>
+T* TSimpleIntrusiveList<T, Tag>::PopFront() noexcept
+{
+ if (IsEmpty()) {
+ return nullptr;
+ }
+
+ TNode* front = Head_.Next;
+ front->Unlink();
+ return front->AsItem();
+}
+
+template <class T, class Tag>
+void TSimpleIntrusiveList<T, Tag>::Append(TList&& other) noexcept
+{
+ if (other.IsEmpty()) {
+ return;
+ }
+
+ auto* other_front = other.Head_.Next;
+ auto* current_back = Head_.Prev;
+ current_back->Next = other_front;
+ other_front->Prev = current_back;
+
+ auto* other_back = other.Head_.Prev;
+ other_back->Next = &Head_;
+ Head_.Prev = other_back;
+
+ other.InitializeEmpty();
+}
+
+template <class T, class Tag>
+bool TSimpleIntrusiveList<T, Tag>::IsEmpty() const noexcept
+{
+ return Head_.Next == &Head_;
+}
+
+template <class T, class Tag>
+TIntrusiveNode<T, Tag>* TSimpleIntrusiveList<T, Tag>::Begin()
+{
+ return Head_.Next;
+}
+
+template <class T, class Tag>
+TIntrusiveNode<T, Tag>* TSimpleIntrusiveList<T, Tag>::End()
+{
+ return &Head_;
+}
+
+template <class T, class Tag>
+TSimpleIntrusiveList<T, Tag>::~TSimpleIntrusiveList()
+{
+ YT_VERIFY(IsEmpty());
+}
+
+template <class T, class Tag>
+void TSimpleIntrusiveList<T, Tag>::InitializeEmpty()
+{
+ Head_.Next = Head_.Prev = &Head_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/yt/core/misc/intrusive_list.h b/yt/yt/core/misc/intrusive_list.h
new file mode 100644
index 0000000000..208e5fea18
--- /dev/null
+++ b/yt/yt/core/misc/intrusive_list.h
@@ -0,0 +1,87 @@
+#pragma once
+
+#include <cstdlib>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TIntrusiveNodeDefaultTag
+{ };
+
+////////////////////////////////////////////////////////////////////////////////
+
+// NB1: util/intrlist.h inits pointers differently
+// and also doesn't provide a way to directly modify
+// Next/Prev making it unusable in lockfree.
+// NB2: yt/containers/intrusive_linked_list.h doesn't provide tag
+// and gives quite a bit of overhead in the list (ItemToNode field, extra pointer
+// and size field). It would be a bit more annoying to hardcode in introspection.
+// TODO(arkady-e1ppa): Change util/intrlist.h to support lf algos and use it one day?
+template <class T, class Tag = TIntrusiveNodeDefaultTag>
+class TIntrusiveNode
+{
+public:
+ TIntrusiveNode* Next = nullptr;
+ TIntrusiveNode* Prev = nullptr;
+
+ TIntrusiveNode() = default;
+
+ TIntrusiveNode(const TIntrusiveNode& other) = delete;
+ TIntrusiveNode& operator=(const TIntrusiveNode& other) = delete;
+
+ void Unlink() noexcept;
+
+ void LinkBefore(TIntrusiveNode* next) noexcept;
+
+ bool IsLinked() const noexcept;
+
+ T* AsItem() noexcept;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T, class Tag = TIntrusiveNodeDefaultTag>
+class TSimpleIntrusiveList
+{
+ using TNode = TIntrusiveNode<T, Tag>;
+ using TList = TSimpleIntrusiveList<T, Tag>;
+
+public:
+ TSimpleIntrusiveList() noexcept;
+ TSimpleIntrusiveList(TSimpleIntrusiveList&& other) noexcept;
+
+ TSimpleIntrusiveList(const TSimpleIntrusiveList& other) = delete;
+ TSimpleIntrusiveList& operator=(const TSimpleIntrusiveList& other) = delete;
+ TSimpleIntrusiveList& operator=(TSimpleIntrusiveList&& other) = delete;
+
+ ~TSimpleIntrusiveList();
+
+ void PushBack(TNode* node) noexcept;
+ void PushFront(TNode* node) noexcept;
+
+ T* PopBack() noexcept;
+ T* PopFront() noexcept;
+
+ void Append(TList&& other) noexcept;
+
+ bool IsEmpty() const noexcept;
+
+ TNode* Begin();
+
+ TNode* End();
+
+private:
+ // Sentinel node.
+ TNode Head_;
+
+ void InitializeEmpty();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define INTRUSIVE_LIST_INL_H_
+#include "intrusive_list-inl.h"
+#undef INTRUSIVE_LIST_INL_H_
diff --git a/yt/yt/core/misc/intrusive_mpsc_stack-inl.h b/yt/yt/core/misc/intrusive_mpsc_stack-inl.h
new file mode 100644
index 0000000000..e4c553587e
--- /dev/null
+++ b/yt/yt/core/misc/intrusive_mpsc_stack-inl.h
@@ -0,0 +1,52 @@
+#ifndef INTRUSIVE_MPSC_STACK_INL_H_
+#error "Direct inclusion of this file is not allowed, include intrusive_mpsc_stack.h"
+// For the sake of sane code completion.
+#include "intrusive_mpsc_stack.h"
+#endif
+
+#include <concepts>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T, class Tag>
+TIntrusiveMPSCStack<T, Tag>::TIntrusiveMPSCStack() noexcept
+{
+ static_assert(std::derived_from<T, TIntrusiveNode<T, Tag>>, "Class must inherit from CRTP-base TIntrusiveNode");
+}
+
+template <class T, class Tag>
+void TIntrusiveMPSCStack<T, Tag>::Push(TNode* item) noexcept
+{
+ // NB: This saves up extra CAS in case of non-empty stack.
+ item->Next = Head_.load(std::memory_order::relaxed);
+
+ while (!Head_.compare_exchange_weak(
+ item->Next,
+ item,
+ std::memory_order::release,
+ std::memory_order::relaxed))
+ { }
+}
+
+template <class T, class Tag>
+TSimpleIntrusiveList<T, Tag> TIntrusiveMPSCStack<T, Tag>::PopAll() noexcept
+{
+ TNode* head = Head_.exchange(nullptr, std::memory_order::acquire);
+
+ TSimpleIntrusiveList<T, Tag> list;
+
+ while (head != nullptr) {
+ auto tmp = head;
+ head = head->Next;
+ tmp->Next = nullptr;
+ list.PushFront(tmp);
+ }
+
+ return list;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/yt/core/misc/intrusive_mpsc_stack.h b/yt/yt/core/misc/intrusive_mpsc_stack.h
new file mode 100644
index 0000000000..80d460553e
--- /dev/null
+++ b/yt/yt/core/misc/intrusive_mpsc_stack.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "intrusive_list.h"
+
+#include <atomic>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T, class Tag = TIntrusiveNodeDefaultTag>
+class TIntrusiveMPSCStack
+{
+ using TNode = TIntrusiveNode<T, Tag>;
+
+public:
+ TIntrusiveMPSCStack() noexcept;
+
+ void Push(TNode* item) noexcept;
+
+ TSimpleIntrusiveList<T, Tag> PopAll() noexcept;
+
+private:
+ std::atomic<TNode*> Head_ = nullptr;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define INTRUSIVE_MPSC_STACK_INL_H_
+#include "intrusive_mpsc_stack-inl.h"
+#undef INTRUSIVE_MPSC_STACK_INL_H_
diff --git a/yt/yt/library/backtrace_introspector/introspect.cpp b/yt/yt/library/backtrace_introspector/introspect.cpp
index a555a8a548..38e146ab08 100644
--- a/yt/yt/library/backtrace_introspector/introspect.cpp
+++ b/yt/yt/library/backtrace_introspector/introspect.cpp
@@ -40,61 +40,67 @@ std::vector<TFiberIntrospectionInfo> IntrospectFibers()
{
YT_LOG_INFO("Fiber introspection started");
- auto fibers = TFiber::List();
-
YT_LOG_INFO("Collecting waiting fibers backtraces");
std::vector<TFiberIntrospectionInfo> infos;
THashSet<TFiberId> waitingFiberIds;
- THashSet<TFiberId> fiberIds;
- for (const auto& fiber : fibers) {
- auto fiberId = fiber->GetFiberId();
- if (fiberId == InvalidFiberId) {
- continue;
- }
-
- InsertOrCrash(fiberIds, fiberId);
+ THashMap<TFiberId, EFiberState> fiberStates;
- EFiberState state;
- if (!fiber->TryIntrospectWaiting(state, [&] {
- YT_LOG_DEBUG("Waiting fiber is successfully locked for introspection (FiberId: %x)",
- fiberId);
+ auto introspectionAction = [&] (NYT::NConcurrency::TFiber::TFiberList& fibers) {
+ for (auto* iter = fibers.Begin(); iter != fibers.End(); iter = iter->Next) {
+ auto* fiber = iter->AsItem();
- const auto& propagatingStorage = fiber->GetPropagatingStorage();
- const auto* traceContext = TryGetTraceContextFromPropagatingStorage(propagatingStorage);
+ auto fiberId = fiber->GetFiberId();
+ if (fiberId == InvalidFiberId) {
+ continue;
+ }
- TFiberIntrospectionInfo info{
- .State = EFiberState::Waiting,
- .FiberId = fiberId,
- .WaitingSince = fiber->GetWaitingSince(),
- .TraceId = traceContext ? traceContext->GetTraceId() : TTraceId(),
- .TraceLoggingTag = traceContext ? traceContext->GetLoggingTag() : TString(),
- };
+ EmplaceOrCrash(fiberStates, fiberId, EFiberState::Introspecting);
- auto optionalContext = TrySynthesizeLibunwindContextFromMachineContext(*fiber->GetMachineContext());
- if (!optionalContext) {
- YT_LOG_WARNING("Failed to synthesize libunwind context (FiberId: %x)",
+ EFiberState state;
+ if (!fiber->TryIntrospectWaiting(state, [&] {
+ YT_LOG_DEBUG("Waiting fiber is successfully locked for introspection (FiberId: %x)",
fiberId);
- return;
- }
- TLibunwindCursor cursor(*optionalContext);
- while (!cursor.IsFinished()) {
- info.Backtrace.push_back(cursor.GetCurrentIP());
- cursor.MoveNext();
+ const auto& propagatingStorage = fiber->GetPropagatingStorage();
+ const auto* traceContext = TryGetTraceContextFromPropagatingStorage(propagatingStorage);
+
+ TFiberIntrospectionInfo info{
+ .State = EFiberState::Waiting,
+ .FiberId = fiberId,
+ .WaitingSince = fiber->GetWaitingSince(),
+ .TraceId = traceContext ? traceContext->GetTraceId() : TTraceId(),
+ .TraceLoggingTag = traceContext ? traceContext->GetLoggingTag() : TString(),
+ };
+
+ auto optionalContext = TrySynthesizeLibunwindContextFromMachineContext(*fiber->GetMachineContext());
+ if (!optionalContext) {
+ YT_LOG_WARNING("Failed to synthesize libunwind context (FiberId: %x)",
+ fiberId);
+ return;
+ }
+
+ TLibunwindCursor cursor(*optionalContext);
+ while (!cursor.IsFinished()) {
+ info.Backtrace.push_back(cursor.GetCurrentIP());
+ cursor.MoveNext();
+ }
+
+ infos.push_back(std::move(info));
+ InsertOrCrash(waitingFiberIds, fiberId);
+
+ YT_LOG_DEBUG("Fiber introspection completed (FiberId: %x)",
+ info.FiberId);
+ })) {
+ YT_LOG_DEBUG("Failed to lock fiber for introspection (FiberId: %x, State: %v)",
+ fiberId,
+ state);
+ fiberStates[fiberId] = state;
}
-
- infos.push_back(std::move(info));
- InsertOrCrash(waitingFiberIds, fiberId);
-
- YT_LOG_DEBUG("Fiber introspection completed (FiberId: %x)",
- info.FiberId);
- })) {
- YT_LOG_DEBUG("Failed to lock fiber for introspection (FiberId: %x, State: %v)",
- fiberId,
- state);
}
- }
+ };
+
+ TFiber::ReadFibers(introspectionAction);
YT_LOG_INFO("Collecting running fibers backtraces");
@@ -123,8 +129,7 @@ std::vector<TFiberIntrospectionInfo> IntrospectFibers()
});
}
- for (const auto& fiber : fibers) {
- auto fiberId = fiber->GetFiberId();
+ for (const auto& [fiberId, fiberState] : fiberStates) {
if (fiberId == InvalidFiberId) {
continue;
}
@@ -136,7 +141,7 @@ std::vector<TFiberIntrospectionInfo> IntrospectFibers()
}
infos.push_back(TFiberIntrospectionInfo{
- .State = fiber->GetState(),
+ .State = fiberState,
.FiberId = fiberId,
});
}