diff options
author | hor911 <hor911@ydb.tech> | 2022-07-16 18:10:10 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-07-16 18:10:10 +0300 |
commit | 7537371746d95bc04135888d09ac2068233932ea (patch) | |
tree | 39204ff36893a0289cd4a100f12840f1647d4ecf /library/cpp | |
parent | 1fa71dfe5f97c247426cdba8779a5850b62b69dd (diff) | |
download | ydb-7537371746d95bc04135888d09ac2068233932ea.tar.gz |
Consistent logging
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/yt/memory/leaky_singleton-inl.h | 34 | ||||
-rw-r--r-- | library/cpp/yt/memory/leaky_singleton.h | 34 | ||||
-rw-r--r-- | library/cpp/yt/threading/at_fork.cpp | 139 | ||||
-rw-r--r-- | library/cpp/yt/threading/at_fork.h | 30 | ||||
-rw-r--r-- | library/cpp/yt/threading/fork_aware_spin_lock.cpp | 116 | ||||
-rw-r--r-- | library/cpp/yt/threading/fork_aware_spin_lock.h | 11 | ||||
-rw-r--r-- | library/cpp/ytalloc/impl/core-inl.h | 46 |
7 files changed, 254 insertions, 156 deletions
diff --git a/library/cpp/yt/memory/leaky_singleton-inl.h b/library/cpp/yt/memory/leaky_singleton-inl.h new file mode 100644 index 00000000000..932747c9213 --- /dev/null +++ b/library/cpp/yt/memory/leaky_singleton-inl.h @@ -0,0 +1,34 @@ +#ifndef LEAKY_SINGLETON_INL_H_ +#error "Direct inclusion of this file is not allowed, include leaky_singleton.h" +// For the sake of sane code completion. +#include "leaky_singleton.h" +#endif + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +TLeakyStorage<T>::TLeakyStorage() +{ + new (Get()) T(); +} + +template <class T> +T* TLeakyStorage<T>::Get() +{ + return reinterpret_cast<T*>(Buffer_); +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +T* LeakySingleton() +{ + static TLeakyStorage<T> Storage; + return Storage.Get(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/leaky_singleton.h b/library/cpp/yt/memory/leaky_singleton.h new file mode 100644 index 00000000000..03b5e51d782 --- /dev/null +++ b/library/cpp/yt/memory/leaky_singleton.h @@ -0,0 +1,34 @@ +#pragma once + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +class TLeakyStorage +{ +public: + TLeakyStorage(); + + T* Get(); + +private: + alignas(T) char Buffer_[sizeof(T)]; +}; + +//////////////////////////////////////////////////////////////////////////////// + +#define DECLARE_LEAKY_SINGLETON_FRIEND() \ + template <class T> \ + friend class ::NYT::TLeakyStorage; + +template <class T> +T* LeakySingleton(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define LEAKY_SINGLETON_INL_H_ +#include "leaky_singleton-inl.h" +#undef LEAKY_SINGLETON_INL_H_ diff --git a/library/cpp/yt/threading/at_fork.cpp b/library/cpp/yt/threading/at_fork.cpp new file mode 100644 index 00000000000..6cf1a7fbfe1 --- /dev/null +++ b/library/cpp/yt/threading/at_fork.cpp @@ -0,0 +1,139 @@ +#include "at_fork.h" + +#include <library/cpp/yt/memory/leaky_singleton.h> + +#include <library/cpp/yt/assert/assert.h> + +#ifdef _unix_ + #include <pthread.h> +#endif + +#include <atomic> +#include <array> + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +class TAtForkManager +{ +public: + static TAtForkManager* Get() + { + return LeakySingleton<TAtForkManager>(); + } + + void RegisterAtForkHandlers( + TAtForkHandler prepare, + TAtForkHandler parent, + TAtForkHandler child) + { + int index = AtForkHandlerCount_++; + Y_VERIFY(index < MaxAtForkHandlerSets); + auto& set = AtForkHandlerSets_[index]; + set.Prepare = std::move(prepare); + set.Parent = std::move(parent); + set.Child = std::move(child); + set.Initialized.store(true); + } + + TReaderWriterSpinLock* GetForkLock() + { + return &ForkLock_; + } + +private: + DECLARE_LEAKY_SINGLETON_FRIEND() + + TReaderWriterSpinLock ForkLock_; + + struct TAtForkHandlerSet + { + TAtForkHandler Prepare; + TAtForkHandler Parent; + TAtForkHandler Child; + std::atomic<bool> Initialized; + }; + + static constexpr int MaxAtForkHandlerSets = 8; + std::array<TAtForkHandlerSet, MaxAtForkHandlerSets> AtForkHandlerSets_; + std::atomic<int> AtForkHandlerCount_ = 0; + + TAtForkManager() + { +#ifdef _unix_ + pthread_atfork( + [] { Get()->OnPrepare(); }, + [] { Get()->OnParent(); }, + [] { Get()->OnChild(); }); +#endif + } + + void OnPrepare() + { + IterateAtForkHandlerSets([] (const TAtForkHandlerSet& set) { + if (set.Prepare) { + set.Prepare(); + } + }); + ForkLock_.AcquireWriter(); + } + + + void OnParent() + { + ForkLock_.ReleaseWriter(); + IterateAtForkHandlerSets([] (const TAtForkHandlerSet& set) { + if (set.Parent) { + set.Parent(); + } + }); + } + + void OnChild() + { + ForkLock_.ReleaseWriter(); + IterateAtForkHandlerSets([] (const TAtForkHandlerSet& set) { + if (set.Child) { + set.Child(); + } + }); + } + + template <class F> + void IterateAtForkHandlerSets(F func) + { + for (const auto& set : AtForkHandlerSets_) { + if (set.Initialized.load()) { + func(set); + } + } + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +static void* AtForkManagerInitializer = [] { return TAtForkManager::Get(); }(); + +//////////////////////////////////////////////////////////////////////////////// + +void RegisterAtForkHandlers( + TAtForkHandler prepare, + TAtForkHandler parent, + TAtForkHandler child) +{ + return TAtForkManager::Get()->RegisterAtForkHandlers( + std::move(prepare), + std::move(parent), + std::move(child)); +} + +TReaderWriterSpinLock* GetForkLock() +{ + return TAtForkManager::Get()->GetForkLock(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading + diff --git a/library/cpp/yt/threading/at_fork.h b/library/cpp/yt/threading/at_fork.h new file mode 100644 index 00000000000..bb9a2f2ebd5 --- /dev/null +++ b/library/cpp/yt/threading/at_fork.h @@ -0,0 +1,30 @@ +#pragma once + +#include "rw_spin_lock.h" + +#include <functional> + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +using TAtForkHandler = std::function<void()>; + +//! Registers handlers to be invoked at fork time. +//! See |pthread_atfork| for more details. +/*! + * Once all prepare handlers are invoked, fork lock is acquired + * in writer mode. This lock is subsequently released in both child + * and parent processes once fork is complete. + */ +void RegisterAtForkHandlers( + TAtForkHandler prepare, + TAtForkHandler parent, + TAtForkHandler child); + +//! Returns the fork lock. +TReaderWriterSpinLock* GetForkLock(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading diff --git a/library/cpp/yt/threading/fork_aware_spin_lock.cpp b/library/cpp/yt/threading/fork_aware_spin_lock.cpp index 3a644694853..44a8ab92522 100644 --- a/library/cpp/yt/threading/fork_aware_spin_lock.cpp +++ b/library/cpp/yt/threading/fork_aware_spin_lock.cpp @@ -1,114 +1,21 @@ #include "fork_aware_spin_lock.h" -#include "rw_spin_lock.h" -#ifdef _unix_ - #include <pthread.h> -#endif - -#include <atomic> -#include <array> +#include "at_fork.h" namespace NYT::NThreading { //////////////////////////////////////////////////////////////////////////////// -static constexpr int MaxAtForkHandlers = 8; -using TAtForkHandler = TForkAwareSpinLock::TAtForkHandler; - -struct TAtForkHandlerSet -{ - void* Cookie; - TAtForkHandler Prepare; - TAtForkHandler Parent; - TAtForkHandler Child; - std::atomic<bool> Initialized; -}; - -static std::array<TAtForkHandlerSet, MaxAtForkHandlers> AtForkHandlerSets; -static std::atomic<int> AtForkHandlerCount; - -//////////////////////////////////////////////////////////////////////////////// - -class TForkProtector -{ -public: - static TForkProtector* Get() - { - static TForkProtector Instance; - return &Instance; - } - - TReaderWriterSpinLock& ForkLock() - { - return ForkLock_; - } - -private: - TReaderWriterSpinLock ForkLock_; - - TForkProtector() - { -#ifdef _unix_ - pthread_atfork( - &TForkProtector::OnPrepare, - &TForkProtector::OnParent, - &TForkProtector::OnChild); -#endif - } - - static void OnPrepare() - { - for (const auto& set : AtForkHandlerSets) { - if (set.Initialized.load() && set.Prepare) { - set.Prepare(set.Cookie); - } - } - Get()->ForkLock().AcquireWriter(); - } - - static void OnParent() - { - Get()->ForkLock().ReleaseWriter(); - for (const auto& set : AtForkHandlerSets) { - if (set.Initialized.load() && set.Parent) { - set.Parent(set.Cookie); - } - } - } - - static void OnChild() - { - Get()->ForkLock().ReleaseWriter(); - for (const auto& set : AtForkHandlerSets) { - if (set.Initialized.load() && set.Child) { - set.Child(set.Cookie); - } - } - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -static struct TForkProtectorInitializer -{ - TForkProtectorInitializer() - { - TForkProtector::Get(); - } -} ForkProtectorInitializer; - -//////////////////////////////////////////////////////////////////////////////// - void TForkAwareSpinLock::Acquire() noexcept { - TForkProtector::Get()->ForkLock().AcquireReaderForkFriendly(); + GetForkLock()->AcquireReaderForkFriendly(); SpinLock_.Acquire(); } void TForkAwareSpinLock::Release() noexcept { SpinLock_.Release(); - TForkProtector::Get()->ForkLock().ReleaseReader(); + GetForkLock()->ReleaseReader(); } bool TForkAwareSpinLock::IsLocked() noexcept @@ -116,23 +23,6 @@ bool TForkAwareSpinLock::IsLocked() noexcept return SpinLock_.IsLocked(); } -void TForkAwareSpinLock::AtFork( - void* cookie, - TAtForkHandler prepare, - TAtForkHandler parent, - TAtForkHandler child) -{ - TForkProtector::Get(); - int index = AtForkHandlerCount++; - Y_VERIFY(index < MaxAtForkHandlers); - auto& set = AtForkHandlerSets[index]; - set.Cookie = cookie; - set.Prepare = prepare; - set.Parent = parent; - set.Child = child; - set.Initialized.store(true); -} - //////////////////////////////////////////////////////////////////////////////// } // namespace NYT::NThreading diff --git a/library/cpp/yt/threading/fork_aware_spin_lock.h b/library/cpp/yt/threading/fork_aware_spin_lock.h index 53b0a485464..c122554240c 100644 --- a/library/cpp/yt/threading/fork_aware_spin_lock.h +++ b/library/cpp/yt/threading/fork_aware_spin_lock.h @@ -6,8 +6,8 @@ namespace NYT::NThreading { //////////////////////////////////////////////////////////////////////////////// -//! Wraps TSpinLock and additionally acquires a global read lock preventing -//! concurrent forks from happening. +//! Wraps TSpinLock and additionally acquires a global fork lock (in read mode) +//! preventing concurrent forks from happening. class TForkAwareSpinLock { public: @@ -20,13 +20,6 @@ public: bool IsLocked() noexcept; - using TAtForkHandler = void(*)(void*); - static void AtFork( - void* cookie, - TAtForkHandler prepare, - TAtForkHandler parent, - TAtForkHandler child); - private: TSpinLock SpinLock_; }; diff --git a/library/cpp/ytalloc/impl/core-inl.h b/library/cpp/ytalloc/impl/core-inl.h index c66e8478b5c..e8e5d254422 100644 --- a/library/cpp/ytalloc/impl/core-inl.h +++ b/library/cpp/ytalloc/impl/core-inl.h @@ -7,6 +7,7 @@ #include <library/cpp/yt/containers/intrusive_linked_list.h> +#include <library/cpp/yt/threading/at_fork.h> #include <library/cpp/yt/threading/fork_aware_spin_lock.h> #include <util/system/tls.h> @@ -1847,11 +1848,10 @@ public: { pthread_key_create(&ThreadDtorKey_, DestroyThread); - NThreading::TForkAwareSpinLock::AtFork( - this, + NThreading::RegisterAtForkHandlers( nullptr, nullptr, - &AfterFork); + [=] { AfterFork(); }); } // Returns TThreadState for the current thread; the caller guarantees that this @@ -2022,8 +2022,7 @@ private: void DestroyThreadState(TThreadState* state); - static void AfterFork(void* cookie); - void DoAfterFork(); + void AfterFork(); private: // TThreadState instance for the current thread. @@ -4208,11 +4207,10 @@ public: TBackgroundThreadBase() : State_(new TState()) { - NThreading::TForkAwareSpinLock::AtFork( - static_cast<T*>(this), - &BeforeFork, - &AfterForkParent, - &AfterForkChild); + NThreading::RegisterAtForkHandlers( + [=] { BeforeFork(); }, + [=] { AfterForkParent(); }, + [=] { AfterForkChild(); }); } virtual ~TBackgroundThreadBase() @@ -4240,12 +4238,7 @@ private: TState* State_; private: - static void BeforeFork(void* cookie) - { - static_cast<T*>(cookie)->DoBeforeFork(); - } - - void DoBeforeFork() + void BeforeFork() { bool stopped = Stop(); if (State_->ForkDepth++ == 0) { @@ -4253,12 +4246,7 @@ private: } } - static void AfterForkParent(void* cookie) - { - static_cast<T*>(cookie)->DoAfterForkParent(); - } - - void DoAfterForkParent() + void AfterForkParent() { if (--State_->ForkDepth == 0) { if (State_->RestartAfterFork) { @@ -4267,12 +4255,7 @@ private: } } - static void AfterForkChild(void* cookie) - { - static_cast<T*>(cookie)->DoAfterForkChild(); - } - - void DoAfterForkChild() + void AfterForkChild() { bool restart = State_->RestartAfterFork; State_ = new TState(); @@ -4531,12 +4514,7 @@ void TThreadManager::DestroyThreadState(TThreadState* state) ThreadStatePool_.Free(state); } -void TThreadManager::AfterFork(void* cookie) -{ - static_cast<TThreadManager*>(cookie)->DoAfterFork(); -} - -void TThreadManager::DoAfterFork() +void TThreadManager::AfterFork() { auto guard = GuardWithTiming(ThreadRegistryLock_); ThreadRegistry_.Clear(); |