aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorlukyan <lukyan@yandex-team.com>2024-06-28 15:19:34 +0300
committerlukyan <lukyan@yandex-team.com>2024-06-28 15:31:23 +0300
commit6a58e5c3d674a249ffcf61d9412d50b6c51bf2af (patch)
tree653df695710ef860da4e60150ba4d22a860eadd4
parent1607efc105286defebdf3e768e4a7ff1288bca8c (diff)
downloadydb-6a58e5c3d674a249ffcf61d9412d50b6c51bf2af.tar.gz
Fix propagating storage check and add logging
71c32a8c3f0ff7d91665d5ea7b6b94ad437d114e
-rw-r--r--yt/yt/core/concurrency/fiber_scheduler_thread.cpp19
-rw-r--r--yt/yt/core/tracing/trace_context-inl.h17
-rw-r--r--yt/yt/core/tracing/trace_context.cpp22
3 files changed, 28 insertions, 30 deletions
diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
index 35558b4eae..52cc77eaa5 100644
--- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
+++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp
@@ -502,7 +502,7 @@ Y_FORCE_INLINE TClosure PickCallback(TFiberSchedulerThread* fiberThread)
// that the propagating storage created there won't spill into the fiber callbacks.
TNullPropagatingStorageGuard guard;
- YT_VERIFY(guard.GetOldStorage().IsNull());
+ YT_VERIFY(guard.GetOldStorage().IsEmpty());
callback = fiberThread->OnExecute();
return callback;
@@ -510,10 +510,6 @@ Y_FORCE_INLINE TClosure PickCallback(TFiberSchedulerThread* fiberThread)
////////////////////////////////////////////////////////////////////////////////
-YT_DECLARE_THREAD_LOCAL(TFls*, PerThreadFls);
-
-////////////////////////////////////////////////////////////////////////////////
-
void FiberTrampoline()
{
RunAfterSwitch();
@@ -528,18 +524,7 @@ void FiberTrampoline()
YT_VERIFY(!TryGetResumerFiber());
YT_VERIFY(CurrentFls() == nullptr);
- if (auto perThreadFls = NDetail::PerThreadFls()) {
- const auto* propStorage = TryGetPropagatingStorage(*perThreadFls);
- if (propStorage != nullptr) {
- if (!propStorage->IsNull()) {
- Cerr << "Unexpected propagating storage" << Endl;
- PrintLocationToStderr();
- YT_ABORT();
- }
- }
- }
-
- YT_VERIFY(GetCurrentPropagatingStorage().IsNull());
+ YT_VERIFY(GetCurrentPropagatingStorage().IsEmpty());
auto callback = PickCallback(fiberThread);
diff --git a/yt/yt/core/tracing/trace_context-inl.h b/yt/yt/core/tracing/trace_context-inl.h
index de1d767a83..6994110d41 100644
--- a/yt/yt/core/tracing/trace_context-inl.h
+++ b/yt/yt/core/tracing/trace_context-inl.h
@@ -185,7 +185,7 @@ namespace NDetail {
YT_DECLARE_THREAD_LOCAL(TTraceContext*, CurrentTraceContext);
-TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext);
+TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext, TSourceLocation loc);
} // namespace NDetail
@@ -193,8 +193,7 @@ Y_FORCE_INLINE TCurrentTraceContextGuard::TCurrentTraceContextGuard(TTraceContex
: Active_(static_cast<bool>(traceContext))
{
if (Active_) {
- OldTraceContext_ = NDetail::SwapTraceContext(std::move(traceContext));
- NConcurrency::GetCurrentPropagatingStorage().RecordLocation(location);
+ OldTraceContext_ = NDetail::SwapTraceContext(std::move(traceContext), location);
}
}
@@ -218,8 +217,7 @@ Y_FORCE_INLINE bool TCurrentTraceContextGuard::IsActive() const
Y_FORCE_INLINE void TCurrentTraceContextGuard::Release()
{
if (Active_) {
- NDetail::SwapTraceContext(std::move(OldTraceContext_));
- NConcurrency::GetCurrentPropagatingStorage().RecordLocation(FROM_HERE);
+ NDetail::SwapTraceContext(std::move(OldTraceContext_), FROM_HERE);
Active_ = false;
}
}
@@ -233,10 +231,8 @@ Y_FORCE_INLINE const TTraceContextPtr& TCurrentTraceContextGuard::GetOldTraceCon
Y_FORCE_INLINE TNullTraceContextGuard::TNullTraceContextGuard(TSourceLocation location)
: Active_(true)
- , OldTraceContext_(NDetail::SwapTraceContext(nullptr))
-{
- NConcurrency::GetCurrentPropagatingStorage().RecordLocation(location);
-}
+ , OldTraceContext_(NDetail::SwapTraceContext(nullptr, location))
+{ }
Y_FORCE_INLINE TNullTraceContextGuard::TNullTraceContextGuard(TNullTraceContextGuard&& other)
: Active_(other.Active_)
@@ -258,8 +254,7 @@ Y_FORCE_INLINE bool TNullTraceContextGuard::IsActive() const
Y_FORCE_INLINE void TNullTraceContextGuard::Release()
{
if (Active_) {
- NDetail::SwapTraceContext(std::move(OldTraceContext_));
- NConcurrency::GetCurrentPropagatingStorage().RecordLocation(FROM_HERE);
+ NDetail::SwapTraceContext(std::move(OldTraceContext_), FROM_HERE);
Active_ = false;
}
}
diff --git a/yt/yt/core/tracing/trace_context.cpp b/yt/yt/core/tracing/trace_context.cpp
index 46c83d2ea6..0cbb7d828b 100644
--- a/yt/yt/core/tracing/trace_context.cpp
+++ b/yt/yt/core/tracing/trace_context.cpp
@@ -25,6 +25,14 @@
#include <atomic>
#include <mutex>
+////////////////////////////////////////////////////////////////////////////////
+
+namespace NYT::NConcurrency::NDetail {
+
+YT_DECLARE_THREAD_LOCAL(TFls*, PerThreadFls);
+
+} // NYT::NConcurrency::NDetail
+
namespace NYT::NTracing {
using namespace NConcurrency;
@@ -116,10 +124,20 @@ void SetCurrentTraceContext(TTraceContext* context)
std::atomic_signal_fence(std::memory_order::seq_cst);
}
-TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext)
+TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext, TSourceLocation loc)
{
+ if (NConcurrency::NDetail::PerThreadFls() && newContext) {
+ YT_LOG_DEBUG("Writing propagating storage in thread FLS (Location: %v)",
+ loc);
+ }
+
auto& propagatingStorage = GetCurrentPropagatingStorage();
- auto oldContext = propagatingStorage.Exchange<TTraceContextPtr>(newContext).value_or(nullptr);
+
+ auto oldContext = newContext
+ ? propagatingStorage.Exchange<TTraceContextPtr>(newContext).value_or(nullptr)
+ : propagatingStorage.Remove<TTraceContextPtr>().value_or(nullptr);
+
+ propagatingStorage.RecordLocation(loc);
auto now = GetApproximateCpuInstant();
auto& traceContextTimingCheckpoint = TraceContextTimingCheckpoint();