diff options
author | babenko <babenko@yandex-team.com> | 2025-05-18 22:22:03 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2025-05-18 22:35:17 +0300 |
commit | 73b1a17c2846f46642b407dd4086bf027005c0ce (patch) | |
tree | ceb1b478e2b40d38dc2022f9b5cf8d0eb230c52a | |
parent | bacb383edfb0c125f9bedc35dd6631766b410cf9 (diff) | |
download | ydb-73b1a17c2846f46642b407dd4086bf027005c0ce.tar.gz |
YT-25084: Reduce FLS leaks
1) Reduce the number of cases when a (leaky) per-thread FLS is being created.
2) Drop source location diagnostics (no longer needed).
3) API change: `GetCurrentPropagatingStorage -> CurrentPropagatingStorage` for mutable access
commit_hash:518102e8b6f49ca27828fece1be0effce2746bb5
-rw-r--r-- | yt/yt/core/actions/bind-inl.h | 6 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fiber_scheduler_thread.cpp | 6 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fls-inl.h | 15 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fls.cpp | 1 | ||||
-rw-r--r-- | yt/yt/core/concurrency/propagating_storage-inl.h | 4 | ||||
-rw-r--r-- | yt/yt/core/concurrency/propagating_storage.cpp | 105 | ||||
-rw-r--r-- | yt/yt/core/concurrency/propagating_storage.h | 18 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/propagating_storage_ut.cpp | 16 | ||||
-rw-r--r-- | yt/yt/core/tracing/trace_context-inl.h | 14 | ||||
-rw-r--r-- | yt/yt/core/tracing/trace_context.cpp | 11 | ||||
-rw-r--r-- | yt/yt/core/tracing/trace_context.h | 6 |
11 files changed, 77 insertions, 125 deletions
diff --git a/yt/yt/core/actions/bind-inl.h b/yt/yt/core/actions/bind-inl.h index 2b7cde50133..5b2118b9042 100644 --- a/yt/yt/core/actions/bind-inl.h +++ b/yt/yt/core/actions/bind-inl.h @@ -535,11 +535,7 @@ public: NConcurrency::TPropagatingStorageGuard MakePropagatingStorageGuard() { - return NConcurrency::TPropagatingStorageGuard(Storage_ -#ifdef YT_ENABLE_BIND_LOCATION_TRACKING - , Location_ -#endif - ); + return NConcurrency::TPropagatingStorageGuard(Storage_); } private: diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp index cd85ca23c8c..a36dc10637a 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp @@ -490,9 +490,9 @@ private: Y_FORCE_INLINE TClosure PickCallback(TFiberSchedulerThread* fiberThread) { TCallback<void()> callback; + // We wrap fiberThread->OnExecute() into a propagating storage guard to ensure // that the propagating storage created there won't spill into the fiber callbacks. - TNullPropagatingStorageGuard guard; YT_VERIFY(guard.GetOldStorage().IsEmpty()); callback = fiberThread->OnExecute(); @@ -513,12 +513,10 @@ void FiberTrampoline() // Break loop to terminate fiber while (auto* fiberThread = TryGetFiberThread()) { YT_VERIFY(!TryGetResumerFiber()); - YT_VERIFY(CurrentFls() == nullptr); - + YT_VERIFY(!CurrentFls()); YT_VERIFY(GetCurrentPropagatingStorage().IsEmpty()); auto callback = PickCallback(fiberThread); - if (!callback) { break; } diff --git a/yt/yt/core/concurrency/fls-inl.h b/yt/yt/core/concurrency/fls-inl.h index a192a1dcb0c..17b31828101 100644 --- a/yt/yt/core/concurrency/fls-inl.h +++ b/yt/yt/core/concurrency/fls-inl.h @@ -20,6 +20,7 @@ int AllocateFlsSlot(TFlsSlotDtor dtor); TFls* GetPerThreadFls(); YT_DECLARE_THREAD_LOCAL(TFls*, CurrentFls); +YT_DECLARE_THREAD_LOCAL(TFls*, PerThreadFls); } // namespace NDetail @@ -49,6 +50,17 @@ inline TFls* GetCurrentFls() return fls; } +inline TFls* TryGetCurrentFls() +{ + if (auto* fls = NDetail::CurrentFls()) { + return fls; + } + if (auto* fls = NDetail::PerThreadFls()) { + return fls; + } + return nullptr; +} + //////////////////////////////////////////////////////////////////////////////// template <class T> @@ -115,7 +127,8 @@ const T* TFlsSlot<T>::Get(const TFls& fls) const template <class T> Y_FORCE_INLINE bool TFlsSlot<T>::IsInitialized() const { - return static_cast<bool>(GetCurrentFls()->Get(Index_)); + const auto* fls = TryGetCurrentFls(); + return fls && fls->Get(Index_); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/fls.cpp b/yt/yt/core/concurrency/fls.cpp index 6856ac9754d..0ed82ff9ab1 100644 --- a/yt/yt/core/concurrency/fls.cpp +++ b/yt/yt/core/concurrency/fls.cpp @@ -79,7 +79,6 @@ void TFls::Set(int index, TCookie cookie) TFls* SwapCurrentFls(TFls* newFls) { - return std::exchange(NDetail::CurrentFls(), newFls); } diff --git a/yt/yt/core/concurrency/propagating_storage-inl.h b/yt/yt/core/concurrency/propagating_storage-inl.h index 50497d3e0d5..70343a7714d 100644 --- a/yt/yt/core/concurrency/propagating_storage-inl.h +++ b/yt/yt/core/concurrency/propagating_storage-inl.h @@ -49,14 +49,14 @@ std::optional<T> TPropagatingStorage::Remove() template <class T> TPropagatingValueGuard<T>::TPropagatingValueGuard(T value) { - auto& storage = GetCurrentPropagatingStorage(); + auto& storage = CurrentPropagatingStorage(); OldValue_ = storage.Exchange<T>(std::move(value)); } template <class T> TPropagatingValueGuard<T>::~TPropagatingValueGuard() { - auto& storage = GetCurrentPropagatingStorage(); + auto& storage = CurrentPropagatingStorage(); if (OldValue_) { storage.Exchange<T>(std::move(*OldValue_)); } else { diff --git a/yt/yt/core/concurrency/propagating_storage.cpp b/yt/yt/core/concurrency/propagating_storage.cpp index 604816cdbef..fce321c277f 100644 --- a/yt/yt/core/concurrency/propagating_storage.cpp +++ b/yt/yt/core/concurrency/propagating_storage.cpp @@ -4,6 +4,8 @@ #include <library/cpp/yt/threading/fork_aware_spin_lock.h> +#include <library/cpp/yt/memory/leaky_singleton.h> + #include <yt/yt/core/misc/static_ring_queue.h> namespace NYT::NConcurrency { @@ -51,26 +53,8 @@ public: DEFINE_SIGNAL_SIMPLE(void(), OnBeforeUninstall); DEFINE_SIGNAL_SIMPLE(void(), OnAfterInstall); - void RecordLocation(TSourceLocation loc) - { - Locations_.Append(&loc, &loc + 1); - } - - void PrintModificationLocationsToStderr() - { - size_t size = Locations_.Size(); - TSourceLocation lastLocations[MaxSize]; - Locations_.CopyTailTo(size, &lastLocations[0]); - for (size_t i = 0; i < size; ++i) { - Cerr << NYT::ToString(lastLocations[i]) << Endl; - } - } - private: TStorage Data_; - - static constexpr int MaxSize = 8; - TStaticRingQueue<TSourceLocation, MaxSize> Locations_; }; //////////////////////////////////////////////////////////////////////////////// @@ -196,24 +180,7 @@ void TPropagatingStorage::EnsureUnique() Impl_ = Impl_->Clone(); } -void TPropagatingStorage::RecordLocation(TSourceLocation loc) -{ - Impl_->RecordLocation(loc); -} - -void TPropagatingStorage::PrintModificationLocationsToStderr() -{ - Impl_->PrintModificationLocationsToStderr(); -} - -struct TPropagatingStorageInfo -{ - TPropagatingStorage Storage; - TSourceLocation Location; - TSourceLocation PrevLocation; -}; - -static YT_DEFINE_GLOBAL(TFlsSlot<TPropagatingStorageInfo>, PropagatingStorageSlot); +static YT_DEFINE_GLOBAL(TFlsSlot<TPropagatingStorage>, PropagatingStorageSlot); //////////////////////////////////////////////////////////////////////////////// @@ -222,18 +189,27 @@ class TPropagatingStorageManager public: static TPropagatingStorageManager* Get() { - return Singleton<TPropagatingStorageManager>(); + return LeakySingleton<TPropagatingStorageManager>(); } - TPropagatingStorage& GetCurrentPropagatingStorage() + TPropagatingStorage& CurrentPropagatingStorage() { - return PropagatingStorageSlot()->Storage; + return *PropagatingStorageSlot(); + } + + const TPropagatingStorage& GetCurrentPropagatingStorage() + { + if (const auto& slot = PropagatingStorageSlot(); slot.IsInitialized()) { + return *slot; + } else { + static const TPropagatingStorage empty; + return empty; + } } const TPropagatingStorage* TryGetPropagatingStorage(const TFls& fls) { - auto* info = PropagatingStorageSlot().Get(fls); - return info != nullptr ? &info->Storage : nullptr; + return PropagatingStorageSlot().Get(fls); } void InstallGlobalSwitchHandler(TPropagatingStorageGlobalSwitchHandler handler) @@ -247,15 +223,20 @@ public: TPropagatingStorage SwitchPropagatingStorage(TPropagatingStorage newStorage) { - auto& storage = GetCurrentPropagatingStorage(); + const auto& oldStorage = GetCurrentPropagatingStorage(); + if (oldStorage.IsNull() && newStorage.IsNull()) { + return TPropagatingStorage(); + } int count = SwitchHandlerCount_.load(std::memory_order::acquire); for (int index = 0; index < count; ++index) { - SwitchHandlers_[index](storage, newStorage); + SwitchHandlers_[index](oldStorage, newStorage); } - return std::exchange(storage, std::move(newStorage)); + return std::exchange(CurrentPropagatingStorage(), std::move(newStorage)); } private: + DECLARE_LEAKY_SINGLETON_FRIEND() + NThreading::TForkAwareSpinLock Lock_; static constexpr int MaxSwitchHandlerCount = 16; @@ -266,7 +247,12 @@ private: Y_DECLARE_SINGLETON_FRIEND() }; -TPropagatingStorage& GetCurrentPropagatingStorage() +TPropagatingStorage& CurrentPropagatingStorage() +{ + return TPropagatingStorageManager::Get()->CurrentPropagatingStorage(); +} + +const TPropagatingStorage& GetCurrentPropagatingStorage() { return TPropagatingStorageManager::Get()->GetCurrentPropagatingStorage(); } @@ -283,34 +269,13 @@ void InstallGlobalPropagatingStorageSwitchHandler(TPropagatingStorageGlobalSwitc //////////////////////////////////////////////////////////////////////////////// -TSourceLocation SwitchPropagatingStorageLocation(TSourceLocation loc) -{ - PropagatingStorageSlot()->PrevLocation = PropagatingStorageSlot()->Location; - return std::exchange(PropagatingStorageSlot()->Location, loc); -} - -void PrintLocationToStderr() -{ - Cerr << Format( - "PropagatingStorageLocation: %v, PrevLocation: %v, ModificationLocations:", - PropagatingStorageSlot()->Location, - PropagatingStorageSlot()->PrevLocation) << Endl; - - PropagatingStorageSlot()->Storage.PrintModificationLocationsToStderr(); -} - -TPropagatingStorageGuard::TPropagatingStorageGuard(TPropagatingStorage storage, TSourceLocation loc) +TPropagatingStorageGuard::TPropagatingStorageGuard(TPropagatingStorage storage) : OldStorage_(TPropagatingStorageManager::Get()->SwitchPropagatingStorage(std::move(storage))) - , OldLocation_(SwitchPropagatingStorageLocation(loc)) -{ - YT_VERIFY((OldLocation_.GetFileName() == nullptr) == (OldLocation_.GetLine() == -1)); - YT_VERIFY((loc.GetFileName() == nullptr) == (loc.GetLine() == -1)); -} +{ } TPropagatingStorageGuard::~TPropagatingStorageGuard() { TPropagatingStorageManager::Get()->SwitchPropagatingStorage(std::move(OldStorage_)); - SwitchPropagatingStorageLocation(OldLocation_); } const TPropagatingStorage& TPropagatingStorageGuard::GetOldStorage() const @@ -320,8 +285,8 @@ const TPropagatingStorage& TPropagatingStorageGuard::GetOldStorage() const //////////////////////////////////////////////////////////////////////////////// -TNullPropagatingStorageGuard::TNullPropagatingStorageGuard(TSourceLocation loc) - : TPropagatingStorageGuard(TPropagatingStorage(), loc) +TNullPropagatingStorageGuard::TNullPropagatingStorageGuard() + : TPropagatingStorageGuard(TPropagatingStorage()) { } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/propagating_storage.h b/yt/yt/core/concurrency/propagating_storage.h index 4d8fd78e838..c328b893a03 100644 --- a/yt/yt/core/concurrency/propagating_storage.h +++ b/yt/yt/core/concurrency/propagating_storage.h @@ -67,9 +67,6 @@ public: template <class T> std::optional<T> Remove(); - void RecordLocation(TSourceLocation loc); - void PrintModificationLocationsToStderr(); - DECLARE_SIGNAL(void(), OnAfterInstall); DECLARE_SIGNAL(void(), OnBeforeUninstall); @@ -88,7 +85,8 @@ private: //////////////////////////////////////////////////////////////////////////////// -TPropagatingStorage& GetCurrentPropagatingStorage(); +TPropagatingStorage& CurrentPropagatingStorage(); +const TPropagatingStorage& GetCurrentPropagatingStorage(); const TPropagatingStorage* TryGetPropagatingStorage(const NConcurrency::TFls& fls); //////////////////////////////////////////////////////////////////////////////// @@ -106,8 +104,7 @@ void InstallGlobalPropagatingStorageSwitchHandler( class TPropagatingStorageGuard { public: - explicit TPropagatingStorageGuard( - TPropagatingStorage storage, TSourceLocation loc = YT_CURRENT_SOURCE_LOCATION); + explicit TPropagatingStorageGuard(TPropagatingStorage storage); ~TPropagatingStorageGuard(); TPropagatingStorageGuard(const TPropagatingStorageGuard& other) = delete; @@ -119,7 +116,6 @@ public: private: TPropagatingStorage OldStorage_; - TSourceLocation OldLocation_; }; //////////////////////////////////////////////////////////////////////////////// @@ -128,7 +124,7 @@ class TNullPropagatingStorageGuard : public TPropagatingStorageGuard { public: - TNullPropagatingStorageGuard(TSourceLocation loc = YT_CURRENT_SOURCE_LOCATION); + TNullPropagatingStorageGuard(); }; //////////////////////////////////////////////////////////////////////////////// @@ -151,12 +147,6 @@ private: //////////////////////////////////////////////////////////////////////////////// -TSourceLocation SwitchPropagatingStorageLocation(TSourceLocation loc); - -void PrintLocationToStderr(); - -//////////////////////////////////////////////////////////////////////////////// - } // namespace NYT::NConcurrency #define PROPAGATING_STORAGE_INL_H_ diff --git a/yt/yt/core/concurrency/unittests/propagating_storage_ut.cpp b/yt/yt/core/concurrency/unittests/propagating_storage_ut.cpp index 8202dd0619d..8b615422947 100644 --- a/yt/yt/core/concurrency/unittests/propagating_storage_ut.cpp +++ b/yt/yt/core/concurrency/unittests/propagating_storage_ut.cpp @@ -29,13 +29,13 @@ TEST(TPropagatingStorageTest, Simple) { auto actionQueue = New<TActionQueue>(); - auto& storage = GetCurrentPropagatingStorage(); + auto& storage = CurrentPropagatingStorage(); storage.Exchange<TFirst>({"hello"}); ASSERT_EQ(storage.GetOrCrash<TFirst>().Value, "hello"); WaitFor( BIND([actionQueue] { - auto& storage = GetCurrentPropagatingStorage(); + auto& storage = CurrentPropagatingStorage(); ASSERT_EQ(storage.GetOrCrash<TFirst>().Value, "hello"); storage.Exchange<TFirst>({"inner"}); ASSERT_EQ(storage.GetOrCrash<TFirst>().Value, "inner"); @@ -43,7 +43,7 @@ TEST(TPropagatingStorageTest, Simple) WaitFor( BIND([] { - auto& storage = GetCurrentPropagatingStorage(); + auto& storage = CurrentPropagatingStorage(); ASSERT_EQ(storage.GetOrCrash<TFirst>().Value, "inner"); storage.Remove<TFirst>(); ASSERT_FALSE(storage.Has<TFirst>()); @@ -68,7 +68,7 @@ TEST(TPropagatingStorageTest, Cow) { auto actionQueue = New<TActionQueue>(); - auto& storage = GetCurrentPropagatingStorage(); + auto& storage = CurrentPropagatingStorage(); storage.Exchange<TFirst>({"hello"}); ASSERT_EQ(storage.GetOrCrash<TFirst>().Value, "hello"); @@ -76,7 +76,7 @@ TEST(TPropagatingStorageTest, Cow) for (int i = 0; i < 10; ++i) { futures.push_back( BIND([] { - auto& storage = GetCurrentPropagatingStorage(); + auto& storage = CurrentPropagatingStorage(); ASSERT_EQ(storage.GetOrCrash<TFirst>().Value, "hello"); TDelayedExecutor::WaitForDuration(TDuration::Seconds(1)); ASSERT_EQ(storage.GetOrCrash<TFirst>().Value, "hello"); @@ -87,7 +87,7 @@ TEST(TPropagatingStorageTest, Cow) futures.push_back( BIND([] { - auto& storage = GetCurrentPropagatingStorage(); + auto& storage = CurrentPropagatingStorage(); ASSERT_EQ(storage.GetOrCrash<TFirst>().Value, "hello"); storage.Exchange<TFirst>({"goodbye"}); ASSERT_EQ(storage.GetOrCrash<TFirst>().Value, "goodbye"); @@ -105,7 +105,7 @@ TEST(TPropagatingStorageTest, Null) { TNullPropagatingStorageGuard guard; - auto &storage = GetCurrentPropagatingStorage(); + auto& storage = CurrentPropagatingStorage(); ASSERT_TRUE(storage.IsNull()); ASSERT_TRUE(storage.IsEmpty()); @@ -121,7 +121,7 @@ TEST(TPropagatingStorageTest, Null) TEST(TPropagatingStorageTest, PropagatingValue) { - auto& storage = GetCurrentPropagatingStorage(); + auto& storage = CurrentPropagatingStorage(); storage.Exchange<TFirst>({"hello"}); storage.Exchange<TSecond>({"world"}); diff --git a/yt/yt/core/tracing/trace_context-inl.h b/yt/yt/core/tracing/trace_context-inl.h index dbcdf46a630..33aaed40377 100644 --- a/yt/yt/core/tracing/trace_context-inl.h +++ b/yt/yt/core/tracing/trace_context-inl.h @@ -140,15 +140,15 @@ namespace NDetail { YT_DECLARE_THREAD_LOCAL(TTraceContext*, CurrentTraceContext); -TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext, TSourceLocation loc); +TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext); } // namespace NDetail -Y_FORCE_INLINE TCurrentTraceContextGuard::TCurrentTraceContextGuard(TTraceContextPtr traceContext, TSourceLocation location) +Y_FORCE_INLINE TCurrentTraceContextGuard::TCurrentTraceContextGuard(TTraceContextPtr traceContext) : Active_(static_cast<bool>(traceContext)) { if (Active_) { - OldTraceContext_ = NDetail::SwapTraceContext(std::move(traceContext), location); + OldTraceContext_ = NDetail::SwapTraceContext(std::move(traceContext)); } } @@ -172,7 +172,7 @@ Y_FORCE_INLINE bool TCurrentTraceContextGuard::IsActive() const Y_FORCE_INLINE void TCurrentTraceContextGuard::Release() { if (Active_) { - NDetail::SwapTraceContext(std::move(OldTraceContext_), YT_CURRENT_SOURCE_LOCATION); + NDetail::SwapTraceContext(std::move(OldTraceContext_)); Active_ = false; } } @@ -184,9 +184,9 @@ Y_FORCE_INLINE const TTraceContextPtr& TCurrentTraceContextGuard::GetOldTraceCon //////////////////////////////////////////////////////////////////////////////// -Y_FORCE_INLINE TNullTraceContextGuard::TNullTraceContextGuard(TSourceLocation location) +Y_FORCE_INLINE TNullTraceContextGuard::TNullTraceContextGuard() : Active_(true) - , OldTraceContext_(NDetail::SwapTraceContext(nullptr, location)) + , OldTraceContext_(NDetail::SwapTraceContext(nullptr)) { } Y_FORCE_INLINE TNullTraceContextGuard::TNullTraceContextGuard(TNullTraceContextGuard&& other) @@ -209,7 +209,7 @@ Y_FORCE_INLINE bool TNullTraceContextGuard::IsActive() const Y_FORCE_INLINE void TNullTraceContextGuard::Release() { if (Active_) { - NDetail::SwapTraceContext(std::move(OldTraceContext_), YT_CURRENT_SOURCE_LOCATION); + NDetail::SwapTraceContext(std::move(OldTraceContext_)); Active_ = false; } } diff --git a/yt/yt/core/tracing/trace_context.cpp b/yt/yt/core/tracing/trace_context.cpp index 8c9fc4295e0..c8f37739a33 100644 --- a/yt/yt/core/tracing/trace_context.cpp +++ b/yt/yt/core/tracing/trace_context.cpp @@ -111,21 +111,14 @@ void SetCurrentTraceContext(TTraceContext* context) std::atomic_signal_fence(std::memory_order::seq_cst); } -TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext, TSourceLocation loc) +TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext) { - if (NConcurrency::NDetail::PerThreadFls() == NConcurrency::NDetail::CurrentFls() && newContext) { - YT_LOG_TRACE("Writing propagating storage in thread FLS (Location: %v)", - loc); - } - - auto& propagatingStorage = GetCurrentPropagatingStorage(); + auto& propagatingStorage = CurrentPropagatingStorage(); auto oldContext = newContext ? propagatingStorage.Exchange<TTraceContextPtr>(newContext).value_or(nullptr) : propagatingStorage.Remove<TTraceContextPtr>().value_or(nullptr); - propagatingStorage.RecordLocation(loc); - auto now = GetApproximateCpuInstant(); auto& traceContextTimingCheckpoint = TraceContextTimingCheckpoint(); // Invalid if no oldContext. diff --git a/yt/yt/core/tracing/trace_context.h b/yt/yt/core/tracing/trace_context.h index 05b8772be11..c97dc391249 100644 --- a/yt/yt/core/tracing/trace_context.h +++ b/yt/yt/core/tracing/trace_context.h @@ -290,9 +290,7 @@ TTraceContextPtr CreateTraceContextFromCurrent(const std::string& spanName); class TCurrentTraceContextGuard { public: - explicit TCurrentTraceContextGuard( - TTraceContextPtr traceContext, - TSourceLocation location = YT_CURRENT_SOURCE_LOCATION); + explicit TCurrentTraceContextGuard(TTraceContextPtr traceContext); TCurrentTraceContextGuard(TCurrentTraceContextGuard&& other); ~TCurrentTraceContextGuard(); @@ -312,7 +310,7 @@ private: class TNullTraceContextGuard { public: - TNullTraceContextGuard(TSourceLocation location = YT_CURRENT_SOURCE_LOCATION); + TNullTraceContextGuard(); TNullTraceContextGuard(TNullTraceContextGuard&& other); ~TNullTraceContextGuard(); |