aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-07-16 18:10:10 +0300
committerhor911 <hor911@ydb.tech>2022-07-16 18:10:10 +0300
commit7537371746d95bc04135888d09ac2068233932ea (patch)
tree39204ff36893a0289cd4a100f12840f1647d4ecf /library/cpp
parent1fa71dfe5f97c247426cdba8779a5850b62b69dd (diff)
downloadydb-7537371746d95bc04135888d09ac2068233932ea.tar.gz
Consistent logging
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/yt/memory/leaky_singleton-inl.h34
-rw-r--r--library/cpp/yt/memory/leaky_singleton.h34
-rw-r--r--library/cpp/yt/threading/at_fork.cpp139
-rw-r--r--library/cpp/yt/threading/at_fork.h30
-rw-r--r--library/cpp/yt/threading/fork_aware_spin_lock.cpp116
-rw-r--r--library/cpp/yt/threading/fork_aware_spin_lock.h11
-rw-r--r--library/cpp/ytalloc/impl/core-inl.h46
7 files changed, 254 insertions, 156 deletions
diff --git a/library/cpp/yt/memory/leaky_singleton-inl.h b/library/cpp/yt/memory/leaky_singleton-inl.h
new file mode 100644
index 00000000000..932747c9213
--- /dev/null
+++ b/library/cpp/yt/memory/leaky_singleton-inl.h
@@ -0,0 +1,34 @@
+#ifndef LEAKY_SINGLETON_INL_H_
+#error "Direct inclusion of this file is not allowed, include leaky_singleton.h"
+// For the sake of sane code completion.
+#include "leaky_singleton.h"
+#endif
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+TLeakyStorage<T>::TLeakyStorage()
+{
+ new (Get()) T();
+}
+
+template <class T>
+T* TLeakyStorage<T>::Get()
+{
+ return reinterpret_cast<T*>(Buffer_);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+T* LeakySingleton()
+{
+ static TLeakyStorage<T> Storage;
+ return Storage.Get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/library/cpp/yt/memory/leaky_singleton.h b/library/cpp/yt/memory/leaky_singleton.h
new file mode 100644
index 00000000000..03b5e51d782
--- /dev/null
+++ b/library/cpp/yt/memory/leaky_singleton.h
@@ -0,0 +1,34 @@
+#pragma once
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+class TLeakyStorage
+{
+public:
+ TLeakyStorage();
+
+ T* Get();
+
+private:
+ alignas(T) char Buffer_[sizeof(T)];
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+#define DECLARE_LEAKY_SINGLETON_FRIEND() \
+ template <class T> \
+ friend class ::NYT::TLeakyStorage;
+
+template <class T>
+T* LeakySingleton();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
+
+#define LEAKY_SINGLETON_INL_H_
+#include "leaky_singleton-inl.h"
+#undef LEAKY_SINGLETON_INL_H_
diff --git a/library/cpp/yt/threading/at_fork.cpp b/library/cpp/yt/threading/at_fork.cpp
new file mode 100644
index 00000000000..6cf1a7fbfe1
--- /dev/null
+++ b/library/cpp/yt/threading/at_fork.cpp
@@ -0,0 +1,139 @@
+#include "at_fork.h"
+
+#include <library/cpp/yt/memory/leaky_singleton.h>
+
+#include <library/cpp/yt/assert/assert.h>
+
+#ifdef _unix_
+ #include <pthread.h>
+#endif
+
+#include <atomic>
+#include <array>
+
+namespace NYT::NThreading {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TAtForkManager
+{
+public:
+ static TAtForkManager* Get()
+ {
+ return LeakySingleton<TAtForkManager>();
+ }
+
+ void RegisterAtForkHandlers(
+ TAtForkHandler prepare,
+ TAtForkHandler parent,
+ TAtForkHandler child)
+ {
+ int index = AtForkHandlerCount_++;
+ Y_VERIFY(index < MaxAtForkHandlerSets);
+ auto& set = AtForkHandlerSets_[index];
+ set.Prepare = std::move(prepare);
+ set.Parent = std::move(parent);
+ set.Child = std::move(child);
+ set.Initialized.store(true);
+ }
+
+ TReaderWriterSpinLock* GetForkLock()
+ {
+ return &ForkLock_;
+ }
+
+private:
+ DECLARE_LEAKY_SINGLETON_FRIEND()
+
+ TReaderWriterSpinLock ForkLock_;
+
+ struct TAtForkHandlerSet
+ {
+ TAtForkHandler Prepare;
+ TAtForkHandler Parent;
+ TAtForkHandler Child;
+ std::atomic<bool> Initialized;
+ };
+
+ static constexpr int MaxAtForkHandlerSets = 8;
+ std::array<TAtForkHandlerSet, MaxAtForkHandlerSets> AtForkHandlerSets_;
+ std::atomic<int> AtForkHandlerCount_ = 0;
+
+ TAtForkManager()
+ {
+#ifdef _unix_
+ pthread_atfork(
+ [] { Get()->OnPrepare(); },
+ [] { Get()->OnParent(); },
+ [] { Get()->OnChild(); });
+#endif
+ }
+
+ void OnPrepare()
+ {
+ IterateAtForkHandlerSets([] (const TAtForkHandlerSet& set) {
+ if (set.Prepare) {
+ set.Prepare();
+ }
+ });
+ ForkLock_.AcquireWriter();
+ }
+
+
+ void OnParent()
+ {
+ ForkLock_.ReleaseWriter();
+ IterateAtForkHandlerSets([] (const TAtForkHandlerSet& set) {
+ if (set.Parent) {
+ set.Parent();
+ }
+ });
+ }
+
+ void OnChild()
+ {
+ ForkLock_.ReleaseWriter();
+ IterateAtForkHandlerSets([] (const TAtForkHandlerSet& set) {
+ if (set.Child) {
+ set.Child();
+ }
+ });
+ }
+
+ template <class F>
+ void IterateAtForkHandlerSets(F func)
+ {
+ for (const auto& set : AtForkHandlerSets_) {
+ if (set.Initialized.load()) {
+ func(set);
+ }
+ }
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void* AtForkManagerInitializer = [] { return TAtForkManager::Get(); }();
+
+////////////////////////////////////////////////////////////////////////////////
+
+void RegisterAtForkHandlers(
+ TAtForkHandler prepare,
+ TAtForkHandler parent,
+ TAtForkHandler child)
+{
+ return TAtForkManager::Get()->RegisterAtForkHandlers(
+ std::move(prepare),
+ std::move(parent),
+ std::move(child));
+}
+
+TReaderWriterSpinLock* GetForkLock()
+{
+ return TAtForkManager::Get()->GetForkLock();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NThreading
+
diff --git a/library/cpp/yt/threading/at_fork.h b/library/cpp/yt/threading/at_fork.h
new file mode 100644
index 00000000000..bb9a2f2ebd5
--- /dev/null
+++ b/library/cpp/yt/threading/at_fork.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include "rw_spin_lock.h"
+
+#include <functional>
+
+namespace NYT::NThreading {
+
+////////////////////////////////////////////////////////////////////////////////
+
+using TAtForkHandler = std::function<void()>;
+
+//! Registers handlers to be invoked at fork time.
+//! See |pthread_atfork| for more details.
+/*!
+ * Once all prepare handlers are invoked, fork lock is acquired
+ * in writer mode. This lock is subsequently released in both child
+ * and parent processes once fork is complete.
+ */
+void RegisterAtForkHandlers(
+ TAtForkHandler prepare,
+ TAtForkHandler parent,
+ TAtForkHandler child);
+
+//! Returns the fork lock.
+TReaderWriterSpinLock* GetForkLock();
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NThreading
diff --git a/library/cpp/yt/threading/fork_aware_spin_lock.cpp b/library/cpp/yt/threading/fork_aware_spin_lock.cpp
index 3a644694853..44a8ab92522 100644
--- a/library/cpp/yt/threading/fork_aware_spin_lock.cpp
+++ b/library/cpp/yt/threading/fork_aware_spin_lock.cpp
@@ -1,114 +1,21 @@
#include "fork_aware_spin_lock.h"
-#include "rw_spin_lock.h"
-#ifdef _unix_
- #include <pthread.h>
-#endif
-
-#include <atomic>
-#include <array>
+#include "at_fork.h"
namespace NYT::NThreading {
////////////////////////////////////////////////////////////////////////////////
-static constexpr int MaxAtForkHandlers = 8;
-using TAtForkHandler = TForkAwareSpinLock::TAtForkHandler;
-
-struct TAtForkHandlerSet
-{
- void* Cookie;
- TAtForkHandler Prepare;
- TAtForkHandler Parent;
- TAtForkHandler Child;
- std::atomic<bool> Initialized;
-};
-
-static std::array<TAtForkHandlerSet, MaxAtForkHandlers> AtForkHandlerSets;
-static std::atomic<int> AtForkHandlerCount;
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TForkProtector
-{
-public:
- static TForkProtector* Get()
- {
- static TForkProtector Instance;
- return &Instance;
- }
-
- TReaderWriterSpinLock& ForkLock()
- {
- return ForkLock_;
- }
-
-private:
- TReaderWriterSpinLock ForkLock_;
-
- TForkProtector()
- {
-#ifdef _unix_
- pthread_atfork(
- &TForkProtector::OnPrepare,
- &TForkProtector::OnParent,
- &TForkProtector::OnChild);
-#endif
- }
-
- static void OnPrepare()
- {
- for (const auto& set : AtForkHandlerSets) {
- if (set.Initialized.load() && set.Prepare) {
- set.Prepare(set.Cookie);
- }
- }
- Get()->ForkLock().AcquireWriter();
- }
-
- static void OnParent()
- {
- Get()->ForkLock().ReleaseWriter();
- for (const auto& set : AtForkHandlerSets) {
- if (set.Initialized.load() && set.Parent) {
- set.Parent(set.Cookie);
- }
- }
- }
-
- static void OnChild()
- {
- Get()->ForkLock().ReleaseWriter();
- for (const auto& set : AtForkHandlerSets) {
- if (set.Initialized.load() && set.Child) {
- set.Child(set.Cookie);
- }
- }
- }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-static struct TForkProtectorInitializer
-{
- TForkProtectorInitializer()
- {
- TForkProtector::Get();
- }
-} ForkProtectorInitializer;
-
-////////////////////////////////////////////////////////////////////////////////
-
void TForkAwareSpinLock::Acquire() noexcept
{
- TForkProtector::Get()->ForkLock().AcquireReaderForkFriendly();
+ GetForkLock()->AcquireReaderForkFriendly();
SpinLock_.Acquire();
}
void TForkAwareSpinLock::Release() noexcept
{
SpinLock_.Release();
- TForkProtector::Get()->ForkLock().ReleaseReader();
+ GetForkLock()->ReleaseReader();
}
bool TForkAwareSpinLock::IsLocked() noexcept
@@ -116,23 +23,6 @@ bool TForkAwareSpinLock::IsLocked() noexcept
return SpinLock_.IsLocked();
}
-void TForkAwareSpinLock::AtFork(
- void* cookie,
- TAtForkHandler prepare,
- TAtForkHandler parent,
- TAtForkHandler child)
-{
- TForkProtector::Get();
- int index = AtForkHandlerCount++;
- Y_VERIFY(index < MaxAtForkHandlers);
- auto& set = AtForkHandlerSets[index];
- set.Cookie = cookie;
- set.Prepare = prepare;
- set.Parent = parent;
- set.Child = child;
- set.Initialized.store(true);
-}
-
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT::NThreading
diff --git a/library/cpp/yt/threading/fork_aware_spin_lock.h b/library/cpp/yt/threading/fork_aware_spin_lock.h
index 53b0a485464..c122554240c 100644
--- a/library/cpp/yt/threading/fork_aware_spin_lock.h
+++ b/library/cpp/yt/threading/fork_aware_spin_lock.h
@@ -6,8 +6,8 @@ namespace NYT::NThreading {
////////////////////////////////////////////////////////////////////////////////
-//! Wraps TSpinLock and additionally acquires a global read lock preventing
-//! concurrent forks from happening.
+//! Wraps TSpinLock and additionally acquires a global fork lock (in read mode)
+//! preventing concurrent forks from happening.
class TForkAwareSpinLock
{
public:
@@ -20,13 +20,6 @@ public:
bool IsLocked() noexcept;
- using TAtForkHandler = void(*)(void*);
- static void AtFork(
- void* cookie,
- TAtForkHandler prepare,
- TAtForkHandler parent,
- TAtForkHandler child);
-
private:
TSpinLock SpinLock_;
};
diff --git a/library/cpp/ytalloc/impl/core-inl.h b/library/cpp/ytalloc/impl/core-inl.h
index c66e8478b5c..e8e5d254422 100644
--- a/library/cpp/ytalloc/impl/core-inl.h
+++ b/library/cpp/ytalloc/impl/core-inl.h
@@ -7,6 +7,7 @@
#include <library/cpp/yt/containers/intrusive_linked_list.h>
+#include <library/cpp/yt/threading/at_fork.h>
#include <library/cpp/yt/threading/fork_aware_spin_lock.h>
#include <util/system/tls.h>
@@ -1847,11 +1848,10 @@ public:
{
pthread_key_create(&ThreadDtorKey_, DestroyThread);
- NThreading::TForkAwareSpinLock::AtFork(
- this,
+ NThreading::RegisterAtForkHandlers(
nullptr,
nullptr,
- &AfterFork);
+ [=] { AfterFork(); });
}
// Returns TThreadState for the current thread; the caller guarantees that this
@@ -2022,8 +2022,7 @@ private:
void DestroyThreadState(TThreadState* state);
- static void AfterFork(void* cookie);
- void DoAfterFork();
+ void AfterFork();
private:
// TThreadState instance for the current thread.
@@ -4208,11 +4207,10 @@ public:
TBackgroundThreadBase()
: State_(new TState())
{
- NThreading::TForkAwareSpinLock::AtFork(
- static_cast<T*>(this),
- &BeforeFork,
- &AfterForkParent,
- &AfterForkChild);
+ NThreading::RegisterAtForkHandlers(
+ [=] { BeforeFork(); },
+ [=] { AfterForkParent(); },
+ [=] { AfterForkChild(); });
}
virtual ~TBackgroundThreadBase()
@@ -4240,12 +4238,7 @@ private:
TState* State_;
private:
- static void BeforeFork(void* cookie)
- {
- static_cast<T*>(cookie)->DoBeforeFork();
- }
-
- void DoBeforeFork()
+ void BeforeFork()
{
bool stopped = Stop();
if (State_->ForkDepth++ == 0) {
@@ -4253,12 +4246,7 @@ private:
}
}
- static void AfterForkParent(void* cookie)
- {
- static_cast<T*>(cookie)->DoAfterForkParent();
- }
-
- void DoAfterForkParent()
+ void AfterForkParent()
{
if (--State_->ForkDepth == 0) {
if (State_->RestartAfterFork) {
@@ -4267,12 +4255,7 @@ private:
}
}
- static void AfterForkChild(void* cookie)
- {
- static_cast<T*>(cookie)->DoAfterForkChild();
- }
-
- void DoAfterForkChild()
+ void AfterForkChild()
{
bool restart = State_->RestartAfterFork;
State_ = new TState();
@@ -4531,12 +4514,7 @@ void TThreadManager::DestroyThreadState(TThreadState* state)
ThreadStatePool_.Free(state);
}
-void TThreadManager::AfterFork(void* cookie)
-{
- static_cast<TThreadManager*>(cookie)->DoAfterFork();
-}
-
-void TThreadManager::DoAfterFork()
+void TThreadManager::AfterFork()
{
auto guard = GuardWithTiming(ThreadRegistryLock_);
ThreadRegistry_.Clear();