aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/ytalloc/impl/core-inl.h
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/ytalloc/impl/core-inl.h
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/ytalloc/impl/core-inl.h')
-rw-r--r--library/cpp/ytalloc/impl/core-inl.h4951
1 files changed, 4951 insertions, 0 deletions
diff --git a/library/cpp/ytalloc/impl/core-inl.h b/library/cpp/ytalloc/impl/core-inl.h
new file mode 100644
index 0000000000..c66e8478b5
--- /dev/null
+++ b/library/cpp/ytalloc/impl/core-inl.h
@@ -0,0 +1,4951 @@
+#pragma once
+
+// This file contains the core parts of YTAlloc but no malloc/free-bridge.
+// The latter bridge is placed into alloc.cpp, which includes (sic!) core-inl.h.
+// This ensures that AllocateInline/FreeInline calls are properly inlined into malloc/free.
+// Also core-inl.h can be directly included in, e.g., benchmarks.
+
+#include <library/cpp/yt/containers/intrusive_linked_list.h>
+
+#include <library/cpp/yt/threading/fork_aware_spin_lock.h>
+
+#include <util/system/tls.h>
+#include <util/system/align.h>
+#include <util/system/thread.h>
+
+#include <util/string/printf.h>
+
+#include <util/generic/singleton.h>
+#include <util/generic/size_literals.h>
+#include <util/generic/utility.h>
+
+#include <util/digest/numeric.h>
+
+#include <library/cpp/ytalloc/api/ytalloc.h>
+
+#include <atomic>
+#include <array>
+#include <vector>
+#include <mutex>
+#include <thread>
+#include <condition_variable>
+#include <cstdio>
+#include <optional>
+
+#include <sys/mman.h>
+
+#ifdef _linux_
+ #include <sys/utsname.h>
+#endif
+
+#include <errno.h>
+#include <pthread.h>
+#include <time.h>
+
+#ifndef MAP_POPULATE
+ #define MAP_POPULATE 0x08000
+#endif
+
+// MAP_FIXED which doesn't unmap underlying mapping.
+// Linux kernels older than 4.17 silently ignore this flag.
+#ifndef MAP_FIXED_NOREPLACE
+ #ifdef _linux_
+ #define MAP_FIXED_NOREPLACE 0x100000
+ #else
+ #define MAP_FIXED_NOREPLACE 0
+ #endif
+#endif
+
+#ifndef MADV_POPULATE
+ #define MADV_POPULATE 0x59410003
+#endif
+
+#ifndef MADV_STOCKPILE
+ #define MADV_STOCKPILE 0x59410004
+#endif
+
+#ifndef MADV_FREE
+ #define MADV_FREE 8
+#endif
+
+#ifndef MADV_DONTDUMP
+ #define MADV_DONTDUMP 16
+#endif
+
+#ifndef NDEBUG
+ #define YTALLOC_PARANOID
+#endif
+
+#ifdef YTALLOC_PARANOID
+ #define YTALLOC_NERVOUS
+#endif
+
+#define YTALLOC_VERIFY(condition) \
+ do { \
+ if (Y_UNLIKELY(!(condition))) { \
+ ::NYT::NYTAlloc::AssertTrap("Assertion failed: " #condition, __FILE__, __LINE__); \
+ } \
+ } while (false)
+
+#ifdef NDEBUG
+ #define YTALLOC_ASSERT(condition) YTALLOC_VERIFY(condition)
+#else
+ #define YTALLOC_ASSERT(condition) (void)(0)
+#endif
+
+#ifdef YTALLOC_PARANOID
+ #define YTALLOC_PARANOID_ASSERT(condition) YTALLOC_VERIFY(condition)
+#else
+ #define YTALLOC_PARANOID_ASSERT(condition) (true || (condition))
+#endif
+
+#define YTALLOC_TRAP(message) ::NYT::NYTAlloc::AssertTrap(message, __FILE__, __LINE__)
+
+namespace NYT::NYTAlloc {
+
+////////////////////////////////////////////////////////////////////////////////
+// Allocations are classified into three types:
+//
+// a) Small chunks (less than LargeAllocationSizeThreshold)
+// These are the fastest and are extensively cached (both per-thread and globally).
+// Memory claimed for these allocations is never reclaimed back.
+// Code dealing with such allocations is heavy optimized with all hot paths
+// as streamlined as possible. The implementation is mostly inspired by LFAlloc.
+//
+// b) Large blobs (from LargeAllocationSizeThreshold to HugeAllocationSizeThreshold)
+// These are cached as well. We expect such allocations to be less frequent
+// than small ones but still do our best to provide good scalability.
+// In particular, thread-sharded concurrent data structures as used to provide access to
+// cached blobs. Memory is claimed via madvise(MADV_POPULATE) and reclaimed back
+// via madvise(MADV_FREE).
+//
+// c) Huge blobs (from HugeAllocationSizeThreshold)
+// These should be rare; we delegate directly to mmap and munmap for each allocation.
+//
+// We also provide a separate allocator for all system allocations (that are needed by YTAlloc itself).
+// These are rare and also delegate to mmap/unmap.
+
+// Periods between background activities.
+constexpr auto BackgroundInterval = TDuration::Seconds(1);
+
+static_assert(LargeRankCount - MinLargeRank <= 16, "Too many large ranks");
+static_assert(SmallRankCount <= 32, "Too many small ranks");
+
+constexpr size_t SmallZoneSize = 1_TB;
+constexpr size_t LargeZoneSize = 16_TB;
+constexpr size_t HugeZoneSize = 1_TB;
+constexpr size_t SystemZoneSize = 1_TB;
+
+constexpr size_t MaxCachedChunksPerRank = 256;
+
+constexpr uintptr_t UntaggedSmallZonesStart = 0;
+constexpr uintptr_t UntaggedSmallZonesEnd = UntaggedSmallZonesStart + 32 * SmallZoneSize;
+constexpr uintptr_t MinUntaggedSmallPtr = UntaggedSmallZonesStart + SmallZoneSize * 1;
+constexpr uintptr_t MaxUntaggedSmallPtr = UntaggedSmallZonesStart + SmallZoneSize * SmallRankCount;
+
+constexpr uintptr_t TaggedSmallZonesStart = UntaggedSmallZonesEnd;
+constexpr uintptr_t TaggedSmallZonesEnd = TaggedSmallZonesStart + 32 * SmallZoneSize;
+constexpr uintptr_t MinTaggedSmallPtr = TaggedSmallZonesStart + SmallZoneSize * 1;
+constexpr uintptr_t MaxTaggedSmallPtr = TaggedSmallZonesStart + SmallZoneSize * SmallRankCount;
+
+constexpr uintptr_t DumpableLargeZoneStart = TaggedSmallZonesEnd;
+constexpr uintptr_t DumpableLargeZoneEnd = DumpableLargeZoneStart + LargeZoneSize;
+
+constexpr uintptr_t UndumpableLargeZoneStart = DumpableLargeZoneEnd;
+constexpr uintptr_t UndumpableLargeZoneEnd = UndumpableLargeZoneStart + LargeZoneSize;
+
+constexpr uintptr_t LargeZoneStart(bool dumpable)
+{
+ return dumpable ? DumpableLargeZoneStart : UndumpableLargeZoneStart;
+}
+constexpr uintptr_t LargeZoneEnd(bool dumpable)
+{
+ return dumpable ? DumpableLargeZoneEnd : UndumpableLargeZoneEnd;
+}
+
+constexpr uintptr_t HugeZoneStart = UndumpableLargeZoneEnd;
+constexpr uintptr_t HugeZoneEnd = HugeZoneStart + HugeZoneSize;
+
+constexpr uintptr_t SystemZoneStart = HugeZoneEnd;
+constexpr uintptr_t SystemZoneEnd = SystemZoneStart + SystemZoneSize;
+
+// We leave 64_KB at the end of 256_MB block and never use it.
+// That serves two purposes:
+// 1. SmallExtentSize % SmallSegmentSize == 0
+// 2. Every small object satisfies RightReadableArea requirement.
+constexpr size_t SmallExtentAllocSize = 256_MB;
+constexpr size_t SmallExtentSize = SmallExtentAllocSize - 64_KB;
+constexpr size_t SmallSegmentSize = 96_KB; // LCM(SmallRankToSize)
+
+constexpr ui16 SmallRankBatchSize[SmallRankCount] = {
+ 0, 256, 256, 256, 256, 256, 256, 256, 256, 256, 192, 128, 96, 64, 48, 32, 24, 16, 12, 8, 6, 4, 3
+};
+
+constexpr bool CheckSmallSizes()
+{
+ for (size_t rank = 0; rank < SmallRankCount; rank++) {
+ auto size = SmallRankToSize[rank];
+ if (size == 0) {
+ continue;
+ }
+
+ if (SmallSegmentSize % size != 0) {
+ return false;
+ }
+
+ if (SmallRankBatchSize[rank] > MaxCachedChunksPerRank) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+static_assert(CheckSmallSizes());
+static_assert(SmallExtentSize % SmallSegmentSize == 0);
+static_assert(SmallSegmentSize % PageSize == 0);
+
+constexpr size_t LargeExtentSize = 1_GB;
+static_assert(LargeExtentSize >= LargeAllocationSizeThreshold, "LargeExtentSize < LargeAllocationSizeThreshold");
+
+constexpr const char* BackgroundThreadName = "YTAllocBack";
+constexpr const char* StockpileThreadName = "YTAllocStock";
+
+DEFINE_ENUM(EAllocationKind,
+ (Untagged)
+ (Tagged)
+);
+
+// Forward declarations.
+struct TThreadState;
+struct TLargeArena;
+struct TLargeBlobExtent;
+
+////////////////////////////////////////////////////////////////////////////////
+// Traps and assertions
+
+[[noreturn]]
+void OomTrap()
+{
+ _exit(9);
+}
+
+[[noreturn]]
+void AssertTrap(const char* message, const char* file, int line)
+{
+ ::fprintf(stderr, "*** YTAlloc has detected an internal trap at %s:%d\n*** %s\n",
+ file,
+ line,
+ message);
+ __builtin_trap();
+}
+
+template <class T, class E>
+void AssertBlobState(T* header, E expectedState)
+{
+ auto actualState = header->State;
+ if (Y_UNLIKELY(actualState != expectedState)) {
+ char message[256];
+ sprintf(message, "Invalid blob header state at %p: expected %" PRIx64 ", actual %" PRIx64,
+ header,
+ static_cast<ui64>(expectedState),
+ static_cast<ui64>(actualState));
+ YTALLOC_TRAP(message);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Provides a never-dying singleton with explicit construction.
+template <class T>
+class TExplicitlyConstructableSingleton
+{
+public:
+ TExplicitlyConstructableSingleton()
+ { }
+
+ ~TExplicitlyConstructableSingleton()
+ { }
+
+ template <class... Ts>
+ void Construct(Ts&&... args)
+ {
+ new (&Storage_) T(std::forward<Ts>(args)...);
+#ifndef NDEBUG
+ Constructed_ = true;
+#endif
+ }
+
+ Y_FORCE_INLINE T* Get()
+ {
+#ifndef NDEBUG
+ YTALLOC_PARANOID_ASSERT(Constructed_);
+#endif
+ return &Storage_;
+ }
+
+ Y_FORCE_INLINE const T* Get() const
+ {
+#ifndef NDEBUG
+ YTALLOC_PARANOID_ASSERT(Constructed_);
+#endif
+ return &Storage_;
+ }
+
+ Y_FORCE_INLINE T* operator->()
+ {
+ return Get();
+ }
+
+ Y_FORCE_INLINE const T* operator->() const
+ {
+ return Get();
+ }
+
+ Y_FORCE_INLINE T& operator*()
+ {
+ return *Get();
+ }
+
+ Y_FORCE_INLINE const T& operator*() const
+ {
+ return *Get();
+ }
+
+private:
+ union {
+ T Storage_;
+ };
+
+#ifndef NDEBUG
+ bool Constructed_;
+#endif
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Initializes all singletons.
+// Safe to call multiple times.
+// Guaranteed to not allocate.
+void InitializeGlobals();
+
+// Spawns the background thread, if it's time.
+// Safe to call multiple times.
+// Must be called on allocation slow path.
+void StartBackgroundThread();
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TLogManager
+{
+public:
+ // Sets the handler to be invoked for each log event produced by YTAlloc.
+ void EnableLogging(TLogHandler logHandler)
+ {
+ LogHandler_.store(logHandler);
+ }
+
+ // Checks (in a racy way) that logging is enabled.
+ bool IsLoggingEnabled()
+ {
+ return LogHandler_.load() != nullptr;
+ }
+
+ // Logs the message via log handler (if any).
+ template <class... Ts>
+ void LogMessage(ELogEventSeverity severity, const char* format, Ts&&... args)
+ {
+ auto logHandler = LogHandler_.load();
+ if (!logHandler) {
+ return;
+ }
+
+ std::array<char, 16_KB> buffer;
+ auto len = ::snprintf(buffer.data(), buffer.size(), format, std::forward<Ts>(args)...);
+
+ TLogEvent event;
+ event.Severity = severity;
+ event.Message = TStringBuf(buffer.data(), len);
+ logHandler(event);
+ }
+
+ // A special case of zero args.
+ void LogMessage(ELogEventSeverity severity, const char* message)
+ {
+ LogMessage(severity, "%s", message);
+ }
+
+private:
+ std::atomic<TLogHandler> LogHandler_= nullptr;
+
+};
+
+TExplicitlyConstructableSingleton<TLogManager> LogManager;
+
+#define YTALLOC_LOG_EVENT(...) LogManager->LogMessage(__VA_ARGS__)
+#define YTALLOC_LOG_DEBUG(...) YTALLOC_LOG_EVENT(ELogEventSeverity::Debug, __VA_ARGS__)
+#define YTALLOC_LOG_INFO(...) YTALLOC_LOG_EVENT(ELogEventSeverity::Info, __VA_ARGS__)
+#define YTALLOC_LOG_WARNING(...) YTALLOC_LOG_EVENT(ELogEventSeverity::Warning, __VA_ARGS__)
+#define YTALLOC_LOG_ERROR(...) YTALLOC_LOG_EVENT(ELogEventSeverity::Error, __VA_ARGS__)
+
+////////////////////////////////////////////////////////////////////////////////
+
+Y_FORCE_INLINE size_t GetUsed(ssize_t allocated, ssize_t freed)
+{
+ return allocated >= freed ? static_cast<size_t>(allocated - freed) : 0;
+}
+
+template <class T>
+Y_FORCE_INLINE void* HeaderToPtr(T* header)
+{
+ return header + 1;
+}
+
+template <class T>
+Y_FORCE_INLINE T* PtrToHeader(void* ptr)
+{
+ return static_cast<T*>(ptr) - 1;
+}
+
+template <class T>
+Y_FORCE_INLINE const T* PtrToHeader(const void* ptr)
+{
+ return static_cast<const T*>(ptr) - 1;
+}
+
+Y_FORCE_INLINE size_t PtrToSmallRank(const void* ptr)
+{
+ return (reinterpret_cast<uintptr_t>(ptr) >> 40) & 0x1f;
+}
+
+Y_FORCE_INLINE char* AlignDownToSmallSegment(char* extent, char* ptr)
+{
+ auto offset = static_cast<uintptr_t>(ptr - extent);
+ // NB: This modulo operation is always performed using multiplication.
+ offset -= (offset % SmallSegmentSize);
+ return extent + offset;
+}
+
+Y_FORCE_INLINE char* AlignUpToSmallSegment(char* extent, char* ptr)
+{
+ return AlignDownToSmallSegment(extent, ptr + SmallSegmentSize - 1);
+}
+
+template <class T>
+static Y_FORCE_INLINE void UnalignPtr(void*& ptr)
+{
+ if (reinterpret_cast<uintptr_t>(ptr) % PageSize == 0) {
+ reinterpret_cast<char*&>(ptr) -= PageSize - sizeof (T);
+ }
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) % PageSize == sizeof (T));
+}
+
+template <class T>
+static Y_FORCE_INLINE void UnalignPtr(const void*& ptr)
+{
+ if (reinterpret_cast<uintptr_t>(ptr) % PageSize == 0) {
+ reinterpret_cast<const char*&>(ptr) -= PageSize - sizeof (T);
+ }
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) % PageSize == sizeof (T));
+}
+
+template <class T>
+Y_FORCE_INLINE size_t GetRawBlobSize(size_t size)
+{
+ return AlignUp(size + sizeof (T) + RightReadableAreaSize, PageSize);
+}
+
+template <class T>
+Y_FORCE_INLINE size_t GetBlobAllocationSize(size_t size)
+{
+ size += sizeof(T);
+ size += RightReadableAreaSize;
+ size = AlignUp(size, PageSize);
+ size -= sizeof(T);
+ size -= RightReadableAreaSize;
+ return size;
+}
+
+Y_FORCE_INLINE size_t GetLargeRank(size_t size)
+{
+ size_t rank = 64 - __builtin_clzl(size);
+ if (size == (1ULL << (rank - 1))) {
+ --rank;
+ }
+ return rank;
+}
+
+Y_FORCE_INLINE void PoisonRange(void* ptr, size_t size, ui32 magic)
+{
+#ifdef YTALLOC_PARANOID
+ size = ::AlignUp<size_t>(size, 4);
+ std::fill(static_cast<ui32*>(ptr), static_cast<ui32*>(ptr) + size / 4, magic);
+#else
+ Y_UNUSED(ptr);
+ Y_UNUSED(size);
+ Y_UNUSED(magic);
+#endif
+}
+
+Y_FORCE_INLINE void PoisonFreedRange(void* ptr, size_t size)
+{
+ PoisonRange(ptr, size, 0xdeadbeef);
+}
+
+Y_FORCE_INLINE void PoisonUninitializedRange(void* ptr, size_t size)
+{
+ PoisonRange(ptr, size, 0xcafebabe);
+}
+
+// Checks that the header size is divisible by 16 (as needed due to alignment restrictions).
+#define CHECK_HEADER_ALIGNMENT(T) static_assert(sizeof(T) % 16 == 0, "sizeof(" #T ") % 16 != 0");
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class T>
+struct TFreeListItem
+{
+ T* Next = nullptr;
+};
+
+constexpr size_t CacheLineSize = 64;
+
+// A lock-free stack of items (derived from TFreeListItem).
+// Supports multiple producers and multiple consumers.
+// Internally uses DCAS with tagged pointers to defeat ABA.
+template <class T>
+class TFreeList
+{
+public:
+ void Put(T* item)
+ {
+ TTaggedPointer currentTaggedHead{};
+ TTaggedPointer newTaggedHead;
+ do {
+ item->Next = currentTaggedHead.first;
+ newTaggedHead = std::make_pair(item, currentTaggedHead.second + 1);
+ } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead));
+ }
+
+ T* Extract()
+ {
+ T* item;
+ TTaggedPointer currentTaggedHead{};
+ TTaggedPointer newTaggedHead{};
+ CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead);
+ do {
+ item = currentTaggedHead.first;
+ if (!item) {
+ break;
+ }
+ newTaggedHead = std::make_pair(item->Next, currentTaggedHead.second + 1);
+ } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead));
+ return item;
+ }
+
+ T* ExtractAll()
+ {
+ T* item;
+ TTaggedPointer currentTaggedHead{};
+ TTaggedPointer newTaggedHead;
+ do {
+ item = currentTaggedHead.first;
+ newTaggedHead = std::make_pair(nullptr, currentTaggedHead.second + 1);
+ } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead));
+ return item;
+ }
+
+private:
+ using TAtomicUint128 = volatile unsigned __int128 __attribute__((aligned(16)));
+ using TTag = ui64;
+ using TTaggedPointer = std::pair<T*, TTag>;
+
+ TAtomicUint128 TaggedHead_ = 0;
+
+ // Avoid false sharing.
+ char Padding[CacheLineSize - sizeof(TAtomicUint128)];
+
+private:
+ static Y_FORCE_INLINE bool CompareAndSet(TAtomicUint128* atomic, TTaggedPointer& expectedValue, TTaggedPointer newValue)
+ {
+ bool success;
+ __asm__ __volatile__
+ (
+ "lock cmpxchg16b %1\n"
+ "setz %0"
+ : "=q"(success)
+ , "+m"(*atomic)
+ , "+a"(expectedValue.first)
+ , "+d"(expectedValue.second)
+ : "b"(newValue.first)
+ , "c"(newValue.second)
+ : "cc"
+ );
+ return success;
+ }
+};
+
+static_assert(sizeof(TFreeList<void>) == CacheLineSize, "sizeof(TFreeList) != CacheLineSize");
+
+////////////////////////////////////////////////////////////////////////////////
+
+constexpr size_t ShardCount = 16;
+std::atomic<size_t> GlobalCurrentShardIndex;
+
+// Provides a context for working with sharded data structures.
+// Captures the initial shard index upon construction (indicating the shard
+// where all insertions go). Maintains the current shard index (round-robin,
+// indicating the shard currently used for extraction).
+// Can be or be not thread-safe depending on TCounter.
+template <class TCounter>
+class TShardedState
+{
+public:
+ TShardedState()
+ : InitialShardIndex_(GlobalCurrentShardIndex++ % ShardCount)
+ , CurrentShardIndex_(InitialShardIndex_)
+ { }
+
+ Y_FORCE_INLINE size_t GetInitialShardIndex() const
+ {
+ return InitialShardIndex_;
+ }
+
+ Y_FORCE_INLINE size_t GetNextShardIndex()
+ {
+ return ++CurrentShardIndex_ % ShardCount;
+ }
+
+private:
+ const size_t InitialShardIndex_;
+ TCounter CurrentShardIndex_;
+};
+
+using TLocalShardedState = TShardedState<size_t>;
+using TGlobalShardedState = TShardedState<std::atomic<size_t>>;
+
+// Implemented as a collection of free lists (each called a shard).
+// One needs TShardedState to access the sharded data structure.
+template <class T>
+class TShardedFreeList
+{
+public:
+ // First tries to extract an item from the initial shard;
+ // if failed then proceeds to all shards in round-robin fashion.
+ template <class TState>
+ T* Extract(TState* state)
+ {
+ if (auto* item = Shards_[state->GetInitialShardIndex()].Extract()) {
+ return item;
+ }
+ return ExtractRoundRobin(state);
+ }
+
+ // Attempts to extract an item from all shards in round-robin fashion.
+ template <class TState>
+ T* ExtractRoundRobin(TState* state)
+ {
+ for (size_t index = 0; index < ShardCount; ++index) {
+ if (auto* item = Shards_[state->GetNextShardIndex()].Extract()) {
+ return item;
+ }
+ }
+ return nullptr;
+ }
+
+ // Extracts items from all shards linking them together.
+ T* ExtractAll()
+ {
+ T* head = nullptr;
+ T* tail = nullptr;
+ for (auto& shard : Shards_) {
+ auto* item = shard.ExtractAll();
+ if (!head) {
+ head = item;
+ }
+ if (tail) {
+ YTALLOC_PARANOID_ASSERT(!tail->Next);
+ tail->Next = item;
+ } else {
+ tail = item;
+ }
+ while (tail && tail->Next) {
+ tail = tail->Next;
+ }
+ }
+ return head;
+ }
+
+ template <class TState>
+ void Put(TState* state, T* item)
+ {
+ Shards_[state->GetInitialShardIndex()].Put(item);
+ }
+
+private:
+ std::array<TFreeList<T>, ShardCount> Shards_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Holds YTAlloc control knobs.
+// Thread safe.
+class TConfigurationManager
+{
+public:
+ void SetLargeUnreclaimableCoeff(double value)
+ {
+ LargeUnreclaimableCoeff_.store(value);
+ }
+
+ double GetLargeUnreclaimableCoeff() const
+ {
+ return LargeUnreclaimableCoeff_.load(std::memory_order_relaxed);
+ }
+
+
+ void SetMinLargeUnreclaimableBytes(size_t value)
+ {
+ MinLargeUnreclaimableBytes_.store(value);
+ }
+
+ void SetMaxLargeUnreclaimableBytes(size_t value)
+ {
+ MaxLargeUnreclaimableBytes_.store(value);
+ }
+
+ size_t GetMinLargeUnreclaimableBytes() const
+ {
+ return MinLargeUnreclaimableBytes_.load(std::memory_order_relaxed);
+ }
+
+ size_t GetMaxLargeUnreclaimableBytes() const
+ {
+ return MaxLargeUnreclaimableBytes_.load(std::memory_order_relaxed);
+ }
+
+
+ void SetTimingEventThreshold(TDuration value)
+ {
+ TimingEventThresholdNs_.store(value.MicroSeconds() * 1000);
+ }
+
+ i64 GetTimingEventThresholdNs() const
+ {
+ return TimingEventThresholdNs_.load(std::memory_order_relaxed);
+ }
+
+
+ void SetAllocationProfilingEnabled(bool value);
+
+ bool IsAllocationProfilingEnabled() const
+ {
+ return AllocationProfilingEnabled_.load();
+ }
+
+
+ Y_FORCE_INLINE bool GetAllocationProfilingSamplingRate()
+ {
+ return AllocationProfilingSamplingRate_.load();
+ }
+
+ void SetAllocationProfilingSamplingRate(double rate)
+ {
+ if (rate < 0) {
+ rate = 0;
+ }
+ if (rate > 1) {
+ rate = 1;
+ }
+ i64 rateX64K = static_cast<i64>(rate * (1ULL << 16));
+ AllocationProfilingSamplingRateX64K_.store(ClampVal<ui32>(rateX64K, 0, std::numeric_limits<ui16>::max() + 1));
+ AllocationProfilingSamplingRate_.store(rate);
+ }
+
+
+ Y_FORCE_INLINE bool IsSmallArenaAllocationProfilingEnabled(size_t rank)
+ {
+ return SmallArenaAllocationProfilingEnabled_[rank].load(std::memory_order_relaxed);
+ }
+
+ Y_FORCE_INLINE bool IsSmallArenaAllocationProfiled(size_t rank)
+ {
+ return IsSmallArenaAllocationProfilingEnabled(rank) && IsAllocationSampled();
+ }
+
+ void SetSmallArenaAllocationProfilingEnabled(size_t rank, bool value)
+ {
+ if (rank >= SmallRankCount) {
+ return;
+ }
+ SmallArenaAllocationProfilingEnabled_[rank].store(value);
+ }
+
+
+ Y_FORCE_INLINE bool IsLargeArenaAllocationProfilingEnabled(size_t rank)
+ {
+ return LargeArenaAllocationProfilingEnabled_[rank].load(std::memory_order_relaxed);
+ }
+
+ Y_FORCE_INLINE bool IsLargeArenaAllocationProfiled(size_t rank)
+ {
+ return IsLargeArenaAllocationProfilingEnabled(rank) && IsAllocationSampled();
+ }
+
+ void SetLargeArenaAllocationProfilingEnabled(size_t rank, bool value)
+ {
+ if (rank >= LargeRankCount) {
+ return;
+ }
+ LargeArenaAllocationProfilingEnabled_[rank].store(value);
+ }
+
+
+ Y_FORCE_INLINE int GetProfilingBacktraceDepth()
+ {
+ return ProfilingBacktraceDepth_.load();
+ }
+
+ void SetProfilingBacktraceDepth(int depth)
+ {
+ if (depth < 1) {
+ return;
+ }
+ if (depth > MaxAllocationProfilingBacktraceDepth) {
+ depth = MaxAllocationProfilingBacktraceDepth;
+ }
+ ProfilingBacktraceDepth_.store(depth);
+ }
+
+
+ Y_FORCE_INLINE size_t GetMinProfilingBytesUsedToReport()
+ {
+ return MinProfilingBytesUsedToReport_.load();
+ }
+
+ void SetMinProfilingBytesUsedToReport(size_t size)
+ {
+ MinProfilingBytesUsedToReport_.store(size);
+ }
+
+ void SetEnableEagerMemoryRelease(bool value)
+ {
+ EnableEagerMemoryRelease_.store(value);
+ }
+
+ bool GetEnableEagerMemoryRelease()
+ {
+ return EnableEagerMemoryRelease_.load(std::memory_order_relaxed);
+ }
+
+ void SetEnableMadvisePopulate(bool value)
+ {
+ EnableMadvisePopulate_.store(value);
+ }
+
+ bool GetEnableMadvisePopulate()
+ {
+ return EnableMadvisePopulate_.load(std::memory_order_relaxed);
+ }
+
+ void EnableStockpile()
+ {
+ StockpileEnabled_.store(true);
+ }
+
+ bool IsStockpileEnabled()
+ {
+ return StockpileEnabled_.load();
+ }
+
+ void SetStockpileInterval(TDuration value)
+ {
+ StockpileInterval_.store(value);
+ }
+
+ TDuration GetStockpileInterval()
+ {
+ return StockpileInterval_.load();
+ }
+
+ void SetStockpileThreadCount(int count)
+ {
+ StockpileThreadCount_.store(count);
+ }
+
+ int GetStockpileThreadCount()
+ {
+ return ClampVal(StockpileThreadCount_.load(), 0, MaxStockpileThreadCount);
+ }
+
+ void SetStockpileSize(size_t value)
+ {
+ StockpileSize_.store(value);
+ }
+
+ size_t GetStockpileSize()
+ {
+ return StockpileSize_.load();
+ }
+
+private:
+ std::atomic<double> LargeUnreclaimableCoeff_ = 0.05;
+ std::atomic<size_t> MinLargeUnreclaimableBytes_ = 128_MB;
+ std::atomic<size_t> MaxLargeUnreclaimableBytes_ = 10_GB;
+ std::atomic<i64> TimingEventThresholdNs_ = 10000000; // in ns, 10 ms by default
+
+ std::atomic<bool> AllocationProfilingEnabled_ = false;
+ std::atomic<double> AllocationProfilingSamplingRate_ = 1.0;
+ std::atomic<ui32> AllocationProfilingSamplingRateX64K_ = std::numeric_limits<ui32>::max();
+ std::array<std::atomic<bool>, SmallRankCount> SmallArenaAllocationProfilingEnabled_ = {};
+ std::array<std::atomic<bool>, LargeRankCount> LargeArenaAllocationProfilingEnabled_ = {};
+ std::atomic<int> ProfilingBacktraceDepth_ = 10;
+ std::atomic<size_t> MinProfilingBytesUsedToReport_ = 1_MB;
+
+ std::atomic<bool> EnableEagerMemoryRelease_ = true;
+ std::atomic<bool> EnableMadvisePopulate_ = false;
+
+ std::atomic<bool> StockpileEnabled_ = false;
+ std::atomic<TDuration> StockpileInterval_ = TDuration::MilliSeconds(10);
+ static constexpr int MaxStockpileThreadCount = 8;
+ std::atomic<int> StockpileThreadCount_ = 4;
+ std::atomic<size_t> StockpileSize_ = 1_GB;
+
+private:
+ bool IsAllocationSampled()
+ {
+ Y_POD_STATIC_THREAD(ui16) Counter;
+ return Counter++ < AllocationProfilingSamplingRateX64K_.load();
+ }
+};
+
+TExplicitlyConstructableSingleton<TConfigurationManager> ConfigurationManager;
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TEvent, class TManager>
+class TEventLogManagerBase
+{
+public:
+ void DisableForCurrentThread()
+ {
+ TManager::DisabledForCurrentThread_ = true;
+ }
+
+ template <class... TArgs>
+ void EnqueueEvent(TArgs&&... args)
+ {
+ if (TManager::DisabledForCurrentThread_) {
+ return;
+ }
+
+ auto timestamp = TInstant::Now();
+ auto fiberId = NYTAlloc::GetCurrentFiberId();
+ auto guard = Guard(EventLock_);
+
+ auto event = TEvent(args...);
+ OnEvent(event);
+
+ if (EventCount_ >= EventBufferSize) {
+ return;
+ }
+
+ auto& enqueuedEvent = Events_[EventCount_++];
+ enqueuedEvent = std::move(event);
+ enqueuedEvent.Timestamp = timestamp;
+ enqueuedEvent.FiberId = fiberId;
+ }
+
+ void RunBackgroundTasks()
+ {
+ if (LogManager->IsLoggingEnabled()) {
+ for (const auto& event : PullEvents()) {
+ ProcessEvent(event);
+ }
+ }
+ }
+
+protected:
+ NThreading::TForkAwareSpinLock EventLock_;
+
+ virtual void OnEvent(const TEvent& event) = 0;
+
+ virtual void ProcessEvent(const TEvent& event) = 0;
+
+private:
+ static constexpr size_t EventBufferSize = 1000;
+ size_t EventCount_ = 0;
+ std::array<TEvent, EventBufferSize> Events_;
+
+ std::vector<TEvent> PullEvents()
+ {
+ std::vector<TEvent> events;
+ events.reserve(EventBufferSize);
+
+ auto guard = Guard(EventLock_);
+ for (size_t index = 0; index < EventCount_; ++index) {
+ events.push_back(Events_[index]);
+ }
+ EventCount_ = 0;
+ return events;
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TTimingEvent
+{
+ ETimingEventType Type;
+ TDuration Duration;
+ size_t Size;
+ TInstant Timestamp;
+ TFiberId FiberId;
+
+ TTimingEvent()
+ { }
+
+ TTimingEvent(
+ ETimingEventType type,
+ TDuration duration,
+ size_t size)
+ : Type(type)
+ , Duration(duration)
+ , Size(size)
+ { }
+};
+
+class TTimingManager
+ : public TEventLogManagerBase<TTimingEvent, TTimingManager>
+{
+public:
+ TEnumIndexedVector<ETimingEventType, TTimingEventCounters> GetTimingEventCounters()
+ {
+ auto guard = Guard(EventLock_);
+ return EventCounters_;
+ }
+
+private:
+ TEnumIndexedVector<ETimingEventType, TTimingEventCounters> EventCounters_;
+
+ Y_POD_STATIC_THREAD(bool) DisabledForCurrentThread_;
+
+ friend class TEventLogManagerBase<TTimingEvent, TTimingManager>;
+
+ virtual void OnEvent(const TTimingEvent& event) override
+ {
+ auto& counters = EventCounters_[event.Type];
+ counters.Count += 1;
+ counters.Size += event.Size;
+ }
+
+ virtual void ProcessEvent(const TTimingEvent& event) override
+ {
+ YTALLOC_LOG_DEBUG("Timing event logged (Type: %s, Duration: %s, Size: %zu, Timestamp: %s, FiberId: %" PRIu64 ")",
+ ToString(event.Type).c_str(),
+ ToString(event.Duration).c_str(),
+ event.Size,
+ ToString(event.Timestamp).c_str(),
+ event.FiberId);
+ }
+};
+
+Y_POD_THREAD(bool) TTimingManager::DisabledForCurrentThread_;
+
+TExplicitlyConstructableSingleton<TTimingManager> TimingManager;
+
+////////////////////////////////////////////////////////////////////////////////
+
+i64 GetElapsedNs(const struct timespec& startTime, const struct timespec& endTime)
+{
+ if (Y_LIKELY(startTime.tv_sec == endTime.tv_sec)) {
+ return static_cast<i64>(endTime.tv_nsec) - static_cast<i64>(startTime.tv_nsec);
+ }
+
+ return
+ static_cast<i64>(endTime.tv_nsec) - static_cast<i64>(startTime.tv_nsec) +
+ (static_cast<i64>(endTime.tv_sec) - static_cast<i64>(startTime.tv_sec)) * 1000000000;
+}
+
+// Used to log statistics about long-running syscalls and lock acquisitions.
+class TTimingGuard
+ : public TNonCopyable
+{
+public:
+ explicit TTimingGuard(ETimingEventType eventType, size_t size = 0)
+ : EventType_(eventType)
+ , Size_(size)
+ {
+ ::clock_gettime(CLOCK_MONOTONIC, &StartTime_);
+ }
+
+ ~TTimingGuard()
+ {
+ auto elapsedNs = GetElapsedNs();
+ if (elapsedNs > ConfigurationManager->GetTimingEventThresholdNs()) {
+ TimingManager->EnqueueEvent(EventType_, TDuration::MicroSeconds(elapsedNs / 1000), Size_);
+ }
+ }
+
+private:
+ const ETimingEventType EventType_;
+ const size_t Size_;
+ struct timespec StartTime_;
+
+ i64 GetElapsedNs() const
+ {
+ struct timespec endTime;
+ ::clock_gettime(CLOCK_MONOTONIC, &endTime);
+ return NYTAlloc::GetElapsedNs(StartTime_, endTime);
+ }
+};
+
+template <class T>
+Y_FORCE_INLINE TGuard<T> GuardWithTiming(const T& lock)
+{
+ TTimingGuard timingGuard(ETimingEventType::Locking);
+ TGuard<T> lockingGuard(lock);
+ return lockingGuard;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// A wrapper for mmap, mumap, and madvise calls.
+// The latter are invoked with MADV_POPULATE (if enabled) and MADV_FREE flags
+// and may fail if the OS support is missing. These failures are logged (once) and
+// handled as follows:
+// * if MADV_POPULATE fails then we fallback to manual per-page prefault
+// for all subsequent attempts;
+// * if MADV_FREE fails then it (and all subsequent attempts) is replaced with MADV_DONTNEED
+// (which is non-lazy and is less efficient but will somehow do).
+// Also this class mlocks all VMAs on startup to prevent pagefaults in our heavy binaries
+// from disturbing latency tails.
+class TMappedMemoryManager
+{
+public:
+ void* Map(uintptr_t hint, size_t size, int flags)
+ {
+ TTimingGuard timingGuard(ETimingEventType::Mmap, size);
+ auto* result = ::mmap(
+ reinterpret_cast<void*>(hint),
+ size,
+ PROT_READ | PROT_WRITE,
+ MAP_PRIVATE | MAP_ANONYMOUS | flags,
+ -1,
+ 0);
+ if (result == MAP_FAILED) {
+ auto error = errno;
+ if (error == EEXIST && (flags & MAP_FIXED_NOREPLACE)) {
+ // Caller must retry with different hint address.
+ return result;
+ }
+ YTALLOC_VERIFY(error == ENOMEM);
+ ::fprintf(stderr, "*** YTAlloc has received ENOMEM error while trying to mmap %zu bytes\n",
+ size);
+ OomTrap();
+ }
+ return result;
+ }
+
+ void Unmap(void* ptr, size_t size)
+ {
+ TTimingGuard timingGuard(ETimingEventType::Munmap, size);
+ auto result = ::munmap(ptr, size);
+ YTALLOC_VERIFY(result == 0);
+ }
+
+ void DontDump(void* ptr, size_t size)
+ {
+ auto result = ::madvise(ptr, size, MADV_DONTDUMP);
+ // Must not fail.
+ YTALLOC_VERIFY(result == 0);
+ }
+
+ void PopulateFile(void* ptr, size_t size)
+ {
+ TTimingGuard timingGuard(ETimingEventType::FilePrefault, size);
+
+ auto* begin = static_cast<volatile char*>(ptr);
+ for (auto* current = begin; current < begin + size; current += PageSize) {
+ *current;
+ }
+ }
+
+ void PopulateReadOnly(void* ptr, size_t size)
+ {
+ if (!MadvisePopulateUnavailable_.load(std::memory_order_relaxed) &&
+ ConfigurationManager->GetEnableMadvisePopulate())
+ {
+ if (!TryMadvisePopulate(ptr, size)) {
+ MadvisePopulateUnavailable_.store(true);
+ }
+ }
+ }
+
+ void Populate(void* ptr, size_t size)
+ {
+ if (MadvisePopulateUnavailable_.load(std::memory_order_relaxed) ||
+ !ConfigurationManager->GetEnableMadvisePopulate())
+ {
+ DoPrefault(ptr, size);
+ } else if (!TryMadvisePopulate(ptr, size)) {
+ MadvisePopulateUnavailable_.store(true);
+ DoPrefault(ptr, size);
+ }
+ }
+
+ void Release(void* ptr, size_t size)
+ {
+ if (CanUseMadviseFree() && !ConfigurationManager->GetEnableEagerMemoryRelease()) {
+ DoMadviseFree(ptr, size);
+ } else {
+ DoMadviseDontNeed(ptr, size);
+ }
+ }
+
+ bool Stockpile(size_t size)
+ {
+ if (MadviseStockpileUnavailable_.load(std::memory_order_relaxed)) {
+ return false;
+ }
+ if (!TryMadviseStockpile(size)) {
+ MadviseStockpileUnavailable_.store(true);
+ return false;
+ }
+ return true;
+ }
+
+ void RunBackgroundTasks()
+ {
+ if (!LogManager->IsLoggingEnabled()) {
+ return;
+ }
+ if (IsBuggyKernel() && !BuggyKernelLogged_) {
+ YTALLOC_LOG_WARNING("Kernel is buggy; see KERNEL-118");
+ BuggyKernelLogged_ = true;
+ }
+ if (MadviseFreeSupported_ && !MadviseFreeSupportedLogged_) {
+ YTALLOC_LOG_INFO("MADV_FREE is supported");
+ MadviseFreeSupportedLogged_ = true;
+ }
+ if (MadviseFreeNotSupported_ && !MadviseFreeNotSupportedLogged_) {
+ YTALLOC_LOG_WARNING("MADV_FREE is not supported");
+ MadviseFreeNotSupportedLogged_ = true;
+ }
+ if (MadvisePopulateUnavailable_.load() && !MadvisePopulateUnavailableLogged_) {
+ YTALLOC_LOG_WARNING("MADV_POPULATE is not supported");
+ MadvisePopulateUnavailableLogged_ = true;
+ }
+ if (MadviseStockpileUnavailable_.load() && !MadviseStockpileUnavailableLogged_) {
+ YTALLOC_LOG_WARNING("MADV_STOCKPILE is not supported");
+ MadviseStockpileUnavailableLogged_ = true;
+ }
+ }
+
+private:
+ bool BuggyKernelLogged_ = false;
+
+ std::atomic<bool> MadviseFreeSupported_ = false;
+ bool MadviseFreeSupportedLogged_ = false;
+
+ std::atomic<bool> MadviseFreeNotSupported_ = false;
+ bool MadviseFreeNotSupportedLogged_ = false;
+
+ std::atomic<bool> MadvisePopulateUnavailable_ = false;
+ bool MadvisePopulateUnavailableLogged_ = false;
+
+ std::atomic<bool> MadviseStockpileUnavailable_ = false;
+ bool MadviseStockpileUnavailableLogged_ = false;
+
+private:
+ bool TryMadvisePopulate(void* ptr, size_t size)
+ {
+ TTimingGuard timingGuard(ETimingEventType::MadvisePopulate, size);
+ auto result = ::madvise(ptr, size, MADV_POPULATE);
+ if (result != 0) {
+ auto error = errno;
+ YTALLOC_VERIFY(error == EINVAL || error == ENOMEM);
+ if (error == ENOMEM) {
+ ::fprintf(stderr, "*** YTAlloc has received ENOMEM error while trying to madvise(MADV_POPULATE) %zu bytes\n",
+ size);
+ OomTrap();
+ }
+ return false;
+ }
+ return true;
+ }
+
+ void DoPrefault(void* ptr, size_t size)
+ {
+ TTimingGuard timingGuard(ETimingEventType::Prefault, size);
+ auto* begin = static_cast<char*>(ptr);
+ for (auto* current = begin; current < begin + size; current += PageSize) {
+ *current = 0;
+ }
+ }
+
+ bool CanUseMadviseFree()
+ {
+ if (MadviseFreeSupported_.load()) {
+ return true;
+ }
+ if (MadviseFreeNotSupported_.load()) {
+ return false;
+ }
+
+ if (IsBuggyKernel()) {
+ MadviseFreeNotSupported_.store(true);
+ } else {
+ auto* ptr = Map(0, PageSize, 0);
+ if (::madvise(ptr, PageSize, MADV_FREE) == 0) {
+ MadviseFreeSupported_.store(true);
+ } else {
+ MadviseFreeNotSupported_.store(true);
+ }
+ Unmap(ptr, PageSize);
+ }
+
+ // Will not recurse.
+ return CanUseMadviseFree();
+ }
+
+ void DoMadviseDontNeed(void* ptr, size_t size)
+ {
+ TTimingGuard timingGuard(ETimingEventType::MadviseDontNeed, size);
+ auto result = ::madvise(ptr, size, MADV_DONTNEED);
+ if (result != 0) {
+ auto error = errno;
+ // Failure is possible for locked pages.
+ Y_VERIFY(error == EINVAL);
+ }
+ }
+
+ void DoMadviseFree(void* ptr, size_t size)
+ {
+ TTimingGuard timingGuard(ETimingEventType::MadviseFree, size);
+ auto result = ::madvise(ptr, size, MADV_FREE);
+ if (result != 0) {
+ auto error = errno;
+ // Failure is possible for locked pages.
+ YTALLOC_VERIFY(error == EINVAL);
+ }
+ }
+
+ bool TryMadviseStockpile(size_t size)
+ {
+ auto result = ::madvise(nullptr, size, MADV_STOCKPILE);
+ if (result != 0) {
+ auto error = errno;
+ if (error == ENOMEM || error == EAGAIN || error == EINTR) {
+ // The call is advisory, ignore ENOMEM, EAGAIN, and EINTR.
+ return true;
+ }
+ YTALLOC_VERIFY(error == EINVAL);
+ return false;
+ }
+ return true;
+ }
+
+ // Some kernels are known to contain bugs in MADV_FREE; see https://st.yandex-team.ru/KERNEL-118.
+ bool IsBuggyKernel()
+ {
+#ifdef _linux_
+ static const bool result = [] () {
+ struct utsname buf;
+ YTALLOC_VERIFY(uname(&buf) == 0);
+ if (strverscmp(buf.release, "4.4.1-1") >= 0 &&
+ strverscmp(buf.release, "4.4.96-44") < 0)
+ {
+ return true;
+ }
+ if (strverscmp(buf.release, "4.14.1-1") >= 0 &&
+ strverscmp(buf.release, "4.14.79-33") < 0)
+ {
+ return true;
+ }
+ return false;
+ }();
+ return result;
+#else
+ return false;
+#endif
+ }
+};
+
+TExplicitlyConstructableSingleton<TMappedMemoryManager> MappedMemoryManager;
+
+////////////////////////////////////////////////////////////////////////////////
+// System allocator
+
+// Each system allocation is prepended with such a header.
+struct TSystemBlobHeader
+{
+ explicit TSystemBlobHeader(size_t size)
+ : Size(size)
+ { }
+
+ size_t Size;
+ char Padding[8];
+};
+
+CHECK_HEADER_ALIGNMENT(TSystemBlobHeader)
+
+// Used for some internal allocations.
+// Delgates directly to TMappedMemoryManager.
+class TSystemAllocator
+{
+public:
+ void* Allocate(size_t size);
+ void Free(void* ptr);
+
+private:
+ std::atomic<uintptr_t> CurrentPtr_ = SystemZoneStart;
+};
+
+TExplicitlyConstructableSingleton<TSystemAllocator> SystemAllocator;
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Deriving from this class makes instances bound to TSystemAllocator.
+struct TSystemAllocatable
+{
+ void* operator new(size_t size) noexcept
+ {
+ return SystemAllocator->Allocate(size);
+ }
+
+ void* operator new[](size_t size) noexcept
+ {
+ return SystemAllocator->Allocate(size);
+ }
+
+ void operator delete(void* ptr) noexcept
+ {
+ SystemAllocator->Free(ptr);
+ }
+
+ void operator delete[](void* ptr) noexcept
+ {
+ SystemAllocator->Free(ptr);
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Maintains a pool of objects.
+// Objects are allocated in groups each containing BatchSize instances.
+// The actual allocation is carried out by TSystemAllocator.
+// Memory is never actually reclaimed; freed instances are put into TFreeList.
+template <class T, size_t BatchSize>
+class TSystemPool
+{
+public:
+ T* Allocate()
+ {
+ while (true) {
+ auto* obj = FreeList_.Extract();
+ if (Y_LIKELY(obj)) {
+ new (obj) T();
+ return obj;
+ }
+ AllocateMore();
+ }
+ }
+
+ void Free(T* obj)
+ {
+ obj->T::~T();
+ PoisonFreedRange(obj, sizeof(T));
+ FreeList_.Put(obj);
+ }
+
+private:
+ TFreeList<T> FreeList_;
+
+private:
+ void AllocateMore()
+ {
+ auto* objs = static_cast<T*>(SystemAllocator->Allocate(sizeof(T) * BatchSize));
+ for (size_t index = 0; index < BatchSize; ++index) {
+ auto* obj = objs + index;
+ FreeList_.Put(obj);
+ }
+ }
+};
+
+// A sharded analogue TSystemPool.
+template <class T, size_t BatchSize>
+class TShardedSystemPool
+{
+public:
+ template <class TState>
+ T* Allocate(TState* state)
+ {
+ if (auto* obj = FreeLists_[state->GetInitialShardIndex()].Extract()) {
+ new (obj) T();
+ return obj;
+ }
+
+ while (true) {
+ for (size_t index = 0; index < ShardCount; ++index) {
+ if (auto* obj = FreeLists_[state->GetNextShardIndex()].Extract()) {
+ new (obj) T();
+ return obj;
+ }
+ }
+ AllocateMore();
+ }
+ }
+
+ template <class TState>
+ void Free(TState* state, T* obj)
+ {
+ obj->T::~T();
+ PoisonFreedRange(obj, sizeof(T));
+ FreeLists_[state->GetInitialShardIndex()].Put(obj);
+ }
+
+private:
+ std::array<TFreeList<T>, ShardCount> FreeLists_;
+
+private:
+ void AllocateMore()
+ {
+ auto* objs = static_cast<T*>(SystemAllocator->Allocate(sizeof(T) * BatchSize));
+ for (size_t index = 0; index < BatchSize; ++index) {
+ auto* obj = objs + index;
+ FreeLists_[index % ShardCount].Put(obj);
+ }
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Handles allocations inside a zone of memory given by its start and end pointers.
+// Each allocation is a separate mapped region of memory.
+// A special care is taken to guarantee that all allocated regions fall inside the zone.
+class TZoneAllocator
+{
+public:
+ TZoneAllocator(uintptr_t zoneStart, uintptr_t zoneEnd)
+ : ZoneStart_(zoneStart)
+ , ZoneEnd_(zoneEnd)
+ , Current_(zoneStart)
+ {
+ YTALLOC_VERIFY(ZoneStart_ % PageSize == 0);
+ }
+
+ void* Allocate(size_t size, int flags)
+ {
+ YTALLOC_VERIFY(size % PageSize == 0);
+ bool restarted = false;
+ while (true) {
+ auto hint = (Current_ += size) - size;
+ if (reinterpret_cast<uintptr_t>(hint) + size > ZoneEnd_) {
+ if (restarted) {
+ ::fprintf(stderr, "*** YTAlloc was unable to mmap %zu bytes in zone %" PRIx64 "--%" PRIx64 "\n",
+ size,
+ ZoneStart_,
+ ZoneEnd_);
+ OomTrap();
+ }
+ restarted = true;
+ Current_ = ZoneStart_;
+ } else {
+ char* ptr = static_cast<char*>(MappedMemoryManager->Map(
+ hint,
+ size,
+ MAP_FIXED_NOREPLACE | flags));
+ if (reinterpret_cast<uintptr_t>(ptr) == hint) {
+ return ptr;
+ }
+ if (ptr != MAP_FAILED) {
+ MappedMemoryManager->Unmap(ptr, size);
+ }
+ }
+ }
+ }
+
+ void Free(void* ptr, size_t size)
+ {
+ MappedMemoryManager->Unmap(ptr, size);
+ }
+
+private:
+ const uintptr_t ZoneStart_;
+ const uintptr_t ZoneEnd_;
+
+ std::atomic<uintptr_t> Current_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+// YTAlloc supports tagged allocations.
+// Since the total number of tags can be huge, a two-level scheme is employed.
+// Possible tags are arranged into sets each containing TaggedCounterSetSize tags.
+// There are up to MaxTaggedCounterSets in total.
+// Upper 4 sets are reserved for profiled allocations.
+constexpr size_t TaggedCounterSetSize = 16384;
+constexpr size_t AllocationProfilingTaggedCounterSets = 4;
+constexpr size_t MaxTaggedCounterSets = 256 + AllocationProfilingTaggedCounterSets;
+
+constexpr size_t MaxCapturedAllocationBacktraces = 65000;
+static_assert(
+ MaxCapturedAllocationBacktraces < AllocationProfilingTaggedCounterSets * TaggedCounterSetSize,
+ "MaxCapturedAllocationBacktraces is too big");
+
+constexpr TMemoryTag AllocationProfilingMemoryTagBase = TaggedCounterSetSize * (MaxTaggedCounterSets - AllocationProfilingTaggedCounterSets);
+constexpr TMemoryTag AllocationProfilingUnknownMemoryTag = AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces;
+
+static_assert(
+ MaxMemoryTag == TaggedCounterSetSize * (MaxTaggedCounterSets - AllocationProfilingTaggedCounterSets) - 1,
+ "Wrong MaxMemoryTag");
+
+template <class TCounter>
+using TUntaggedTotalCounters = TEnumIndexedVector<EBasicCounter, TCounter>;
+
+template <class TCounter>
+struct TTaggedTotalCounterSet
+ : public TSystemAllocatable
+{
+ std::array<TEnumIndexedVector<EBasicCounter, TCounter>, TaggedCounterSetSize> Counters;
+};
+
+using TLocalTaggedBasicCounterSet = TTaggedTotalCounterSet<ssize_t>;
+using TGlobalTaggedBasicCounterSet = TTaggedTotalCounterSet<std::atomic<ssize_t>>;
+
+template <class TCounter>
+struct TTotalCounters
+{
+ // The sum of counters across all tags.
+ TUntaggedTotalCounters<TCounter> CumulativeTaggedCounters;
+
+ // Counters for untagged allocations.
+ TUntaggedTotalCounters<TCounter> UntaggedCounters;
+
+ // Access to tagged counters may involve creation of a new tag set.
+ // For simplicity, we separate the read-side (TaggedCounterSets) and the write-side (TaggedCounterSetHolders).
+ // These arrays contain virtually identical data (up to std::unique_ptr and std::atomic semantic differences).
+ std::array<std::atomic<TTaggedTotalCounterSet<TCounter>*>, MaxTaggedCounterSets> TaggedCounterSets{};
+ std::array<std::unique_ptr<TTaggedTotalCounterSet<TCounter>>, MaxTaggedCounterSets> TaggedCounterSetHolders;
+
+ // Protects TaggedCounterSetHolders from concurrent updates.
+ NThreading::TForkAwareSpinLock TaggedCounterSetsLock;
+
+ // Returns null if the set is not yet constructed.
+ Y_FORCE_INLINE TTaggedTotalCounterSet<TCounter>* FindTaggedCounterSet(size_t index) const
+ {
+ return TaggedCounterSets[index].load();
+ }
+
+ // Constructs the set on first access.
+ TTaggedTotalCounterSet<TCounter>* GetOrCreateTaggedCounterSet(size_t index)
+ {
+ auto* set = TaggedCounterSets[index].load();
+ if (Y_LIKELY(set)) {
+ return set;
+ }
+
+ auto guard = GuardWithTiming(TaggedCounterSetsLock);
+ auto& setHolder = TaggedCounterSetHolders[index];
+ if (!setHolder) {
+ setHolder = std::make_unique<TTaggedTotalCounterSet<TCounter>>();
+ TaggedCounterSets[index] = setHolder.get();
+ }
+ return setHolder.get();
+ }
+};
+
+using TLocalSystemCounters = TEnumIndexedVector<ESystemCounter, ssize_t>;
+using TGlobalSystemCounters = TEnumIndexedVector<ESystemCounter, std::atomic<ssize_t>>;
+
+using TLocalSmallCounters = TEnumIndexedVector<ESmallArenaCounter, ssize_t>;
+using TGlobalSmallCounters = TEnumIndexedVector<ESmallArenaCounter, std::atomic<ssize_t>>;
+
+using TLocalLargeCounters = TEnumIndexedVector<ELargeArenaCounter, ssize_t>;
+using TGlobalLargeCounters = TEnumIndexedVector<ELargeArenaCounter, std::atomic<ssize_t>>;
+
+using TLocalHugeCounters = TEnumIndexedVector<EHugeCounter, ssize_t>;
+using TGlobalHugeCounters = TEnumIndexedVector<EHugeCounter, std::atomic<ssize_t>>;
+
+using TLocalUndumpableCounters = TEnumIndexedVector<EUndumpableCounter, ssize_t>;
+using TGlobalUndumpableCounters = TEnumIndexedVector<EUndumpableCounter, std::atomic<ssize_t>>;
+
+Y_FORCE_INLINE ssize_t LoadCounter(ssize_t counter)
+{
+ return counter;
+}
+
+Y_FORCE_INLINE ssize_t LoadCounter(const std::atomic<ssize_t>& counter)
+{
+ return counter.load();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TMmapObservationEvent
+{
+ size_t Size;
+ std::array<void*, MaxAllocationProfilingBacktraceDepth> Frames;
+ int FrameCount;
+ TInstant Timestamp;
+ TFiberId FiberId;
+
+ TMmapObservationEvent() = default;
+
+ TMmapObservationEvent(
+ size_t size,
+ std::array<void*, MaxAllocationProfilingBacktraceDepth> frames,
+ int frameCount)
+ : Size(size)
+ , Frames(frames)
+ , FrameCount(frameCount)
+ { }
+};
+
+class TMmapObservationManager
+ : public TEventLogManagerBase<TMmapObservationEvent, TMmapObservationManager>
+{
+public:
+ void SetBacktraceFormatter(TBacktraceFormatter formatter)
+ {
+ BacktraceFormatter_.store(formatter);
+ }
+
+private:
+ std::atomic<TBacktraceFormatter> BacktraceFormatter_ = nullptr;
+
+ Y_POD_STATIC_THREAD(bool) DisabledForCurrentThread_;
+
+ friend class TEventLogManagerBase<TMmapObservationEvent, TMmapObservationManager>;
+
+ virtual void OnEvent(const TMmapObservationEvent& /*event*/) override
+ { }
+
+ virtual void ProcessEvent(const TMmapObservationEvent& event) override
+ {
+ YTALLOC_LOG_DEBUG("Large arena mmap observed (Size: %zu, Timestamp: %s, FiberId: %" PRIx64 ")",
+ event.Size,
+ ToString(event.Timestamp).c_str(),
+ event.FiberId);
+
+ if (auto backtraceFormatter = BacktraceFormatter_.load()) {
+ auto backtrace = backtraceFormatter(const_cast<void**>(event.Frames.data()), event.FrameCount);
+ YTALLOC_LOG_DEBUG("YTAlloc stack backtrace (Stack: %s)",
+ backtrace.c_str());
+ }
+ }
+};
+
+Y_POD_THREAD(bool) TMmapObservationManager::DisabledForCurrentThread_;
+
+TExplicitlyConstructableSingleton<TMmapObservationManager> MmapObservationManager;
+
+////////////////////////////////////////////////////////////////////////////////
+
+// A per-thread structure containing counters, chunk caches etc.
+struct TThreadState
+ : public TFreeListItem<TThreadState>
+ , public TLocalShardedState
+{
+ // TThreadState instances of all alive threads are put into a double-linked intrusive list.
+ // This is a pair of next/prev pointers connecting an instance of TThreadState to its neighbors.
+ TIntrusiveLinkedListNode<TThreadState> RegistryNode;
+
+ // Pointers to the respective parts of TThreadManager::ThreadControlWord_.
+ // If null then the thread is already destroyed (but TThreadState may still live for a while
+ // due to ref-counting).
+ ui8* AllocationProfilingEnabled;
+ ui8* BackgroundThreadStarted;
+
+ // TThreadStates are ref-counted.
+ // TThreadManager::EnumerateThreadStates enumerates the registered states and acquires
+ // a temporary reference preventing these states from being destructed. This provides
+ // for shorter periods of time the global lock needs to be held.
+ int RefCounter = 1;
+
+ // Per-thread counters.
+ TTotalCounters<ssize_t> TotalCounters;
+ std::array<TLocalLargeCounters, LargeRankCount> LargeArenaCounters;
+ TLocalUndumpableCounters UndumpableCounters;
+
+ // Each thread maintains caches of small chunks.
+ // One cache is for tagged chunks; the other is for untagged ones.
+ // Each cache contains up to MaxCachedChunksPerRank chunks per any rank.
+ // Special sentinels are placed to distinguish the boundaries of region containing
+ // pointers of a specific rank. This enables a tiny-bit faster inplace boundary checks.
+
+ static constexpr uintptr_t LeftSentinel = 1;
+ static constexpr uintptr_t RightSentinel = 2;
+
+ struct TSmallBlobCache
+ {
+ TSmallBlobCache()
+ {
+ void** chunkPtrs = CachedChunks.data();
+ for (size_t rank = 0; rank < SmallRankCount; ++rank) {
+ RankToCachedChunkPtrHead[rank] = chunkPtrs;
+ chunkPtrs[0] = reinterpret_cast<void*>(LeftSentinel);
+ chunkPtrs[MaxCachedChunksPerRank + 1] = reinterpret_cast<void*>(RightSentinel);
+
+#ifdef YTALLOC_PARANOID
+ RankToCachedChunkPtrTail[rank] = chunkPtrs;
+ CachedChunkFull[rank] = false;
+
+ RankToCachedChunkLeftBorder[rank] = chunkPtrs;
+ RankToCachedChunkRightBorder[rank] = chunkPtrs + MaxCachedChunksPerRank + 1;
+#endif
+ chunkPtrs += MaxCachedChunksPerRank + 2;
+ }
+ }
+
+ // For each rank we have a segment of pointers in CachedChunks with the following layout:
+ // LCC[C]........R
+ // Legend:
+ // . = garbage
+ // L = left sentinel
+ // R = right sentinel
+ // C = cached pointer
+ // [C] = current cached pointer
+ //
+ // Under YTALLOC_PARANOID the following layout is used:
+ // L.[T]CCC[H]...R
+ // Legend:
+ // [H] = head cached pointer, put chunks here
+ // [T] = tail cached pointer, take chunks from here
+
+ // +2 is for two sentinels
+ std::array<void*, SmallRankCount * (MaxCachedChunksPerRank + 2)> CachedChunks{};
+
+ // Pointer to [P] for each rank.
+ std::array<void**, SmallRankCount> RankToCachedChunkPtrHead{};
+
+#ifdef YTALLOC_PARANOID
+ // Pointers to [L] and [R] for each rank.
+ std::array<void**, SmallRankCount> RankToCachedChunkLeftBorder{};
+ std::array<void**, SmallRankCount> RankToCachedChunkRightBorder{};
+
+ std::array<void**, SmallRankCount> RankToCachedChunkPtrTail{};
+ std::array<bool, SmallRankCount> CachedChunkFull{};
+#endif
+ };
+ TEnumIndexedVector<EAllocationKind, TSmallBlobCache> SmallBlobCache;
+};
+
+struct TThreadStateToRegistryNode
+{
+ auto operator() (TThreadState* state) const
+ {
+ return &state->RegistryNode;
+ }
+};
+
+// Manages all registered threads and controls access to TThreadState.
+class TThreadManager
+{
+public:
+ TThreadManager()
+ {
+ pthread_key_create(&ThreadDtorKey_, DestroyThread);
+
+ NThreading::TForkAwareSpinLock::AtFork(
+ this,
+ nullptr,
+ nullptr,
+ &AfterFork);
+ }
+
+ // Returns TThreadState for the current thread; the caller guarantees that this
+ // state is initialized and is not destroyed yet.
+ static TThreadState* GetThreadStateUnchecked();
+
+ // Returns TThreadState for the current thread; may return null.
+ static TThreadState* FindThreadState();
+
+ // Returns TThreadState for the current thread; may not return null
+ // (but may crash if TThreadState is already destroyed).
+ static TThreadState* GetThreadStateChecked()
+ {
+ auto* state = FindThreadState();
+ YTALLOC_VERIFY(state);
+ return state;
+ }
+
+ // Enumerates all threads and invokes func passing TThreadState instances.
+ // func must not throw but can take arbitrary time; no locks are being held while it executes.
+ template <class THandler>
+ void EnumerateThreadStatesAsync(const THandler& handler) noexcept
+ {
+ TMemoryTagGuard guard(NullMemoryTag);
+
+ std::vector<TThreadState*> states;
+ states.reserve(1024); // must be enough in most cases
+
+ auto unrefStates = [&] {
+ // Releasing references also requires global lock to be held to avoid getting zombies above.
+ auto guard = GuardWithTiming(ThreadRegistryLock_);
+ for (auto* state : states) {
+ UnrefThreadState(state);
+ }
+ };
+
+ auto tryRefStates = [&] {
+ // Only hold this guard for a small period of time to reference all the states.
+ auto guard = GuardWithTiming(ThreadRegistryLock_);
+ auto* current = ThreadRegistry_.GetFront();
+ while (current) {
+ if (states.size() == states.capacity()) {
+ // Cannot allocate while holding ThreadRegistryLock_ due to a possible deadlock as follows:
+ // EnumerateThreadStatesAsync -> StartBackgroundThread -> EnumerateThreadStatesSync
+ // (many other scenarios are also possible).
+ guard.Release();
+ unrefStates();
+ states.clear();
+ states.reserve(states.capacity() * 2);
+ return false;
+ }
+ RefThreadState(current);
+ states.push_back(current);
+ current = current->RegistryNode.Next;
+ }
+ return true;
+ };
+
+ while (!tryRefStates()) ;
+
+ for (auto* state : states) {
+ handler(state);
+ }
+
+ unrefStates();
+ }
+
+ // Similar to EnumerateThreadStatesAsync but holds the global lock while enumerating the threads.
+ // Also invokes a given prologue functor while holding the thread registry lock.
+ // Handler and prologue calls must be fast and must not allocate.
+ template <class TPrologue, class THandler>
+ void EnumerateThreadStatesSync(const TPrologue& prologue, const THandler& handler) noexcept
+ {
+ auto guard = GuardWithTiming(ThreadRegistryLock_);
+ prologue();
+ auto* current = ThreadRegistry_.GetFront();
+ while (current) {
+ handler(current);
+ current = current->RegistryNode.Next;
+ }
+ }
+
+
+ // We store a special 64-bit "thread control word" in TLS encapsulating the following
+ // crucial per-thread parameters:
+ // * the current memory tag
+ // * a flag indicating that a valid TThreadState is known to exists
+ // (and can be obtained via GetThreadStateUnchecked)
+ // * a flag indicating that allocation profiling is enabled
+ // * a flag indicating that background thread is started
+ // Thread control word is fetched via GetThreadControlWord and is compared
+ // against FastPathControlWord to see if the fast path can be taken.
+ // The latter happens when no memory tagging is configured, TThreadState is
+ // valid, allocation profiling is disabled, and background thread is started.
+
+ // The mask for extracting memory tag from thread control word.
+ static constexpr ui64 MemoryTagControlWordMask = 0xffffffff;
+ // ThreadStateValid is on.
+ static constexpr ui64 ThreadStateValidControlWordMask = (1ULL << 32);
+ // AllocationProfiling is on.
+ static constexpr ui64 AllocationProfilingEnabledControlWordMask = (1ULL << 40);
+ // All background thread are properly started.
+ static constexpr ui64 BackgroundThreadStartedControlWorkMask = (1ULL << 48);
+ // Memory tag is NullMemoryTag; thread state is valid.
+ static constexpr ui64 FastPathControlWord =
+ BackgroundThreadStartedControlWorkMask |
+ ThreadStateValidControlWordMask |
+ NullMemoryTag;
+
+ Y_FORCE_INLINE static ui64 GetThreadControlWord()
+ {
+ return (&ThreadControlWord_)->Value;
+ }
+
+
+ static TMemoryTag GetCurrentMemoryTag()
+ {
+ return (&ThreadControlWord_)->Parts.MemoryTag;
+ }
+
+ static void SetCurrentMemoryTag(TMemoryTag tag)
+ {
+ Y_VERIFY(tag <= MaxMemoryTag);
+ (&ThreadControlWord_)->Parts.MemoryTag = tag;
+ }
+
+
+ static EMemoryZone GetCurrentMemoryZone()
+ {
+ return CurrentMemoryZone_;
+ }
+
+ static void SetCurrentMemoryZone(EMemoryZone zone)
+ {
+ CurrentMemoryZone_ = zone;
+ }
+
+
+ static void SetCurrentFiberId(TFiberId id)
+ {
+ CurrentFiberId_ = id;
+ }
+
+ static TFiberId GetCurrentFiberId()
+ {
+ return CurrentFiberId_;
+ }
+
+private:
+ static void DestroyThread(void*);
+
+ TThreadState* AllocateThreadState();
+
+ void RefThreadState(TThreadState* state)
+ {
+ auto result = ++state->RefCounter;
+ Y_VERIFY(result > 1);
+ }
+
+ void UnrefThreadState(TThreadState* state)
+ {
+ auto result = --state->RefCounter;
+ Y_VERIFY(result >= 0);
+ if (result == 0) {
+ DestroyThreadState(state);
+ }
+ }
+
+ void DestroyThreadState(TThreadState* state);
+
+ static void AfterFork(void* cookie);
+ void DoAfterFork();
+
+private:
+ // TThreadState instance for the current thread.
+ // Initially null, then initialized when first needed.
+ // TThreadState is destroyed upon thread termination (which is detected with
+ // the help of pthread_key_create machinery), so this pointer can become null again.
+ Y_POD_STATIC_THREAD(TThreadState*) ThreadState_;
+
+ // Initially false, then set to true then TThreadState is destroyed.
+ // If the thread requests for its state afterwards, null is returned and no new state is (re-)created.
+ // The caller must be able to deal with it.
+ Y_POD_STATIC_THREAD(bool) ThreadStateDestroyed_;
+
+ union TThreadControlWord
+ {
+ ui64 __attribute__((__may_alias__)) Value;
+ struct TParts
+ {
+ // The current memory tag used in all allocations by this thread.
+ ui32 __attribute__((__may_alias__)) MemoryTag;
+ // Indicates if a valid TThreadState exists and can be obtained via GetThreadStateUnchecked.
+ ui8 __attribute__((__may_alias__)) ThreadStateValid;
+ // Indicates if allocation profiling is on.
+ ui8 __attribute__((__may_alias__)) AllocationProfilingEnabled;
+ // Indicates if all background threads are properly started.
+ ui8 __attribute__((__may_alias__)) BackgroundThreadStarted;
+ ui8 Padding[2];
+ } Parts;
+ };
+ Y_POD_STATIC_THREAD(TThreadControlWord) ThreadControlWord_;
+
+ // See memory zone API.
+ Y_POD_STATIC_THREAD(EMemoryZone) CurrentMemoryZone_;
+
+ // See fiber id API.
+ Y_POD_STATIC_THREAD(TFiberId) CurrentFiberId_;
+
+ pthread_key_t ThreadDtorKey_;
+
+ static constexpr size_t ThreadStatesBatchSize = 1;
+ TSystemPool<TThreadState, ThreadStatesBatchSize> ThreadStatePool_;
+
+ NThreading::TForkAwareSpinLock ThreadRegistryLock_;
+ TIntrusiveLinkedList<TThreadState, TThreadStateToRegistryNode> ThreadRegistry_;
+};
+
+Y_POD_THREAD(TThreadState*) TThreadManager::ThreadState_;
+Y_POD_THREAD(bool) TThreadManager::ThreadStateDestroyed_;
+Y_POD_THREAD(TThreadManager::TThreadControlWord) TThreadManager::ThreadControlWord_;
+Y_POD_THREAD(EMemoryZone) TThreadManager::CurrentMemoryZone_;
+Y_POD_THREAD(TFiberId) TThreadManager::CurrentFiberId_;
+
+TExplicitlyConstructableSingleton<TThreadManager> ThreadManager;
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TConfigurationManager::SetAllocationProfilingEnabled(bool value)
+{
+ // Update threads' TLS.
+ ThreadManager->EnumerateThreadStatesSync(
+ [&] {
+ AllocationProfilingEnabled_.store(value);
+ },
+ [&] (auto* state) {
+ if (state->AllocationProfilingEnabled) {
+ *state->AllocationProfilingEnabled = value;
+ }
+ });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Backtrace Manager
+//
+// Captures backtraces observed during allocations and assigns memory tags to them.
+// Memory tags are chosen sequentially starting from AllocationProfilingMemoryTagBase.
+//
+// For each backtrace we compute a 64-bit hash and use it as a key in a certain concurrent hashmap.
+// This hashmap is organized into BucketCount buckets, each consisting of BucketSize slots.
+//
+// Backtrace hash is translated into bucket index by taking the appropriate number of
+// its lower bits. For each slot, we remember a 32-bit fingerprint, which is
+// just the next 32 bits of the backtrace's hash, and the (previously assigned) memory tag.
+//
+// Upon access to the hashtable, the bucket is first scanned optimistically, without taking
+// any locks. In case of a miss, a per-bucket spinlock is acquired and the bucket is rescanned.
+//
+// The above scheme may involve collisions but we neglect their probability.
+//
+// If the whole hash table overflows (i.e. a total of MaxCapturedAllocationBacktraces
+// backtraces are captured) or the bucket overflows (i.e. all of its slots become occupied),
+// the allocation is annotated with AllocationProfilingUnknownMemoryTag. Such allocations
+// appear as having no backtrace whatsoever in the profiling reports.
+
+class TBacktraceManager
+{
+public:
+ // Sets the provider used for collecting backtraces when allocation profiling
+ // is turned ON.
+ void SetBacktraceProvider(TBacktraceProvider provider)
+ {
+ BacktraceProvider_.store(provider);
+ }
+
+ // Captures the backtrace and inserts it into the hashtable.
+ TMemoryTag GetMemoryTagFromBacktrace(int framesToSkip)
+ {
+ std::array<void*, MaxAllocationProfilingBacktraceDepth> frames;
+ auto backtraceProvider = BacktraceProvider_.load();
+ if (!backtraceProvider) {
+ return NullMemoryTag;
+ }
+ auto frameCount = backtraceProvider(frames.data(), ConfigurationManager->GetProfilingBacktraceDepth(), framesToSkip);
+ auto hash = GetBacktraceHash(frames.data(), frameCount);
+ return CaptureBacktrace(hash, frames.data(), frameCount);
+ }
+
+ // Returns the backtrace corresponding to the given tag, if any.
+ std::optional<TBacktrace> FindBacktrace(TMemoryTag tag)
+ {
+ if (tag < AllocationProfilingMemoryTagBase ||
+ tag >= AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces)
+ {
+ return std::nullopt;
+ }
+ const auto& entry = Backtraces_[tag - AllocationProfilingMemoryTagBase];
+ if (!entry.Captured.load()) {
+ return std::nullopt;
+ }
+ return entry.Backtrace;
+ }
+
+private:
+ static constexpr int Log2BucketCount = 16;
+ static constexpr int BucketCount = 1 << Log2BucketCount;
+ static constexpr int BucketSize = 8;
+
+ std::atomic<TBacktraceProvider> BacktraceProvider_ = nullptr;
+
+ std::array<std::array<std::atomic<ui32>, BucketSize>, BucketCount> Fingerprints_= {};
+ std::array<std::array<std::atomic<TMemoryTag>, BucketSize>, BucketCount> MemoryTags_ = {};
+ std::array<NThreading::TForkAwareSpinLock, BucketCount> BucketLocks_;
+ std::atomic<TMemoryTag> CurrentMemoryTag_ = AllocationProfilingMemoryTagBase;
+
+ struct TBacktraceEntry
+ {
+ TBacktrace Backtrace;
+ std::atomic<bool> Captured = false;
+ };
+
+ std::array<TBacktraceEntry, MaxCapturedAllocationBacktraces> Backtraces_;
+
+private:
+ static size_t GetBacktraceHash(void** frames, int frameCount)
+ {
+ size_t hash = 0;
+ for (int index = 0; index < frameCount; ++index) {
+ hash = CombineHashes(hash, THash<void*>()(frames[index]));
+ }
+ return hash;
+ }
+
+ TMemoryTag CaptureBacktrace(size_t hash, void** frames, int frameCount)
+ {
+ size_t bucketIndex = hash % BucketCount;
+ ui32 fingerprint = (hash >> Log2BucketCount) & 0xffffffff;
+ // Zero fingerprint indicates the slot is free; check and adjust to ensure
+ // that regular fingerprints are non-zero.
+ if (fingerprint == 0) {
+ fingerprint = 1;
+ }
+
+ for (int slotIndex = 0; slotIndex < BucketSize; ++slotIndex) {
+ auto currentFingerprint = Fingerprints_[bucketIndex][slotIndex].load(std::memory_order_relaxed);
+ if (currentFingerprint == fingerprint) {
+ return MemoryTags_[bucketIndex][slotIndex].load();
+ }
+ }
+
+ auto guard = Guard(BucketLocks_[bucketIndex]);
+
+ int spareSlotIndex = -1;
+ for (int slotIndex = 0; slotIndex < BucketSize; ++slotIndex) {
+ auto currentFingerprint = Fingerprints_[bucketIndex][slotIndex].load(std::memory_order_relaxed);
+ if (currentFingerprint == fingerprint) {
+ return MemoryTags_[bucketIndex][slotIndex];
+ }
+ if (currentFingerprint == 0) {
+ spareSlotIndex = slotIndex;
+ break;
+ }
+ }
+
+ if (spareSlotIndex < 0) {
+ return AllocationProfilingUnknownMemoryTag;
+ }
+
+ auto memoryTag = CurrentMemoryTag_++;
+ if (memoryTag >= AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces) {
+ return AllocationProfilingUnknownMemoryTag;
+ }
+
+ MemoryTags_[bucketIndex][spareSlotIndex].store(memoryTag);
+ Fingerprints_[bucketIndex][spareSlotIndex].store(fingerprint);
+
+ auto& entry = Backtraces_[memoryTag - AllocationProfilingMemoryTagBase];
+ entry.Backtrace.FrameCount = frameCount;
+ ::memcpy(entry.Backtrace.Frames.data(), frames, sizeof (void*) * frameCount);
+ entry.Captured.store(true);
+
+ return memoryTag;
+ }
+};
+
+TExplicitlyConstructableSingleton<TBacktraceManager> BacktraceManager;
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Mimics the counters of TThreadState but uses std::atomic to survive concurrent access.
+struct TGlobalState
+ : public TGlobalShardedState
+{
+ TTotalCounters<std::atomic<ssize_t>> TotalCounters;
+ std::array<TGlobalLargeCounters, LargeRankCount> LargeArenaCounters;
+ TGlobalUndumpableCounters UndumpableCounters;
+};
+
+TExplicitlyConstructableSingleton<TGlobalState> GlobalState;
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Accumulates various allocation statistics.
+class TStatisticsManager
+{
+public:
+ template <EAllocationKind Kind = EAllocationKind::Tagged, class TState>
+ static Y_FORCE_INLINE void IncrementTotalCounter(TState* state, TMemoryTag tag, EBasicCounter counter, ssize_t delta)
+ {
+ // This branch is typically resolved at compile time.
+ if (Kind == EAllocationKind::Tagged && tag != NullMemoryTag) {
+ IncrementTaggedTotalCounter(&state->TotalCounters, tag, counter, delta);
+ } else {
+ IncrementUntaggedTotalCounter(&state->TotalCounters, counter, delta);
+ }
+ }
+
+ static Y_FORCE_INLINE void IncrementTotalCounter(TMemoryTag tag, EBasicCounter counter, ssize_t delta)
+ {
+ IncrementTotalCounter(GlobalState.Get(), tag, counter, delta);
+ }
+
+ void IncrementSmallArenaCounter(ESmallArenaCounter counter, size_t rank, ssize_t delta)
+ {
+ SmallArenaCounters_[rank][counter] += delta;
+ }
+
+ template <class TState>
+ static Y_FORCE_INLINE void IncrementLargeArenaCounter(TState* state, size_t rank, ELargeArenaCounter counter, ssize_t delta)
+ {
+ state->LargeArenaCounters[rank][counter] += delta;
+ }
+
+ template <class TState>
+ static Y_FORCE_INLINE void IncrementUndumpableCounter(TState* state, EUndumpableCounter counter, ssize_t delta)
+ {
+ state->UndumpableCounters[counter] += delta;
+ }
+
+ void IncrementHugeCounter(EHugeCounter counter, ssize_t delta)
+ {
+ HugeCounters_[counter] += delta;
+ }
+
+ void IncrementHugeUndumpableCounter(EUndumpableCounter counter, ssize_t delta)
+ {
+ HugeUndumpableCounters_[counter] += delta;
+ }
+
+ void IncrementSystemCounter(ESystemCounter counter, ssize_t delta)
+ {
+ SystemCounters_[counter] += delta;
+ }
+
+ // Computes memory usage for a list of tags by aggregating counters across threads.
+ void GetTaggedMemoryCounters(const TMemoryTag* tags, size_t count, TEnumIndexedVector<EBasicCounter, ssize_t>* counters)
+ {
+ TMemoryTagGuard guard(NullMemoryTag);
+
+ for (size_t index = 0; index < count; ++index) {
+ counters[index][EBasicCounter::BytesAllocated] = 0;
+ counters[index][EBasicCounter::BytesFreed] = 0;
+ }
+
+ for (size_t index = 0; index < count; ++index) {
+ auto tag = tags[index];
+ counters[index][EBasicCounter::BytesAllocated] += LoadTaggedTotalCounter(GlobalState->TotalCounters, tag, EBasicCounter::BytesAllocated);
+ counters[index][EBasicCounter::BytesFreed] += LoadTaggedTotalCounter(GlobalState->TotalCounters, tag, EBasicCounter::BytesFreed);
+ }
+
+ ThreadManager->EnumerateThreadStatesAsync(
+ [&] (const auto* state) {
+ for (size_t index = 0; index < count; ++index) {
+ auto tag = tags[index];
+ counters[index][EBasicCounter::BytesAllocated] += LoadTaggedTotalCounter(state->TotalCounters, tag, EBasicCounter::BytesAllocated);
+ counters[index][EBasicCounter::BytesFreed] += LoadTaggedTotalCounter(state->TotalCounters, tag, EBasicCounter::BytesFreed);
+ }
+ });
+
+ for (size_t index = 0; index < count; ++index) {
+ counters[index][EBasicCounter::BytesUsed] = GetUsed(counters[index][EBasicCounter::BytesAllocated], counters[index][EBasicCounter::BytesFreed]);
+ }
+ }
+
+ void GetTaggedMemoryUsage(const TMemoryTag* tags, size_t count, size_t* results)
+ {
+ TMemoryTagGuard guard(NullMemoryTag);
+
+ std::vector<TEnumIndexedVector<EBasicCounter, ssize_t>> counters;
+ counters.resize(count);
+ GetTaggedMemoryCounters(tags, count, counters.data());
+
+ for (size_t index = 0; index < count; ++index) {
+ results[index] = counters[index][EBasicCounter::BytesUsed];
+ }
+ }
+
+ TEnumIndexedVector<ETotalCounter, ssize_t> GetTotalAllocationCounters()
+ {
+ TEnumIndexedVector<ETotalCounter, ssize_t> result;
+
+ auto accumulate = [&] (const auto& counters) {
+ result[ETotalCounter::BytesAllocated] += LoadCounter(counters[EBasicCounter::BytesAllocated]);
+ result[ETotalCounter::BytesFreed] += LoadCounter(counters[EBasicCounter::BytesFreed]);
+ };
+
+ accumulate(GlobalState->TotalCounters.UntaggedCounters);
+ accumulate(GlobalState->TotalCounters.CumulativeTaggedCounters);
+
+ ThreadManager->EnumerateThreadStatesAsync(
+ [&] (const auto* state) {
+ accumulate(state->TotalCounters.UntaggedCounters);
+ accumulate(state->TotalCounters.CumulativeTaggedCounters);
+ });
+
+ result[ETotalCounter::BytesUsed] = GetUsed(
+ result[ETotalCounter::BytesAllocated],
+ result[ETotalCounter::BytesFreed]);
+
+ auto systemCounters = GetSystemAllocationCounters();
+ result[ETotalCounter::BytesCommitted] += systemCounters[EBasicCounter::BytesUsed];
+
+ auto hugeCounters = GetHugeAllocationCounters();
+ result[ETotalCounter::BytesCommitted] += hugeCounters[EHugeCounter::BytesUsed];
+
+ auto smallArenaCounters = GetSmallArenaAllocationCounters();
+ for (size_t rank = 0; rank < SmallRankCount; ++rank) {
+ result[ETotalCounter::BytesCommitted] += smallArenaCounters[rank][ESmallArenaCounter::BytesCommitted];
+ }
+
+ auto largeArenaCounters = GetLargeArenaAllocationCounters();
+ for (size_t rank = 0; rank < LargeRankCount; ++rank) {
+ result[ETotalCounter::BytesCommitted] += largeArenaCounters[rank][ELargeArenaCounter::BytesCommitted];
+ }
+
+ result[ETotalCounter::BytesUnaccounted] = std::max<ssize_t>(GetProcessRss() - result[ETotalCounter::BytesCommitted], 0);
+
+ return result;
+ }
+
+ TEnumIndexedVector<ESmallCounter, ssize_t> GetSmallAllocationCounters()
+ {
+ TEnumIndexedVector<ESmallCounter, ssize_t> result;
+
+ auto totalCounters = GetTotalAllocationCounters();
+ result[ESmallCounter::BytesAllocated] = totalCounters[ETotalCounter::BytesAllocated];
+ result[ESmallCounter::BytesFreed] = totalCounters[ETotalCounter::BytesFreed];
+ result[ESmallCounter::BytesUsed] = totalCounters[ETotalCounter::BytesUsed];
+
+ auto largeArenaCounters = GetLargeArenaAllocationCounters();
+ for (size_t rank = 0; rank < LargeRankCount; ++rank) {
+ result[ESmallCounter::BytesAllocated] -= largeArenaCounters[rank][ELargeArenaCounter::BytesAllocated];
+ result[ESmallCounter::BytesFreed] -= largeArenaCounters[rank][ELargeArenaCounter::BytesFreed];
+ result[ESmallCounter::BytesUsed] -= largeArenaCounters[rank][ELargeArenaCounter::BytesUsed];
+ }
+
+ auto hugeCounters = GetHugeAllocationCounters();
+ result[ESmallCounter::BytesAllocated] -= hugeCounters[EHugeCounter::BytesAllocated];
+ result[ESmallCounter::BytesFreed] -= hugeCounters[EHugeCounter::BytesFreed];
+ result[ESmallCounter::BytesUsed] -= hugeCounters[EHugeCounter::BytesUsed];
+
+ return result;
+ }
+
+ std::array<TLocalSmallCounters, SmallRankCount> GetSmallArenaAllocationCounters()
+ {
+ std::array<TLocalSmallCounters, SmallRankCount> result;
+ for (size_t rank = 0; rank < SmallRankCount; ++rank) {
+ for (auto counter : TEnumTraits<ESmallArenaCounter>::GetDomainValues()) {
+ result[rank][counter] = SmallArenaCounters_[rank][counter].load();
+ }
+ }
+ return result;
+ }
+
+ TEnumIndexedVector<ELargeCounter, ssize_t> GetLargeAllocationCounters()
+ {
+ TEnumIndexedVector<ELargeCounter, ssize_t> result;
+ auto largeArenaCounters = GetLargeArenaAllocationCounters();
+ for (size_t rank = 0; rank < LargeRankCount; ++rank) {
+ result[ESmallCounter::BytesAllocated] += largeArenaCounters[rank][ELargeArenaCounter::BytesAllocated];
+ result[ESmallCounter::BytesFreed] += largeArenaCounters[rank][ELargeArenaCounter::BytesFreed];
+ result[ESmallCounter::BytesUsed] += largeArenaCounters[rank][ELargeArenaCounter::BytesUsed];
+ }
+ return result;
+ }
+
+ std::array<TLocalLargeCounters, LargeRankCount> GetLargeArenaAllocationCounters()
+ {
+ std::array<TLocalLargeCounters, LargeRankCount> result{};
+
+ for (size_t rank = 0; rank < LargeRankCount; ++rank) {
+ for (auto counter : TEnumTraits<ELargeArenaCounter>::GetDomainValues()) {
+ result[rank][counter] = GlobalState->LargeArenaCounters[rank][counter].load();
+ }
+ }
+
+ ThreadManager->EnumerateThreadStatesAsync(
+ [&] (const auto* state) {
+ for (size_t rank = 0; rank < LargeRankCount; ++rank) {
+ for (auto counter : TEnumTraits<ELargeArenaCounter>::GetDomainValues()) {
+ result[rank][counter] += state->LargeArenaCounters[rank][counter];
+ }
+ }
+ });
+
+ for (size_t rank = 0; rank < LargeRankCount; ++rank) {
+ result[rank][ELargeArenaCounter::BytesUsed] = GetUsed(result[rank][ELargeArenaCounter::BytesAllocated], result[rank][ELargeArenaCounter::BytesFreed]);
+ result[rank][ELargeArenaCounter::BlobsUsed] = GetUsed(result[rank][ELargeArenaCounter::BlobsAllocated], result[rank][ELargeArenaCounter::BlobsFreed]);
+ }
+
+ return result;
+ }
+
+ TLocalSystemCounters GetSystemAllocationCounters()
+ {
+ TLocalSystemCounters result;
+ for (auto counter : TEnumTraits<ESystemCounter>::GetDomainValues()) {
+ result[counter] = SystemCounters_[counter].load();
+ }
+ result[ESystemCounter::BytesUsed] = GetUsed(result[ESystemCounter::BytesAllocated], result[ESystemCounter::BytesFreed]);
+ return result;
+ }
+
+ TLocalHugeCounters GetHugeAllocationCounters()
+ {
+ TLocalHugeCounters result;
+ for (auto counter : TEnumTraits<EHugeCounter>::GetDomainValues()) {
+ result[counter] = HugeCounters_[counter].load();
+ }
+ result[EHugeCounter::BytesUsed] = GetUsed(result[EHugeCounter::BytesAllocated], result[EHugeCounter::BytesFreed]);
+ result[EHugeCounter::BlobsUsed] = GetUsed(result[EHugeCounter::BlobsAllocated], result[EHugeCounter::BlobsFreed]);
+ return result;
+ }
+
+ TLocalUndumpableCounters GetUndumpableAllocationCounters()
+ {
+ TLocalUndumpableCounters result;
+ for (auto counter : TEnumTraits<EUndumpableCounter>::GetDomainValues()) {
+ result[counter] = HugeUndumpableCounters_[counter].load();
+ result[counter] += GlobalState->UndumpableCounters[counter].load();
+ }
+
+ ThreadManager->EnumerateThreadStatesAsync(
+ [&] (const auto* state) {
+ result[EUndumpableCounter::BytesAllocated] += LoadCounter(state->UndumpableCounters[EUndumpableCounter::BytesAllocated]);
+ result[EUndumpableCounter::BytesFreed] += LoadCounter(state->UndumpableCounters[EUndumpableCounter::BytesFreed]);
+ });
+
+ result[EUndumpableCounter::BytesUsed] = GetUsed(result[EUndumpableCounter::BytesAllocated], result[EUndumpableCounter::BytesFreed]);
+ return result;
+ }
+
+ // Called before TThreadState is destroyed.
+ // Adds the counter values from TThreadState to the global counters.
+ void AccumulateLocalCounters(TThreadState* state)
+ {
+ for (auto counter : TEnumTraits<EBasicCounter>::GetDomainValues()) {
+ GlobalState->TotalCounters.CumulativeTaggedCounters[counter] += state->TotalCounters.CumulativeTaggedCounters[counter];
+ GlobalState->TotalCounters.UntaggedCounters[counter] += state->TotalCounters.UntaggedCounters[counter];
+ }
+ for (size_t index = 0; index < MaxTaggedCounterSets; ++index) {
+ const auto* localSet = state->TotalCounters.FindTaggedCounterSet(index);
+ if (!localSet) {
+ continue;
+ }
+ auto* globalSet = GlobalState->TotalCounters.GetOrCreateTaggedCounterSet(index);
+ for (size_t jndex = 0; jndex < TaggedCounterSetSize; ++jndex) {
+ for (auto counter : TEnumTraits<EBasicCounter>::GetDomainValues()) {
+ globalSet->Counters[jndex][counter] += localSet->Counters[jndex][counter];
+ }
+ }
+ }
+ for (size_t rank = 0; rank < LargeRankCount; ++rank) {
+ for (auto counter : TEnumTraits<ELargeArenaCounter>::GetDomainValues()) {
+ GlobalState->LargeArenaCounters[rank][counter] += state->LargeArenaCounters[rank][counter];
+ }
+ }
+ for (auto counter : TEnumTraits<EUndumpableCounter>::GetDomainValues()) {
+ GlobalState->UndumpableCounters[counter] += state->UndumpableCounters[counter];
+ }
+ }
+
+private:
+ template <class TCounter>
+ static ssize_t LoadTaggedTotalCounter(const TTotalCounters<TCounter>& counters, TMemoryTag tag, EBasicCounter counter)
+ {
+ const auto* set = counters.FindTaggedCounterSet(tag / TaggedCounterSetSize);
+ if (Y_UNLIKELY(!set)) {
+ return 0;
+ }
+ return LoadCounter(set->Counters[tag % TaggedCounterSetSize][counter]);
+ }
+
+ template <class TCounter>
+ static Y_FORCE_INLINE void IncrementUntaggedTotalCounter(TTotalCounters<TCounter>* counters, EBasicCounter counter, ssize_t delta)
+ {
+ counters->UntaggedCounters[counter] += delta;
+ }
+
+ template <class TCounter>
+ static Y_FORCE_INLINE void IncrementTaggedTotalCounter(TTotalCounters<TCounter>* counters, TMemoryTag tag, EBasicCounter counter, ssize_t delta)
+ {
+ counters->CumulativeTaggedCounters[counter] += delta;
+ auto* set = counters->GetOrCreateTaggedCounterSet(tag / TaggedCounterSetSize);
+ set->Counters[tag % TaggedCounterSetSize][counter] += delta;
+ }
+
+
+ static ssize_t GetProcessRss()
+ {
+ auto* file = ::fopen("/proc/self/statm", "r");
+ if (!file) {
+ return 0;
+ }
+
+ ssize_t dummy;
+ ssize_t rssPages;
+ auto readResult = fscanf(file, "%zd %zd", &dummy, &rssPages);
+
+ ::fclose(file);
+
+ if (readResult != 2) {
+ return 0;
+ }
+
+ return rssPages * PageSize;
+ }
+
+private:
+ TGlobalSystemCounters SystemCounters_;
+ std::array<TGlobalSmallCounters, SmallRankCount> SmallArenaCounters_;
+ TGlobalHugeCounters HugeCounters_;
+ TGlobalUndumpableCounters HugeUndumpableCounters_;
+};
+
+TExplicitlyConstructableSingleton<TStatisticsManager> StatisticsManager;
+
+////////////////////////////////////////////////////////////////////////////////
+
+void* TSystemAllocator::Allocate(size_t size)
+{
+ auto rawSize = GetRawBlobSize<TSystemBlobHeader>(size);
+ void* mmappedPtr;
+ while (true) {
+ auto currentPtr = CurrentPtr_.fetch_add(rawSize);
+ Y_VERIFY(currentPtr + rawSize <= SystemZoneEnd);
+ mmappedPtr = MappedMemoryManager->Map(
+ currentPtr,
+ rawSize,
+ MAP_FIXED_NOREPLACE | MAP_POPULATE);
+ if (mmappedPtr == reinterpret_cast<void*>(currentPtr)) {
+ break;
+ }
+ if (mmappedPtr != MAP_FAILED) {
+ MappedMemoryManager->Unmap(mmappedPtr, rawSize);
+ }
+ }
+ auto* blob = static_cast<TSystemBlobHeader*>(mmappedPtr);
+ new (blob) TSystemBlobHeader(size);
+ auto* result = HeaderToPtr(blob);
+ PoisonUninitializedRange(result, size);
+ StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, rawSize);
+ return result;
+}
+
+void TSystemAllocator::Free(void* ptr)
+{
+ auto* blob = PtrToHeader<TSystemBlobHeader>(ptr);
+ auto rawSize = GetRawBlobSize<TSystemBlobHeader>(blob->Size);
+ MappedMemoryManager->Unmap(blob, rawSize);
+ StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesFreed, rawSize);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// Small allocator
+//
+// Allocations (called small chunks) are grouped by their sizes. Two most-significant binary digits are
+// used to determine the rank of a chunk, which guarantees 25% overhead in the worst case.
+// A pair of helper arrays (SizeToSmallRank1 and SizeToSmallRank2) are used to compute ranks; we expect
+// them to be permanently cached.
+//
+// Chunks of the same rank are served by a (small) arena allocator.
+// In fact, there are two arenas for each rank: one is for tagged allocations and another is for untagged ones.
+//
+// We encode chunk's rank and whether it is tagged or not in the resulting pointer as follows:
+// 0- 3: must be zero due to alignment
+// 4-39: varies
+// 40-44: rank
+// 45: 0 for untagged allocations, 1 for tagged ones
+// 45-63: zeroes
+// This enables computing chunk's rank and also determining if it is tagged in constant time
+// without any additional lookups. Also, one pays no space overhead for untagged allocations
+// and pays 16 bytes for each tagged one.
+//
+// Each arena allocates extents of memory by calling mmap for each extent of SmallExtentSize bytes.
+// (Recall that this memory is never reclaimed.)
+// Each extent is then sliced into segments of SmallSegmentSize bytes.
+// Whenever a new segment is acquired, its memory is pre-faulted by madvise(MADV_POPULATE).
+// New segments are acquired in a lock-free manner.
+//
+// Each thread maintains a separate cache of chunks of each rank (two caches to be precise: one
+// for tagged allocations and the other for untagged). These caches are fully thread-local and
+// involve no atomic operations.
+//
+// There are also global caches (per rank, for tagged and untagged allocations).
+// Instead of keeping individual chunks these work with chunk groups (collections of up to ChunksPerGroup
+// arbitrary chunks).
+//
+// When the local cache becomes exhausted, a group of chunks is fetched from the global cache
+// (if the latter is empty then the arena allocator is consulted).
+// Vice versa, if the local cache overflows, a group of chunks is moved from it to the global cache.
+//
+// Global caches and arena allocators also take care of (rare) cases when Allocate/Free is called
+// without a valid thread state (which happens during thread shutdown when TThreadState is already destroyed).
+//
+// Each arena allocates memory in a certain "data" zone of SmallZoneSize.
+// In addition to that zone, up to two "shadow" zones are maintained.
+//
+// The first one contains memory tags of chunks residing in the primary zone.
+// The second one (which is present if YTALLOC_NERVOUS is defined) contains
+// states of chunks. These states enable some simple internal sanity checks
+// (e.g. detect attempts to double-free a chunk).
+//
+// Addresses in the data zone are directly mapped to offsets in shadow zones.
+// When a segment of a small arena zone is allocated, the relevant portions of shadow
+// zones get initialized (and also accounted for as a system allocation).
+//
+// Shadow zones are memory-mapped with MAP_NORESERVE flag and are quite sparse.
+// These zones are omitted from core dumps due to their huge size and sparsity.
+
+// For each small rank i, gives max K such that 2^k <= SmallRankToSize[i].
+// Chunk pointer is mapped to its shadow image via GetShadowOffset helper.
+// Note that chunk size is not always a power of 2. To avoid costly integer division,
+// chunk pointer is translated by means of bitwise shift only (leaving some bytes
+// of shadow zones unused). This array provides the needed shifts.
+constexpr int SmallRankToLogSize[SmallRankCount] = {
+ 0,
+ 4, 5, 5, 6, 6, 7,
+ 7, 8, 8, 9, 9, 10, 10, 11,
+ 11, 12, 12, 13, 13, 14, 14, 15
+};
+
+enum class ESmallChunkState : ui8
+{
+ Spare = 0,
+ Allocated = 0x61, // a
+ Freed = 0x66 // f
+};
+
+class TSmallArenaAllocator
+{
+public:
+ TSmallArenaAllocator(EAllocationKind kind, size_t rank, uintptr_t dataZoneStart)
+ : Kind_(kind)
+ , Rank_(rank)
+ , LogSize_(SmallRankToLogSize[Rank_])
+ , ChunkSize_(SmallRankToSize[Rank_])
+ , DataZoneStart_(dataZoneStart)
+ , DataZoneAllocator_(DataZoneStart_, DataZoneStart_ + SmallZoneSize)
+ { }
+
+ size_t PullMany(void** batch, size_t maxCount)
+ {
+ size_t count;
+ while (true) {
+ count = TryAllocateFromCurrentExtent(batch, maxCount);
+ if (Y_LIKELY(count != 0)) {
+ break;
+ }
+ PopulateAnotherExtent();
+ }
+ return count;
+ }
+
+ void* Allocate(size_t size)
+ {
+ void* ptr;
+ auto count = PullMany(&ptr, 1);
+ YTALLOC_PARANOID_ASSERT(count == 1);
+ YTALLOC_PARANOID_ASSERT(PtrToSmallRank(ptr) == Rank_);
+ PoisonUninitializedRange(ptr, size);
+ UpdateChunkState(ptr, ESmallChunkState::Freed, ESmallChunkState::Allocated);
+ return ptr;
+ }
+
+ TMemoryTag GetAndResetMemoryTag(const void* ptr)
+ {
+ auto& tag = MemoryTagZoneStart_[GetShadowOffset(ptr)];
+ auto currentTag = tag;
+ tag = NullMemoryTag;
+ return currentTag;
+ }
+
+ void SetMemoryTag(void* ptr, TMemoryTag tag)
+ {
+ MemoryTagZoneStart_[GetShadowOffset(ptr)] = tag;
+ }
+
+ void UpdateChunkState(const void* ptr, ESmallChunkState expectedState, ESmallChunkState newState)
+ {
+#ifdef YTALLOC_NERVOUS
+ auto& state = ChunkStateZoneStart_[GetShadowOffset(ptr)];
+ auto actualState = state;
+ if (Y_UNLIKELY(actualState != expectedState)) {
+ char message[256];
+ sprintf(message, "Invalid small chunk state at %p: expected %" PRIx8 ", actual %" PRIx8,
+ ptr,
+ static_cast<ui8>(expectedState),
+ static_cast<ui8>(actualState));
+ YTALLOC_TRAP(message);
+ }
+ state = newState;
+#else
+ Y_UNUSED(ptr);
+ Y_UNUSED(expectedState);
+ Y_UNUSED(newState);
+#endif
+ }
+
+private:
+ size_t TryAllocateFromCurrentExtent(void** batch, size_t maxCount)
+ {
+ auto* oldPtr = CurrentPtr_.load();
+ if (Y_UNLIKELY(!oldPtr)) {
+ return 0;
+ }
+
+ auto* currentExtent = CurrentExtent_.load(std::memory_order_relaxed);
+ if (Y_UNLIKELY(!currentExtent)) {
+ return 0;
+ }
+
+ char* newPtr;
+ while (true) {
+ if (Y_UNLIKELY(oldPtr < currentExtent || oldPtr + ChunkSize_ + RightReadableAreaSize > currentExtent + SmallExtentSize)) {
+ return 0;
+ }
+
+ newPtr = std::min(
+ oldPtr + ChunkSize_ * maxCount,
+ currentExtent + SmallExtentSize);
+
+ auto* alignedNewPtr = AlignDownToSmallSegment(currentExtent, newPtr);
+ if (alignedNewPtr > oldPtr) {
+ newPtr = alignedNewPtr;
+ }
+
+ if (Y_LIKELY(CurrentPtr_.compare_exchange_weak(oldPtr, newPtr))) {
+ break;
+ }
+ }
+
+ auto* firstSegment = AlignUpToSmallSegment(currentExtent, oldPtr);
+ auto* nextSegment = AlignUpToSmallSegment(currentExtent, newPtr);
+ if (firstSegment != nextSegment) {
+ auto size = nextSegment - firstSegment;
+ MappedMemoryManager->PopulateReadOnly(firstSegment, size);
+
+ StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::BytesCommitted, Rank_, size);
+ StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::PagesCommitted, Rank_, size / PageSize);
+ if (Kind_ == EAllocationKind::Tagged) {
+ StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, size / ChunkSize_ * sizeof(TMemoryTag));
+ }
+#ifdef YTALLOC_NERVOUS
+ StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, size / ChunkSize_ * sizeof(ESmallChunkState));
+#endif
+ }
+
+ size_t count = 0;
+ while (oldPtr != newPtr) {
+ UpdateChunkState(oldPtr, ESmallChunkState::Spare, ESmallChunkState::Freed);
+
+ batch[count] = oldPtr;
+
+ oldPtr += ChunkSize_;
+ count++;
+ }
+ return count;
+ }
+
+ void PopulateAnotherExtent()
+ {
+ auto lockGuard = GuardWithTiming(ExtentLock_);
+
+ auto* currentPtr = CurrentPtr_.load();
+ auto* currentExtent = CurrentExtent_.load();
+
+ if (currentPtr && currentPtr + ChunkSize_ + RightReadableAreaSize <= currentExtent + SmallExtentSize) {
+ // No need for a new extent.
+ return;
+ }
+
+ auto* newExtent = static_cast<char*>(DataZoneAllocator_.Allocate(SmallExtentAllocSize, 0));
+
+ AllocateShadowZones();
+
+ YTALLOC_VERIFY(reinterpret_cast<uintptr_t>(newExtent) % SmallExtentAllocSize == 0);
+ CurrentPtr_ = CurrentExtent_ = newExtent;
+
+ StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::BytesMapped, Rank_, SmallExtentAllocSize);
+ StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::PagesMapped, Rank_, SmallExtentAllocSize / PageSize);
+ }
+
+private:
+ const EAllocationKind Kind_;
+ const size_t Rank_;
+ const size_t LogSize_;
+ const size_t ChunkSize_;
+ const uintptr_t DataZoneStart_;
+
+ TZoneAllocator DataZoneAllocator_;
+
+ bool ShadowZonesAllocated_ = false;
+ TMemoryTag* MemoryTagZoneStart_;
+#ifdef YTALLOC_NERVOUS
+ ESmallChunkState* ChunkStateZoneStart_;
+#endif
+
+ NThreading::TForkAwareSpinLock ExtentLock_;
+ std::atomic<char*> CurrentPtr_ = nullptr;
+ std::atomic<char*> CurrentExtent_ = nullptr;
+
+ size_t GetShadowOffset(const void* ptr)
+ {
+ return (reinterpret_cast<uintptr_t>(ptr) - DataZoneStart_) >> LogSize_;
+ }
+
+ void AllocateShadowZones()
+ {
+ if (ShadowZonesAllocated_) {
+ return;
+ }
+
+ if (Kind_ == EAllocationKind::Tagged) {
+ MemoryTagZoneStart_ = MapShadowZone<TMemoryTag>();
+ }
+#ifdef YTALLOC_NERVOUS
+ ChunkStateZoneStart_ = MapShadowZone<ESmallChunkState>();
+#endif
+
+ ShadowZonesAllocated_ = true;
+ }
+
+ template <class T>
+ T* MapShadowZone()
+ {
+ auto size = AlignUp((SmallZoneSize >> LogSize_) * sizeof (T), PageSize);
+ auto* ptr = static_cast<T*>(MappedMemoryManager->Map(SystemZoneStart, size, MAP_NORESERVE));
+ MappedMemoryManager->DontDump(ptr, size);
+ return ptr;
+ }
+};
+
+TExplicitlyConstructableSingleton<TEnumIndexedVector<EAllocationKind, std::array<TExplicitlyConstructableSingleton<TSmallArenaAllocator>, SmallRankCount>>> SmallArenaAllocators;
+
+////////////////////////////////////////////////////////////////////////////////
+
+constexpr size_t ChunksPerGroup = 128;
+constexpr size_t GroupsBatchSize = 1024;
+
+static_assert(ChunksPerGroup <= MaxCachedChunksPerRank, "ChunksPerGroup > MaxCachedChunksPerRank");
+
+class TChunkGroup
+ : public TFreeListItem<TChunkGroup>
+{
+public:
+ bool IsEmpty() const
+ {
+ return Size_ == 0;
+ }
+
+ size_t ExtractAll(void** ptrs)
+ {
+ auto count = Size_;
+ ::memcpy(ptrs, Ptrs_.data(), count * sizeof(void*));
+ Size_ = 0;
+ return count;
+ }
+
+ void PutOne(void* ptr)
+ {
+ PutMany(&ptr, 1);
+ }
+
+ void PutMany(void** ptrs, size_t count)
+ {
+ YTALLOC_PARANOID_ASSERT(Size_ == 0);
+ YTALLOC_PARANOID_ASSERT(count <= ChunksPerGroup);
+ ::memcpy(Ptrs_.data(), ptrs, count * sizeof(void*));
+ Size_ = count;
+ }
+
+private:
+ size_t Size_ = 0; // <= ChunksPerGroup
+ std::array<void*, ChunksPerGroup> Ptrs_;
+};
+
+class TGlobalSmallChunkCache
+{
+public:
+ explicit TGlobalSmallChunkCache(EAllocationKind kind)
+ : Kind_(kind)
+ { }
+
+#ifdef YTALLOC_PARANOID
+ void CanonizeChunkPtrs(TThreadState* state, size_t rank)
+ {
+ auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank];
+
+ auto leftBorder = state->SmallBlobCache[Kind_].RankToCachedChunkLeftBorder[rank];
+ auto rightBorder = state->SmallBlobCache[Kind_].RankToCachedChunkRightBorder[rank];
+
+ state->SmallBlobCache[Kind_].CachedChunkFull[rank] = false;
+ if (chunkPtrPtr + 1 == rightBorder) {
+ chunkPtrPtr = leftBorder;
+ state->SmallBlobCache[Kind_].CachedChunkFull[rank] = true;
+ }
+
+ state->SmallBlobCache[Kind_].RankToCachedChunkPtrTail[rank] = leftBorder;
+ }
+#endif
+
+ bool TryMoveGroupToLocal(TThreadState* state, size_t rank)
+ {
+ auto& groups = RankToChunkGroups_[rank];
+ auto* group = groups.Extract(state);
+ if (!Y_LIKELY(group)) {
+ return false;
+ }
+
+ YTALLOC_PARANOID_ASSERT(!group->IsEmpty());
+
+ auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank];
+#ifdef YTALLOC_PARANOID
+ chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkLeftBorder[rank];
+ state->SmallBlobCache[Kind_].RankToCachedChunkPtrTail[rank] = chunkPtrPtr;
+#endif
+ auto chunkCount = group->ExtractAll(chunkPtrPtr + 1);
+ chunkPtrPtr += chunkCount;
+
+#ifdef YTALLOC_PARANOID
+ CanonizeChunkPtrs(state, rank);
+#endif
+ GroupPool_.Free(state, group);
+ return true;
+ }
+
+ void MoveGroupToGlobal(TThreadState* state, size_t rank)
+ {
+ auto* group = GroupPool_.Allocate(state);
+
+ auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank];
+ YTALLOC_PARANOID_ASSERT(*(chunkPtrPtr + 1) == reinterpret_cast<void*>(TThreadState::RightSentinel));
+ group->PutMany(chunkPtrPtr - ChunksPerGroup + 1, ChunksPerGroup);
+ chunkPtrPtr -= ChunksPerGroup;
+#ifdef YTALLOC_PARANOID
+ ::memset(chunkPtrPtr + 1, 0, sizeof(void*) * ChunksPerGroup);
+ CanonizeChunkPtrs(state, rank);
+#endif
+
+ auto& groups = RankToChunkGroups_[rank];
+ YTALLOC_PARANOID_ASSERT(!group->IsEmpty());
+ groups.Put(state, group);
+ }
+
+ void MoveOneToGlobal(void* ptr, size_t rank)
+ {
+ auto* group = GroupPool_.Allocate(&GlobalShardedState_);
+ group->PutOne(ptr);
+
+ auto& groups = RankToChunkGroups_[rank];
+ YTALLOC_PARANOID_ASSERT(!group->IsEmpty());
+ groups.Put(&GlobalShardedState_, group);
+ }
+
+#ifdef YTALLOC_PARANOID
+ void MoveAllToGlobal(TThreadState* state, size_t rank)
+ {
+ auto leftSentinelBorder = state->SmallBlobCache[Kind_].RankToCachedChunkLeftBorder[rank];
+ auto rightSentinelBorder = state->SmallBlobCache[Kind_].RankToCachedChunkRightBorder[rank];
+
+ auto& headPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank];
+ auto& tailPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrTail[rank];
+
+ if (tailPtr == headPtr && !state->SmallBlobCache[Kind_].CachedChunkFull[rank]) {
+ headPtr = leftSentinelBorder;
+ return;
+ }
+
+ // (leftBorder, rightBorder]
+ auto moveIntervalToGlobal = [=] (void** leftBorder, void** rightBorder) {
+ while (true) {
+ size_t count = 0;
+ while (count < ChunksPerGroup && rightBorder != leftBorder) {
+ --rightBorder;
+ ++count;
+ }
+
+ if (count == 0) {
+ break;
+ }
+
+ auto* group = GroupPool_.Allocate(state);
+ group->PutMany(rightBorder + 1, count);
+ ::memset(rightBorder + 1, 0, sizeof(void*) * count);
+ auto& groups = RankToChunkGroups_[rank];
+ groups.Put(state, group);
+ }
+ };
+
+ if (tailPtr >= headPtr) {
+ moveIntervalToGlobal(tailPtr, rightSentinelBorder - 1);
+ moveIntervalToGlobal(leftSentinelBorder, headPtr);
+ } else {
+ moveIntervalToGlobal(tailPtr, headPtr);
+ }
+
+ headPtr = leftSentinelBorder;
+ }
+#else
+ void MoveAllToGlobal(TThreadState* state, size_t rank)
+ {
+ auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank];
+ while (true) {
+ size_t count = 0;
+ while (count < ChunksPerGroup && *chunkPtrPtr != reinterpret_cast<void*>(TThreadState::LeftSentinel)) {
+ --chunkPtrPtr;
+ ++count;
+ }
+
+ if (count == 0) {
+ break;
+ }
+
+ auto* group = GroupPool_.Allocate(state);
+ group->PutMany(chunkPtrPtr + 1, count);
+ auto& groups = RankToChunkGroups_[rank];
+ groups.Put(state, group);
+ }
+ }
+#endif
+
+private:
+ const EAllocationKind Kind_;
+
+ TGlobalShardedState GlobalShardedState_;
+ TShardedSystemPool<TChunkGroup, GroupsBatchSize> GroupPool_;
+ std::array<TShardedFreeList<TChunkGroup>, SmallRankCount> RankToChunkGroups_;
+};
+
+TExplicitlyConstructableSingleton<TEnumIndexedVector<EAllocationKind, TExplicitlyConstructableSingleton<TGlobalSmallChunkCache>>> GlobalSmallChunkCaches;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSmallAllocator
+{
+public:
+ template <EAllocationKind Kind>
+ static Y_FORCE_INLINE void* Allocate(TMemoryTag tag, size_t rank)
+ {
+ auto* state = TThreadManager::FindThreadState();
+ if (Y_LIKELY(state)) {
+ return Allocate<Kind>(tag, rank, state);
+ }
+ auto size = SmallRankToSize[rank];
+ return AllocateGlobal<Kind>(tag, rank, size);
+ }
+
+#ifdef YTALLOC_PARANOID
+ template <EAllocationKind Kind>
+ static Y_FORCE_INLINE void* Allocate(TMemoryTag tag, size_t rank, TThreadState* state)
+ {
+ auto& localCache = state->SmallBlobCache[Kind];
+ auto& allocator = *(*SmallArenaAllocators)[Kind][rank];
+
+ size_t size = SmallRankToSize[rank];
+ StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesAllocated, size);
+
+ auto leftBorder = localCache.RankToCachedChunkLeftBorder[rank];
+ auto rightBorder = localCache.RankToCachedChunkRightBorder[rank];
+
+ void* result;
+ while (true) {
+ auto& chunkHeadPtr = localCache.RankToCachedChunkPtrHead[rank];
+ auto& cachedHeadPtr = *(chunkHeadPtr + 1);
+ auto* headPtr = cachedHeadPtr;
+
+ auto& chunkTailPtr = localCache.RankToCachedChunkPtrTail[rank];
+ auto& cachedTailPtr = *(chunkTailPtr + 1);
+ auto* tailPtr = cachedTailPtr;
+
+ auto& chunkFull = localCache.CachedChunkFull[rank];
+
+ if (Y_LIKELY(chunkFull || headPtr != tailPtr)) {
+ YTALLOC_PARANOID_ASSERT(tailPtr);
+ cachedTailPtr = nullptr;
+ ++chunkTailPtr;
+ if (Y_LIKELY(chunkTailPtr + 1 == rightBorder)) {
+ chunkTailPtr = leftBorder;
+ }
+
+ chunkFull = false;
+ result = tailPtr;
+ PoisonUninitializedRange(result, size);
+ allocator.UpdateChunkState(result, ESmallChunkState::Freed, ESmallChunkState::Allocated);
+ break;
+ }
+
+ auto& globalCache = *(*GlobalSmallChunkCaches)[Kind];
+ if (!globalCache.TryMoveGroupToLocal(state, rank)) {
+ result = allocator.Allocate(size);
+ break;
+ }
+ }
+
+ if constexpr(Kind == EAllocationKind::Tagged) {
+ allocator.SetMemoryTag(result, tag);
+ }
+
+ return result;
+ }
+
+ template <EAllocationKind Kind>
+ static Y_FORCE_INLINE void Free(void* ptr)
+ {
+ auto rank = PtrToSmallRank(ptr);
+ auto size = SmallRankToSize[rank];
+
+ auto& allocator = *(*SmallArenaAllocators)[Kind][rank];
+
+ auto tag = NullMemoryTag;
+ if constexpr(Kind == EAllocationKind::Tagged) {
+ tag = allocator.GetAndResetMemoryTag(ptr);
+ YTALLOC_PARANOID_ASSERT(tag != NullMemoryTag);
+ }
+
+ allocator.UpdateChunkState(ptr, ESmallChunkState::Allocated, ESmallChunkState::Freed);
+ PoisonFreedRange(ptr, size);
+
+ auto* state = TThreadManager::FindThreadState();
+ if (Y_UNLIKELY(!state)) {
+ FreeGlobal<Kind>(tag, ptr, rank, size);
+ return;
+ }
+
+ StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesFreed, size);
+
+ auto& localCache = state->SmallBlobCache[Kind];
+
+ auto leftBorder = localCache.RankToCachedChunkLeftBorder[rank];
+ auto rightBorder = localCache.RankToCachedChunkRightBorder[rank];
+
+ while (true) {
+ auto& chunkHeadPtr = localCache.RankToCachedChunkPtrHead[rank];
+ auto& headPtr = *(chunkHeadPtr + 1);
+
+ auto& chunkTailPtr = localCache.RankToCachedChunkPtrTail[rank];
+ auto& chunkFull = localCache.CachedChunkFull[rank];
+
+ if (Y_LIKELY(!chunkFull)) {
+ headPtr = ptr;
+ ++chunkHeadPtr;
+ if (Y_LIKELY(chunkHeadPtr + 1 == rightBorder)) {
+ chunkHeadPtr = leftBorder;
+ }
+ chunkFull = (chunkHeadPtr == chunkTailPtr);
+ break;
+ }
+
+ chunkHeadPtr = rightBorder - 1;
+ chunkTailPtr = leftBorder;
+
+ auto& globalCache = *(*GlobalSmallChunkCaches)[Kind];
+ globalCache.MoveGroupToGlobal(state, rank);
+ }
+ }
+
+#else
+
+ template <EAllocationKind Kind>
+ static Y_FORCE_INLINE void* Allocate(TMemoryTag tag, size_t rank, TThreadState* state)
+ {
+ size_t size = SmallRankToSize[rank];
+ StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesAllocated, size);
+
+ auto& localCache = state->SmallBlobCache[Kind];
+ auto& allocator = *(*SmallArenaAllocators)[Kind][rank];
+
+ void* result;
+ while (true) {
+ auto& chunkPtr = localCache.RankToCachedChunkPtrHead[rank];
+ auto& cachedPtr = *chunkPtr;
+ auto* ptr = cachedPtr;
+ if (Y_LIKELY(ptr != reinterpret_cast<void*>(TThreadState::LeftSentinel))) {
+ --chunkPtr;
+ result = ptr;
+ allocator.UpdateChunkState(result, ESmallChunkState::Freed, ESmallChunkState::Allocated);
+ PoisonUninitializedRange(result, size);
+ break;
+ }
+
+ auto& globalCache = *(*GlobalSmallChunkCaches)[Kind];
+ if (globalCache.TryMoveGroupToLocal(state, rank)) {
+ continue;
+ }
+
+ auto count = allocator.PullMany(
+ chunkPtr + 1,
+ SmallRankBatchSize[rank]);
+ chunkPtr += count;
+ }
+
+ if constexpr(Kind == EAllocationKind::Tagged) {
+ allocator.SetMemoryTag(result, tag);
+ }
+
+ return result;
+ }
+
+ template <EAllocationKind Kind>
+ static Y_FORCE_INLINE void Free(void* ptr)
+ {
+ auto rank = PtrToSmallRank(ptr);
+ auto size = SmallRankToSize[rank];
+
+ auto& allocator = *(*SmallArenaAllocators)[Kind][rank];
+
+ auto tag = NullMemoryTag;
+ if constexpr(Kind == EAllocationKind::Tagged) {
+ tag = allocator.GetAndResetMemoryTag(ptr);
+ YTALLOC_PARANOID_ASSERT(tag != NullMemoryTag);
+ }
+
+ allocator.UpdateChunkState(ptr, ESmallChunkState::Allocated, ESmallChunkState::Freed);
+ PoisonFreedRange(ptr, size);
+
+ auto* state = TThreadManager::FindThreadState();
+ if (Y_UNLIKELY(!state)) {
+ FreeGlobal<Kind>(tag, ptr, rank, size);
+ return;
+ }
+
+ StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesFreed, size);
+
+ auto& localCache = state->SmallBlobCache[Kind];
+
+ while (true) {
+ auto& chunkPtrPtr = localCache.RankToCachedChunkPtrHead[rank];
+ auto& chunkPtr = *(chunkPtrPtr + 1);
+ if (Y_LIKELY(chunkPtr != reinterpret_cast<void*>(TThreadState::RightSentinel))) {
+ chunkPtr = ptr;
+ ++chunkPtrPtr;
+ break;
+ }
+
+ auto& globalCache = *(*GlobalSmallChunkCaches)[Kind];
+ globalCache.MoveGroupToGlobal(state, rank);
+ }
+ }
+#endif
+
+ static size_t GetAllocationSize(const void* ptr)
+ {
+ return SmallRankToSize[PtrToSmallRank(ptr)];
+ }
+
+ static size_t GetAllocationSize(size_t size)
+ {
+ return SmallRankToSize[SizeToSmallRank(size)];
+ }
+
+ static void PurgeCaches()
+ {
+ DoPurgeCaches<EAllocationKind::Untagged>();
+ DoPurgeCaches<EAllocationKind::Tagged>();
+ }
+
+private:
+ template <EAllocationKind Kind>
+ static void DoPurgeCaches()
+ {
+ auto* state = TThreadManager::GetThreadStateChecked();
+ for (size_t rank = 0; rank < SmallRankCount; ++rank) {
+ (*GlobalSmallChunkCaches)[Kind]->MoveAllToGlobal(state, rank);
+ }
+ }
+
+ template <EAllocationKind Kind>
+ static void* AllocateGlobal(TMemoryTag tag, size_t rank, size_t size)
+ {
+ StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesAllocated, size);
+
+ auto& allocator = *(*SmallArenaAllocators)[Kind][rank];
+ auto* result = allocator.Allocate(size);
+
+ if constexpr(Kind == EAllocationKind::Tagged) {
+ allocator.SetMemoryTag(result, tag);
+ }
+
+ return result;
+ }
+
+ template <EAllocationKind Kind>
+ static void FreeGlobal(TMemoryTag tag, void* ptr, size_t rank, size_t size)
+ {
+ StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesFreed, size);
+
+ auto& globalCache = *(*GlobalSmallChunkCaches)[Kind];
+ globalCache.MoveOneToGlobal(ptr, rank);
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+// Large blob allocator
+//
+// Like for small chunks, large blobs are grouped into arenas, where arena K handles
+// blobs of size (2^{K-1},2^K]. Memory is mapped in extents of LargeExtentSize bytes.
+// Each extent is split into segments of size 2^K (here segment is just a memory region, which may fully consist of
+// unmapped pages). When a segment is actually allocated, it becomes a blob and a TLargeBlobHeader
+// structure is placed at its start.
+//
+// When an extent is allocated, it is sliced into segments (not blobs, since no headers are placed and
+// no memory is touched). These segments are put into disposed segments list.
+//
+// For each blob two separate sizes are maintained: BytesAcquired indicates the number of bytes
+// acquired via madvise(MADV_POPULATE) from the system; BytesAllocated (<= BytesAcquired) corresponds
+// to the number of bytes claimed by the user (including the header and page size alignment).
+// If BytesAllocated == 0 then this blob is spare, i.e.
+// was freed and remains cached for further possible reuse.
+//
+// When a new blob is being allocated, the allocator first tries to extract a spare blob. On success,
+// its acquired size is extended (if needed); the acquired size never shrinks on allocation.
+// If no spare blobs exist, a disposed segment is extracted and is turned into a blob (i.e.
+// its header is initialized) and the needed number of bytes is acquired. If no disposed segments
+// exist, then a new extent is allocated and sliced into segments.
+//
+// The above algorithm only claims memory from the system (by means of madvise(MADV_POPULATE));
+// the reclaim is handled by a separate background mechanism. Two types of reclaimable memory
+// regions are possible:
+// * spare: these correspond to spare blobs; upon reclaiming this region becomes a disposed segment
+// * overhead: these correspond to trailing parts of allocated blobs in [BytesAllocated, BytesAcquired) byte range
+//
+// Reclaiming spare blobs is easy as these are explicitly tracked by spare blob lists. To reclaim,
+// we atomically extract a blob from a spare list, call madvise(MADV_FREE), and put the pointer to
+// the disposed segment list.
+//
+// Reclaiming overheads is more complicated since (a) allocated blobs are never tracked directly and
+// (b) reclaiming them may interfere with Allocate and Free.
+//
+// To overcome (a), for each extent we maintain a bitmap marking segments that are actually blobs
+// (i.e. contain a header). (For simplicity and efficiency this bitmap is just a vector of bytes.)
+// These flags are updated in Allocate/Free with appropriate memory ordering. Note that
+// blobs are only disposed (and are turned into segments) by the background thread; if this
+// thread discovers a segment that is marked as a blob, then it is safe to assume that this segment
+// remains a blob unless the thread disposes it.
+//
+// To overcome (b), each large blob header maintains a spin lock. When blob B is extracted
+// from a spare list in Allocate, an acquisition is tried. If successful, B is returned to the
+// user. Otherwise it is assumed that B is currently being examined by the background
+// reclaimer thread. Allocate then skips this blob and retries extraction; the problem is that
+// since the spare list is basically a stack one cannot just push B back into the spare list.
+// Instead, B is pushed into a special locked spare list. This list is purged by the background
+// thread on each tick and its items are pushed back into the usual spare list.
+//
+// A similar trick is used by Free: when invoked for blob B its spin lock acquisition is first
+// tried. Upon success, B is moved to the spare list. On failure, Free has to postpone this deallocation
+// by moving B into the freed locked list. This list, similarly, is being purged by the background thread.
+//
+// It remains to explain how the background thread computes the number of bytes to be reclaimed from
+// each arena. To this aim, we first compute the total number of reclaimable bytes.
+// This is the sum of spare and overhead bytes in all arenas minus the number of unreclaimable bytes
+// The latter grows linearly in the number of used bytes and is capped from below by a MinUnreclaimableLargeBytes;
+// and from above by MaxUnreclaimableLargeBytes. SetLargeUnreclaimableCoeff and Set(Min|Max)LargeUnreclaimableBytes
+// enable tuning these control knobs. The reclaimable bytes are being taken from arenas starting from those
+// with the largest spare and overhead volumes.
+//
+// The above implies that each large blob contains a fixed-size header preceeding it.
+// Hence ptr % PageSize == sizeof (TLargeBlobHeader) for each ptr returned by Allocate
+// (since large blob sizes are larger than PageSize and are divisible by PageSize).
+// For AllocatePageAligned, however, ptr must be divisible by PageSize. To handle such an allocation, we
+// artificially increase its size and align the result of Allocate up to the next page boundary.
+// When handling a deallocation, ptr is moved back by UnalignPtr (which is capable of dealing
+// with both the results of Allocate and AllocatePageAligned).
+// This technique applies to both large and huge blobs.
+
+enum ELargeBlobState : ui64
+{
+ Allocated = 0x6c6c61656772616cULL, // largeall
+ Spare = 0x727073656772616cULL, // largespr
+ LockedSpare = 0x70736c656772616cULL, // largelsp
+ LockedFreed = 0x72666c656772616cULL // largelfr
+};
+
+// Every large blob (either tagged or not) is prepended with this header.
+struct TLargeBlobHeader
+ : public TFreeListItem<TLargeBlobHeader>
+{
+ TLargeBlobHeader(
+ TLargeBlobExtent* extent,
+ size_t bytesAcquired,
+ size_t bytesAllocated,
+ TMemoryTag tag)
+ : Extent(extent)
+ , BytesAcquired(bytesAcquired)
+ , Tag(tag)
+ , BytesAllocated(bytesAllocated)
+ , State(ELargeBlobState::Allocated)
+ { }
+
+ TLargeBlobExtent* Extent;
+ // Number of bytes in all acquired pages.
+ size_t BytesAcquired;
+ std::atomic<bool> Locked = false;
+ TMemoryTag Tag = NullMemoryTag;
+ // For spare blobs this is zero.
+ // For allocated blobs this is the number of bytes requested by user (not including header of any alignment).
+ size_t BytesAllocated;
+ ELargeBlobState State;
+ char Padding[12];
+};
+
+CHECK_HEADER_ALIGNMENT(TLargeBlobHeader)
+
+struct TLargeBlobExtent
+{
+ TLargeBlobExtent(size_t segmentCount, char* ptr)
+ : SegmentCount(segmentCount)
+ , Ptr(ptr)
+ { }
+
+ size_t SegmentCount;
+ char* Ptr;
+ TLargeBlobExtent* NextExtent = nullptr;
+
+ std::atomic<bool> DisposedFlags[0];
+};
+
+// A helper node that enables storing a number of extent's segments
+// in a free list. Recall that segments themselves do not posses any headers.
+struct TDisposedSegment
+ : public TFreeListItem<TDisposedSegment>
+{
+ size_t Index;
+ TLargeBlobExtent* Extent;
+};
+
+struct TLargeArena
+{
+ size_t Rank = 0;
+ size_t SegmentSize = 0;
+
+ TShardedFreeList<TLargeBlobHeader> SpareBlobs;
+ TFreeList<TLargeBlobHeader> LockedSpareBlobs;
+ TFreeList<TLargeBlobHeader> LockedFreedBlobs;
+ TFreeList<TDisposedSegment> DisposedSegments;
+ std::atomic<TLargeBlobExtent*> FirstExtent = nullptr;
+
+ TLargeBlobExtent* CurrentOverheadScanExtent = nullptr;
+ size_t CurrentOverheadScanSegment = 0;
+};
+
+template <bool Dumpable>
+class TLargeBlobAllocator
+{
+public:
+ TLargeBlobAllocator()
+ : ZoneAllocator_(LargeZoneStart(Dumpable), LargeZoneEnd(Dumpable))
+ {
+ for (size_t rank = 0; rank < Arenas_.size(); ++rank) {
+ auto& arena = Arenas_[rank];
+ arena.Rank = rank;
+ arena.SegmentSize = (1ULL << rank);
+ }
+ }
+
+ void* Allocate(size_t size)
+ {
+ auto* state = TThreadManager::FindThreadState();
+ return Y_LIKELY(state)
+ ? DoAllocate(state, size)
+ : DoAllocate(GlobalState.Get(), size);
+ }
+
+ void Free(void* ptr)
+ {
+ auto* state = TThreadManager::FindThreadState();
+ if (Y_LIKELY(state)) {
+ DoFree(state, ptr);
+ } else {
+ DoFree(GlobalState.Get(), ptr);
+ }
+ }
+
+ static size_t GetAllocationSize(const void* ptr)
+ {
+ UnalignPtr<TLargeBlobHeader>(ptr);
+ const auto* blob = PtrToHeader<TLargeBlobHeader>(ptr);
+ return blob->BytesAllocated;
+ }
+
+ static size_t GetAllocationSize(size_t size)
+ {
+ return GetBlobAllocationSize<TLargeBlobHeader>(size);
+ }
+
+ void RunBackgroundTasks()
+ {
+ ReinstallLockedBlobs();
+ ReclaimMemory();
+ }
+
+ void SetBacktraceProvider(TBacktraceProvider provider)
+ {
+ BacktraceProvider_.store(provider);
+ }
+
+private:
+ template <class TState>
+ void PopulateArenaPages(TState* state, TLargeArena* arena, void* ptr, size_t size)
+ {
+ MappedMemoryManager->Populate(ptr, size);
+ StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesPopulated, size);
+ StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesPopulated, size / PageSize);
+ StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesCommitted, size);
+ StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesCommitted, size / PageSize);
+ }
+
+ template <class TState>
+ void ReleaseArenaPages(TState* state, TLargeArena* arena, void* ptr, size_t size)
+ {
+ MappedMemoryManager->Release(ptr, size);
+ StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesReleased, size);
+ StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesReleased, size / PageSize);
+ StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesCommitted, -size);
+ StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesCommitted, -size / PageSize);
+ }
+
+ bool TryLockBlob(TLargeBlobHeader* blob)
+ {
+ bool expected = false;
+ return blob->Locked.compare_exchange_strong(expected, true);
+ }
+
+ void UnlockBlob(TLargeBlobHeader* blob)
+ {
+ blob->Locked.store(false);
+ }
+
+ template <class TState>
+ void MoveBlobToSpare(TState* state, TLargeArena* arena, TLargeBlobHeader* blob, bool unlock)
+ {
+ auto rank = arena->Rank;
+ auto size = blob->BytesAllocated;
+ auto rawSize = GetRawBlobSize<TLargeBlobHeader>(size);
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesSpare, blob->BytesAcquired);
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesOverhead, -(blob->BytesAcquired - rawSize));
+ blob->BytesAllocated = 0;
+ if (unlock) {
+ UnlockBlob(blob);
+ } else {
+ YTALLOC_VERIFY(!blob->Locked.load());
+ }
+ blob->State = ELargeBlobState::Spare;
+ arena->SpareBlobs.Put(state, blob);
+ }
+
+ size_t GetBytesToReclaim(const std::array<TLocalLargeCounters, LargeRankCount>& arenaCounters)
+ {
+ size_t totalBytesAllocated = 0;
+ size_t totalBytesFreed = 0;
+ size_t totalBytesSpare = 0;
+ size_t totalBytesOverhead = 0;
+ for (size_t rank = 0; rank < Arenas_.size(); ++rank) {
+ const auto& counters = arenaCounters[rank];
+ totalBytesAllocated += counters[ELargeArenaCounter::BytesAllocated];
+ totalBytesFreed += counters[ELargeArenaCounter::BytesFreed];
+ totalBytesSpare += counters[ELargeArenaCounter::BytesSpare];
+ totalBytesOverhead += counters[ELargeArenaCounter::BytesOverhead];
+ }
+
+ auto totalBytesUsed = totalBytesAllocated - totalBytesFreed;
+ auto totalBytesReclaimable = totalBytesSpare + totalBytesOverhead;
+
+ auto threshold = ClampVal(
+ static_cast<size_t>(ConfigurationManager->GetLargeUnreclaimableCoeff() * totalBytesUsed),
+ ConfigurationManager->GetMinLargeUnreclaimableBytes(),
+ ConfigurationManager->GetMaxLargeUnreclaimableBytes());
+ if (totalBytesReclaimable < threshold) {
+ return 0;
+ }
+
+ auto bytesToReclaim = totalBytesReclaimable - threshold;
+ return AlignUp(bytesToReclaim, PageSize);
+ }
+
+ void ReinstallLockedSpareBlobs(TLargeArena* arena)
+ {
+ auto* blob = arena->LockedSpareBlobs.ExtractAll();
+ auto* state = TThreadManager::GetThreadStateChecked();
+
+ size_t count = 0;
+ while (blob) {
+ auto* nextBlob = blob->Next;
+ YTALLOC_VERIFY(!blob->Locked.load());
+ AssertBlobState(blob, ELargeBlobState::LockedSpare);
+ blob->State = ELargeBlobState::Spare;
+ arena->SpareBlobs.Put(state, blob);
+ blob = nextBlob;
+ ++count;
+ }
+
+ if (count > 0) {
+ YTALLOC_LOG_DEBUG("Locked spare blobs reinstalled (Rank: %d, Blobs: %zu)",
+ arena->Rank,
+ count);
+ }
+ }
+
+ void ReinstallLockedFreedBlobs(TLargeArena* arena)
+ {
+ auto* state = TThreadManager::GetThreadStateChecked();
+ auto* blob = arena->LockedFreedBlobs.ExtractAll();
+
+ size_t count = 0;
+ while (blob) {
+ auto* nextBlob = blob->Next;
+ AssertBlobState(blob, ELargeBlobState::LockedFreed);
+ MoveBlobToSpare(state, arena, blob, false);
+ ++count;
+ blob = nextBlob;
+ }
+
+ if (count > 0) {
+ YTALLOC_LOG_DEBUG("Locked freed blobs reinstalled (Rank: %d, Blobs: %zu)",
+ arena->Rank,
+ count);
+ }
+ }
+
+ void ReclaimSpareMemory(TLargeArena* arena, ssize_t bytesToReclaim)
+ {
+ if (bytesToReclaim <= 0) {
+ return;
+ }
+
+ auto rank = arena->Rank;
+ auto* state = TThreadManager::GetThreadStateChecked();
+
+ YTALLOC_LOG_DEBUG("Started processing spare memory in arena (BytesToReclaim: %zdM, Rank: %d)",
+ bytesToReclaim / 1_MB,
+ rank);
+
+ size_t bytesReclaimed = 0;
+ size_t blobsReclaimed = 0;
+ while (bytesToReclaim > 0) {
+ auto* blob = arena->SpareBlobs.ExtractRoundRobin(state);
+ if (!blob) {
+ break;
+ }
+
+ AssertBlobState(blob, ELargeBlobState::Spare);
+ YTALLOC_VERIFY(blob->BytesAllocated == 0);
+
+ auto bytesAcquired = blob->BytesAcquired;
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesSpare, -bytesAcquired);
+ bytesToReclaim -= bytesAcquired;
+ bytesReclaimed += bytesAcquired;
+ blobsReclaimed += 1;
+
+ auto* extent = blob->Extent;
+ auto* ptr = reinterpret_cast<char*>(blob);
+ ReleaseArenaPages(
+ state,
+ arena,
+ ptr,
+ bytesAcquired);
+
+ size_t segmentIndex = (ptr - extent->Ptr) / arena->SegmentSize;
+ extent->DisposedFlags[segmentIndex].store(true, std::memory_order_relaxed);
+
+ auto* disposedSegment = DisposedSegmentPool_.Allocate();
+ disposedSegment->Index = segmentIndex;
+ disposedSegment->Extent = extent;
+ arena->DisposedSegments.Put(disposedSegment);
+ }
+
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::SpareBytesReclaimed, bytesReclaimed);
+
+ YTALLOC_LOG_DEBUG("Finished processing spare memory in arena (Rank: %d, BytesReclaimed: %zdM, BlobsReclaimed: %zu)",
+ arena->Rank,
+ bytesReclaimed / 1_MB,
+ blobsReclaimed);
+ }
+
+ void ReclaimOverheadMemory(TLargeArena* arena, ssize_t bytesToReclaim)
+ {
+ if (bytesToReclaim == 0) {
+ return;
+ }
+
+ auto* state = TThreadManager::GetThreadStateChecked();
+ auto rank = arena->Rank;
+
+ YTALLOC_LOG_DEBUG("Started processing overhead memory in arena (BytesToReclaim: %zdM, Rank: %d)",
+ bytesToReclaim / 1_MB,
+ rank);
+
+ size_t extentsTraversed = 0;
+ size_t segmentsTraversed = 0;
+ size_t bytesReclaimed = 0;
+
+ bool restartedFromFirstExtent = false;
+ auto& currentExtent = arena->CurrentOverheadScanExtent;
+ auto& currentSegment = arena->CurrentOverheadScanSegment;
+ while (bytesToReclaim > 0) {
+ if (!currentExtent) {
+ if (restartedFromFirstExtent) {
+ break;
+ }
+ currentExtent = arena->FirstExtent.load();
+ if (!currentExtent) {
+ break;
+ }
+ restartedFromFirstExtent = true;
+ }
+
+ while (currentSegment < currentExtent->SegmentCount && bytesToReclaim > 0) {
+ ++segmentsTraversed;
+ if (!currentExtent->DisposedFlags[currentSegment].load(std::memory_order_acquire)) {
+ auto* ptr = currentExtent->Ptr + currentSegment * arena->SegmentSize;
+ auto* blob = reinterpret_cast<TLargeBlobHeader*>(ptr);
+ YTALLOC_PARANOID_ASSERT(blob->Extent == currentExtent);
+ if (TryLockBlob(blob)) {
+ if (blob->BytesAllocated > 0) {
+ size_t rawSize = GetRawBlobSize<TLargeBlobHeader>(blob->BytesAllocated);
+ size_t bytesToRelease = blob->BytesAcquired - rawSize;
+ if (bytesToRelease > 0) {
+ ReleaseArenaPages(
+ state,
+ arena,
+ ptr + blob->BytesAcquired - bytesToRelease,
+ bytesToRelease);
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesOverhead, -bytesToRelease);
+ blob->BytesAcquired = rawSize;
+ bytesToReclaim -= bytesToRelease;
+ bytesReclaimed += bytesToRelease;
+ }
+ }
+ UnlockBlob(blob);
+ }
+ }
+ ++currentSegment;
+ }
+
+ ++extentsTraversed;
+ currentSegment = 0;
+ currentExtent = currentExtent->NextExtent;
+ }
+
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::OverheadBytesReclaimed, bytesReclaimed);
+
+ YTALLOC_LOG_DEBUG("Finished processing overhead memory in arena (Rank: %d, Extents: %zu, Segments: %zu, BytesReclaimed: %zuM)",
+ arena->Rank,
+ extentsTraversed,
+ segmentsTraversed,
+ bytesReclaimed / 1_MB);
+ }
+
+ void ReinstallLockedBlobs()
+ {
+ for (auto& arena : Arenas_) {
+ ReinstallLockedSpareBlobs(&arena);
+ ReinstallLockedFreedBlobs(&arena);
+ }
+ }
+
+ void ReclaimMemory()
+ {
+ auto arenaCounters = StatisticsManager->GetLargeArenaAllocationCounters();
+ ssize_t bytesToReclaim = GetBytesToReclaim(arenaCounters);
+ if (bytesToReclaim == 0) {
+ return;
+ }
+
+ YTALLOC_LOG_DEBUG("Memory reclaim started (BytesToReclaim: %zdM)",
+ bytesToReclaim / 1_MB);
+
+ std::array<ssize_t, LargeRankCount * 2> bytesReclaimablePerArena;
+ for (size_t rank = 0; rank < LargeRankCount; ++rank) {
+ bytesReclaimablePerArena[rank * 2] = arenaCounters[rank][ELargeArenaCounter::BytesOverhead];
+ bytesReclaimablePerArena[rank * 2 + 1] = arenaCounters[rank][ELargeArenaCounter::BytesSpare];
+ }
+
+ std::array<ssize_t, LargeRankCount * 2> bytesToReclaimPerArena{};
+ while (bytesToReclaim > 0) {
+ ssize_t maxBytes = std::numeric_limits<ssize_t>::min();
+ int maxIndex = -1;
+ for (int index = 0; index < LargeRankCount * 2; ++index) {
+ if (bytesReclaimablePerArena[index] > maxBytes) {
+ maxBytes = bytesReclaimablePerArena[index];
+ maxIndex = index;
+ }
+ }
+
+ if (maxIndex < 0) {
+ break;
+ }
+
+ auto bytesToReclaimPerStep = std::min<ssize_t>({bytesToReclaim, maxBytes, 4_MB});
+ if (bytesToReclaimPerStep < 0) {
+ break;
+ }
+
+ bytesToReclaimPerArena[maxIndex] += bytesToReclaimPerStep;
+ bytesReclaimablePerArena[maxIndex] -= bytesToReclaimPerStep;
+ bytesToReclaim -= bytesToReclaimPerStep;
+ }
+
+ for (auto& arena : Arenas_) {
+ auto rank = arena.Rank;
+ ReclaimOverheadMemory(&arena, bytesToReclaimPerArena[rank * 2]);
+ ReclaimSpareMemory(&arena, bytesToReclaimPerArena[rank * 2 + 1]);
+ }
+
+ YTALLOC_LOG_DEBUG("Memory reclaim finished");
+ }
+
+ template <class TState>
+ void AllocateArenaExtent(TState* state, TLargeArena* arena)
+ {
+ auto rank = arena->Rank;
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::ExtentsAllocated, 1);
+
+ size_t segmentCount = LargeExtentSize / arena->SegmentSize;
+ size_t extentHeaderSize = AlignUp(sizeof (TLargeBlobExtent) + sizeof (TLargeBlobExtent::DisposedFlags[0]) * segmentCount, PageSize);
+ size_t allocationSize = extentHeaderSize + LargeExtentSize;
+
+ auto* ptr = ZoneAllocator_.Allocate(allocationSize, MAP_NORESERVE);
+ if (!Dumpable) {
+ MappedMemoryManager->DontDump(ptr, allocationSize);
+ }
+
+ if (auto backtraceProvider = BacktraceProvider_.load()) {
+ std::array<void*, MaxAllocationProfilingBacktraceDepth> frames;
+ auto frameCount = backtraceProvider(
+ frames.data(),
+ MaxAllocationProfilingBacktraceDepth,
+ 3);
+ MmapObservationManager->EnqueueEvent(allocationSize, frames, frameCount);
+ }
+
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesMapped, allocationSize);
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::PagesMapped, allocationSize / PageSize);
+
+ auto* extent = static_cast<TLargeBlobExtent*>(ptr);
+ MappedMemoryManager->Populate(ptr, extentHeaderSize);
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesPopulated, extentHeaderSize);
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::PagesPopulated, extentHeaderSize / PageSize);
+ StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, extentHeaderSize);
+
+ new (extent) TLargeBlobExtent(segmentCount, static_cast<char*>(ptr) + extentHeaderSize);
+
+ for (size_t index = 0; index < segmentCount; ++index) {
+ auto* disposedSegment = DisposedSegmentPool_.Allocate();
+ disposedSegment->Index = index;
+ disposedSegment->Extent = extent;
+ arena->DisposedSegments.Put(disposedSegment);
+ extent->DisposedFlags[index].store(true);
+ }
+
+ auto* expectedFirstExtent = arena->FirstExtent.load();
+ do {
+ extent->NextExtent = expectedFirstExtent;
+ } while (Y_UNLIKELY(!arena->FirstExtent.compare_exchange_weak(expectedFirstExtent, extent)));
+ }
+
+ template <class TState>
+ void* DoAllocate(TState* state, size_t size)
+ {
+ auto rawSize = GetRawBlobSize<TLargeBlobHeader>(size);
+ auto rank = GetLargeRank(rawSize);
+ auto tag = ConfigurationManager->IsLargeArenaAllocationProfiled(rank)
+ ? BacktraceManager->GetMemoryTagFromBacktrace(3)
+ : TThreadManager::GetCurrentMemoryTag();
+ auto& arena = Arenas_[rank];
+ YTALLOC_PARANOID_ASSERT(rawSize <= arena.SegmentSize);
+
+ TLargeBlobHeader* blob;
+ while (true) {
+ blob = arena.SpareBlobs.Extract(state);
+ if (blob) {
+ AssertBlobState(blob, ELargeBlobState::Spare);
+ if (TryLockBlob(blob)) {
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesSpare, -blob->BytesAcquired);
+ if (blob->BytesAcquired < rawSize) {
+ PopulateArenaPages(
+ state,
+ &arena,
+ reinterpret_cast<char*>(blob) + blob->BytesAcquired,
+ rawSize - blob->BytesAcquired);
+ blob->BytesAcquired = rawSize;
+ } else {
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesOverhead, blob->BytesAcquired - rawSize);
+ }
+ YTALLOC_PARANOID_ASSERT(blob->BytesAllocated == 0);
+ blob->BytesAllocated = size;
+ blob->Tag = tag;
+ blob->State = ELargeBlobState::Allocated;
+ UnlockBlob(blob);
+ break;
+ } else {
+ blob->State = ELargeBlobState::LockedSpare;
+ arena.LockedSpareBlobs.Put(blob);
+ }
+ }
+
+ auto* disposedSegment = arena.DisposedSegments.Extract();
+ if (disposedSegment) {
+ auto index = disposedSegment->Index;
+ auto* extent = disposedSegment->Extent;
+ DisposedSegmentPool_.Free(disposedSegment);
+
+ auto* ptr = extent->Ptr + index * arena.SegmentSize;
+ PopulateArenaPages(
+ state,
+ &arena,
+ ptr,
+ rawSize);
+
+ blob = reinterpret_cast<TLargeBlobHeader*>(ptr);
+ new (blob) TLargeBlobHeader(extent, rawSize, size, tag);
+
+ extent->DisposedFlags[index].store(false, std::memory_order_release);
+
+ break;
+ }
+
+ AllocateArenaExtent(state, &arena);
+ }
+
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BlobsAllocated, 1);
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesAllocated, size);
+ StatisticsManager->IncrementTotalCounter(state, tag, EBasicCounter::BytesAllocated, size);
+ if (!Dumpable) {
+ StatisticsManager->IncrementUndumpableCounter(state, EUndumpableCounter::BytesAllocated, size);
+ }
+
+ auto* result = HeaderToPtr(blob);
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= LargeZoneStart(Dumpable) && reinterpret_cast<uintptr_t>(result) < LargeZoneEnd(Dumpable));
+ PoisonUninitializedRange(result, size);
+ return result;
+ }
+
+ template <class TState>
+ void DoFree(TState* state, void* ptr)
+ {
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= LargeZoneStart(Dumpable) && reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(Dumpable));
+
+ auto* blob = PtrToHeader<TLargeBlobHeader>(ptr);
+ AssertBlobState(blob, ELargeBlobState::Allocated);
+
+ auto size = blob->BytesAllocated;
+ PoisonFreedRange(ptr, size);
+
+ auto rawSize = GetRawBlobSize<TLargeBlobHeader>(size);
+ auto rank = GetLargeRank(rawSize);
+ auto& arena = Arenas_[rank];
+ YTALLOC_PARANOID_ASSERT(blob->BytesAcquired <= arena.SegmentSize);
+
+ auto tag = blob->Tag;
+
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BlobsFreed, 1);
+ StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesFreed, size);
+ StatisticsManager->IncrementTotalCounter(state, tag, EBasicCounter::BytesFreed, size);
+ if (!Dumpable) {
+ StatisticsManager->IncrementUndumpableCounter(state, EUndumpableCounter::BytesFreed, size);
+ }
+
+ if (TryLockBlob(blob)) {
+ MoveBlobToSpare(state, &arena, blob, true);
+ } else {
+ blob->State = ELargeBlobState::LockedFreed;
+ arena.LockedFreedBlobs.Put(blob);
+ }
+ }
+
+private:
+ TZoneAllocator ZoneAllocator_;
+ std::array<TLargeArena, LargeRankCount> Arenas_;
+
+ static constexpr size_t DisposedSegmentsBatchSize = 1024;
+ TSystemPool<TDisposedSegment, DisposedSegmentsBatchSize> DisposedSegmentPool_;
+
+ std::atomic<TBacktraceProvider> BacktraceProvider_ = nullptr;
+};
+
+TExplicitlyConstructableSingleton<TLargeBlobAllocator<true>> DumpableLargeBlobAllocator;
+TExplicitlyConstructableSingleton<TLargeBlobAllocator<false>> UndumpableLargeBlobAllocator;
+
+////////////////////////////////////////////////////////////////////////////////
+// Huge blob allocator
+//
+// Basically a wrapper for TZoneAllocator.
+
+// Acts as a signature to detect broken headers.
+enum class EHugeBlobState : ui64
+{
+ Allocated = 0x72666c656772616cULL // hugeallc
+};
+
+// Every huge blob (both tagged or not) is prepended with this header.
+struct THugeBlobHeader
+{
+ THugeBlobHeader(TMemoryTag tag, size_t size, bool dumpable)
+ : Tag(tag)
+ , Size(size)
+ , State(EHugeBlobState::Allocated)
+ , Dumpable(dumpable)
+ { }
+
+ TMemoryTag Tag;
+ size_t Size;
+ EHugeBlobState State;
+ bool Dumpable;
+ char Padding[7];
+};
+
+CHECK_HEADER_ALIGNMENT(THugeBlobHeader)
+
+class THugeBlobAllocator
+{
+public:
+ THugeBlobAllocator()
+ : ZoneAllocator_(HugeZoneStart, HugeZoneEnd)
+ { }
+
+ void* Allocate(size_t size, bool dumpable)
+ {
+ YTALLOC_VERIFY(size <= MaxAllocationSize);
+ auto tag = TThreadManager::GetCurrentMemoryTag();
+ auto rawSize = GetRawBlobSize<THugeBlobHeader>(size);
+ auto* blob = static_cast<THugeBlobHeader*>(ZoneAllocator_.Allocate(rawSize, MAP_POPULATE));
+ if (!dumpable) {
+ MappedMemoryManager->DontDump(blob, rawSize);
+ }
+ new (blob) THugeBlobHeader(tag, size, dumpable);
+
+ StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesAllocated, size);
+ StatisticsManager->IncrementHugeCounter(EHugeCounter::BlobsAllocated, 1);
+ StatisticsManager->IncrementHugeCounter(EHugeCounter::BytesAllocated, size);
+ if (!dumpable) {
+ StatisticsManager->IncrementHugeUndumpableCounter(EUndumpableCounter::BytesAllocated, size);
+ }
+
+ auto* result = HeaderToPtr(blob);
+ PoisonUninitializedRange(result, size);
+ return result;
+ }
+
+ void Free(void* ptr)
+ {
+ auto* blob = PtrToHeader<THugeBlobHeader>(ptr);
+ AssertBlobState(blob, EHugeBlobState::Allocated);
+ auto tag = blob->Tag;
+ auto size = blob->Size;
+ auto dumpable = blob->Dumpable;
+ PoisonFreedRange(ptr, size);
+
+ auto rawSize = GetRawBlobSize<THugeBlobHeader>(size);
+ ZoneAllocator_.Free(blob, rawSize);
+
+ StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesFreed, size);
+ StatisticsManager->IncrementHugeCounter(EHugeCounter::BlobsFreed, 1);
+ StatisticsManager->IncrementHugeCounter(EHugeCounter::BytesFreed, size);
+ if (!dumpable) {
+ StatisticsManager->IncrementHugeUndumpableCounter(EUndumpableCounter::BytesFreed, size);
+ }
+ }
+
+ static size_t GetAllocationSize(const void* ptr)
+ {
+ UnalignPtr<THugeBlobHeader>(ptr);
+ const auto* blob = PtrToHeader<THugeBlobHeader>(ptr);
+ return blob->Size;
+ }
+
+ static size_t GetAllocationSize(size_t size)
+ {
+ return GetBlobAllocationSize<THugeBlobHeader>(size);
+ }
+
+private:
+ TZoneAllocator ZoneAllocator_;
+};
+
+TExplicitlyConstructableSingleton<THugeBlobAllocator> HugeBlobAllocator;
+
+////////////////////////////////////////////////////////////////////////////////
+// A thunk to large and huge blob allocators
+
+class TBlobAllocator
+{
+public:
+ static void* Allocate(size_t size)
+ {
+ InitializeGlobals();
+ bool dumpable = GetCurrentMemoryZone() != EMemoryZone::Undumpable;
+ // NB: Account for the header. Also note that we may safely ignore the alignment since
+ // HugeAllocationSizeThreshold is already page-aligned.
+ if (Y_LIKELY(size < HugeAllocationSizeThreshold - sizeof(TLargeBlobHeader) - RightReadableAreaSize)) {
+ void* result = dumpable
+ ? DumpableLargeBlobAllocator->Allocate(size)
+ : UndumpableLargeBlobAllocator->Allocate(size);
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= LargeZoneStart(dumpable) && reinterpret_cast<uintptr_t>(result) < LargeZoneEnd(dumpable));
+ return result;
+ } else {
+ auto* result = HugeBlobAllocator->Allocate(size, dumpable);
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= HugeZoneStart && reinterpret_cast<uintptr_t>(result) < HugeZoneEnd);
+ return result;
+ }
+ }
+
+ static void Free(void* ptr)
+ {
+ InitializeGlobals();
+ if (reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(true)) {
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= LargeZoneStart(true) && reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(true));
+ UnalignPtr<TLargeBlobHeader>(ptr);
+ DumpableLargeBlobAllocator->Free(ptr);
+ } else if (reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(false)) {
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= LargeZoneStart(false) && reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(false));
+ UnalignPtr<TLargeBlobHeader>(ptr);
+ UndumpableLargeBlobAllocator->Free(ptr);
+ } else if (reinterpret_cast<uintptr_t>(ptr) < HugeZoneEnd) {
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= HugeZoneStart && reinterpret_cast<uintptr_t>(ptr) < HugeZoneEnd);
+ UnalignPtr<THugeBlobHeader>(ptr);
+ HugeBlobAllocator->Free(ptr);
+ } else {
+ YTALLOC_TRAP("Wrong ptr passed to Free");
+ }
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+Y_POD_THREAD(bool) CurrentThreadIsBackground;
+
+// Base class for all background threads.
+template <class T>
+class TBackgroundThreadBase
+{
+public:
+ TBackgroundThreadBase()
+ : State_(new TState())
+ {
+ NThreading::TForkAwareSpinLock::AtFork(
+ static_cast<T*>(this),
+ &BeforeFork,
+ &AfterForkParent,
+ &AfterForkChild);
+ }
+
+ virtual ~TBackgroundThreadBase()
+ {
+ Stop();
+ }
+
+private:
+ struct TState
+ : public TSystemAllocatable
+ {
+ std::mutex StartStopMutex;
+ std::optional<std::thread> Thread;
+
+ std::mutex StopFlagMutex;
+ std::condition_variable StopFlagVariable;
+ std::chrono::system_clock::time_point LastInvocationTime;
+ bool StopFlag = false;
+ bool Paused = false;
+
+ std::atomic<int> ForkDepth = 0;
+ bool RestartAfterFork = false;
+ };
+
+ TState* State_;
+
+private:
+ static void BeforeFork(void* cookie)
+ {
+ static_cast<T*>(cookie)->DoBeforeFork();
+ }
+
+ void DoBeforeFork()
+ {
+ bool stopped = Stop();
+ if (State_->ForkDepth++ == 0) {
+ State_->RestartAfterFork = stopped;
+ }
+ }
+
+ static void AfterForkParent(void* cookie)
+ {
+ static_cast<T*>(cookie)->DoAfterForkParent();
+ }
+
+ void DoAfterForkParent()
+ {
+ if (--State_->ForkDepth == 0) {
+ if (State_->RestartAfterFork) {
+ Start(false);
+ }
+ }
+ }
+
+ static void AfterForkChild(void* cookie)
+ {
+ static_cast<T*>(cookie)->DoAfterForkChild();
+ }
+
+ void DoAfterForkChild()
+ {
+ bool restart = State_->RestartAfterFork;
+ State_ = new TState();
+ if (restart) {
+ Start(false);
+ }
+ }
+
+ virtual void ThreadMain() = 0;
+
+protected:
+ void Start(bool fromAlloc)
+ {
+ std::unique_lock<std::mutex> guard(State_->StartStopMutex, std::defer_lock);
+ if (fromAlloc) {
+ if (!guard.try_lock()) {
+ return;
+ }
+
+ if (State_->Paused) {
+ return;
+ }
+ } else {
+ guard.lock();
+ }
+
+ State_->Paused = false;
+ if (State_->Thread) {
+ return;
+ }
+
+ State_->StopFlag = false;
+
+ State_->Thread.emplace([=] {
+ CurrentThreadIsBackground = true;
+ ThreadMain();
+ });
+
+ OnStart();
+ }
+
+ bool Stop()
+ {
+ std::unique_lock<std::mutex> guard(State_->StartStopMutex);
+
+ State_->Paused = true;
+ if (!State_->Thread) {
+ return false;
+ }
+
+ std::unique_lock<std::mutex> flagGuard(State_->StopFlagMutex);
+ State_->StopFlag = true;
+ flagGuard.unlock();
+ State_->StopFlagVariable.notify_one();
+
+ State_->Thread->join();
+ State_->Thread.reset();
+
+ OnStop();
+
+ return true;
+ }
+
+ bool IsDone(TDuration interval)
+ {
+ std::unique_lock<std::mutex> flagGuard(State_->StopFlagMutex);
+ auto result = State_->StopFlagVariable.wait_until(
+ flagGuard,
+ State_->LastInvocationTime + std::chrono::microseconds(interval.MicroSeconds()),
+ [&] { return State_->StopFlag; });
+ State_->LastInvocationTime = std::chrono::system_clock::now();
+ return result;
+ }
+
+ virtual void OnStart()
+ { }
+
+ virtual void OnStop()
+ { }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Invokes madvise(MADV_STOCKPILE) periodically.
+class TStockpileThread
+ : public TBackgroundThreadBase<TStockpileThread>
+{
+public:
+ explicit TStockpileThread(int index)
+ : Index_(index)
+ {
+ Start(false);
+ }
+
+private:
+ const int Index_;
+
+ virtual void ThreadMain() override
+ {
+ TThread::SetCurrentThreadName(Sprintf("%s:%d", StockpileThreadName, Index_).c_str());
+
+ while (!IsDone(ConfigurationManager->GetStockpileInterval())) {
+ if (!MappedMemoryManager->Stockpile(ConfigurationManager->GetStockpileSize())) {
+ // No use to proceed.
+ YTALLOC_LOG_INFO("Stockpile call failed; terminating stockpile thread");
+ break;
+ }
+ }
+ }
+};
+
+// Manages a bunch of TStockpileThreads.
+class TStockpileManager
+{
+public:
+ void SpawnIfNeeded()
+ {
+ if (!ConfigurationManager->IsStockpileEnabled()) {
+ return;
+ }
+
+ int threadCount = ConfigurationManager->GetStockpileThreadCount();
+ while (static_cast<int>(Threads_.size()) > threadCount) {
+ Threads_.pop_back();
+ }
+ while (static_cast<int>(Threads_.size()) < threadCount) {
+ Threads_.push_back(std::make_unique<TStockpileThread>(static_cast<int>(Threads_.size())));
+ }
+ }
+
+private:
+ std::vector<std::unique_ptr<TStockpileThread>> Threads_;
+};
+
+TExplicitlyConstructableSingleton<TStockpileManager> StockpileManager;
+
+////////////////////////////////////////////////////////////////////////////////
+
+// Time to wait before re-spawning the thread after a fork.
+static constexpr auto BackgroundThreadRespawnDelay = TDuration::Seconds(3);
+
+// Runs basic background activities: reclaim, logging, profiling etc.
+class TBackgroundThread
+ : public TBackgroundThreadBase<TBackgroundThread>
+{
+public:
+ bool IsStarted()
+ {
+ return Started_.load();
+ }
+
+ void SpawnIfNeeded()
+ {
+ if (CurrentThreadIsBackground) {
+ return;
+ }
+ Start(true);
+ }
+
+private:
+ std::atomic<bool> Started_ = false;
+
+private:
+ virtual void ThreadMain() override
+ {
+ TThread::SetCurrentThreadName(BackgroundThreadName);
+ TimingManager->DisableForCurrentThread();
+ MmapObservationManager->DisableForCurrentThread();
+
+ while (!IsDone(BackgroundInterval)) {
+ DumpableLargeBlobAllocator->RunBackgroundTasks();
+ UndumpableLargeBlobAllocator->RunBackgroundTasks();
+ MappedMemoryManager->RunBackgroundTasks();
+ TimingManager->RunBackgroundTasks();
+ MmapObservationManager->RunBackgroundTasks();
+ StockpileManager->SpawnIfNeeded();
+ }
+ }
+
+ virtual void OnStart() override
+ {
+ DoUpdateAllThreadsControlWord(true);
+ }
+
+ virtual void OnStop() override
+ {
+ DoUpdateAllThreadsControlWord(false);
+ }
+
+ void DoUpdateAllThreadsControlWord(bool started)
+ {
+ // Update threads' TLS.
+ ThreadManager->EnumerateThreadStatesSync(
+ [&] {
+ Started_.store(started);
+ },
+ [&] (auto* state) {
+ if (state->BackgroundThreadStarted) {
+ *state->BackgroundThreadStarted = started;
+ }
+ });
+ }
+};
+
+TExplicitlyConstructableSingleton<TBackgroundThread> BackgroundThread;
+
+////////////////////////////////////////////////////////////////////////////////
+
+Y_FORCE_INLINE TThreadState* TThreadManager::GetThreadStateUnchecked()
+{
+ YTALLOC_PARANOID_ASSERT(ThreadState_);
+ return ThreadState_;
+}
+
+Y_FORCE_INLINE TThreadState* TThreadManager::FindThreadState()
+{
+ if (Y_LIKELY(ThreadState_)) {
+ return ThreadState_;
+ }
+
+ if (ThreadStateDestroyed_) {
+ return nullptr;
+ }
+
+ InitializeGlobals();
+
+ // InitializeGlobals must not allocate.
+ Y_VERIFY(!ThreadState_);
+ ThreadState_ = ThreadManager->AllocateThreadState();
+ (&ThreadControlWord_)->Parts.ThreadStateValid = true;
+
+ return ThreadState_;
+}
+
+void TThreadManager::DestroyThread(void*)
+{
+ TSmallAllocator::PurgeCaches();
+
+ TThreadState* state = ThreadState_;
+ ThreadState_ = nullptr;
+ ThreadStateDestroyed_ = true;
+ (&ThreadControlWord_)->Parts.ThreadStateValid = false;
+
+ {
+ auto guard = GuardWithTiming(ThreadManager->ThreadRegistryLock_);
+ state->AllocationProfilingEnabled = nullptr;
+ state->BackgroundThreadStarted = nullptr;
+ ThreadManager->UnrefThreadState(state);
+ }
+}
+
+void TThreadManager::DestroyThreadState(TThreadState* state)
+{
+ StatisticsManager->AccumulateLocalCounters(state);
+ ThreadRegistry_.Remove(state);
+ ThreadStatePool_.Free(state);
+}
+
+void TThreadManager::AfterFork(void* cookie)
+{
+ static_cast<TThreadManager*>(cookie)->DoAfterFork();
+}
+
+void TThreadManager::DoAfterFork()
+{
+ auto guard = GuardWithTiming(ThreadRegistryLock_);
+ ThreadRegistry_.Clear();
+ TThreadState* state = ThreadState_;
+ if (state) {
+ ThreadRegistry_.PushBack(state);
+ }
+}
+
+TThreadState* TThreadManager::AllocateThreadState()
+{
+ auto* state = ThreadStatePool_.Allocate();
+ state->AllocationProfilingEnabled = &(*&ThreadControlWord_).Parts.AllocationProfilingEnabled;
+ state->BackgroundThreadStarted = &(*&ThreadControlWord_).Parts.BackgroundThreadStarted;
+
+ {
+ auto guard = GuardWithTiming(ThreadRegistryLock_);
+ // NB: These flags must be initialized under ThreadRegistryLock_; see EnumerateThreadStatesSync.
+ *state->AllocationProfilingEnabled = ConfigurationManager->IsAllocationProfilingEnabled();
+ *state->BackgroundThreadStarted = BackgroundThread->IsStarted();
+ ThreadRegistry_.PushBack(state);
+ }
+
+ // Need to pass some non-null value for DestroyThread to be called.
+ pthread_setspecific(ThreadDtorKey_, (void*)-1);
+
+ return state;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void InitializeGlobals()
+{
+ static std::once_flag Initialized;
+ std::call_once(Initialized, [] () {
+ LogManager.Construct();
+ BacktraceManager.Construct();
+ StatisticsManager.Construct();
+ MappedMemoryManager.Construct();
+ ThreadManager.Construct();
+ GlobalState.Construct();
+ DumpableLargeBlobAllocator.Construct();
+ UndumpableLargeBlobAllocator.Construct();
+ HugeBlobAllocator.Construct();
+ ConfigurationManager.Construct();
+ SystemAllocator.Construct();
+ TimingManager.Construct();
+ MmapObservationManager.Construct();
+ StockpileManager.Construct();
+ BackgroundThread.Construct();
+
+ SmallArenaAllocators.Construct();
+ auto constructSmallArenaAllocators = [&] (EAllocationKind kind, uintptr_t zonesStart) {
+ for (size_t rank = 1; rank < SmallRankCount; ++rank) {
+ (*SmallArenaAllocators)[kind][rank].Construct(kind, rank, zonesStart + rank * SmallZoneSize);
+ }
+ };
+ constructSmallArenaAllocators(EAllocationKind::Untagged, UntaggedSmallZonesStart);
+ constructSmallArenaAllocators(EAllocationKind::Tagged, TaggedSmallZonesStart);
+
+ GlobalSmallChunkCaches.Construct();
+ (*GlobalSmallChunkCaches)[EAllocationKind::Tagged].Construct(EAllocationKind::Tagged);
+ (*GlobalSmallChunkCaches)[EAllocationKind::Untagged].Construct(EAllocationKind::Untagged);
+ });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void StartBackgroundThread()
+{
+ InitializeGlobals();
+ BackgroundThread->SpawnIfNeeded();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class... Ts>
+Y_FORCE_INLINE void* AllocateSmallUntagged(size_t rank, Ts... args)
+{
+ auto* result = TSmallAllocator::Allocate<EAllocationKind::Untagged>(NullMemoryTag, rank, std::forward<Ts>(args)...);
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= MinUntaggedSmallPtr && reinterpret_cast<uintptr_t>(result) < MaxUntaggedSmallPtr);
+ return result;
+}
+
+template <class... Ts>
+Y_FORCE_INLINE void* AllocateSmallTagged(ui64 controlWord, size_t rank, Ts... args)
+{
+ auto tag = Y_UNLIKELY((controlWord & TThreadManager::AllocationProfilingEnabledControlWordMask) && ConfigurationManager->IsSmallArenaAllocationProfiled(rank))
+ ? BacktraceManager->GetMemoryTagFromBacktrace(2)
+ : static_cast<TMemoryTag>(controlWord & TThreadManager::MemoryTagControlWordMask);
+ auto* result = TSmallAllocator::Allocate<EAllocationKind::Tagged>(tag, rank, std::forward<Ts>(args)...);
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= MinTaggedSmallPtr && reinterpret_cast<uintptr_t>(result) < MaxTaggedSmallPtr);
+ return result;
+}
+
+Y_FORCE_INLINE void* AllocateInline(size_t size)
+{
+ size_t rank;
+ if (Y_LIKELY(size <= 512)) {
+ rank = SizeToSmallRank1[(size + 7) >> 3];
+ } else if (Y_LIKELY(size < LargeAllocationSizeThreshold)) {
+ rank = SizeToSmallRank2[(size - 1) >> 8];
+ } else {
+ StartBackgroundThread();
+ return TBlobAllocator::Allocate(size);
+ }
+
+ auto controlWord = TThreadManager::GetThreadControlWord();
+ if (Y_LIKELY(controlWord == TThreadManager::FastPathControlWord)) {
+ return AllocateSmallUntagged(rank, TThreadManager::GetThreadStateUnchecked());
+ }
+
+ if (Y_UNLIKELY(!(controlWord & TThreadManager::BackgroundThreadStartedControlWorkMask))) {
+ StartBackgroundThread();
+ }
+
+ if (!(controlWord & (TThreadManager::MemoryTagControlWordMask | TThreadManager::AllocationProfilingEnabledControlWordMask))) {
+ return AllocateSmallUntagged(rank);
+ } else {
+ return AllocateSmallTagged(controlWord, rank);
+ }
+}
+
+Y_FORCE_INLINE void* AllocateSmallInline(size_t rank)
+{
+ auto controlWord = TThreadManager::GetThreadControlWord();
+ if (Y_LIKELY(controlWord == TThreadManager::FastPathControlWord)) {
+ return AllocateSmallUntagged(rank, TThreadManager::GetThreadStateUnchecked());
+ }
+
+ if (!(controlWord & (TThreadManager::MemoryTagControlWordMask | TThreadManager::AllocationProfilingEnabledControlWordMask))) {
+ return AllocateSmallUntagged(rank);
+ } else {
+ return AllocateSmallTagged(controlWord, rank);
+ }
+}
+
+Y_FORCE_INLINE void* AllocatePageAlignedInline(size_t size)
+{
+ size = std::max(AlignUp(size, PageSize), PageSize);
+ void* result = size >= LargeAllocationSizeThreshold
+ ? AlignUp(TBlobAllocator::Allocate(size + PageSize), PageSize)
+ : Allocate(size);
+ YTALLOC_ASSERT(reinterpret_cast<uintptr_t>(result) % PageSize == 0);
+ return result;
+}
+
+Y_FORCE_INLINE void FreeNonNullInline(void* ptr)
+{
+ YTALLOC_ASSERT(ptr);
+ if (Y_LIKELY(reinterpret_cast<uintptr_t>(ptr) < UntaggedSmallZonesEnd)) {
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= MinUntaggedSmallPtr && reinterpret_cast<uintptr_t>(ptr) < MaxUntaggedSmallPtr);
+ TSmallAllocator::Free<EAllocationKind::Untagged>(ptr);
+ } else if (Y_LIKELY(reinterpret_cast<uintptr_t>(ptr) < TaggedSmallZonesEnd)) {
+ YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= MinTaggedSmallPtr && reinterpret_cast<uintptr_t>(ptr) < MaxTaggedSmallPtr);
+ TSmallAllocator::Free<EAllocationKind::Tagged>(ptr);
+ } else {
+ TBlobAllocator::Free(ptr);
+ }
+}
+
+Y_FORCE_INLINE void FreeInline(void* ptr)
+{
+ if (Y_LIKELY(ptr)) {
+ FreeNonNullInline(ptr);
+ }
+}
+
+Y_FORCE_INLINE size_t GetAllocationSizeInline(const void* ptr)
+{
+ if (Y_UNLIKELY(!ptr)) {
+ return 0;
+ }
+
+ auto uintptr = reinterpret_cast<uintptr_t>(ptr);
+ if (uintptr < UntaggedSmallZonesEnd) {
+ YTALLOC_PARANOID_ASSERT(uintptr >= MinUntaggedSmallPtr && uintptr < MaxUntaggedSmallPtr);
+ return TSmallAllocator::GetAllocationSize(ptr);
+ } else if (uintptr < TaggedSmallZonesEnd) {
+ YTALLOC_PARANOID_ASSERT(uintptr >= MinTaggedSmallPtr && uintptr < MaxTaggedSmallPtr);
+ return TSmallAllocator::GetAllocationSize(ptr);
+ } else if (uintptr < LargeZoneEnd(true)) {
+ YTALLOC_PARANOID_ASSERT(uintptr >= LargeZoneStart(true) && uintptr < LargeZoneEnd(true));
+ return TLargeBlobAllocator<true>::GetAllocationSize(ptr);
+ } else if (uintptr < LargeZoneEnd(false)) {
+ YTALLOC_PARANOID_ASSERT(uintptr >= LargeZoneStart(false) && uintptr < LargeZoneEnd(false));
+ return TLargeBlobAllocator<false>::GetAllocationSize(ptr);
+ } else if (uintptr < HugeZoneEnd) {
+ YTALLOC_PARANOID_ASSERT(uintptr >= HugeZoneStart && uintptr < HugeZoneEnd);
+ return THugeBlobAllocator::GetAllocationSize(ptr);
+ } else {
+ YTALLOC_TRAP("Wrong ptr passed to GetAllocationSizeInline");
+ }
+}
+
+Y_FORCE_INLINE size_t GetAllocationSizeInline(size_t size)
+{
+ if (size <= LargeAllocationSizeThreshold) {
+ return TSmallAllocator::GetAllocationSize(size);
+ } else if (size <= HugeAllocationSizeThreshold) {
+ return TLargeBlobAllocator<true>::GetAllocationSize(size);
+ } else {
+ return THugeBlobAllocator::GetAllocationSize(size);
+ }
+}
+
+void EnableLogging(TLogHandler logHandler)
+{
+ InitializeGlobals();
+ LogManager->EnableLogging(logHandler);
+}
+
+void SetBacktraceProvider(TBacktraceProvider provider)
+{
+ InitializeGlobals();
+ BacktraceManager->SetBacktraceProvider(provider);
+ DumpableLargeBlobAllocator->SetBacktraceProvider(provider);
+ UndumpableLargeBlobAllocator->SetBacktraceProvider(provider);
+}
+
+void SetBacktraceFormatter(TBacktraceFormatter provider)
+{
+ InitializeGlobals();
+ MmapObservationManager->SetBacktraceFormatter(provider);
+}
+
+void EnableStockpile()
+{
+ InitializeGlobals();
+ ConfigurationManager->EnableStockpile();
+}
+
+void SetStockpileInterval(TDuration value)
+{
+ InitializeGlobals();
+ ConfigurationManager->SetStockpileInterval(value);
+}
+
+void SetStockpileThreadCount(int value)
+{
+ InitializeGlobals();
+ ConfigurationManager->SetStockpileThreadCount(value);
+}
+
+void SetStockpileSize(size_t value)
+{
+ InitializeGlobals();
+ ConfigurationManager->SetStockpileSize(value);
+}
+
+void SetLargeUnreclaimableCoeff(double value)
+{
+ InitializeGlobals();
+ ConfigurationManager->SetLargeUnreclaimableCoeff(value);
+}
+
+void SetTimingEventThreshold(TDuration value)
+{
+ InitializeGlobals();
+ ConfigurationManager->SetTimingEventThreshold(value);
+}
+
+void SetMinLargeUnreclaimableBytes(size_t value)
+{
+ InitializeGlobals();
+ ConfigurationManager->SetMinLargeUnreclaimableBytes(value);
+}
+
+void SetMaxLargeUnreclaimableBytes(size_t value)
+{
+ InitializeGlobals();
+ ConfigurationManager->SetMaxLargeUnreclaimableBytes(value);
+}
+
+void SetAllocationProfilingEnabled(bool value)
+{
+ ConfigurationManager->SetAllocationProfilingEnabled(value);
+}
+
+void SetAllocationProfilingSamplingRate(double rate)
+{
+ ConfigurationManager->SetAllocationProfilingSamplingRate(rate);
+}
+
+void SetSmallArenaAllocationProfilingEnabled(size_t rank, bool value)
+{
+ ConfigurationManager->SetSmallArenaAllocationProfilingEnabled(rank, value);
+}
+
+void SetLargeArenaAllocationProfilingEnabled(size_t rank, bool value)
+{
+ ConfigurationManager->SetLargeArenaAllocationProfilingEnabled(rank, value);
+}
+
+void SetProfilingBacktraceDepth(int depth)
+{
+ ConfigurationManager->SetProfilingBacktraceDepth(depth);
+}
+
+void SetMinProfilingBytesUsedToReport(size_t size)
+{
+ ConfigurationManager->SetMinProfilingBytesUsedToReport(size);
+}
+
+void SetEnableEagerMemoryRelease(bool value)
+{
+ ConfigurationManager->SetEnableEagerMemoryRelease(value);
+}
+
+void SetEnableMadvisePopulate(bool value)
+{
+ ConfigurationManager->SetEnableMadvisePopulate(value);
+}
+
+TEnumIndexedVector<ETotalCounter, ssize_t> GetTotalAllocationCounters()
+{
+ InitializeGlobals();
+ return StatisticsManager->GetTotalAllocationCounters();
+}
+
+TEnumIndexedVector<ESystemCounter, ssize_t> GetSystemAllocationCounters()
+{
+ InitializeGlobals();
+ return StatisticsManager->GetSystemAllocationCounters();
+}
+
+TEnumIndexedVector<ESystemCounter, ssize_t> GetUndumpableAllocationCounters()
+{
+ InitializeGlobals();
+ return StatisticsManager->GetUndumpableAllocationCounters();
+}
+
+TEnumIndexedVector<ESmallCounter, ssize_t> GetSmallAllocationCounters()
+{
+ InitializeGlobals();
+ return StatisticsManager->GetSmallAllocationCounters();
+}
+
+TEnumIndexedVector<ESmallCounter, ssize_t> GetLargeAllocationCounters()
+{
+ InitializeGlobals();
+ return StatisticsManager->GetLargeAllocationCounters();
+}
+
+std::array<TEnumIndexedVector<ESmallArenaCounter, ssize_t>, SmallRankCount> GetSmallArenaAllocationCounters()
+{
+ InitializeGlobals();
+ return StatisticsManager->GetSmallArenaAllocationCounters();
+}
+
+std::array<TEnumIndexedVector<ELargeArenaCounter, ssize_t>, LargeRankCount> GetLargeArenaAllocationCounters()
+{
+ InitializeGlobals();
+ return StatisticsManager->GetLargeArenaAllocationCounters();
+}
+
+TEnumIndexedVector<EHugeCounter, ssize_t> GetHugeAllocationCounters()
+{
+ InitializeGlobals();
+ return StatisticsManager->GetHugeAllocationCounters();
+}
+
+std::vector<TProfiledAllocation> GetProfiledAllocationStatistics()
+{
+ InitializeGlobals();
+
+ if (!ConfigurationManager->IsAllocationProfilingEnabled()) {
+ return {};
+ }
+
+ std::vector<TMemoryTag> tags;
+ tags.reserve(MaxCapturedAllocationBacktraces + 1);
+ for (TMemoryTag tag = AllocationProfilingMemoryTagBase;
+ tag < AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces;
+ ++tag)
+ {
+ tags.push_back(tag);
+ }
+ tags.push_back(AllocationProfilingUnknownMemoryTag);
+
+ std::vector<TEnumIndexedVector<EBasicCounter, ssize_t>> counters;
+ counters.resize(tags.size());
+ StatisticsManager->GetTaggedMemoryCounters(tags.data(), tags.size(), counters.data());
+
+ std::vector<TProfiledAllocation> statistics;
+ for (size_t index = 0; index < tags.size(); ++index) {
+ if (counters[index][EBasicCounter::BytesUsed] < static_cast<ssize_t>(ConfigurationManager->GetMinProfilingBytesUsedToReport())) {
+ continue;
+ }
+ auto tag = tags[index];
+ auto optionalBacktrace = BacktraceManager->FindBacktrace(tag);
+ if (!optionalBacktrace && tag != AllocationProfilingUnknownMemoryTag) {
+ continue;
+ }
+ statistics.push_back(TProfiledAllocation{
+ optionalBacktrace.value_or(TBacktrace()),
+ counters[index]
+ });
+ }
+ return statistics;
+}
+
+TEnumIndexedVector<ETimingEventType, TTimingEventCounters> GetTimingEventCounters()
+{
+ InitializeGlobals();
+ return TimingManager->GetTimingEventCounters();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NYTAlloc
+