diff options
author | lukyan <lukyan@yandex-team.com> | 2024-06-28 15:19:34 +0300 |
---|---|---|
committer | lukyan <lukyan@yandex-team.com> | 2024-06-28 15:31:23 +0300 |
commit | 6a58e5c3d674a249ffcf61d9412d50b6c51bf2af (patch) | |
tree | 653df695710ef860da4e60150ba4d22a860eadd4 | |
parent | 1607efc105286defebdf3e768e4a7ff1288bca8c (diff) | |
download | ydb-6a58e5c3d674a249ffcf61d9412d50b6c51bf2af.tar.gz |
Fix propagating storage check and add logging
71c32a8c3f0ff7d91665d5ea7b6b94ad437d114e
-rw-r--r-- | yt/yt/core/concurrency/fiber_scheduler_thread.cpp | 19 | ||||
-rw-r--r-- | yt/yt/core/tracing/trace_context-inl.h | 17 | ||||
-rw-r--r-- | yt/yt/core/tracing/trace_context.cpp | 22 |
3 files changed, 28 insertions, 30 deletions
diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp index 35558b4eae..52cc77eaa5 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp @@ -502,7 +502,7 @@ Y_FORCE_INLINE TClosure PickCallback(TFiberSchedulerThread* fiberThread) // that the propagating storage created there won't spill into the fiber callbacks. TNullPropagatingStorageGuard guard; - YT_VERIFY(guard.GetOldStorage().IsNull()); + YT_VERIFY(guard.GetOldStorage().IsEmpty()); callback = fiberThread->OnExecute(); return callback; @@ -510,10 +510,6 @@ Y_FORCE_INLINE TClosure PickCallback(TFiberSchedulerThread* fiberThread) //////////////////////////////////////////////////////////////////////////////// -YT_DECLARE_THREAD_LOCAL(TFls*, PerThreadFls); - -//////////////////////////////////////////////////////////////////////////////// - void FiberTrampoline() { RunAfterSwitch(); @@ -528,18 +524,7 @@ void FiberTrampoline() YT_VERIFY(!TryGetResumerFiber()); YT_VERIFY(CurrentFls() == nullptr); - if (auto perThreadFls = NDetail::PerThreadFls()) { - const auto* propStorage = TryGetPropagatingStorage(*perThreadFls); - if (propStorage != nullptr) { - if (!propStorage->IsNull()) { - Cerr << "Unexpected propagating storage" << Endl; - PrintLocationToStderr(); - YT_ABORT(); - } - } - } - - YT_VERIFY(GetCurrentPropagatingStorage().IsNull()); + YT_VERIFY(GetCurrentPropagatingStorage().IsEmpty()); auto callback = PickCallback(fiberThread); diff --git a/yt/yt/core/tracing/trace_context-inl.h b/yt/yt/core/tracing/trace_context-inl.h index de1d767a83..6994110d41 100644 --- a/yt/yt/core/tracing/trace_context-inl.h +++ b/yt/yt/core/tracing/trace_context-inl.h @@ -185,7 +185,7 @@ namespace NDetail { YT_DECLARE_THREAD_LOCAL(TTraceContext*, CurrentTraceContext); -TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext); +TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext, TSourceLocation loc); } // namespace NDetail @@ -193,8 +193,7 @@ Y_FORCE_INLINE TCurrentTraceContextGuard::TCurrentTraceContextGuard(TTraceContex : Active_(static_cast<bool>(traceContext)) { if (Active_) { - OldTraceContext_ = NDetail::SwapTraceContext(std::move(traceContext)); - NConcurrency::GetCurrentPropagatingStorage().RecordLocation(location); + OldTraceContext_ = NDetail::SwapTraceContext(std::move(traceContext), location); } } @@ -218,8 +217,7 @@ Y_FORCE_INLINE bool TCurrentTraceContextGuard::IsActive() const Y_FORCE_INLINE void TCurrentTraceContextGuard::Release() { if (Active_) { - NDetail::SwapTraceContext(std::move(OldTraceContext_)); - NConcurrency::GetCurrentPropagatingStorage().RecordLocation(FROM_HERE); + NDetail::SwapTraceContext(std::move(OldTraceContext_), FROM_HERE); Active_ = false; } } @@ -233,10 +231,8 @@ Y_FORCE_INLINE const TTraceContextPtr& TCurrentTraceContextGuard::GetOldTraceCon Y_FORCE_INLINE TNullTraceContextGuard::TNullTraceContextGuard(TSourceLocation location) : Active_(true) - , OldTraceContext_(NDetail::SwapTraceContext(nullptr)) -{ - NConcurrency::GetCurrentPropagatingStorage().RecordLocation(location); -} + , OldTraceContext_(NDetail::SwapTraceContext(nullptr, location)) +{ } Y_FORCE_INLINE TNullTraceContextGuard::TNullTraceContextGuard(TNullTraceContextGuard&& other) : Active_(other.Active_) @@ -258,8 +254,7 @@ Y_FORCE_INLINE bool TNullTraceContextGuard::IsActive() const Y_FORCE_INLINE void TNullTraceContextGuard::Release() { if (Active_) { - NDetail::SwapTraceContext(std::move(OldTraceContext_)); - NConcurrency::GetCurrentPropagatingStorage().RecordLocation(FROM_HERE); + NDetail::SwapTraceContext(std::move(OldTraceContext_), FROM_HERE); Active_ = false; } } diff --git a/yt/yt/core/tracing/trace_context.cpp b/yt/yt/core/tracing/trace_context.cpp index 46c83d2ea6..0cbb7d828b 100644 --- a/yt/yt/core/tracing/trace_context.cpp +++ b/yt/yt/core/tracing/trace_context.cpp @@ -25,6 +25,14 @@ #include <atomic> #include <mutex> +//////////////////////////////////////////////////////////////////////////////// + +namespace NYT::NConcurrency::NDetail { + +YT_DECLARE_THREAD_LOCAL(TFls*, PerThreadFls); + +} // NYT::NConcurrency::NDetail + namespace NYT::NTracing { using namespace NConcurrency; @@ -116,10 +124,20 @@ void SetCurrentTraceContext(TTraceContext* context) std::atomic_signal_fence(std::memory_order::seq_cst); } -TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext) +TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext, TSourceLocation loc) { + if (NConcurrency::NDetail::PerThreadFls() && newContext) { + YT_LOG_DEBUG("Writing propagating storage in thread FLS (Location: %v)", + loc); + } + auto& propagatingStorage = GetCurrentPropagatingStorage(); - auto oldContext = propagatingStorage.Exchange<TTraceContextPtr>(newContext).value_or(nullptr); + + auto oldContext = newContext + ? propagatingStorage.Exchange<TTraceContextPtr>(newContext).value_or(nullptr) + : propagatingStorage.Remove<TTraceContextPtr>().value_or(nullptr); + + propagatingStorage.RecordLocation(loc); auto now = GetApproximateCpuInstant(); auto& traceContextTimingCheckpoint = TraceContextTimingCheckpoint(); |