diff options
author | lukyan <lukyan@yandex-team.com> | 2023-11-07 15:55:00 +0300 |
---|---|---|
committer | lukyan <lukyan@yandex-team.com> | 2023-11-07 16:22:41 +0300 |
commit | d519b0904939506b9881fb2a5cfb1d2ec3b5e6eb (patch) | |
tree | 60d3d7aec7a065dcda1e57a3b85281aa04513673 /yt | |
parent | 03ffc7e5fc8734ac44705fc1d4fbdc74a5b7accd (diff) | |
download | ydb-d519b0904939506b9881fb2a5cfb1d2ec3b5e6eb.tar.gz |
Make thread local variables fiber friendly
Diffstat (limited to 'yt')
26 files changed, 132 insertions, 78 deletions
diff --git a/yt/yt/client/unittests/named_yson_token_ut.cpp b/yt/yt/client/unittests/named_yson_token_ut.cpp index c330595c62..60f3793e3b 100644 --- a/yt/yt/client/unittests/named_yson_token_ut.cpp +++ b/yt/yt/client/unittests/named_yson_token_ut.cpp @@ -8,6 +8,7 @@ #include <yt/yt/core/yson/parser.h> #include <library/cpp/yt/misc/variant.h> +#include <library/cpp/yt/misc/tls.h> #include <util/stream/mem.h> @@ -29,20 +30,20 @@ const auto IntStringVariant = VariantStructLogicalType({ {"string", SimpleLogicalType(ESimpleLogicalValueType::String)}, }); -thread_local TYsonConverterConfig PositionalToNamedConfigInstance; +YT_THREAD_LOCAL(TYsonConverterConfig) PositionalToNamedConfigInstance; class TWithConfig { public: TWithConfig(const TYsonConverterConfig& config) - : OldConfig_(PositionalToNamedConfigInstance) + : OldConfig_(GetTlsRef(PositionalToNamedConfigInstance)) { - PositionalToNamedConfigInstance = config; + GetTlsRef(PositionalToNamedConfigInstance) = config; } ~TWithConfig() { - PositionalToNamedConfigInstance = OldConfig_; + GetTlsRef(PositionalToNamedConfigInstance) = OldConfig_; } private: TYsonConverterConfig OldConfig_; @@ -73,7 +74,7 @@ TString ConvertYson( }; converter = CreateYsonClientToServerConverter(descriptor, config); } else { - converter = CreateYsonServerToClientConverter(descriptor, PositionalToNamedConfigInstance); + converter = CreateYsonServerToClientConverter(descriptor, GetTlsRef(PositionalToNamedConfigInstance)); } } catch (const std::exception& ex) { ADD_FAILURE() << "cannot create converter: " << ex.what(); diff --git a/yt/yt/core/actions/current_invoker.cpp b/yt/yt/core/actions/current_invoker.cpp index 3ed572a257..074965353d 100644 --- a/yt/yt/core/actions/current_invoker.cpp +++ b/yt/yt/core/actions/current_invoker.cpp @@ -2,11 +2,13 @@ #include "invoker_util.h" +#include <library/cpp/yt/misc/tls.h> + namespace NYT { //////////////////////////////////////////////////////////////////////////////// -thread_local IInvoker* CurrentInvoker; +YT_THREAD_LOCAL(IInvoker*) CurrentInvoker; IInvoker* GetCurrentInvoker() { @@ -30,7 +32,7 @@ TCurrentInvokerGuard::TCurrentInvokerGuard(IInvoker* invoker) , Active_(true) , SavedInvoker_(std::move(invoker)) { - std::swap(CurrentInvoker, SavedInvoker_); + std::swap(GetTlsRef(CurrentInvoker), SavedInvoker_); } void TCurrentInvokerGuard::Restore() diff --git a/yt/yt/core/concurrency/action_queue.cpp b/yt/yt/core/concurrency/action_queue.cpp index 14f9d2d8e8..0377297265 100644 --- a/yt/yt/core/concurrency/action_queue.cpp +++ b/yt/yt/core/concurrency/action_queue.cpp @@ -15,6 +15,8 @@ #include <yt/yt/core/misc/ring_queue.h> #include <yt/yt/core/misc/shutdown.h> +#include <library/cpp/yt/misc/tls.h> + #include <util/thread/lfqueue.h> namespace NYT::NConcurrency { @@ -427,7 +429,7 @@ private: TRingQueue<TClosure> Queue_; int Semaphore_ = 0; - static thread_local TBoundedConcurrencyInvoker* CurrentSchedulingInvoker_; + static YT_THREAD_LOCAL(TBoundedConcurrencyInvoker*) CurrentSchedulingInvoker_; private: class TInvocationGuard @@ -494,7 +496,7 @@ private: } }; -thread_local TBoundedConcurrencyInvoker* TBoundedConcurrencyInvoker::CurrentSchedulingInvoker_; +YT_THREAD_LOCAL(TBoundedConcurrencyInvoker*) TBoundedConcurrencyInvoker::CurrentSchedulingInvoker_; IInvokerPtr CreateBoundedConcurrencyInvoker( IInvokerPtr underlyingInvoker, diff --git a/yt/yt/core/concurrency/execution_stack.cpp b/yt/yt/core/concurrency/execution_stack.cpp index 618eb3bbee..15e3b7ec57 100644 --- a/yt/yt/core/concurrency/execution_stack.cpp +++ b/yt/yt/core/concurrency/execution_stack.cpp @@ -17,6 +17,8 @@ #include <library/cpp/yt/memory/ref.h> +#include <library/cpp/yt/misc/tls.h> + #include <util/system/sanitizers.h> namespace NYT::NConcurrency { @@ -129,7 +131,7 @@ TExecutionStack::~TExecutionStack() ::DeleteFiber(Handle_); } -static thread_local void* FiberTrampolineOpaque; +static YT_THREAD_LOCAL(void*) FiberTrampolineOpaque; void TExecutionStack::SetOpaque(void* opaque) { diff --git a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp index 47036ef8f6..d8e80afd48 100644 --- a/yt/yt/core/concurrency/fiber_scheduler_thread.cpp +++ b/yt/yt/core/concurrency/fiber_scheduler_thread.cpp @@ -87,16 +87,16 @@ struct TFiberContext TFiberPtr CurrentFiber; }; -static thread_local TFiberContext* FiberContext; +static YT_THREAD_LOCAL(TFiberContext*) FiberContext; // Forbid inlining these accessors to prevent the compiler from // miss-optimizing TLS access in presence of fiber context switches. -Y_NO_INLINE TFiberContext* TryGetFiberContext() +TFiberContext* TryGetFiberContext() { return FiberContext; } -Y_NO_INLINE void SetFiberContext(TFiberContext* context) +void SetFiberContext(TFiberContext* context) { FiberContext = context; } @@ -716,7 +716,7 @@ private: const TFiber* const Fiber_; TFiberSwitchHandler* SavedThis_; - static thread_local TFiberSwitchHandler* This_; + static YT_THREAD_LOCAL(TFiberSwitchHandler*) This_; struct TContextSwitchHandlers { @@ -736,7 +736,7 @@ private: TBaseSwitchHandler::OnSwitch(); - std::swap(SavedThis_, This_); + std::swap(SavedThis_, GetTlsRef(This_)); } // On finish fiber running. @@ -768,7 +768,7 @@ private: } }; -thread_local TFiberSwitchHandler* TFiberSwitchHandler::This_; +YT_THREAD_LOCAL(TFiberSwitchHandler*) TFiberSwitchHandler::This_; TFiberSwitchHandler* TryGetFiberSwitchHandler() { @@ -875,7 +875,7 @@ void TFiberSchedulerThread::ThreadMain() //////////////////////////////////////////////////////////////////////////////// -thread_local TFiberId CurrentFiberId; +YT_THREAD_LOCAL(TFiberId) CurrentFiberId; TFiberId GetCurrentFiberId() { @@ -889,7 +889,7 @@ void SetCurrentFiberId(TFiberId id) //////////////////////////////////////////////////////////////////////////////// -thread_local bool ContextSwitchForbidden; +YT_THREAD_LOCAL(bool) ContextSwitchForbidden; bool IsContextSwitchForbidden() { diff --git a/yt/yt/core/concurrency/fls-inl.h b/yt/yt/core/concurrency/fls-inl.h index eb588c0511..47c8b2d90d 100644 --- a/yt/yt/core/concurrency/fls-inl.h +++ b/yt/yt/core/concurrency/fls-inl.h @@ -7,6 +7,8 @@ #include <library/cpp/yt/memory/memory_tag.h> +#include <library/cpp/yt/misc/tls.h> + namespace NYT::NConcurrency { //////////////////////////////////////////////////////////////////////////////// @@ -19,7 +21,7 @@ using TFlsSlotDtor = void(*)(TFls::TCookie cookie); int AllocateFlsSlot(TFlsSlotDtor dtor); TFls* GetPerThreadFls(); -extern thread_local TFls* CurrentFls; +extern YT_THREAD_LOCAL(TFls*) CurrentFls; } // namespace NDetail diff --git a/yt/yt/core/concurrency/fls.cpp b/yt/yt/core/concurrency/fls.cpp index 447aae688d..9cedcadcfa 100644 --- a/yt/yt/core/concurrency/fls.cpp +++ b/yt/yt/core/concurrency/fls.cpp @@ -2,6 +2,8 @@ #include <library/cpp/yt/threading/fork_aware_spin_lock.h> +#include <library/cpp/yt/misc/tls.h> + #include <util/system/sanitizers.h> #include <array> @@ -18,8 +20,8 @@ std::atomic<int> FlsSize; NThreading::TForkAwareSpinLock FlsLock; std::array<TFlsSlotDtor, MaxFlsSize> FlsDtors; -thread_local TFls* PerThreadFls; -thread_local TFls* CurrentFls; +YT_THREAD_LOCAL(TFls*) PerThreadFls; +YT_THREAD_LOCAL(TFls*) CurrentFls; int AllocateFlsSlot(TFlsSlotDtor dtor) { @@ -76,6 +78,7 @@ void TFls::Set(int index, TCookie cookie) TFls* SwapCurrentFls(TFls* newFls) { + return std::exchange(NDetail::CurrentFls, newFls); } diff --git a/yt/yt/core/concurrency/invoker_queue.cpp b/yt/yt/core/concurrency/invoker_queue.cpp index 5b77a048df..5c0607e6dd 100644 --- a/yt/yt/core/concurrency/invoker_queue.cpp +++ b/yt/yt/core/concurrency/invoker_queue.cpp @@ -6,6 +6,8 @@ #include <yt/yt/core/profiling/tscp.h> +#include <library/cpp/yt/misc/tls.h> + namespace NYT::NConcurrency { using namespace NProfiling; @@ -18,7 +20,7 @@ static const auto& Logger = ConcurrencyLogger; //////////////////////////////////////////////////////////////////////////////// -constinit thread_local TCpuProfilerTagGuard CpuProfilerTagGuard; +constinit YT_THREAD_LOCAL(TCpuProfilerTagGuard) CpuProfilerTagGuard; //////////////////////////////////////////////////////////////////////////////// @@ -501,9 +503,9 @@ bool TInvokerQueue<TQueueImpl>::BeginExecute(TEnqueuedAction* action, typename T updateCounters(CumulativeCounters_); if (const auto& profilerTag = action->ProfilerTag) { - CpuProfilerTagGuard = TCpuProfilerTagGuard(profilerTag); + GetTlsRef(CpuProfilerTagGuard) = TCpuProfilerTagGuard(profilerTag); } else { - CpuProfilerTagGuard = {}; + GetTlsRef(CpuProfilerTagGuard) = {}; } SetCurrentInvoker(GetProfilingTagSettingInvoker(action->ProfilingTag)); @@ -514,7 +516,7 @@ bool TInvokerQueue<TQueueImpl>::BeginExecute(TEnqueuedAction* action, typename T template <class TQueueImpl> void TInvokerQueue<TQueueImpl>::EndExecute(TEnqueuedAction* action) { - CpuProfilerTagGuard = TCpuProfilerTagGuard{}; + GetTlsRef(CpuProfilerTagGuard) = TCpuProfilerTagGuard{}; SetCurrentInvoker(nullptr); YT_ASSERT(action); diff --git a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp index 56d0bd633c..c53bc1bc7f 100644 --- a/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/new_fair_share_thread_pool.cpp @@ -17,6 +17,8 @@ #include <library/cpp/yt/memory/public.h> +#include <library/cpp/yt/misc/tls.h> + #include <util/system/spinlock.h> #include <util/generic/xrange.h> @@ -37,7 +39,7 @@ DECLARE_REFCOUNTED_CLASS(TBucket) struct TExecutionPool; // High 16 bits is thread index and 48 bits for thread pool ptr. -thread_local TPackedPtr ThreadCookie = 0; +YT_THREAD_LOCAL(TPackedPtr) ThreadCookie = 0; static constexpr auto LogDurationThreshold = TDuration::Seconds(1); diff --git a/yt/yt/core/logging/log_manager.cpp b/yt/yt/core/logging/log_manager.cpp index e254032e20..5098fff2a6 100644 --- a/yt/yt/core/logging/log_manager.cpp +++ b/yt/yt/core/logging/log_manager.cpp @@ -40,6 +40,7 @@ #include <library/cpp/yt/misc/hash.h> #include <library/cpp/yt/misc/variant.h> +#include <library/cpp/yt/misc/tls.h> #include <library/cpp/yt/string/raw_formatter.h> @@ -357,7 +358,7 @@ TCpuInstant GetEventInstant(const TLoggerQueueItem& item) using TThreadLocalQueue = TSpscQueue<TLoggerQueueItem>; static constexpr uintptr_t ThreadQueueDestroyedSentinel = -1; -static thread_local TThreadLocalQueue* PerThreadQueue; +static YT_THREAD_LOCAL(TThreadLocalQueue*) PerThreadQueue; ///////////////////////////////////////////////////////////////////////////// @@ -1020,7 +1021,7 @@ private: { if (!PerThreadQueue) { PerThreadQueue = new TThreadLocalQueue(); - RegisteredLocalQueues_.Enqueue(PerThreadQueue); + RegisteredLocalQueues_.Enqueue(GetTlsRef(PerThreadQueue)); } ++EnqueuedEvents_; @@ -1455,13 +1456,13 @@ struct TLocalQueueReclaimer { if (PerThreadQueue) { auto logManager = TLogManager::Get()->Impl_; - logManager->UnregisteredLocalQueues_.Enqueue(PerThreadQueue); + logManager->UnregisteredLocalQueues_.Enqueue(GetTlsRef(PerThreadQueue)); PerThreadQueue = reinterpret_cast<TThreadLocalQueue*>(ThreadQueueDestroyedSentinel); } } }; -static thread_local TLocalQueueReclaimer LocalQueueReclaimer; +static YT_THREAD_LOCAL(TLocalQueueReclaimer) LocalQueueReclaimer; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/core/misc/error.cpp b/yt/yt/core/misc/error.cpp index 649a594ad0..2a6f7dcb14 100644 --- a/yt/yt/core/misc/error.cpp +++ b/yt/yt/core/misc/error.cpp @@ -22,6 +22,7 @@ #include <library/cpp/yt/exception/exception.h> #include <library/cpp/yt/misc/thread_name.h> +#include <library/cpp/yt/misc/tls.h> #include <util/string/subst.h> @@ -60,15 +61,15 @@ TString ToString(TErrorCode code) //////////////////////////////////////////////////////////////////////////////// -thread_local bool ErrorSanitizerEnabled = false; -thread_local TInstant ErrorSanitizerDatetimeOverride = {}; +YT_THREAD_LOCAL(bool) ErrorSanitizerEnabled = false; +YT_THREAD_LOCAL(TInstant) ErrorSanitizerDatetimeOverride = {}; TErrorSanitizerGuard::TErrorSanitizerGuard(TInstant datetimeOverride) : SavedEnabled_(ErrorSanitizerEnabled) - , SavedDatetimeOverride_(ErrorSanitizerDatetimeOverride) + , SavedDatetimeOverride_(GetTlsRef(ErrorSanitizerDatetimeOverride)) { ErrorSanitizerEnabled = true; - ErrorSanitizerDatetimeOverride = datetimeOverride; + GetTlsRef(ErrorSanitizerDatetimeOverride) = datetimeOverride; } TErrorSanitizerGuard::~TErrorSanitizerGuard() @@ -76,7 +77,7 @@ TErrorSanitizerGuard::~TErrorSanitizerGuard() YT_ASSERT(ErrorSanitizerEnabled); ErrorSanitizerEnabled = SavedEnabled_; - ErrorSanitizerDatetimeOverride = SavedDatetimeOverride_; + GetTlsRef(ErrorSanitizerDatetimeOverride) = SavedDatetimeOverride_; } //////////////////////////////////////////////////////////////////////////////// @@ -282,7 +283,7 @@ private: void CaptureOriginAttributes() { if (ErrorSanitizerEnabled) { - Datetime_ = ErrorSanitizerDatetimeOverride; + Datetime_ = GetTlsRef(ErrorSanitizerDatetimeOverride); return; } diff --git a/yt/yt/core/misc/hazard_ptr-inl.h b/yt/yt/core/misc/hazard_ptr-inl.h index 4e5a4fc04c..44ae2c69d5 100644 --- a/yt/yt/core/misc/hazard_ptr-inl.h +++ b/yt/yt/core/misc/hazard_ptr-inl.h @@ -5,6 +5,8 @@ #endif #undef HAZARD_PTR_INL_H_ +#include <library/cpp/yt/misc/tls.h> + #include <array> namespace NYT { @@ -18,10 +20,10 @@ namespace NDetail { constexpr int MaxHazardPointersPerThread = 2; using THazardPointerSet = std::array<std::atomic<void*>, MaxHazardPointersPerThread>; -extern thread_local THazardPointerSet HazardPointers; +extern YT_THREAD_LOCAL(THazardPointerSet) HazardPointers; struct THazardThreadState; -extern thread_local THazardThreadState* HazardThreadState; +extern YT_THREAD_LOCAL(THazardThreadState*) HazardThreadState; void InitHazardThreadState(); @@ -87,8 +89,10 @@ THazardPtr<T> THazardPtr<T>::Acquire(TPtrLoader&& ptrLoader, T* ptr) return {}; } - auto* hazardPtr = [] { - for (auto it = NYT::NDetail::HazardPointers.begin(); it != NYT::NDetail::HazardPointers.end(); ++it) { + auto& hazardPointers = GetTlsRef(NYT::NDetail::HazardPointers); + + auto* hazardPtr = [&] { + for (auto it = hazardPointers.begin(); it != hazardPointers.end(); ++it) { auto& ptr = *it; if (!ptr.load(std::memory_order::relaxed)) { return &ptr; diff --git a/yt/yt/core/misc/hazard_ptr.cpp b/yt/yt/core/misc/hazard_ptr.cpp index 0c52c68bd4..6ba356bf0b 100644 --- a/yt/yt/core/misc/hazard_ptr.cpp +++ b/yt/yt/core/misc/hazard_ptr.cpp @@ -15,6 +15,8 @@ #include <library/cpp/yt/memory/free_list.h> +#include <library/cpp/yt/misc/tls.h> + namespace NYT { using namespace NConcurrency; @@ -30,7 +32,7 @@ namespace NDetail { //////////////////////////////////////////////////////////////////////////// -thread_local THazardPointerSet HazardPointers; +YT_THREAD_LOCAL(THazardPointerSet) HazardPointers; //! A simple container based on free list which supports only Enqueue and DequeueAll. template <class T> @@ -110,8 +112,8 @@ struct THazardThreadState { } }; -thread_local THazardThreadState* HazardThreadState; -thread_local bool HazardThreadStateDestroyed; +YT_THREAD_LOCAL(THazardThreadState*) HazardThreadState; +YT_THREAD_LOCAL(bool) HazardThreadStateDestroyed; //////////////////////////////////////////////////////////////////////////////// @@ -258,7 +260,7 @@ void THazardPointerManager::InitThreadState() THazardThreadState* THazardPointerManager::AllocateThreadState() { - auto* threadState = new THazardThreadState(&HazardPointers); + auto* threadState = new THazardThreadState(&GetTlsRef(HazardPointers)); struct THazardThreadStateDestroyer { @@ -271,7 +273,7 @@ THazardThreadState* THazardPointerManager::AllocateThreadState() }; // Unregisters thread from hazard ptr manager on thread exit. - static thread_local THazardThreadStateDestroyer destroyer{threadState}; + static YT_THREAD_LOCAL(THazardThreadStateDestroyer) destroyer{threadState}; { auto guard = WriterGuard(ThreadRegistryLock_); diff --git a/yt/yt/core/misc/pool_allocator-inl.h b/yt/yt/core/misc/pool_allocator-inl.h index 5602258bc6..f538228244 100644 --- a/yt/yt/core/misc/pool_allocator-inl.h +++ b/yt/yt/core/misc/pool_allocator-inl.h @@ -4,6 +4,8 @@ #include "pool_allocator.h" #endif +#include <library/cpp/yt/misc/tls.h> + #include <util/system/align.h> namespace NYT { @@ -70,13 +72,13 @@ std::unique_ptr<T> TPoolAllocator::New(TArgs&&... args) struct TChunkTag { }; constexpr auto ChunkSize = 64_KB; - static thread_local TPoolAllocator Allocator( + static YT_THREAD_LOCAL(TPoolAllocator) Allocator( sizeof(T), alignof(T), ChunkSize, GetRefCountedTypeCookie<TChunkTag>()); - return std::unique_ptr<T>(new(&Allocator) T(std::forward<TArgs>(args)...)); + return std::unique_ptr<T>(new(&GetTlsRef(Allocator)) T(std::forward<TArgs>(args)...)); } inline void TPoolAllocator::DoFree(void* ptr) diff --git a/yt/yt/core/misc/ref_counted_tracker.cpp b/yt/yt/core/misc/ref_counted_tracker.cpp index 617ce2cd26..6cc0dd2a2d 100644 --- a/yt/yt/core/misc/ref_counted_tracker.cpp +++ b/yt/yt/core/misc/ref_counted_tracker.cpp @@ -8,6 +8,8 @@ #include <library/cpp/yt/memory/memory_tag.h> +#include <library/cpp/yt/misc/tls.h> + #include <algorithm> namespace NYT { @@ -142,14 +144,14 @@ size_t TRefCountedTracker::TNamedSlot::ClampNonnegative(size_t allocated, size_t //////////////////////////////////////////////////////////////////////////////// // nullptr if not initialized or already destroyed -thread_local TRefCountedTracker::TLocalSlots* TRefCountedTracker::LocalSlots_; +YT_THREAD_LOCAL(TRefCountedTracker::TLocalSlots*) TRefCountedTracker::LocalSlots_; // nullptr if not initialized or already destroyed -thread_local TRefCountedTracker::TLocalSlot* TRefCountedTracker::LocalSlotsBegin_; +YT_THREAD_LOCAL(TRefCountedTracker::TLocalSlot*) TRefCountedTracker::LocalSlotsBegin_; // 0 if not initialized // -1 if already destroyed -thread_local int TRefCountedTracker::LocalSlotsSize_; +YT_THREAD_LOCAL(int) TRefCountedTracker::LocalSlotsSize_; int TRefCountedTracker::GetTrackedThreadCount() const { @@ -410,15 +412,17 @@ TRefCountedTracker::TLocalSlot* TRefCountedTracker::GetLocalSlot(TRefCountedType auto guard = Guard(this_->SpinLock_); - if (this_->GlobalSlots_.size() < LocalSlots_->size()) { - this_->GlobalSlots_.resize(std::max(LocalSlots_->size(), this_->GlobalSlots_.size())); + auto& localSlots = GetTlsRef(LocalSlots_); + + if (this_->GlobalSlots_.size() < localSlots->size()) { + this_->GlobalSlots_.resize(std::max(localSlots->size(), this_->GlobalSlots_.size())); } - for (auto index = 0; index < std::ssize(*LocalSlots_); ++index) { - this_->GlobalSlots_[index] += (*LocalSlots_)[index]; + for (auto index = 0; index < std::ssize(*localSlots); ++index) { + this_->GlobalSlots_[index] += (*localSlots)[index]; } - YT_VERIFY(this_->AllLocalSlots_.erase(LocalSlots_) == 1); + YT_VERIFY(this_->AllLocalSlots_.erase(localSlots) == 1); delete LocalSlots_; LocalSlots_ = nullptr; @@ -427,7 +431,7 @@ TRefCountedTracker::TLocalSlot* TRefCountedTracker::GetLocalSlot(TRefCountedType } }; - static thread_local TReclaimer Reclaimer; + static YT_THREAD_LOCAL(TReclaimer) Reclaimer; YT_VERIFY(LocalSlotsSize_ >= 0); @@ -435,7 +439,7 @@ TRefCountedTracker::TLocalSlot* TRefCountedTracker::GetLocalSlot(TRefCountedType if (!LocalSlots_) { LocalSlots_ = new TLocalSlots(); - YT_VERIFY(AllLocalSlots_.insert(LocalSlots_).second); + YT_VERIFY(AllLocalSlots_.insert(GetTlsRef(LocalSlots_)).second); } auto index = cookie.Underlying(); diff --git a/yt/yt/core/misc/ref_counted_tracker.h b/yt/yt/core/misc/ref_counted_tracker.h index fd5bb0a887..89c161151d 100644 --- a/yt/yt/core/misc/ref_counted_tracker.h +++ b/yt/yt/core/misc/ref_counted_tracker.h @@ -6,6 +6,8 @@ #include <library/cpp/yt/threading/fork_aware_spin_lock.h> +#include <library/cpp/yt/misc/tls.h> + namespace NYT { //////////////////////////////////////////////////////////////////////////////// @@ -98,14 +100,14 @@ private: using TNamedStatistics = std::vector<TNamedSlot>; // nullptr if not initialized or already destroyed - static thread_local TLocalSlots* LocalSlots_; + static YT_THREAD_LOCAL(TLocalSlots*) LocalSlots_; // nullptr if not initialized or already destroyed - static thread_local TLocalSlot* LocalSlotsBegin_; + static YT_THREAD_LOCAL(TLocalSlot*) LocalSlotsBegin_; // 0 if not initialized // -1 if already destroyed - static thread_local int LocalSlotsSize_; + static YT_THREAD_LOCAL(int) LocalSlotsSize_; mutable NThreading::TForkAwareSpinLock SpinLock_; std::map<TKey, TRefCountedTypeCookie> KeyToCookie_; diff --git a/yt/yt/core/misc/shutdown.cpp b/yt/yt/core/misc/shutdown.cpp index 0d46fcfdb8..758980909b 100644 --- a/yt/yt/core/misc/shutdown.cpp +++ b/yt/yt/core/misc/shutdown.cpp @@ -7,6 +7,8 @@ #include <library/cpp/yt/threading/fork_aware_spin_lock.h> #include <library/cpp/yt/threading/event_count.h> +#include <library/cpp/yt/misc/tls.h> + #include <util/generic/algorithm.h> #include <util/system/env.h> @@ -280,7 +282,7 @@ static const void* ShutdownGuardInitializer = [] { } }; - static thread_local TShutdownGuard Guard; + static YT_THREAD_LOCAL(TShutdownGuard) Guard; return nullptr; }(); diff --git a/yt/yt/core/rpc/bus/channel.cpp b/yt/yt/core/rpc/bus/channel.cpp index ae030b5615..ec8c6fd04d 100644 --- a/yt/yt/core/rpc/bus/channel.cpp +++ b/yt/yt/core/rpc/bus/channel.cpp @@ -26,6 +26,8 @@ #include <library/cpp/yt/threading/rw_spin_lock.h> #include <library/cpp/yt/threading/spin_lock.h> +#include <library/cpp/yt/misc/tls.h> + #include <array> namespace NYT::NRpc::NBus { @@ -404,7 +406,7 @@ private: } // YT-1639: Avoid long chain of recursive calls. - thread_local int Depth = 0; + YT_THREAD_LOCAL(int) Depth = 0; constexpr int MaxDepth = 10; if (Depth < MaxDepth) { ++Depth; diff --git a/yt/yt/core/rpc/service_detail.cpp b/yt/yt/core/rpc/service_detail.cpp index 74ebf6b4b4..067a057e75 100644 --- a/yt/yt/core/rpc/service_detail.cpp +++ b/yt/yt/core/rpc/service_detail.cpp @@ -30,6 +30,8 @@ #include <yt/yt/core/profiling/timing.h> +#include <library/cpp/yt/misc/tls.h> + namespace NYT::NRpc { using namespace NBus; @@ -1295,7 +1297,7 @@ void TRequestQueue::OnRequestFinished() // Prevents reentrant invocations. // One case is: RunRequest calling the handler synchronously, which replies the // context, which calls context->Finish, and we're back here again. -static thread_local bool ScheduleRequestsLatch = false; +static YT_THREAD_LOCAL(bool) ScheduleRequestsLatch = false; void TRequestQueue::ScheduleRequestsFromQueue() { diff --git a/yt/yt/core/threading/thread.cpp b/yt/yt/core/threading/thread.cpp index 59a0b66f4d..1aecb9242e 100644 --- a/yt/yt/core/threading/thread.cpp +++ b/yt/yt/core/threading/thread.cpp @@ -6,6 +6,8 @@ #include <yt/yt/core/misc/proc.h> +#include <library/cpp/yt/misc/tls.h> + #ifdef _linux_ #include <sched.h> #endif @@ -14,7 +16,7 @@ namespace NYT::NThreading { //////////////////////////////////////////////////////////////////////////////// -static thread_local TThreadId CurrentUniqueThreadId; +static YT_THREAD_LOCAL(TThreadId) CurrentUniqueThreadId; static std::atomic<TThreadId> UniqueThreadIdGenerator; static const auto& Logger = ThreadingLogger; @@ -218,11 +220,11 @@ void TThread::ThreadMainTrampoline() bool Armed_ = true; }; - static thread_local TExitInterceptor Interceptor; + static YT_THREAD_LOCAL(TExitInterceptor) Interceptor; ThreadMain(); - Interceptor.Disarm(); + GetTlsRef(Interceptor).Disarm(); StoppedEvent_.NotifyAll(); } diff --git a/yt/yt/core/tracing/trace_context-inl.h b/yt/yt/core/tracing/trace_context-inl.h index dcb2bb78d7..228498b548 100644 --- a/yt/yt/core/tracing/trace_context-inl.h +++ b/yt/yt/core/tracing/trace_context-inl.h @@ -8,6 +8,8 @@ #include <yt/yt/core/concurrency/thread_affinity.h> +#include <library/cpp/yt/misc/tls.h> + #include <atomic> namespace NYT::NTracing { @@ -179,7 +181,7 @@ std::optional<TTag> TTraceContext::SetAllocationTag(const TString& key, TTag new namespace NDetail { -extern thread_local TTraceContext* CurrentTraceContext; +extern YT_THREAD_LOCAL(TTraceContext*) CurrentTraceContext; TTraceContextPtr SwapTraceContext(TTraceContextPtr newContext); diff --git a/yt/yt/core/tracing/trace_context.cpp b/yt/yt/core/tracing/trace_context.cpp index 7aedfb5dfe..b4d1a2db67 100644 --- a/yt/yt/core/tracing/trace_context.cpp +++ b/yt/yt/core/tracing/trace_context.cpp @@ -20,6 +20,8 @@ #include <library/cpp/yt/memory/atomic_intrusive_ptr.h> +#include <library/cpp/yt/misc/tls.h> + #include <atomic> #include <mutex> @@ -100,8 +102,8 @@ TTracingConfigPtr GetTracingConfig() namespace NDetail { -thread_local TTraceContext* CurrentTraceContext; -thread_local TCpuInstant TraceContextTimingCheckpoint; +YT_THREAD_LOCAL(TTraceContext*) CurrentTraceContext; +YT_THREAD_LOCAL(TCpuInstant) TraceContextTimingCheckpoint; TSpanId GenerateSpanId() { diff --git a/yt/yt/core/ytree/yson_struct.h b/yt/yt/core/ytree/yson_struct.h index 0b26ca40b0..0d7eae1949 100644 --- a/yt/yt/core/ytree/yson_struct.h +++ b/yt/yt/core/ytree/yson_struct.h @@ -156,7 +156,7 @@ public: void InitializeStruct(TStruct* target); private: - static inline thread_local IYsonStructMeta* CurrentlyInitializingMeta_ = nullptr; + static inline YT_THREAD_LOCAL(IYsonStructMeta*) CurrentlyInitializingMeta_ = nullptr; template <class TStruct> friend class TYsonStructRegistrar; diff --git a/yt/yt/library/ytprof/api/api.cpp b/yt/yt/library/ytprof/api/api.cpp index debee0630c..f00d11d88c 100644 --- a/yt/yt/library/ytprof/api/api.cpp +++ b/yt/yt/library/ytprof/api/api.cpp @@ -1,5 +1,7 @@ #include "api.h" +#include <library/cpp/yt/misc/tls.h> + namespace NYT::NYTProf { //////////////////////////////////////////////////////////////////////////////// @@ -9,7 +11,7 @@ DEFINE_REFCOUNTED_TYPE(TProfilerTag) struct TCpuProfilerTags; // This variable is referenced from signal handler. -constinit thread_local std::atomic<TCpuProfilerTags*> CpuProfilerTagsPtr = nullptr; +constinit YT_THREAD_LOCAL(std::atomic<TCpuProfilerTags*>) CpuProfilerTagsPtr = nullptr; struct TCpuProfilerTags { @@ -28,7 +30,7 @@ struct TCpuProfilerTags // We can't reference CpuProfilerTags from signal handler, // since it may trigger lazy initialization. -thread_local TCpuProfilerTags CpuProfilerTags; +YT_THREAD_LOCAL(TCpuProfilerTags) CpuProfilerTags; std::array<TAtomicSignalPtr<TProfilerTag>, MaxActiveTags>* GetCpuProfilerTags() { @@ -44,9 +46,11 @@ std::array<TAtomicSignalPtr<TProfilerTag>, MaxActiveTags>* GetCpuProfilerTags() TCpuProfilerTagGuard::TCpuProfilerTagGuard(TProfilerTagPtr tag) { + auto& cpuProfilerTags = GetTlsRef(CpuProfilerTags); + for (int i = 0; i < MaxActiveTags; i++) { - if (!CpuProfilerTags.Tags[i].IsSetFromThread()) { - CpuProfilerTags.Tags[i].StoreFromThread(std::move(tag)); + if (!cpuProfilerTags.Tags[i].IsSetFromThread()) { + cpuProfilerTags.Tags[i].StoreFromThread(std::move(tag)); TagIndex_ = i; return; } @@ -55,8 +59,10 @@ TCpuProfilerTagGuard::TCpuProfilerTagGuard(TProfilerTagPtr tag) TCpuProfilerTagGuard::~TCpuProfilerTagGuard() { + auto& cpuProfilerTags = GetTlsRef(CpuProfilerTags); + if (TagIndex_ != -1) { - CpuProfilerTags.Tags[TagIndex_].StoreFromThread(nullptr); + cpuProfilerTags.Tags[TagIndex_].StoreFromThread(nullptr); } } @@ -73,7 +79,8 @@ TCpuProfilerTagGuard& TCpuProfilerTagGuard::operator = (TCpuProfilerTagGuard&& o } if (TagIndex_ != -1) { - CpuProfilerTags.Tags[TagIndex_].StoreFromThread(nullptr); + auto& cpuProfilerTags = GetTlsRef(CpuProfilerTags); + cpuProfilerTags.Tags[TagIndex_].StoreFromThread(nullptr); } TagIndex_ = other.TagIndex_; diff --git a/yt/yt/library/ytprof/api/api.h b/yt/yt/library/ytprof/api/api.h index f69ccc4c32..c04b1ebcf5 100644 --- a/yt/yt/library/ytprof/api/api.h +++ b/yt/yt/library/ytprof/api/api.h @@ -6,6 +6,7 @@ #include <yt/yt/library/ytprof/api/atomic_signal_ptr.h> +#include <library/cpp/yt/misc/port.h> #include <library/cpp/yt/cpu_clock/public.h> #include <library/cpp/yt/memory/intrusive_ptr.h> diff --git a/yt/yt/library/ytprof/spinlock_profiler.cpp b/yt/yt/library/ytprof/spinlock_profiler.cpp index a9b3b6b248..f6fc756703 100644 --- a/yt/yt/library/ytprof/spinlock_profiler.cpp +++ b/yt/yt/library/ytprof/spinlock_profiler.cpp @@ -2,6 +2,8 @@ #include <library/cpp/yt/backtrace/cursors/interop/interop.h> +#include <library/cpp/yt/misc/tls.h> + #include <absl/base/internal/spinlock.h> #include <absl/base/internal/cycleclock.h> @@ -64,7 +66,7 @@ void TSpinlockProfiler::RecordEvent(const void* /*lock*/, int64_t waitCycles) RecordSample(&fpCursor, waitCycles); } -static thread_local int SpinlockEventCount; +static YT_THREAD_LOCAL(int) SpinlockEventCount; void TSpinlockProfiler::OnEvent(const void* lock, int64_t waitCycles) { @@ -169,7 +171,7 @@ void TBlockingProfiler::RecordEvent( RecordSample(&fpCursor, cpuDelay); } -static thread_local int YTSpinlockEventCount; +static YT_THREAD_LOCAL(int) YTSpinlockEventCount; void TBlockingProfiler::OnEvent( TCpuDuration cpuDelay, |