diff options
author | lukyan <lukyan@yandex-team.com> | 2024-04-26 02:21:44 +0300 |
---|---|---|
committer | lukyan <lukyan@yandex-team.com> | 2024-04-26 02:41:13 +0300 |
commit | 5bbe44ff4e12b6d5496d56ecca97b0c4db340509 (patch) | |
tree | 511f2114250a8a3da539995a2da71782c3f82883 /yt | |
parent | 7bde5f1f7732fb9e9103ac1f54fe1de99bdb6be5 (diff) | |
download | ydb-5bbe44ff4e12b6d5496d56ecca97b0c4db340509.tar.gz |
YT-21566: Access thread local variables via noinline functions
970c33b44a7bd166b2716d86d3d2053dcaf05d7d
Diffstat (limited to 'yt')
31 files changed, 256 insertions, 247 deletions
diff --git a/yt/yt/client/unittests/named_yson_token_ut.cpp b/yt/yt/client/unittests/named_yson_token_ut.cpp index 60f3793e3b..e9e55c5842 100644 --- a/yt/yt/client/unittests/named_yson_token_ut.cpp +++ b/yt/yt/client/unittests/named_yson_token_ut.cpp @@ -30,20 +30,20 @@ const auto IntStringVariant = VariantStructLogicalType({ {"string", SimpleLogicalType(ESimpleLogicalValueType::String)}, }); -YT_THREAD_LOCAL(TYsonConverterConfig) PositionalToNamedConfigInstance; +YT_DEFINE_THREAD_LOCAL(TYsonConverterConfig, PositionalToNamedConfigInstance); class TWithConfig { public: TWithConfig(const TYsonConverterConfig& config) - : OldConfig_(GetTlsRef(PositionalToNamedConfigInstance)) + : OldConfig_(PositionalToNamedConfigInstance()) { - GetTlsRef(PositionalToNamedConfigInstance) = config; + PositionalToNamedConfigInstance() = config; } ~TWithConfig() { - GetTlsRef(PositionalToNamedConfigInstance) = OldConfig_; + PositionalToNamedConfigInstance() = OldConfig_; } private: TYsonConverterConfig OldConfig_; @@ -74,7 +74,7 @@ TString ConvertYson( }; converter = CreateYsonClientToServerConverter(descriptor, config); } else { - converter = CreateYsonServerToClientConverter(descriptor, GetTlsRef(PositionalToNamedConfigInstance)); + converter = CreateYsonServerToClientConverter(descriptor, PositionalToNamedConfigInstance()); } } catch (const std::exception& ex) { ADD_FAILURE() << "cannot create converter: " << ex.what(); diff --git a/yt/yt/core/actions/current_invoker.cpp b/yt/yt/core/actions/current_invoker.cpp index 074965353d..904f746969 100644 --- a/yt/yt/core/actions/current_invoker.cpp +++ b/yt/yt/core/actions/current_invoker.cpp @@ -8,19 +8,19 @@ namespace NYT { //////////////////////////////////////////////////////////////////////////////// -YT_THREAD_LOCAL(IInvoker*) CurrentInvoker; +YT_DEFINE_THREAD_LOCAL(IInvoker*, CurrentInvoker); IInvoker* GetCurrentInvoker() { - if (CurrentInvoker) { - return CurrentInvoker; + if (CurrentInvoker()) { + return CurrentInvoker(); } return GetSyncInvoker().Get(); } void SetCurrentInvoker(IInvoker* invoker) { - CurrentInvoker = invoker; + CurrentInvoker() = invoker; } TCurrentInvokerGuard::TCurrentInvokerGuard(IInvoker* invoker) @@ -32,7 +32,7 @@ TCurrentInvokerGuard::TCurrentInvokerGuard(IInvoker* invoker) , Active_(true) , SavedInvoker_(std::move(invoker)) { - std::swap(GetTlsRef(CurrentInvoker), SavedInvoker_); + std::swap(CurrentInvoker(), SavedInvoker_); } void TCurrentInvokerGuard::Restore() @@ -41,7 +41,7 @@ void TCurrentInvokerGuard::Restore() return; } Active_ = false; - CurrentInvoker = std::move(SavedInvoker_); + CurrentInvoker() = std::move(SavedInvoker_); } TCurrentInvokerGuard::~TCurrentInvokerGuard() diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp index 057773cf36..5ff68e0dcc 100644 --- a/yt/yt/core/concurrency/action_queue.cpp +++ b/yt/yt/core/concurrency/action_queue.cpp @@ -398,6 +398,10 @@ IInvokerPtr CreateFixedPriorityInvoker( //////////////////////////////////////////////////////////////////////////////// +class TBoundedConcurrencyInvoker; + +YT_DEFINE_THREAD_LOCAL(TBoundedConcurrencyInvoker*, CurrentBoundedConcurrencyInvoker); + class TBoundedConcurrencyInvoker : public TInvokerWrapper { @@ -429,8 +433,6 @@ private: TRingQueue<TClosure> Queue_; int Semaphore_ = 0; - static YT_THREAD_LOCAL(TBoundedConcurrencyInvoker*) CurrentSchedulingInvoker_; - private: class TInvocationGuard { @@ -463,7 +465,7 @@ private: { // If UnderlyingInvoker_ is already terminated, Invoke may drop the guard right away. // Protect by setting CurrentSchedulingInvoker_ and checking it on entering ScheduleMore. - CurrentSchedulingInvoker_ = this; + CurrentBoundedConcurrencyInvoker() = this; UnderlyingInvoker_->Invoke(BIND_NO_PROPAGATE( &TBoundedConcurrencyInvoker::DoRunCallback, @@ -472,7 +474,7 @@ private: Passed(TInvocationGuard(this)))); // Don't leave a dangling pointer behind. - CurrentSchedulingInvoker_ = nullptr; + CurrentBoundedConcurrencyInvoker() = nullptr; } void DoRunCallback(const TClosure& callback, TInvocationGuard /*invocationGuard*/) @@ -485,7 +487,7 @@ private: { auto guard = Guard(SpinLock_); // See RunCallback. - if (Queue_.empty() || CurrentSchedulingInvoker_ == this) { + if (Queue_.empty() || CurrentBoundedConcurrencyInvoker() == this) { IncrementSemaphore(-1); } else { auto callback = std::move(Queue_.front()); @@ -496,8 +498,6 @@ private: } }; -YT_THREAD_LOCAL(TBoundedConcurrencyInvoker*) TBoundedConcurrencyInvoker::CurrentSchedulingInvoker_; - IInvokerPtr CreateBoundedConcurrencyInvoker( IInvokerPtr underlyingInvoker, int maxConcurrentInvocations) diff --git a/yt/yt/core/concurrency/execution_stack.cpp b/yt/yt/core/concurrency/execution_stack.cpp index 3fcf1a11d4..3d2c826296 100644 --- a/yt/yt/core/concurrency/execution_stack.cpp +++ b/yt/yt/core/concurrency/execution_stack.cpp @@ -130,16 +130,16 @@ TExecutionStack::~TExecutionStack() ::DeleteFiber(Handle_); } -static YT_THREAD_LOCAL(void*) FiberTrampolineOpaque; +YT_DEFINE_THREAD_LOCAL(void*, FiberTrampolineOpaque); void TExecutionStack::SetOpaque(void* opaque) { - FiberTrampolineOpaque = opaque; + FiberTrampolineOpaque() = opaque; } void* TExecutionStack::GetOpaque() { - return FiberTrampolineOpaque; + return FiberTrampolineOpaque(); } void TExecutionStack::SetTrampoline(void (*trampoline)(void*)) @@ -151,7 +151,7 @@ void TExecutionStack::SetTrampoline(void (*trampoline)(void*)) VOID CALLBACK TExecutionStack::FiberTrampoline(PVOID opaque) { auto* stack = reinterpret_cast<TExecutionStack*>(opaque); - stack->Trampoline_(FiberTrampolineOpaque); + stack->Trampoline_(FiberTrampolineOpaque()); } #else diff --git a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp index 1f2f26fc78..46d4845f40 100644 --- a/yt/yt/core/concurrency/fair_share_invoker_pool.cpp +++ b/yt/yt/core/concurrency/fair_share_invoker_pool.cpp @@ -31,7 +31,7 @@ using namespace NYTProf; //////////////////////////////////////////////////////////////////////////////// -constinit YT_THREAD_LOCAL(TCpuProfilerTagGuard) FairShareInvokerPoolProfilerTagGuard; +YT_DEFINE_THREAD_LOCAL(TCpuProfilerTagGuard, FairShareInvokerPoolProfilerTagGuard); //////////////////////////////////////////////////////////////////////////////// @@ -290,12 +290,12 @@ private: counters->WaitTimer.Record(waitTime); } - GetTlsRef(FairShareInvokerPoolProfilerTagGuard) = TCpuProfilerTagGuard(BucketProfilerTags_[index]); + FairShareInvokerPoolProfilerTagGuard() = TCpuProfilerTagGuard(BucketProfilerTags_[index]); } void ProfileExecutionFinish(int index, TDuration execTime, TDuration totalTime) { - GetTlsRef(FairShareInvokerPoolProfilerTagGuard) = TCpuProfilerTagGuard{}; + FairShareInvokerPoolProfilerTagGuard() = TCpuProfilerTagGuard{}; auto& counters = Counters_[index]; if (counters) { diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp index 25e380d4a9..9b4345b0bc 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp @@ -130,18 +130,18 @@ struct TFiberContext TFiber* CurrentFiber = nullptr; }; -YT_THREAD_LOCAL(TFiberContext*) FiberContext; +YT_DEFINE_THREAD_LOCAL(TFiberContext*, FiberContext); // Forbid inlining these accessors to prevent the compiler from // miss-optimizing TLS access in presence of fiber context switches. -Y_NO_INLINE TFiberContext* TryGetFiberContext() +TFiberContext* TryGetFiberContext() { - return FiberContext; + return FiberContext(); } -Y_NO_INLINE void SetFiberContext(TFiberContext* context) +void SetFiberContext(TFiberContext* context) { - FiberContext = context; + FiberContext() = context; } //////////////////////////////////////////////////////////////////////////////// @@ -188,7 +188,7 @@ Y_FORCE_INLINE ELogLevel SwapMinLogLevel(ELogLevel minLogLevel) Y_FORCE_INLINE TExceptionSafeContext* GetMachineContext() { - return &TryGetFiberContext()->MachineContext; + return &FiberContext()->MachineContext; } Y_FORCE_INLINE void SetAfterSwitch(TAfterSwitch afterSwitch) @@ -200,53 +200,53 @@ Y_FORCE_INLINE void SetAfterSwitch(TAfterSwitch afterSwitch) Y_FORCE_INLINE TAfterSwitch ExtractAfterSwitch() { - auto* context = TryGetFiberContext(); + auto* context = FiberContext(); return context->AfterSwitch.Release(); } Y_FORCE_INLINE void SetResumerFiber(TFiber* fiber) { - auto* context = TryGetFiberContext(); + auto* context = FiberContext(); YT_VERIFY(!context->ResumerFiber); context->ResumerFiber = fiber; } Y_FORCE_INLINE TFiber* ExtractResumerFiber() { - return std::exchange(TryGetFiberContext()->ResumerFiber, nullptr); + return std::exchange(FiberContext()->ResumerFiber, nullptr); } Y_FORCE_INLINE TFiber* TryGetResumerFiber() { - return TryGetFiberContext()->ResumerFiber; + return FiberContext()->ResumerFiber; } Y_FORCE_INLINE TFiber* SwapCurrentFiber(TFiber* fiber) { - return std::exchange(TryGetFiberContext()->CurrentFiber, fiber); + return std::exchange(FiberContext()->CurrentFiber, fiber); } Y_FORCE_INLINE TFiber* TryGetCurrentFiber() { - auto* context = TryGetFiberContext(); + auto* context = FiberContext(); return context ? context->CurrentFiber : nullptr; } Y_FORCE_INLINE TFiber* GetCurrentFiber() { - auto* fiber = TryGetFiberContext()->CurrentFiber; + auto* fiber = FiberContext()->CurrentFiber; YT_VERIFY(fiber); return fiber; } Y_FORCE_INLINE TFiberSchedulerThread* TryGetFiberThread() { - return TryGetFiberContext()->FiberThread; + return FiberContext()->FiberThread; } Y_FORCE_INLINE TRefCountedGaugePtr GetWaitingFibersCounter() { - return TryGetFiberContext()->WaitingFibersCounter; + return FiberContext()->WaitingFibersCounter; } //////////////////////////////////////////////////////////////////////////////// @@ -525,9 +525,6 @@ void ResumeFiber(TFiber* targetFiber) YT_VERIFY(!TryGetResumerFiber()); } -class TFiberSwitchHandler; -TFiberSwitchHandler* TryGetFiberSwitchHandler(); - //////////////////////////////////////////////////////////////////////////////// DECLARE_REFCOUNTED_CLASS(TCanceler) @@ -706,6 +703,10 @@ private: ELogLevel MinLogLevel_ = ELogLevel::Minimum; }; +class TFiberSwitchHandler; + +YT_DEFINE_THREAD_LOCAL(TFiberSwitchHandler*, CurrentFiberSwitchHandler); + class TFiberSwitchHandler : public TBaseSwitchHandler { @@ -714,7 +715,7 @@ public: explicit TFiberSwitchHandler(TFiber* fiber) : Fiber_(fiber) { - SavedThis_ = std::exchange(This_, this); + SavedThis_ = std::exchange(CurrentFiberSwitchHandler(), this); YT_VERIFY(SwapCurrentFiberId(fiber->GetFiberId()) == InvalidFiberId); YT_VERIFY(!SwapCurrentFls(fiber->GetFls())); @@ -723,7 +724,7 @@ public: // On finish fiber running. ~TFiberSwitchHandler() { - YT_VERIFY(This_ == this); + YT_VERIFY(CurrentFiberSwitchHandler() == this); YT_VERIFY(UserHandlers_.empty()); YT_VERIFY(SwapCurrentFiberId(InvalidFiberId) == Fiber_->GetFiberId()); @@ -750,7 +751,7 @@ public: TGuard(TGuard&&) = delete; TGuard() - : SwitchHandler_(This_) + : SwitchHandler_(CurrentFiberSwitchHandler()) { YT_VERIFY(SwitchHandler_); SwitchHandler_->OnOut(); @@ -767,12 +768,10 @@ public: private: friend TContextSwitchGuard; - friend TFiberSwitchHandler* TryGetFiberSwitchHandler(); const TFiber* const Fiber_; TFiberSwitchHandler* SavedThis_; - static YT_THREAD_LOCAL(TFiberSwitchHandler*) This_; struct TContextSwitchHandlers { @@ -792,7 +791,7 @@ private: TBaseSwitchHandler::OnSwitch(); - std::swap(SavedThis_, GetTlsRef(This_)); + std::swap(SavedThis_, CurrentFiberSwitchHandler()); } // On finish fiber running. @@ -824,16 +823,9 @@ private: } }; -YT_THREAD_LOCAL(TFiberSwitchHandler*) TFiberSwitchHandler::This_; - -TFiberSwitchHandler* TryGetFiberSwitchHandler() -{ - return TFiberSwitchHandler::This_; -} - TFiberSwitchHandler* GetFiberSwitchHandler() { - auto* switchHandler = TryGetFiberSwitchHandler(); + auto* switchHandler = CurrentFiberSwitchHandler(); YT_VERIFY(switchHandler); return switchHandler; } @@ -938,34 +930,34 @@ void TFiberSchedulerThread::ThreadMain() //////////////////////////////////////////////////////////////////////////////// -YT_THREAD_LOCAL(TFiberId) CurrentFiberId; +YT_DEFINE_THREAD_LOCAL(TFiberId, CurrentFiberId); TFiberId GetCurrentFiberId() { - return CurrentFiberId; + return CurrentFiberId(); } void SetCurrentFiberId(TFiberId id) { - CurrentFiberId = id; + CurrentFiberId() = id; } //////////////////////////////////////////////////////////////////////////////// -YT_THREAD_LOCAL(bool) ContextSwitchForbidden; +YT_DEFINE_THREAD_LOCAL(bool, ContextSwitchForbidden); bool IsContextSwitchForbidden() { - return ContextSwitchForbidden; + return ContextSwitchForbidden(); } TForbidContextSwitchGuard::TForbidContextSwitchGuard() - : OldValue_(std::exchange(ContextSwitchForbidden, true)) + : OldValue_(std::exchange(ContextSwitchForbidden(), true)) { } TForbidContextSwitchGuard::~TForbidContextSwitchGuard() { - ContextSwitchForbidden = OldValue_; + ContextSwitchForbidden() = OldValue_; } //////////////////////////////////////////////////////////////////////////////// @@ -978,7 +970,7 @@ bool CheckFreeStackSpace(size_t space) TFiberCanceler GetCurrentFiberCanceler() { - auto* switchHandler = NDetail::TryGetFiberSwitchHandler(); + auto* switchHandler = NDetail::CurrentFiberSwitchHandler(); if (!switchHandler) { // Not in fiber context. return {}; @@ -1077,14 +1069,14 @@ TContextSwitchGuard::TContextSwitchGuard( TContextSwitchHandler outHandler, TContextSwitchHandler inHandler) { - if (auto* context = NDetail::TryGetFiberSwitchHandler()) { + if (auto* context = NDetail::CurrentFiberSwitchHandler()) { context->UserHandlers_.push_back({std::move(outHandler), std::move(inHandler)}); } } TContextSwitchGuard::~TContextSwitchGuard() { - if (auto* context = NDetail::TryGetFiberSwitchHandler()) { + if (auto* context = NDetail::CurrentFiberSwitchHandler()) { YT_VERIFY(!context->UserHandlers_.empty()); context->UserHandlers_.pop_back(); } diff --git a/yt/yt/core/concurrency/fls-inl.h b/yt/yt/core/concurrency/fls-inl.h index 47c8b2d90d..4f8eba6b6f 100644 --- a/yt/yt/core/concurrency/fls-inl.h +++ b/yt/yt/core/concurrency/fls-inl.h @@ -21,7 +21,7 @@ using TFlsSlotDtor = void(*)(TFls::TCookie cookie); int AllocateFlsSlot(TFlsSlotDtor dtor); TFls* GetPerThreadFls(); -extern YT_THREAD_LOCAL(TFls*) CurrentFls; +YT_DECLARE_THREAD_LOCAL(TFls*, CurrentFls); } // namespace NDetail @@ -44,7 +44,7 @@ Y_FORCE_INLINE TFls::TCookie TFls::Get(int index) const inline TFls* GetCurrentFls() { - auto* fls = NDetail::CurrentFls; + auto* fls = NDetail::CurrentFls(); if (Y_UNLIKELY(!fls)) { fls = NDetail::GetPerThreadFls(); } diff --git a/yt/yt/core/concurrency/fls.cpp b/yt/yt/core/concurrency/fls.cpp index 9cedcadcfa..6856ac9754 100644 --- a/yt/yt/core/concurrency/fls.cpp +++ b/yt/yt/core/concurrency/fls.cpp @@ -20,8 +20,8 @@ std::atomic<int> FlsSize; NThreading::TForkAwareSpinLock FlsLock; std::array<TFlsSlotDtor, MaxFlsSize> FlsDtors; -YT_THREAD_LOCAL(TFls*) PerThreadFls; -YT_THREAD_LOCAL(TFls*) CurrentFls; +YT_DEFINE_THREAD_LOCAL(TFls*, PerThreadFls); +YT_DEFINE_THREAD_LOCAL(TFls*, CurrentFls); int AllocateFlsSlot(TFlsSlotDtor dtor) { @@ -42,13 +42,14 @@ void DestructFlsSlot(int index, TFls::TCookie cookie) TFls* GetPerThreadFls() { - if (!PerThreadFls) { + auto& perThreadFls = PerThreadFls(); + if (!perThreadFls) { // This is only needed when some code attempts to interact with FLS outside of a fiber context. // Unfortunately there's no safe place to destroy this FLS upon thread shutdown. - PerThreadFls = new TFls(); - NSan::MarkAsIntentionallyLeaked(PerThreadFls); + perThreadFls = new TFls(); + NSan::MarkAsIntentionallyLeaked(perThreadFls); } - return PerThreadFls; + return perThreadFls; } } // namespace NDetail @@ -79,7 +80,7 @@ void TFls::Set(int index, TCookie cookie) TFls* SwapCurrentFls(TFls* newFls) { - return std::exchange(NDetail::CurrentFls, newFls); + return std::exchange(NDetail::CurrentFls(), newFls); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/concurrency/invoker_queue.cpp b/yt/yt/core/concurrency/invoker_queue.cpp index fec32d6c86..22fa826733 100644 --- a/yt/yt/core/concurrency/invoker_queue.cpp +++ b/yt/yt/core/concurrency/invoker_queue.cpp @@ -20,7 +20,7 @@ static const auto& Logger = ConcurrencyLogger; //////////////////////////////////////////////////////////////////////////////// -constinit YT_THREAD_LOCAL(TCpuProfilerTagGuard) CpuProfilerTagGuard; +YT_DEFINE_THREAD_LOCAL(TCpuProfilerTagGuard, CpuProfilerTagGuard); //////////////////////////////////////////////////////////////////////////////// @@ -490,9 +490,9 @@ bool TInvokerQueue<TQueueImpl>::BeginExecute(TEnqueuedAction* action, typename T } if (const auto& profilerTag = action->ProfilerTag) { - GetTlsRef(CpuProfilerTagGuard) = TCpuProfilerTagGuard(profilerTag); + CpuProfilerTagGuard() = TCpuProfilerTagGuard(profilerTag); } else { - GetTlsRef(CpuProfilerTagGuard) = {}; + CpuProfilerTagGuard() = {}; } SetCurrentInvoker(GetProfilingTagSettingInvoker(action->ProfilingTag)); @@ -503,7 +503,7 @@ bool TInvokerQueue<TQueueImpl>::BeginExecute(TEnqueuedAction* action, typename T template <class TQueueImpl> void TInvokerQueue<TQueueImpl>::EndExecute(TEnqueuedAction* action) { - GetTlsRef(CpuProfilerTagGuard) = TCpuProfilerTagGuard{}; + CpuProfilerTagGuard() = TCpuProfilerTagGuard{}; SetCurrentInvoker(nullptr); YT_ASSERT(action); diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp index fcfc64eec7..7482f49b1d 100644 --- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp @@ -43,7 +43,7 @@ DECLARE_REFCOUNTED_CLASS(TBucket) struct TExecutionPool; // High 16 bits is thread index and 48 bits for thread pool ptr. -YT_THREAD_LOCAL(TPackedPtr) ThreadCookie = 0; +YT_DEFINE_THREAD_LOCAL(TPackedPtr, ThreadCookie, 0); static constexpr auto LogDurationThreshold = TDuration::Seconds(1); @@ -636,7 +636,7 @@ public: // Callback keeps raw ptr to bucket to minimize bucket ref count. action.Callback = BIND(&TBucket::RunCallback, Unretained(bucket), std::move(callback), cpuInstant); action.BucketHolder = MakeStrong(bucket); - action.EnqueuedThreadCookie = ThreadCookie; + action.EnqueuedThreadCookie = ThreadCookie(); InvokeQueue_.Enqueue(std::move(action)); @@ -1201,7 +1201,7 @@ protected: void OnStart() override { - ThreadCookie = TTaggedPtr(Queue_.Get(), static_cast<ui16>(Index_)).Pack(); + ThreadCookie() = TTaggedPtr(Queue_.Get(), static_cast<ui16>(Index_)).Pack(); } void StopPrologue() override diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp index 00bfe4cb1b..e23759b816 100644 --- a/yt/yt/core/logging/log_manager.cpp +++ b/yt/yt/core/logging/log_manager.cpp @@ -355,7 +355,7 @@ TCpuInstant GetEventInstant(const TLoggerQueueItem& item) using TThreadLocalQueue = TSpscQueue<TLoggerQueueItem>; static constexpr uintptr_t ThreadQueueDestroyedSentinel = -1; -YT_THREAD_LOCAL(TThreadLocalQueue*) PerThreadQueue; +YT_DEFINE_THREAD_LOCAL(TThreadLocalQueue*, PerThreadQueue); ///////////////////////////////////////////////////////////////////////////// @@ -364,7 +364,7 @@ struct TLocalQueueReclaimer ~TLocalQueueReclaimer(); }; -YT_THREAD_LOCAL(TLocalQueueReclaimer) LocalQueueReclaimer; +YT_DEFINE_THREAD_LOCAL(TLocalQueueReclaimer, LocalQueueReclaimer); ///////////////////////////////////////////////////////////////////////////// @@ -1041,17 +1041,17 @@ private: void PushEvent(TLoggerQueueItem&& event) { - if (!PerThreadQueue) { - PerThreadQueue = new TThreadLocalQueue(); - RegisteredLocalQueues_.Enqueue(GetTlsRef(PerThreadQueue)); - Y_UNUSED(LocalQueueReclaimer); // Touch thread-local variable so that its destructor is called. + auto& perThreadQueue = PerThreadQueue(); + if (!perThreadQueue) { + perThreadQueue = new TThreadLocalQueue(); + RegisteredLocalQueues_.Enqueue(perThreadQueue); } ++EnqueuedEvents_; - if (PerThreadQueue == reinterpret_cast<TThreadLocalQueue*>(ThreadQueueDestroyedSentinel)) { + if (perThreadQueue == reinterpret_cast<TThreadLocalQueue*>(ThreadQueueDestroyedSentinel)) { GlobalQueue_.Enqueue(std::move(event)); } else { - PerThreadQueue->Push(std::move(event)); + perThreadQueue->Push(std::move(event)); } } @@ -1476,10 +1476,10 @@ private: TLocalQueueReclaimer::~TLocalQueueReclaimer() { - if (PerThreadQueue) { + if (auto& perThreadQueue = PerThreadQueue()) { auto logManager = TLogManager::Get()->Impl_; - logManager->UnregisteredLocalQueues_.Enqueue(GetTlsRef(PerThreadQueue)); - PerThreadQueue = reinterpret_cast<TThreadLocalQueue*>(ThreadQueueDestroyedSentinel); + logManager->UnregisteredLocalQueues_.Enqueue(perThreadQueue); + perThreadQueue = reinterpret_cast<TThreadLocalQueue*>(ThreadQueueDestroyedSentinel); } } diff --git a/yt/yt/core/misc/error.cpp b/yt/yt/core/misc/error.cpp index 6aabbb1ecc..7bb2100763 100644 --- a/yt/yt/core/misc/error.cpp +++ b/yt/yt/core/misc/error.cpp @@ -71,27 +71,27 @@ TString ToString(TErrorCode code) //////////////////////////////////////////////////////////////////////////////// -YT_THREAD_LOCAL(bool) ErrorSanitizerEnabled = false; -YT_THREAD_LOCAL(TInstant) ErrorSanitizerDatetimeOverride = {}; -YT_THREAD_LOCAL(TSharedRef) ErrorSanitizerLocalHostNameOverride = {}; +YT_DEFINE_THREAD_LOCAL(bool, ErrorSanitizerEnabled, false); +YT_DEFINE_THREAD_LOCAL(TInstant, ErrorSanitizerDatetimeOverride); +YT_DEFINE_THREAD_LOCAL(TSharedRef, ErrorSanitizerLocalHostNameOverride); TErrorSanitizerGuard::TErrorSanitizerGuard(TInstant datetimeOverride, TSharedRef localHostNameOverride) - : SavedEnabled_(ErrorSanitizerEnabled) - , SavedDatetimeOverride_(GetTlsRef(ErrorSanitizerDatetimeOverride)) - , SavedLocalHostNameOverride_(GetTlsRef(ErrorSanitizerLocalHostNameOverride)) + : SavedEnabled_(ErrorSanitizerEnabled()) + , SavedDatetimeOverride_(ErrorSanitizerDatetimeOverride()) + , SavedLocalHostNameOverride_(ErrorSanitizerLocalHostNameOverride()) { - ErrorSanitizerEnabled = true; - GetTlsRef(ErrorSanitizerDatetimeOverride) = datetimeOverride; - GetTlsRef(ErrorSanitizerLocalHostNameOverride) = std::move(localHostNameOverride); + ErrorSanitizerEnabled() = true; + ErrorSanitizerDatetimeOverride() = datetimeOverride; + ErrorSanitizerLocalHostNameOverride() = std::move(localHostNameOverride); } TErrorSanitizerGuard::~TErrorSanitizerGuard() { - YT_ASSERT(ErrorSanitizerEnabled); + YT_ASSERT(ErrorSanitizerEnabled()); - ErrorSanitizerEnabled = SavedEnabled_; - GetTlsRef(ErrorSanitizerDatetimeOverride) = SavedDatetimeOverride_; - GetTlsRef(ErrorSanitizerLocalHostNameOverride) = std::move(SavedLocalHostNameOverride_); + ErrorSanitizerEnabled() = SavedEnabled_; + ErrorSanitizerDatetimeOverride() = SavedDatetimeOverride_; + ErrorSanitizerLocalHostNameOverride() = std::move(SavedLocalHostNameOverride_); } //////////////////////////////////////////////////////////////////////////////// @@ -301,9 +301,9 @@ private: void CaptureOriginAttributes() { - if (ErrorSanitizerEnabled) { - Datetime_ = GetTlsRef(ErrorSanitizerDatetimeOverride); - HostHolder_ = GetTlsRef(ErrorSanitizerLocalHostNameOverride); + if (ErrorSanitizerEnabled()) { + Datetime_ = ErrorSanitizerDatetimeOverride(); + HostHolder_ = ErrorSanitizerLocalHostNameOverride(); Host_ = HostHolder_.empty() ? TStringBuf() : TStringBuf(HostHolder_.Begin(), HostHolder_.End()); return; } @@ -1034,7 +1034,7 @@ void AppendError(TStringBuilderBase* builder, const TError& error, int indent) (!error.GetThreadName().empty() ? error.GetThreadName() : ToString(error.GetTid())), error.GetFid()), indent); - } else if (ErrorSanitizerEnabled && error.HasHost()) { + } else if (ErrorSanitizerEnabled() && error.HasHost()) { AppendAttribute( builder, "host", @@ -1151,7 +1151,7 @@ void ToProto(NYT::NProto::TError* protoError, const TError& error) static const TString FidKey("fid"); addAttribute(FidKey, error.GetFid()); - } else if (ErrorSanitizerEnabled && error.HasHost()) { + } else if (ErrorSanitizerEnabled() && error.HasHost()) { static const TString HostKey("host"); addAttribute(HostKey, error.GetHost()); } @@ -1243,6 +1243,7 @@ void Serialize( const std::function<void(IYsonConsumer*)>* valueProducer, int depth) { + auto& errorSanitizerEnabled = ErrorSanitizerEnabled(); BuildYsonFluently(consumer) .BeginMap() .Item("code").Value(error.GetCode()) @@ -1255,7 +1256,7 @@ void Serialize( .Item("tid").Value(error.GetTid()) .Item("thread").Value(error.GetThreadName()) .Item("fid").Value(error.GetFid()); - } else if (ErrorSanitizerEnabled && error.HasHost()) { + } else if (errorSanitizerEnabled && error.HasHost()) { fluent .Item("host").Value(error.GetHost()); } diff --git a/yt/yt/core/misc/hazard_ptr-inl.h b/yt/yt/core/misc/hazard_ptr-inl.h index ab9ab05a58..a202d07cd3 100644 --- a/yt/yt/core/misc/hazard_ptr-inl.h +++ b/yt/yt/core/misc/hazard_ptr-inl.h @@ -20,10 +20,10 @@ namespace NDetail { constexpr int MaxHazardPointersPerThread = 2; using THazardPointerSet = std::array<std::atomic<void*>, MaxHazardPointersPerThread>; -extern YT_THREAD_LOCAL(THazardPointerSet) HazardPointers; +YT_DECLARE_THREAD_LOCAL(THazardPointerSet, HazardPointers); struct THazardThreadState; -extern YT_THREAD_LOCAL(THazardThreadState*) HazardThreadState; +YT_DECLARE_THREAD_LOCAL(THazardThreadState*, HazardThreadState); void InitHazardThreadState(); @@ -89,7 +89,7 @@ THazardPtr<T> THazardPtr<T>::Acquire(TPtrLoader&& ptrLoader, T* ptr) return {}; } - auto& hazardPointers = GetTlsRef(NYT::NDetail::HazardPointers); + auto& hazardPointers = NYT::NDetail::HazardPointers(); auto* hazardPtr = [&] { for (auto it = hazardPointers.begin(); it != hazardPointers.end(); ++it) { @@ -103,7 +103,7 @@ THazardPtr<T> THazardPtr<T>::Acquire(TPtrLoader&& ptrLoader, T* ptr) YT_ABORT(); }(); - if (Y_UNLIKELY(!NYT::NDetail::HazardThreadState)) { + if (Y_UNLIKELY(!NYT::NDetail::HazardThreadState())) { NYT::NDetail::InitHazardThreadState(); } diff --git a/yt/yt/core/misc/hazard_ptr.cpp b/yt/yt/core/misc/hazard_ptr.cpp index 97f6da87e2..07b3f5c80a 100644 --- a/yt/yt/core/misc/hazard_ptr.cpp +++ b/yt/yt/core/misc/hazard_ptr.cpp @@ -32,7 +32,7 @@ namespace NDetail { //////////////////////////////////////////////////////////////////////////// -YT_THREAD_LOCAL(THazardPointerSet) HazardPointers; +YT_DEFINE_THREAD_LOCAL(THazardPointerSet, HazardPointers); //! A simple container based on free list which supports only Enqueue and DequeueAll. template <class T> @@ -112,8 +112,8 @@ struct THazardThreadState { } }; -YT_THREAD_LOCAL(THazardThreadState*) HazardThreadState; -YT_THREAD_LOCAL(bool) HazardThreadStateDestroyed; +YT_DEFINE_THREAD_LOCAL(THazardThreadState*, HazardThreadState); +YT_DEFINE_THREAD_LOCAL(bool, HazardThreadStateDestroyed); //////////////////////////////////////////////////////////////////////////////// @@ -204,15 +204,15 @@ void THazardPointerManager::Shutdown() void THazardPointerManager::RetireHazardPointer(TPackedPtr packedPtr, THazardPtrReclaimer reclaimer) { - auto* threadState = HazardThreadState; + auto* threadState = HazardThreadState(); if (Y_UNLIKELY(!threadState)) { - if (HazardThreadStateDestroyed) { + if (HazardThreadStateDestroyed()) { // Looks like a global shutdown. reclaimer(packedPtr); return; } InitThreadState(); - threadState = HazardThreadState; + threadState = HazardThreadState(); } threadState->RetireList.push({packedPtr, reclaimer}); @@ -229,7 +229,7 @@ void THazardPointerManager::RetireHazardPointer(TPackedPtr packedPtr, THazardPtr bool THazardPointerManager::TryReclaimHazardPointers() { - auto* threadState = HazardThreadState; + auto* threadState = HazardThreadState(); if (!threadState || threadState->RetireList.empty()) { return false; } @@ -254,15 +254,15 @@ void THazardPointerManager::ReclaimHazardPointers(bool flush) void THazardPointerManager::InitThreadState() { - if (!HazardThreadState) { - YT_VERIFY(!HazardThreadStateDestroyed); - HazardThreadState = AllocateThreadState(); + if (!HazardThreadState()) { + YT_VERIFY(!HazardThreadStateDestroyed()); + HazardThreadState() = AllocateThreadState(); } } -THazardThreadState* THazardPointerManager::AllocateThreadState() +YT_PREVENT_TLS_CACHING THazardThreadState* THazardPointerManager::AllocateThreadState() { - auto* threadState = new THazardThreadState(&GetTlsRef(HazardPointers)); + auto* threadState = new THazardThreadState(&HazardPointers()); struct THazardThreadStateDestroyer { @@ -275,7 +275,7 @@ THazardThreadState* THazardPointerManager::AllocateThreadState() }; // Unregisters thread from hazard ptr manager on thread exit. - YT_THREAD_LOCAL(THazardThreadStateDestroyer) destroyer{threadState}; + thread_local THazardThreadStateDestroyer destroyer{threadState}; { auto guard = WriterGuard(ThreadRegistryLock_); @@ -385,8 +385,8 @@ void THazardPointerManager::DestroyThreadState(THazardThreadState* threadState) delete threadState; - HazardThreadState = nullptr; - HazardThreadStateDestroyed = true; + HazardThreadState() = nullptr; + HazardThreadStateDestroyed() = true; } void THazardPointerManager::BeforeFork() @@ -404,8 +404,8 @@ void THazardPointerManager::AfterForkChild() ThreadRegistry_.Clear(); ThreadCount_ = 0; - if (HazardThreadState) { - ThreadRegistry_.PushBack(HazardThreadState); + if (HazardThreadState()) { + ThreadRegistry_.PushBack(HazardThreadState()); ThreadCount_ = 1; } diff --git a/yt/yt/core/misc/pool_allocator-inl.h b/yt/yt/core/misc/pool_allocator-inl.h index 4d33816df4..95a9e2cf40 100644 --- a/yt/yt/core/misc/pool_allocator-inl.h +++ b/yt/yt/core/misc/pool_allocator-inl.h @@ -67,18 +67,18 @@ inline void TPoolAllocator::Free(void* ptr) noexcept } template <std::derived_from<TPoolAllocator::TObjectBase> T, class... TArgs> -std::unique_ptr<T> TPoolAllocator::New(TArgs&&... args) +YT_PREVENT_TLS_CACHING std::unique_ptr<T> TPoolAllocator::New(TArgs&&... args) { struct TChunkTag { }; constexpr auto ChunkSize = 64_KB; - YT_THREAD_LOCAL(TPoolAllocator) Allocator( + thread_local TPoolAllocator Allocator( sizeof(T), alignof(T), ChunkSize, GetRefCountedTypeCookie<TChunkTag>()); - return std::unique_ptr<T>(new(&GetTlsRef(Allocator)) T(std::forward<TArgs>(args)...)); + return std::unique_ptr<T>(new(&Allocator) T(std::forward<TArgs>(args)...)); } inline void TPoolAllocator::DoFree(void* ptr) diff --git a/yt/yt/core/misc/ref_counted_tracker-inl.h b/yt/yt/core/misc/ref_counted_tracker-inl.h index e7c4b6c52b..8e132d59a5 100644 --- a/yt/yt/core/misc/ref_counted_tracker-inl.h +++ b/yt/yt/core/misc/ref_counted_tracker-inl.h @@ -108,6 +108,9 @@ private: //////////////////////////////////////////////////////////////////////////////// +YT_DECLARE_THREAD_LOCAL(TRefCountedTracker::TLocalSlot*, RefCountedTrackerLocalSlotsBegin); +YT_DECLARE_THREAD_LOCAL(int, RefCountedTrackerLocalSlotsSize); + Y_FORCE_INLINE TRefCountedTracker* TRefCountedTracker::Get() { return LeakySingleton<TRefCountedTracker>(); @@ -116,10 +119,10 @@ Y_FORCE_INLINE TRefCountedTracker* TRefCountedTracker::Get() #define INCREMENT_COUNTER(fallback, name, delta) \ auto index = cookie.Underlying(); \ YT_ASSERT(index >= 0); \ - if (Y_UNLIKELY(index >= LocalSlotsSize_)) { \ + if (Y_UNLIKELY(index >= RefCountedTrackerLocalSlotsSize())) { \ Get()->fallback; \ } else { \ - LocalSlotsBegin_[index].name += delta; \ + RefCountedTrackerLocalSlotsBegin()[index].name += delta; \ } Y_FORCE_INLINE void TRefCountedTracker::AllocateInstance(TRefCountedTypeCookie cookie) diff --git a/yt/yt/core/misc/ref_counted_tracker.cpp b/yt/yt/core/misc/ref_counted_tracker.cpp index c15ceb840d..af06706eb3 100644 --- a/yt/yt/core/misc/ref_counted_tracker.cpp +++ b/yt/yt/core/misc/ref_counted_tracker.cpp @@ -144,14 +144,14 @@ size_t TRefCountedTracker::TNamedSlot::ClampNonnegative(size_t allocated, size_t //////////////////////////////////////////////////////////////////////////////// // nullptr if not initialized or already destroyed -YT_THREAD_LOCAL(TRefCountedTracker::TLocalSlots*) TRefCountedTracker::LocalSlots_; +YT_DEFINE_THREAD_LOCAL(TRefCountedTracker::TLocalSlots*, RefCountedTrackerLocalSlots); // nullptr if not initialized or already destroyed -YT_THREAD_LOCAL(TRefCountedTracker::TLocalSlot*) TRefCountedTracker::LocalSlotsBegin_; +YT_DEFINE_THREAD_LOCAL(TRefCountedTracker::TLocalSlot*, RefCountedTrackerLocalSlotsBegin); // 0 if not initialized // -1 if already destroyed -YT_THREAD_LOCAL(int) TRefCountedTracker::LocalSlotsSize_; +YT_DEFINE_THREAD_LOCAL(int, RefCountedTrackerLocalSlotsSize); int TRefCountedTracker::GetTrackedThreadCount() const { @@ -361,7 +361,7 @@ TRefCountedTracker::TNamedSlot TRefCountedTracker::GetSlot(TRefCountedTypeKey ty } #define INCREMENT_COUNTER_SLOW(name, delta) \ - if (LocalSlotsSize_ < 0) { \ + if (RefCountedTrackerLocalSlotsSize() < 0) { \ auto guard = Guard(SpinLock_); \ GetGlobalSlot(cookie)->name += delta; \ } else { \ @@ -412,7 +412,7 @@ TRefCountedTracker::TLocalSlot* TRefCountedTracker::GetLocalSlot(TRefCountedType auto guard = Guard(this_->SpinLock_); - auto& localSlots = GetTlsRef(LocalSlots_); + auto& localSlots = RefCountedTrackerLocalSlots(); if (this_->GlobalSlots_.size() < localSlots->size()) { this_->GlobalSlots_.resize(std::max(localSlots->size(), this_->GlobalSlots_.size())); @@ -424,33 +424,38 @@ TRefCountedTracker::TLocalSlot* TRefCountedTracker::GetLocalSlot(TRefCountedType YT_VERIFY(this_->AllLocalSlots_.erase(localSlots) == 1); - delete LocalSlots_; - LocalSlots_ = nullptr; - LocalSlotsBegin_ = nullptr; - LocalSlotsSize_ = -1; + delete localSlots; + localSlots = nullptr; + RefCountedTrackerLocalSlotsBegin() = nullptr; + RefCountedTrackerLocalSlotsSize() = -1; } }; - YT_THREAD_LOCAL(TReclaimer) Reclaimer; + thread_local TReclaimer Reclaimer; - YT_VERIFY(LocalSlotsSize_ >= 0); + auto& refCountedTrackerLocalSlotsSize = RefCountedTrackerLocalSlotsSize(); + + YT_VERIFY(refCountedTrackerLocalSlotsSize >= 0); auto guard = Guard(SpinLock_); - if (!LocalSlots_) { - LocalSlots_ = new TLocalSlots(); - YT_VERIFY(AllLocalSlots_.insert(GetTlsRef(LocalSlots_)).second); + auto& localSlotsBegin = RefCountedTrackerLocalSlotsBegin(); + auto& localSlots = RefCountedTrackerLocalSlots(); + + if (!localSlots) { + localSlots = new TLocalSlots(); + YT_VERIFY(AllLocalSlots_.insert(localSlots).second); } auto index = cookie.Underlying(); - if (index >= std::ssize(*LocalSlots_)) { - LocalSlots_->resize(static_cast<size_t>(index) + 1); + if (index >= std::ssize(*localSlots)) { + localSlots->resize(static_cast<size_t>(index) + 1); } - LocalSlotsBegin_ = LocalSlots_->data(); - LocalSlotsSize_ = std::ssize(*LocalSlots_); + localSlotsBegin = localSlots->data(); + refCountedTrackerLocalSlotsSize = std::ssize(*localSlots); - return LocalSlotsBegin_ + index; + return localSlotsBegin + index; } TRefCountedTracker::TGlobalSlot* TRefCountedTracker::GetGlobalSlot(TRefCountedTypeCookie cookie) diff --git a/yt/yt/core/misc/ref_counted_tracker.h b/yt/yt/core/misc/ref_counted_tracker.h index 89c161151d..485a7ee6e1 100644 --- a/yt/yt/core/misc/ref_counted_tracker.h +++ b/yt/yt/core/misc/ref_counted_tracker.h @@ -50,6 +50,15 @@ class TRefCountedTracker : private TNonCopyable { public: + struct TLocalSlot; + using TLocalSlots = std::vector<TLocalSlot>; + + struct TGlobalSlot; + using TGlobalSlots = std::vector<TGlobalSlot>; + + class TNamedSlot; + using TNamedStatistics = std::vector<TNamedSlot>; + static TRefCountedTracker* Get(); TRefCountedTypeCookie GetCookie( @@ -90,25 +99,6 @@ private: bool operator == (const TKey& other) const; }; - struct TLocalSlot; - using TLocalSlots = std::vector<TLocalSlot>; - - struct TGlobalSlot; - using TGlobalSlots = std::vector<TGlobalSlot>; - - class TNamedSlot; - using TNamedStatistics = std::vector<TNamedSlot>; - - // nullptr if not initialized or already destroyed - static YT_THREAD_LOCAL(TLocalSlots*) LocalSlots_; - - // nullptr if not initialized or already destroyed - static YT_THREAD_LOCAL(TLocalSlot*) LocalSlotsBegin_; - - // 0 if not initialized - // -1 if already destroyed - static YT_THREAD_LOCAL(int) LocalSlotsSize_; - mutable NThreading::TForkAwareSpinLock SpinLock_; std::map<TKey, TRefCountedTypeCookie> KeyToCookie_; std::map<TRefCountedTypeKey, size_t> TypeKeyToObjectSize_; diff --git a/yt/yt/core/misc/shutdown.cpp b/yt/yt/core/misc/shutdown.cpp index 25a772a2d6..d509350c07 100644 --- a/yt/yt/core/misc/shutdown.cpp +++ b/yt/yt/core/misc/shutdown.cpp @@ -290,7 +290,7 @@ static const void* ShutdownGuardInitializer = [] { } }; - static YT_THREAD_LOCAL(TShutdownGuard) Guard; + static thread_local TShutdownGuard Guard; return nullptr; }(); diff --git a/yt/yt/core/misc/unittests/tls_destructor_ut.cpp b/yt/yt/core/misc/unittests/tls_destructor_ut.cpp index 943f31ce72..b2f02e1617 100644 --- a/yt/yt/core/misc/unittests/tls_destructor_ut.cpp +++ b/yt/yt/core/misc/unittests/tls_destructor_ut.cpp @@ -19,7 +19,7 @@ struct TTlsGuard } }; -YT_THREAD_LOCAL(TTlsGuard) TlsGuard; +YT_DEFINE_THREAD_LOCAL(TTlsGuard, TlsGuard); TEST(TTlsDestructorTest, DestructorIsCalled) { @@ -27,7 +27,7 @@ TEST(TTlsDestructorTest, DestructorIsCalled) std::thread thread{[] { // Important moment. TLS must be touched to be initialized and destructed later. - Y_UNUSED(TlsGuard); + Y_UNUSED(TlsGuard()); }}; thread.join(); diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp index b085986df7..ba47fba03d 100644 --- a/yt/yt/core/rpc/bus/channel.cpp +++ b/yt/yt/core/rpc/bus/channel.cpp @@ -389,7 +389,7 @@ private: return requestControl; } - void Cancel(const TClientRequestControlPtr& requestControl) + YT_PREVENT_TLS_CACHING void Cancel(const TClientRequestControlPtr& requestControl) { VERIFY_THREAD_AFFINITY_ANY(); @@ -419,7 +419,7 @@ private: } // YT-1639: Avoid long chain of recursive calls. - YT_THREAD_LOCAL(int) Depth = 0; + thread_local int Depth = 0; constexpr int MaxDepth = 10; if (Depth < MaxDepth) { ++Depth; diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index 23c7284dab..e5b061424f 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -1453,17 +1453,17 @@ void TRequestQueue::OnRequestFinished(i64 requestTotalSize) // Prevents reentrant invocations. // One case is: RunRequest calling the handler synchronously, which replies the // context, which calls context->Finish, and we're back here again. -YT_THREAD_LOCAL(bool) ScheduleRequestsLatch = false; +YT_DEFINE_THREAD_LOCAL(bool, ScheduleRequestsLatch, false); void TRequestQueue::ScheduleRequestsFromQueue() { - if (ScheduleRequestsLatch) { + if (ScheduleRequestsLatch()) { return; } - ScheduleRequestsLatch = true; + ScheduleRequestsLatch() = true; auto latchGuard = Finally([&] { - ScheduleRequestsLatch = false; + ScheduleRequestsLatch() = false; }); #ifndef NDEBUG diff --git a/yt/yt/core/threading/thread.cpp b/yt/yt/core/threading/thread.cpp index 7bd7049493..41688af355 100644 --- a/yt/yt/core/threading/thread.cpp +++ b/yt/yt/core/threading/thread.cpp @@ -20,7 +20,7 @@ namespace NYT::NThreading { //////////////////////////////////////////////////////////////////////////////// -YT_THREAD_LOCAL(TThreadId) CurrentUniqueThreadId; +YT_DEFINE_THREAD_LOCAL(TThreadId, CurrentUniqueThreadId) ; static std::atomic<TThreadId> UniqueThreadIdGenerator; static const auto& Logger = ThreadingLogger; @@ -110,7 +110,7 @@ bool TThread::StartSlow() bool TThread::CanWaitForThreadShutdown() const { return - CurrentUniqueThreadId != UniqueThreadId_ && + CurrentUniqueThreadId() != UniqueThreadId_ && GetShutdownThreadId() != ThreadId_; } @@ -195,14 +195,14 @@ void* TThread::StaticThreadMainTrampoline(void* opaque) return nullptr; } -void TThread::ThreadMainTrampoline() +YT_PREVENT_TLS_CACHING void TThread::ThreadMainTrampoline() { auto this_ = MakeStrong(this); ::TThread::SetCurrentThreadName(ThreadName_.c_str()); ThreadId_ = GetCurrentThreadId(); - CurrentUniqueThreadId = UniqueThreadId_; + CurrentUniqueThreadId() = UniqueThreadId_; SetThreadPriority(); ConfigureSignalHandlerStack(); @@ -233,7 +233,7 @@ void TThread::ThreadMainTrampoline() bool Armed_ = true; }; - YT_THREAD_LOCAL(TExitInterceptor) Interceptor; + thread_local TExitInterceptor Interceptor; if (Options_.ThreadInitializer) { Options_.ThreadInitializer(); @@ -241,7 +241,7 @@ void TThread::ThreadMainTrampoline() ThreadMain(); - GetTlsRef(Interceptor).Disarm(); + Interceptor.Disarm(); StoppedEvent_.NotifyAll(); } @@ -282,23 +282,23 @@ void TThread::SetThreadPriority() #endif } -void TThread::ConfigureSignalHandlerStack() +YT_PREVENT_TLS_CACHING void TThread::ConfigureSignalHandlerStack() { #if !defined(_asan_enabled_) && !defined(_msan_enabled_) && \ (_XOPEN_SOURCE >= 500 || \ /* Since glibc 2.12: */ _POSIX_C_SOURCE >= 200809L || \ /* glibc <= 2.19: */ _BSD_SOURCE) - YT_THREAD_LOCAL(bool) Configured; + thread_local bool Configured; if (std::exchange(Configured, true)) { return; } // The size of of the custom stack to be provided for signal handlers. constexpr size_t SignalHandlerStackSize = 16_KB; - YT_THREAD_LOCAL(std::unique_ptr<char[]>) Stack = std::make_unique<char[]>(SignalHandlerStackSize); + thread_local std::unique_ptr<char[]> Stack = std::make_unique<char[]>(SignalHandlerStackSize); stack_t stack{ - .ss_sp = GetTlsRef(Stack).get(), + .ss_sp = Stack.get(), .ss_flags = 0, .ss_size = SignalHandlerStackSize, }; diff --git a/yt/yt/core/tracing/trace_context-inl.h b/yt/yt/core/tracing/trace_context-inl.h index 0b74bf56d9..83d87b3f0d 100644 --- a/yt/yt/core/tracing/trace_context-inl.h +++ b/yt/yt/core/tracing/trace_context-inl.h @@ -181,7 +181,7 @@ std::optional<TTag> TTraceContext::SetAllocationTag(const TString& key, TTag new namespace NDetail { -extern YT_THREAD_LOCAL(TTraceContext*) CurrentTraceContext; +YT_DECLARE_THREAD_LOCAL(TTraceContext*, CurrentTraceContext); TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext); @@ -322,13 +322,13 @@ inline void TTraceContextFinishGuard::Release() Y_FORCE_INLINE TTraceContext* TryGetCurrentTraceContext() { - return NDetail::CurrentTraceContext; + return NDetail::CurrentTraceContext(); } Y_FORCE_INLINE TTraceContext* GetCurrentTraceContext() { - YT_ASSERT(NDetail::CurrentTraceContext); - return NDetail::CurrentTraceContext; + YT_ASSERT(NDetail::CurrentTraceContext()); + return NDetail::CurrentTraceContext(); } Y_FORCE_INLINE TTraceContextPtr CreateTraceContextFromCurrent(TString spanName) diff --git a/yt/yt/core/tracing/trace_context.cpp b/yt/yt/core/tracing/trace_context.cpp index 4fe7a615d0..6feb04aed2 100644 --- a/yt/yt/core/tracing/trace_context.cpp +++ b/yt/yt/core/tracing/trace_context.cpp @@ -102,8 +102,8 @@ TTracingTransportConfigPtr GetTracingTransportConfig() namespace NDetail { -YT_THREAD_LOCAL(TTraceContext*) CurrentTraceContext; -YT_THREAD_LOCAL(TCpuInstant) TraceContextTimingCheckpoint; +YT_DEFINE_THREAD_LOCAL(TTraceContext*, CurrentTraceContext); +YT_DEFINE_THREAD_LOCAL(TCpuInstant, TraceContextTimingCheckpoint); TSpanId GenerateSpanId() { @@ -112,7 +112,7 @@ TSpanId GenerateSpanId() void SetCurrentTraceContext(TTraceContext* context) { - CurrentTraceContext = context; + CurrentTraceContext() = context; std::atomic_signal_fence(std::memory_order::seq_cst); } @@ -122,8 +122,9 @@ TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext) auto oldContext = propagatingStorage.Exchange<TTraceContextPtr>(newContext).value_or(nullptr); auto now = GetApproximateCpuInstant(); + auto& traceContextTimingCheckpoint = TraceContextTimingCheckpoint(); // Invalid if no oldContext. - auto delta = now - TraceContextTimingCheckpoint; + auto delta = now - traceContextTimingCheckpoint; if (oldContext && newContext) { YT_LOG_TRACE("Switching context (OldContext: %v, NewContext: %v, CpuTimeDelta: %v)", @@ -144,7 +145,7 @@ TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext) } SetCurrentTraceContext(newContext.Get()); - TraceContextTimingCheckpoint = now; + traceContextTimingCheckpoint = now; return oldContext; } @@ -152,10 +153,11 @@ TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext) void OnContextSwitchOut() { if (auto* context = TryGetCurrentTraceContext()) { + auto& traceContextTimingCheckpoint = TraceContextTimingCheckpoint(); auto now = GetApproximateCpuInstant(); - context->IncrementElapsedCpuTime(now - TraceContextTimingCheckpoint); + context->IncrementElapsedCpuTime(now - traceContextTimingCheckpoint); SetCurrentTraceContext(nullptr); - TraceContextTimingCheckpoint = 0; + traceContextTimingCheckpoint = 0; } } @@ -163,10 +165,10 @@ void OnContextSwitchIn() { if (auto* context = TryGetTraceContextFromPropagatingStorage(GetCurrentPropagatingStorage())) { SetCurrentTraceContext(context); - TraceContextTimingCheckpoint = GetApproximateCpuInstant(); + TraceContextTimingCheckpoint() = GetApproximateCpuInstant(); } else { SetCurrentTraceContext(nullptr); - TraceContextTimingCheckpoint = 0; + TraceContextTimingCheckpoint() = 0; } } @@ -175,12 +177,13 @@ void OnPropagatingStorageSwitch( const TPropagatingStorage& newStorage) { TCpuInstant now = 0; + auto& traceContextTimingCheckpoint = TraceContextTimingCheckpoint(); if (auto* oldContext = TryGetCurrentTraceContext()) { YT_ASSERT(oldContext == TryGetTraceContextFromPropagatingStorage(oldStorage)); - YT_ASSERT(TraceContextTimingCheckpoint != 0); + YT_ASSERT(traceContextTimingCheckpoint != 0); now = GetApproximateCpuInstant(); - oldContext->IncrementElapsedCpuTime(now - TraceContextTimingCheckpoint); + oldContext->IncrementElapsedCpuTime(now - traceContextTimingCheckpoint); } if (auto* newContext = TryGetTraceContextFromPropagatingStorage(newStorage)) { @@ -188,10 +191,10 @@ void OnPropagatingStorageSwitch( if (now == 0) { now = GetApproximateCpuInstant(); } - TraceContextTimingCheckpoint = now; + traceContextTimingCheckpoint = now; } else { SetCurrentTraceContext(nullptr); - TraceContextTimingCheckpoint = 0; + traceContextTimingCheckpoint = 0; } } @@ -712,13 +715,15 @@ void FlushCurrentTraceContextElapsedTime() return; } + auto& traceContextTimingCheckpoint = NDetail::TraceContextTimingCheckpoint(); + auto now = GetApproximateCpuInstant(); - auto delta = std::max(now - NDetail::TraceContextTimingCheckpoint, static_cast<TCpuInstant>(0)); + auto delta = std::max(now - traceContextTimingCheckpoint, static_cast<TCpuInstant>(0)); YT_LOG_TRACE("Flushing context time (Context: %v, CpuTimeDelta: %v)", context, NProfiling::CpuDurationToDuration(delta)); context->IncrementElapsedCpuTime(delta); - NDetail::TraceContextTimingCheckpoint = now; + traceContextTimingCheckpoint = now; } bool IsCurrentTraceContextRecorded() @@ -770,7 +775,7 @@ void ReleaseFiberTagStorage(void* storage) TCpuInstant GetTraceContextTimingCheckpoint() { - return NTracing::NDetail::TraceContextTimingCheckpoint; + return NTracing::NDetail::TraceContextTimingCheckpoint(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/ya.make b/yt/yt/core/ya.make index 9e820939ac..6806f169c6 100644 --- a/yt/yt/core/ya.make +++ b/yt/yt/core/ya.make @@ -9,6 +9,8 @@ IF (ARCH_X86_64) CFLAGS(-mpclmul) ENDIF() +NO_LTO() + SRCS( actions/cancelable_context.cpp actions/current_invoker.cpp diff --git a/yt/yt/core/ytree/yson_struct-inl.h b/yt/yt/core/ytree/yson_struct-inl.h index 88fafd913d..75f5b1db1d 100644 --- a/yt/yt/core/ytree/yson_struct-inl.h +++ b/yt/yt/core/ytree/yson_struct-inl.h @@ -80,7 +80,7 @@ TSerializer TExternalizedYsonStruct::CreateReadOnly(const TStruct& readOnly) //! We need some writable instance of TStruct to refer to in order //! to have a default constructor required by TYsonStructRegistry::InitializeStruct. template <std::default_initializable TStruct> -TStruct* TExternalizedYsonStruct::GetDefault() noexcept +YT_PREVENT_TLS_CACHING TStruct* TExternalizedYsonStruct::GetDefault() noexcept { thread_local TStruct defaultThat = {}; //! NB: We reset default after every invocation @@ -101,11 +101,11 @@ void TYsonStructRegistry::InitializeStruct(TStruct* target) TForbidCachedDynamicCastGuard guard(target); // It takes place only inside special constructor call inside lambda below. - if (CurrentlyInitializingMeta_) { + if (CurrentlyInitializingYsonMeta()) { // TODO(renadeen): assert target is from the same type hierarchy. // Call initialization method that is provided by user. - if (RegistryDepth_ <= 1) { - TStruct::Register(TYsonStructRegistrar<TStruct>(CurrentlyInitializingMeta_)); + if (YsonMetaRegistryDepth() <= 1) { + TStruct::Register(TYsonStructRegistrar<TStruct>(CurrentlyInitializingYsonMeta())); } return; } @@ -122,14 +122,14 @@ void TYsonStructRegistry::InitializeStruct(TStruct* target) // where registration of yson parameters takes place. // This way all parameters of the whole type hierarchy will fill `CurrentlyInitializingMeta_`. // We prevent context switch cause we don't want another fiber to use `CurrentlyInitializingMeta_` before we finish initialization. - YT_VERIFY(!CurrentlyInitializingMeta_); - CurrentlyInitializingMeta_ = result; + YT_VERIFY(!CurrentlyInitializingYsonMeta()); + CurrentlyInitializingYsonMeta() = result; { NConcurrency::TForbidContextSwitchGuard contextSwitchGuard; const std::type_info& typeInfo = CallCtor<TStruct>(); result->FinishInitialization(typeInfo); } - CurrentlyInitializingMeta_ = nullptr; + CurrentlyInitializingYsonMeta() = nullptr; return result; }; diff --git a/yt/yt/core/ytree/yson_struct.cpp b/yt/yt/core/ytree/yson_struct.cpp index 2afabecb0b..b939d17b9a 100644 --- a/yt/yt/core/ytree/yson_struct.cpp +++ b/yt/yt/core/ytree/yson_struct.cpp @@ -157,6 +157,11 @@ void TYsonStruct::InitializeRefCounted() //////////////////////////////////////////////////////////////////////////////// +YT_DEFINE_THREAD_LOCAL(IYsonStructMeta*, CurrentlyInitializingYsonMeta, nullptr); +YT_DEFINE_THREAD_LOCAL(i64, YsonMetaRegistryDepth, 0); + +//////////////////////////////////////////////////////////////////////////////// + TYsonStructRegistry* TYsonStructRegistry::Get() { return LeakySingleton<TYsonStructRegistry>(); @@ -164,20 +169,20 @@ TYsonStructRegistry* TYsonStructRegistry::Get() bool TYsonStructRegistry::InitializationInProgress() { - return CurrentlyInitializingMeta_ != nullptr; + return CurrentlyInitializingYsonMeta() != nullptr; } void TYsonStructRegistry::OnBaseCtorCalled() { - if (CurrentlyInitializingMeta_ != nullptr) { - ++RegistryDepth_; + if (CurrentlyInitializingYsonMeta() != nullptr) { + ++YsonMetaRegistryDepth(); } } void TYsonStructRegistry::OnFinalCtorCalled() { - if (CurrentlyInitializingMeta_ != nullptr) { - --RegistryDepth_; + if (CurrentlyInitializingYsonMeta() != nullptr) { + --YsonMetaRegistryDepth(); } } diff --git a/yt/yt/core/ytree/yson_struct.h b/yt/yt/core/ytree/yson_struct.h index e6efb571a2..19300099bf 100644 --- a/yt/yt/core/ytree/yson_struct.h +++ b/yt/yt/core/ytree/yson_struct.h @@ -11,6 +11,7 @@ #include <yt/yt/library/syncmap/map.h> #include <library/cpp/yt/misc/enum.h> +#include <library/cpp/yt/misc/tls.h> #include <util/generic/algorithm.h> @@ -179,6 +180,9 @@ protected: //////////////////////////////////////////////////////////////////////////////// +YT_DECLARE_THREAD_LOCAL(IYsonStructMeta*, CurrentlyInitializingYsonMeta); +YT_DECLARE_THREAD_LOCAL(i64, YsonMetaRegistryDepth); + class TYsonStructRegistry { public: @@ -194,9 +198,6 @@ public: void OnFinalCtorCalled(); private: - static inline YT_THREAD_LOCAL(IYsonStructMeta*) CurrentlyInitializingMeta_ = nullptr; - static inline YT_THREAD_LOCAL(i64) RegistryDepth_ = 0; - template <class TStruct> friend class TYsonStructRegistrar; diff --git a/yt/yt/library/ytprof/api/api.cpp b/yt/yt/library/ytprof/api/api.cpp index f00d11d88c..4f379232c9 100644 --- a/yt/yt/library/ytprof/api/api.cpp +++ b/yt/yt/library/ytprof/api/api.cpp @@ -11,18 +11,18 @@ DEFINE_REFCOUNTED_TYPE(TProfilerTag) struct TCpuProfilerTags; // This variable is referenced from signal handler. -constinit YT_THREAD_LOCAL(std::atomic<TCpuProfilerTags*>) CpuProfilerTagsPtr = nullptr; +YT_DEFINE_THREAD_LOCAL(std::atomic<TCpuProfilerTags*>, CpuProfilerTagsPtr, nullptr); struct TCpuProfilerTags { TCpuProfilerTags() { - CpuProfilerTagsPtr = this; + CpuProfilerTagsPtr() = this; } ~TCpuProfilerTags() { - CpuProfilerTagsPtr = nullptr; + CpuProfilerTagsPtr() = nullptr; } std::array<TAtomicSignalPtr<TProfilerTag>, MaxActiveTags> Tags; @@ -30,11 +30,11 @@ struct TCpuProfilerTags // We can't reference CpuProfilerTags from signal handler, // since it may trigger lazy initialization. -YT_THREAD_LOCAL(TCpuProfilerTags) CpuProfilerTags; +YT_DEFINE_THREAD_LOCAL(TCpuProfilerTags, CpuProfilerTags); std::array<TAtomicSignalPtr<TProfilerTag>, MaxActiveTags>* GetCpuProfilerTags() { - auto tags = CpuProfilerTagsPtr.load(); + auto tags = CpuProfilerTagsPtr().load(); if (tags) { return &(tags->Tags); } @@ -46,7 +46,7 @@ std::array<TAtomicSignalPtr<TProfilerTag>, MaxActiveTags>* GetCpuProfilerTags() TCpuProfilerTagGuard::TCpuProfilerTagGuard(TProfilerTagPtr tag) { - auto& cpuProfilerTags = GetTlsRef(CpuProfilerTags); + auto& cpuProfilerTags = CpuProfilerTags(); for (int i = 0; i < MaxActiveTags; i++) { if (!cpuProfilerTags.Tags[i].IsSetFromThread()) { @@ -59,7 +59,7 @@ TCpuProfilerTagGuard::TCpuProfilerTagGuard(TProfilerTagPtr tag) TCpuProfilerTagGuard::~TCpuProfilerTagGuard() { - auto& cpuProfilerTags = GetTlsRef(CpuProfilerTags); + auto& cpuProfilerTags = CpuProfilerTags(); if (TagIndex_ != -1) { cpuProfilerTags.Tags[TagIndex_].StoreFromThread(nullptr); @@ -79,7 +79,7 @@ TCpuProfilerTagGuard& TCpuProfilerTagGuard::operator = (TCpuProfilerTagGuard&& o } if (TagIndex_ != -1) { - auto& cpuProfilerTags = GetTlsRef(CpuProfilerTags); + auto& cpuProfilerTags = CpuProfilerTags(); cpuProfilerTags.Tags[TagIndex_].StoreFromThread(nullptr); } diff --git a/yt/yt/library/ytprof/spinlock_profiler.cpp b/yt/yt/library/ytprof/spinlock_profiler.cpp index e1a2fa58e3..a5a03461da 100644 --- a/yt/yt/library/ytprof/spinlock_profiler.cpp +++ b/yt/yt/library/ytprof/spinlock_profiler.cpp @@ -66,7 +66,7 @@ void TSpinlockProfiler::RecordEvent(const void* /*lock*/, int64_t waitCycles) RecordSample(&fpCursor, waitCycles); } -YT_THREAD_LOCAL(int) SpinlockEventCount; +YT_DEFINE_THREAD_LOCAL(int, SpinlockEventCount); void TSpinlockProfiler::OnEvent(const void* lock, int64_t waitCycles) { @@ -75,12 +75,14 @@ void TSpinlockProfiler::OnEvent(const void* lock, int64_t waitCycles) return; } - if (SpinlockEventCount < samplingRate) { - SpinlockEventCount++; + auto& spinlockEventCount = SpinlockEventCount(); + + if (spinlockEventCount < samplingRate) { + spinlockEventCount++; return; } - SpinlockEventCount = 0; + spinlockEventCount = 0; while (HandlingEvent_.exchange(true)) { SchedYield(); } @@ -171,7 +173,7 @@ void TBlockingProfiler::RecordEvent( RecordSample(&fpCursor, cpuDelay); } -YT_THREAD_LOCAL(int) YTSpinlockEventCount; +YT_DEFINE_THREAD_LOCAL(int, YTSpinlockEventCount); void TBlockingProfiler::OnEvent( TCpuDuration cpuDelay, @@ -183,12 +185,14 @@ void TBlockingProfiler::OnEvent( return; } - if (YTSpinlockEventCount < samplingRate) { - YTSpinlockEventCount++; + auto& ytSpinlockEventCount = YTSpinlockEventCount(); + + if (ytSpinlockEventCount < samplingRate) { + ytSpinlockEventCount++; return; } - YTSpinlockEventCount = 0; + ytSpinlockEventCount = 0; while (HandlingEvent_.exchange(true)) { SchedYield(); } |