diff options
author | arkady-e1ppa <[email protected]> | 2024-05-20 12:25:58 +0300 |
---|---|---|
committer | arkady-e1ppa <[email protected]> | 2024-05-20 12:39:26 +0300 |
commit | d2a413e97eb8e248a266f0129f6f8fecbc8605b7 (patch) | |
tree | 38ce318e38bbd14d578a65f4bb9809c3ff5d689c | |
parent | 73d783fe0b096861111e19b63fa2259296050916 (diff) |
YT-21402: Fibers refactoring pt.2: Slightly better encapsulation of fiber introspection part and moved fls back to switch handler
In this pr we refactor fiber state machine so that it is more clear what its purpose is: it protects Fls (and some other fields) across fiber-introspector interaction over the fiber life-cycle and also fix some problems related to the shutdown sequence:
-Fls is back to being created at FiberSwitchHandler (although FiberSwitchHandler is still inappropriately placed in the same UT as fiber scheduler, but that issue is intended to be fixed in later prs).
-FiberId generation is also back to FiberSwitchHandler, with its value being cached inside fiber for introspector use. This shouldn't make things worse, but would make it clearer who *owns* the FiberId and who *views* it.
-TFiberRegistry now uses shutdown callback to clear the last batch of fibers instead of doing so in dtor, which is apparently never called.
6f8c7b1c741b775128a3a7a5aeaefbb2563fcf5d
-rw-r--r-- | yt/yt/core/concurrency/fiber.cpp | 329 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fiber.h | 102 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fiber_scheduler_thread.cpp | 54 | ||||
-rw-r--r-- | yt/yt/core/concurrency/private.h | 1 | ||||
-rw-r--r-- | yt/yt/core/misc/intrusive_mpsc_stack-inl.h | 1 | ||||
-rw-r--r-- | yt/yt/library/backtrace_introspector/introspect.cpp | 8 |
6 files changed, 331 insertions, 164 deletions
diff --git a/yt/yt/core/concurrency/fiber.cpp b/yt/yt/core/concurrency/fiber.cpp index 4e2395abb9f..1edce184027 100644 --- a/yt/yt/core/concurrency/fiber.cpp +++ b/yt/yt/core/concurrency/fiber.cpp @@ -6,6 +6,7 @@ #include <yt/yt/core/misc/intrusive_mpsc_stack.h> #include <yt/yt/core/misc/singleton.h> +#include <yt/yt/core/misc/shutdown.h> #include <yt/yt/core/misc/finally.h> #include <yt/yt/library/profiling/producer.h> @@ -30,6 +31,22 @@ static constexpr auto& Logger = ConcurrencyLogger; //////////////////////////////////////////////////////////////////////////////// +#ifdef NDEBUG + #define ATOMIC_EXCHANGE_WITH_VERIFY(Atomic, NewValue, ExpectedValue, MemoryOrder) \ + YT_VERIFY(Atomic.load(std::memory_order::relaxed) == ExpectedValue); \ + Atomic.store(NewValue, MemoryOrder); \ + static_assert(true) +#else + #define ATOMIC_EXCHANGE_WITH_VERIFY(Atomic, NewValue, ExpectedValue, MemoryOrder) \ + { \ + auto observed = Atomic.exchange(NewValue, MemoryOrder); \ + YT_VERIFY(observed == ExpectedValue); \ + } \ + static_assert(true) +#endif + +//////////////////////////////////////////////////////////////////////////////// + class TFiberProfiler : public ISensorProducer { @@ -81,46 +98,8 @@ private: //////////////////////////////////////////////////////////////////////////////// -class TFiberIdGenerator -{ -public: - static TFiberIdGenerator* Get() - { - return LeakySingleton<TFiberIdGenerator>(); - } - - TFiberId Generate() - { - const TFiberId Factor = std::numeric_limits<TFiberId>::max() - 173864; - YT_ASSERT(Factor % 2 == 1); // Factor must be coprime with 2^n. - - while (true) { - auto seed = Seed_++; - auto id = seed * Factor; - if (id != InvalidFiberId) { - return id; - } - } - } - -private: - std::atomic<TFiberId> Seed_; - - DECLARE_LEAKY_SINGLETON_FRIEND() - - TFiberIdGenerator() - { - Seed_.store(static_cast<TFiberId>(::time(nullptr))); - } -}; - -//////////////////////////////////////////////////////////////////////////////// - class TFiberRegistry { - template <class Tag> - using TFiberStack = TIntrusiveMpscStack<TFiber, Tag>; - public: //! Do not rename, change the signature, or drop Y_NO_INLINE. //! Used in devtools/gdb/yt_fibers_printer.py. @@ -129,7 +108,7 @@ public: return LeakySingleton<TFiberRegistry>(); } - void Register(TFiber* fiber) + void Register(TFiber* fiber) noexcept { RegisterQueue_.Push(fiber); @@ -138,7 +117,7 @@ public: } } - void Unregister(TFiber* fiber) + void Unregister(TFiber* fiber) noexcept { UnregisterQueue_.Push(fiber); @@ -159,6 +138,9 @@ public: } private: + template <class Tag> + using TFiberStack = TIntrusiveMpscStack<TFiber, Tag>; + TFiberStack<NDetail::TFiberRegisterTag> RegisterQueue_; TFiberStack<NDetail::TFiberUnregisterTag> UnregisterQueue_; @@ -230,7 +212,7 @@ private: // We have to check ourselves that // PopBack return is a valid one. while (!toUnregister.Empty()) { - toUnregister.PopBack()->UnregisterAndDelete(); + toUnregister.PopBack()->DeleteFiber(); } // NB: Around this line guard is released. We do not properly double check @@ -266,157 +248,260 @@ private: //////////////////////////////////////////////////////////////////////////////// -TFiber* TFiber::CreateFiber(EExecutionStackKind stackKind) +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +EFiberState TFiberIntrospectionBase::TryLockAsIntrospector() noexcept { - return new TFiber(stackKind); + auto state = GetState(); + if (state != EFiberState::Waiting) { + // Locked by running fiber. + return state; + } + + State_.compare_exchange_strong( + state, + EFiberState::Introspecting, + std::memory_order::acquire, + std::memory_order::relaxed); + + return state; } -void TFiber::ReleaseFiber(TFiber* fiber) +bool TFiberIntrospectionBase::TryLockForIntrospection(EFiberState* state, TFunctionView<void()> successHandler) { - YT_VERIFY(fiber); - fiber->SetFinished(); - fiber->Clear(); - TFiberRegistry::Get()->Unregister(fiber); + auto& stateRef = *state; + stateRef = TryLockAsIntrospector(); + if (stateRef != EFiberState::Waiting) { + // Locked by running fiber. + return false; + } + + auto guard = Finally([&] { + // Release lock held by introspector. + ATOMIC_EXCHANGE_WITH_VERIFY( + State_, + EFiberState::Waiting, + EFiberState::Introspecting, + std::memory_order::release); + }); + + successHandler(); + + return true; } -TFiber::TFiber(EExecutionStackKind stackKind) - : Stack_(CreateExecutionStack(stackKind)) - , MachineContext_({ - this, - TArrayRef(static_cast<char*>(Stack_->GetStack()), Stack_->GetSize()), - }) +TFiberId TFiberIntrospectionBase::GetFiberId() const { - TFiberRegistry::Get()->Register(this); - TFiberProfiler::Get()->OnFiberCreated(); - TFiberProfiler::Get()->OnStackAllocated(Stack_->GetSize()); + return FiberId_.load(std::memory_order::relaxed); } -TFiber::~TFiber() +TInstant TFiberIntrospectionBase::GetWaitingSince() const { - YT_VERIFY(GetState() == EFiberState::Finished); - TFiberProfiler::Get()->OnStackFreed(Stack_->GetSize()); + // Locked by introspector + YT_VERIFY(GetState() == EFiberState::Introspecting); + return WaitingSince_; } -bool TFiber::CheckFreeStackSpace(size_t space) const +TFls* TFiberIntrospectionBase::GetFls() { - return reinterpret_cast<char*>(Stack_->GetStack()) + space < __builtin_frame_address(0); + // Locked by introspector + YT_VERIFY(GetState() == EFiberState::Introspecting); + return Fls_; } -TExceptionSafeContext* TFiber::GetMachineContext() +void TFiberIntrospectionBase::OnCallbackExecutionStarted(TFiberId fiberId, TFls* fls) { - return &MachineContext_; + // Locked by running fiber. + YT_VERIFY(GetState() == EFiberState::Running); + + FiberId_.store(fiberId, std::memory_order::relaxed); + Fls_ = fls; } -TFiberId TFiber::GetFiberId() const +void TFiberIntrospectionBase::OnCallbackExecutionFinished() { - return FiberId_.load(std::memory_order::relaxed); + // Locked by running fiber. + YT_VERIFY(GetState() == EFiberState::Running); + + FiberId_.store(InvalidFiberId, std::memory_order::relaxed); + Fls_ = nullptr; } +void TFiberIntrospectionBase::SetWaiting() +{ + WaitingSince_ = CpuInstantToInstant(GetApproximateCpuInstant()); + + // Release lock that should be acquired by running fiber. + ATOMIC_EXCHANGE_WITH_VERIFY( + State_, + EFiberState::Waiting, + EFiberState::Running, + std::memory_order::release); +} -EFiberState TFiber::GetState() const +void TFiberIntrospectionBase::SetIdle() { - return State_.load(std::memory_order::relaxed); + // 1) Locked by running fiber. + // 2) Reading this doesn't cause anything to happen. + // This state is never checked for, so just relaxed. + ATOMIC_EXCHANGE_WITH_VERIFY( + State_, + EFiberState::Idle, + EFiberState::Running, + std::memory_order::relaxed); } -void TFiber::SetRunning() +std::optional<TDuration> TFiberIntrospectionBase::SetRunning() { - auto expectedState = State_.load(std::memory_order::relaxed); + auto observed = GetState(); std::optional<NProfiling::TWallTimer> lockedTimer; + do { - YT_VERIFY(expectedState != EFiberState::Running); - if (expectedState == EFiberState::Introspecting) { + // NB(arkady-e1ppa): We expect SetRunning to have + // either SetIdle, SetWaiting or Ctor in HB relation to this call. + // If we have SetIdle in HB relation then we must + // observed |Idle|. + // If we have SetWaiting in HB then we either observer + // |Waiting| by w-r coherence or |Introspecting| + // written by RWM from TryLockForIntrospection + // which observed mentioned SetWaiting write. + // If Ctor is in HB then this is the first op + // on State_ and thus guaranteed to read |Created|. + // This leaves |Running| and |Finished| impossible. + YT_VERIFY(observed != EFiberState::Running); + if (observed == EFiberState::Introspecting) { if (!lockedTimer) { lockedTimer.emplace(); } ThreadYield(); - expectedState = State_.load(); + observed = GetState(); continue; } - } while (!State_.compare_exchange_weak(expectedState, EFiberState::Running)); - - if (lockedTimer) { - YT_LOG_WARNING("Fiber execution was delayed due to introspection (FiberId: %x, Delay: %v)", - GetFiberId(), - lockedTimer->GetElapsedTime()); - } + // TryAcquire lock. + } while (!State_.compare_exchange_weak( + observed, + EFiberState::Running, + std::memory_order::acquire, + std::memory_order::relaxed)); + + return lockedTimer.has_value() + ? std::optional(lockedTimer->GetElapsedTime()) + : std::nullopt; } -void TFiber::SetWaiting() +void TFiberIntrospectionBase::SetFinished() { - WaitingSince_.store(GetApproximateCpuInstant(), std::memory_order::release); - - auto observed = State_.exchange(EFiberState::Waiting, std::memory_order::release); - YT_VERIFY(observed == EFiberState::Running); + // NB(arkady-e1ppa): This state is relevant for + // two reasons: + // 1) For dtor of fiber -- see ~Fiber for details + // 2) For verifying that fiber is attempted to be + // deleted exactly one. In such case (though we + // check it only in debug mode) exchange reads + // the last modification and if this function + // is called twice regardless of situation + // one of the calls will crash as it should. + ATOMIC_EXCHANGE_WITH_VERIFY( + State_, + EFiberState::Finished, + EFiberState::Running, + std::memory_order::relaxed); } -void TFiber::SetFinished() +EFiberState TFiberIntrospectionBase::GetState() const { - auto observed = State_.exchange(EFiberState::Finished, std::memory_order::relaxed); - YT_VERIFY(observed == EFiberState::Running); + return State_.load(std::memory_order::relaxed); } -void TFiber::SetIdle() +} // namespace NDetail + +//////////////////////////////////////////////////////////////////////////////// + +TFiber* TFiber::CreateFiber(EExecutionStackKind stackKind) { - auto observed = State_.exchange(EFiberState::Idle, std::memory_order::relaxed); - YT_VERIFY(observed == EFiberState::Running); - Clear(); + auto* fiber = new TFiber(stackKind); + TFiberRegistry::Get()->Register(fiber); + return fiber; } -bool TFiber::TryIntrospectWaiting(EFiberState& state, const std::function<void()>& func) +void TFiber::ReleaseFiber(TFiber* fiber) { - state = State_.load(); - if (state != EFiberState::Waiting) { - return false; - } - if (!State_.compare_exchange_strong(state, EFiberState::Introspecting)) { - return false; - } - auto guard = Finally([&] { - YT_VERIFY(State_.exchange(state) == EFiberState::Introspecting); - }); - func(); - return true; + YT_VERIFY(fiber); + fiber->SetFinished(); + TFiberRegistry::Get()->Unregister(fiber); } -TInstant TFiber::GetWaitingSince() const +void TFiber::SetRunning() { - YT_VERIFY(State_.load() == EFiberState::Introspecting); - return CpuInstantToInstant(WaitingSince_.load()); + if (auto delay = TFiberBase::SetRunning()) { + YT_LOG_WARNING( + "Fiber execution was delayed due to introspection (FiberId: %x, Delay: %v)", + GetFiberId(), + *delay); + } } -const TPropagatingStorage& TFiber::GetPropagatingStorage() const +bool TFiber::CheckFreeStackSpace(size_t space) const { - YT_VERIFY(State_.load() == EFiberState::Introspecting); - return NConcurrency::GetPropagatingStorage(*Fls_); + return reinterpret_cast<char*>(Stack_->GetStack()) + space < __builtin_frame_address(0); } -TFls* TFiber::GetFls() const +TExceptionSafeContext* TFiber::GetMachineContext() { - return Fls_.get(); + return &MachineContext_; } -void TFiber::Recreate() +void TFiber::ReadFibers(TFunctionView<void(TFiberList&)> callback) { - FiberId_.store(TFiberIdGenerator::Get()->Generate(), std::memory_order::release); - Fls_ = std::make_unique<TFls>(); + return TFiberRegistry::Get()->ReadFibers(callback); } -void TFiber::Clear() +TFiber::TFiber(EExecutionStackKind stackKind) + : Stack_(CreateExecutionStack(stackKind)) + , MachineContext_({ + .TrampoLine = this, + .Stack = TArrayRef(static_cast<char*>(Stack_->GetStack()), Stack_->GetSize()), + }) { - FiberId_.store(InvalidFiberId); - Fls_.reset(); + TFiberProfiler::Get()->OnFiberCreated(); + TFiberProfiler::Get()->OnStackAllocated(Stack_->GetSize()); } -void TFiber::ReadFibers(TFunctionView<void(TFiberList&)> callback) +TFiber::~TFiber() { - return TFiberRegistry::Get()->ReadFibers(callback); + // What happens: + // SetFinished + // Sequenced-before + // Enqueue in deletion queue + // Synchronizes-with + // Dequeue from deletion queue (can be from another thread) + // Sequenced-before + // DeleteFiber + // Sequence-before + // GetState + // Since this is the only possible chain of events + // we are safe to use |GetState| (which does + // relaxed load). + YT_VERIFY(GetState() == EFiberState::Finished); + TFiberProfiler::Get()->OnStackFreed(Stack_->GetSize()); } -void TFiber::UnregisterAndDelete() noexcept +void TFiber::DeleteFiber() noexcept { YT_VERIFY(static_cast<TUnregisterBase*>(this)->Empty()); YT_VERIFY(!static_cast<TRegisterBase*>(this)->Empty()); + // NB(arkady-e1ppa): Pair of events such as + // RegisterFiber and UnregisterFiber are always + // processes in the mentioned order. + // Since this is the case, every fiber + // is supposed to be inserted in the registry + // exactly once. And then removed from the registry + // also exactly once. This means that at this point + // we must be linked in the FiberList_. + YT_VERIFY(!static_cast<TRegisterBase*>(this)->Empty()); + static_cast<TRegisterBase*>(this)->Unlink(); delete this; } diff --git a/yt/yt/core/concurrency/fiber.h b/yt/yt/core/concurrency/fiber.h index d8aade79e1b..8a014a36f9e 100644 --- a/yt/yt/core/concurrency/fiber.h +++ b/yt/yt/core/concurrency/fiber.h @@ -32,20 +32,80 @@ struct TFiberUnregisterTag //////////////////////////////////////////////////////////////////////////////// -} // namespace NDetail +// Basic invariants: +// Some SetRunning is in hb with SetWaiting/SetIdle/SetFinished. +// Some SetWaiting is in hb with a successful TryStartIntrospection. +// SetRunning stalls while observed state is Introspecting. + +// You can think about this state machine in the following way: +// This is a SpinLock where +// SetRunning is Acquire +// SetWaiting is Release +// SetIdle/SetFinished are basically no-op +// TryLockForIntrospection is TryAcquire + Release on success. +// Thus same sync logic applies. +class TFiberIntrospectionBase +{ +public: + // Used by introspector on any fiber. + bool TryLockForIntrospection(EFiberState* state, TFunctionView<void()> successHandler); + TFiberId GetFiberId() const; + + // Used by introspector on waiting fibers. + TInstant GetWaitingSince() const; + TFls* GetFls(); + + // Used by fiber_schedulers on running fibers. + void OnCallbackExecutionStarted(TFiberId fiberId, TFls* fls); + void OnCallbackExecutionFinished(); + void SetWaiting(); + void SetIdle(); + +protected: + // Used by fiber itself. + + // Returns duration of stalling. + Y_FORCE_INLINE std::optional<TDuration> SetRunning(); + Y_FORCE_INLINE void SetFinished(); + + // NB(arkady-e1ppa): This does a relaxed + // load. We expect that synchronisation + // is done via external means. + Y_FORCE_INLINE EFiberState GetState() const; + +private: + std::atomic<EFiberState> State_ = EFiberState::Created; + std::atomic<TFiberId> FiberId_ = InvalidFiberId; + + // Guarded by State_. + TInstant WaitingSince_ = TInstant::Zero(); + TFls* Fls_ = nullptr; + + EFiberState TryLockAsIntrospector() noexcept; +}; //////////////////////////////////////////////////////////////////////////////// -// Do not change inheritence order or layout. -// Some offsets are hardcoded at devtools/gdb/yt_fibers_printer.py. -class TFiber +class TFiberBase : public TIntrusiveListItem<TFiber, NDetail::TFiberRegisterTag> , public TIntrusiveListItem<TFiber, NDetail::TFiberUnregisterTag> - , public ITrampoLine + , public TFiberIntrospectionBase { +protected: using TRegisterBase = TIntrusiveListItem<TFiber, NDetail::TFiberRegisterTag>; using TUnregisterBase = TIntrusiveListItem<TFiber, NDetail::TFiberUnregisterTag>; +}; + +} // namespace NDetail +//////////////////////////////////////////////////////////////////////////////// + +// Do not change inheritence order or layout. +// Some offsets are hardcoded at devtools/gdb/yt_fibers_printer.py. +class TFiber + : public NDetail::TFiberBase + , public ITrampoLine +{ public: using TFiberList = TIntrusiveList<TFiber, NDetail::TFiberRegisterTag>; @@ -54,47 +114,25 @@ public: // Set this as AfterSwitch to release fiber's resources. static void ReleaseFiber(TFiber* fiber); - ~TFiber(); - - void Recreate(); + // TODO(arkady-e1ppa): Make fiber be in charge of context switches + // so that these methods are automatically inlined instead of manually called. + void SetRunning(); bool CheckFreeStackSpace(size_t space) const; TExceptionSafeContext* GetMachineContext(); - TFiberId GetFiberId() const; - EFiberState GetState() const; - - void SetRunning(); - void SetWaiting(); - void SetIdle(); - - bool TryIntrospectWaiting(EFiberState& state, const std::function<void()>& func); - - TInstant GetWaitingSince() const; - const TPropagatingStorage& GetPropagatingStorage() const; - TFls* GetFls() const; - static void ReadFibers(TFunctionView<void(TFiberList&)> callback); private: const std::shared_ptr<TExecutionStack> Stack_; - TExceptionSafeContext MachineContext_; - std::atomic<TFiberId> FiberId_ = InvalidFiberId; - std::atomic<EFiberState> State_ = EFiberState::Created; - std::atomic<TCpuInstant> WaitingSince_ = 0; - - std::unique_ptr<TFls> Fls_; - explicit TFiber(EExecutionStackKind stackKind = EExecutionStackKind::Small); - - void SetFinished(); - void Clear(); + ~TFiber(); void DoRunNaked() override; - void UnregisterAndDelete() noexcept; + void DeleteFiber() noexcept; friend class ::NYT::NConcurrency::TFiberRegistry; }; diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp index 9ee9f7492ee..f4577a40c27 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp @@ -302,7 +302,6 @@ void SwitchFromFiber(TFiber* targetFiber, TAfterSwitch afterSwitch) auto* targetContext = targetFiber->GetMachineContext(); auto currentFiber = SwapCurrentFiber(targetFiber); - YT_VERIFY(currentFiber->GetState() != EFiberState::Waiting); auto* currentContext = currentFiber->GetMachineContext(); SetAfterSwitch(afterSwitch); @@ -313,6 +312,41 @@ void SwitchFromFiber(TFiber* targetFiber, TAfterSwitch afterSwitch) //////////////////////////////////////////////////////////////////////////////// +class TFiberIdGenerator +{ +public: + static TFiberIdGenerator* Get() + { + return LeakySingleton<TFiberIdGenerator>(); + } + + TFiberId Generate() + { + const TFiberId Factor = std::numeric_limits<TFiberId>::max() - 173864; + YT_ASSERT(Factor % 2 == 1); // Factor must be coprime with 2^n. + + while (true) { + auto seed = Seed_++; + auto id = seed * Factor; + if (id != InvalidFiberId) { + return id; + } + } + } + +private: + std::atomic<TFiberId> Seed_; + + DECLARE_LEAKY_SINGLETON_FRIEND() + + TFiberIdGenerator() + { + Seed_.store(static_cast<TFiberId>(::time(nullptr))); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + #ifdef YT_REUSE_FIBERS class TIdleFiberPool @@ -714,11 +748,14 @@ public: // On start fiber running. explicit TFiberSwitchHandler(TFiber* fiber) : Fiber_(fiber) + , FiberId_(TFiberIdGenerator::Get()->Generate()) { SavedThis_ = std::exchange(CurrentFiberSwitchHandler(), this); - YT_VERIFY(SwapCurrentFiberId(fiber->GetFiberId()) == InvalidFiberId); - YT_VERIFY(!SwapCurrentFls(fiber->GetFls())); + Fiber_->OnCallbackExecutionStarted(FiberId_, &Fls_); + + YT_VERIFY(SwapCurrentFiberId(FiberId_) == InvalidFiberId); + YT_VERIFY(!SwapCurrentFls(&Fls_)); } // On finish fiber running. @@ -727,8 +764,10 @@ public: YT_VERIFY(CurrentFiberSwitchHandler() == this); YT_VERIFY(UserHandlers_.empty()); - YT_VERIFY(SwapCurrentFiberId(InvalidFiberId) == Fiber_->GetFiberId()); - YT_VERIFY(SwapCurrentFls(nullptr) == Fiber_->GetFls()); + Fiber_->OnCallbackExecutionFinished(); + + YT_VERIFY(SwapCurrentFiberId(InvalidFiberId) == FiberId_); + YT_VERIFY(SwapCurrentFls(nullptr) == &Fls_); // Support case when current fiber has been resumed, but finished without WaitFor. // There is preserved context of resumer fiber saved in switchHandler. Restore it. @@ -769,7 +808,9 @@ public: private: friend TContextSwitchGuard; - const TFiber* const Fiber_; + TFiber* const Fiber_; + const TFiberId FiberId_; + TFls Fls_; TFiberSwitchHandler* SavedThis_; @@ -834,7 +875,6 @@ TFiberSwitchHandler* GetFiberSwitchHandler() // See devtools/gdb/yt_fibers_printer.py. Y_NO_INLINE void RunInFiberContext(TFiber* fiber, TClosure callback) { - fiber->Recreate(); TFiberSwitchHandler switchHandler(fiber); TNullPropagatingStorageGuard nullPropagatingStorageGuard; callback(); diff --git a/yt/yt/core/concurrency/private.h b/yt/yt/core/concurrency/private.h index 6a48878538f..e6ad8eaf547 100644 --- a/yt/yt/core/concurrency/private.h +++ b/yt/yt/core/concurrency/private.h @@ -51,6 +51,7 @@ using TMpscSuspendableSingleQueueSchedulerThreadPtr = TIntrusivePtr<TMpscSuspend //////////////////////////////////////////////////////////////////////////////// class TFiber; +class TFls; DECLARE_REFCOUNTED_CLASS(TSchedulerThread) diff --git a/yt/yt/core/misc/intrusive_mpsc_stack-inl.h b/yt/yt/core/misc/intrusive_mpsc_stack-inl.h index 03c97a12a7a..4c22486d426 100644 --- a/yt/yt/core/misc/intrusive_mpsc_stack-inl.h +++ b/yt/yt/core/misc/intrusive_mpsc_stack-inl.h @@ -19,6 +19,7 @@ TIntrusiveMpscStack<T, Tag>::TIntrusiveMpscStack() noexcept template <class T, class Tag> void TIntrusiveMpscStack<T, Tag>::Push(TNode* item) noexcept { + YT_VERIFY(item->Empty()); // Past this line item is not a valid instance of TInstrusiveListItem. // NB: This saves up extra CAS in case of non-empty stack. diff --git a/yt/yt/library/backtrace_introspector/introspect.cpp b/yt/yt/library/backtrace_introspector/introspect.cpp index 6b5e0081069..941dbaed65f 100644 --- a/yt/yt/library/backtrace_introspector/introspect.cpp +++ b/yt/yt/library/backtrace_introspector/introspect.cpp @@ -58,11 +58,12 @@ std::vector<TFiberIntrospectionInfo> IntrospectFibers() EmplaceOrCrash(fiberStates, fiberId, EFiberState::Introspecting); EFiberState state; - if (!fiber->TryIntrospectWaiting(state, [&] { + + auto onIntrospectionLockAcquired = [&] { YT_LOG_DEBUG("Waiting fiber is successfully locked for introspection (FiberId: %x)", fiberId); - const auto& propagatingStorage = fiber->GetPropagatingStorage(); + const auto& propagatingStorage = NConcurrency::GetPropagatingStorage(*fiber->GetFls()); const auto* traceContext = TryGetTraceContextFromPropagatingStorage(propagatingStorage); TFiberIntrospectionInfo info{ @@ -91,7 +92,8 @@ std::vector<TFiberIntrospectionInfo> IntrospectFibers() YT_LOG_DEBUG("Fiber introspection completed (FiberId: %x)", info.FiberId); - })) { + }; + if (!fiber->TryLockForIntrospection(&state, onIntrospectionLockAcquired)) { YT_LOG_DEBUG("Failed to lock fiber for introspection (FiberId: %x, State: %v)", fiberId, state); |