diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/ytalloc/impl/core-inl.h | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/ytalloc/impl/core-inl.h')
-rw-r--r-- | library/cpp/ytalloc/impl/core-inl.h | 4951 |
1 files changed, 4951 insertions, 0 deletions
diff --git a/library/cpp/ytalloc/impl/core-inl.h b/library/cpp/ytalloc/impl/core-inl.h new file mode 100644 index 0000000000..c66e8478b5 --- /dev/null +++ b/library/cpp/ytalloc/impl/core-inl.h @@ -0,0 +1,4951 @@ +#pragma once + +// This file contains the core parts of YTAlloc but no malloc/free-bridge. +// The latter bridge is placed into alloc.cpp, which includes (sic!) core-inl.h. +// This ensures that AllocateInline/FreeInline calls are properly inlined into malloc/free. +// Also core-inl.h can be directly included in, e.g., benchmarks. + +#include <library/cpp/yt/containers/intrusive_linked_list.h> + +#include <library/cpp/yt/threading/fork_aware_spin_lock.h> + +#include <util/system/tls.h> +#include <util/system/align.h> +#include <util/system/thread.h> + +#include <util/string/printf.h> + +#include <util/generic/singleton.h> +#include <util/generic/size_literals.h> +#include <util/generic/utility.h> + +#include <util/digest/numeric.h> + +#include <library/cpp/ytalloc/api/ytalloc.h> + +#include <atomic> +#include <array> +#include <vector> +#include <mutex> +#include <thread> +#include <condition_variable> +#include <cstdio> +#include <optional> + +#include <sys/mman.h> + +#ifdef _linux_ + #include <sys/utsname.h> +#endif + +#include <errno.h> +#include <pthread.h> +#include <time.h> + +#ifndef MAP_POPULATE + #define MAP_POPULATE 0x08000 +#endif + +// MAP_FIXED which doesn't unmap underlying mapping. +// Linux kernels older than 4.17 silently ignore this flag. +#ifndef MAP_FIXED_NOREPLACE + #ifdef _linux_ + #define MAP_FIXED_NOREPLACE 0x100000 + #else + #define MAP_FIXED_NOREPLACE 0 + #endif +#endif + +#ifndef MADV_POPULATE + #define MADV_POPULATE 0x59410003 +#endif + +#ifndef MADV_STOCKPILE + #define MADV_STOCKPILE 0x59410004 +#endif + +#ifndef MADV_FREE + #define MADV_FREE 8 +#endif + +#ifndef MADV_DONTDUMP + #define MADV_DONTDUMP 16 +#endif + +#ifndef NDEBUG + #define YTALLOC_PARANOID +#endif + +#ifdef YTALLOC_PARANOID + #define YTALLOC_NERVOUS +#endif + +#define YTALLOC_VERIFY(condition) \ + do { \ + if (Y_UNLIKELY(!(condition))) { \ + ::NYT::NYTAlloc::AssertTrap("Assertion failed: " #condition, __FILE__, __LINE__); \ + } \ + } while (false) + +#ifdef NDEBUG + #define YTALLOC_ASSERT(condition) YTALLOC_VERIFY(condition) +#else + #define YTALLOC_ASSERT(condition) (void)(0) +#endif + +#ifdef YTALLOC_PARANOID + #define YTALLOC_PARANOID_ASSERT(condition) YTALLOC_VERIFY(condition) +#else + #define YTALLOC_PARANOID_ASSERT(condition) (true || (condition)) +#endif + +#define YTALLOC_TRAP(message) ::NYT::NYTAlloc::AssertTrap(message, __FILE__, __LINE__) + +namespace NYT::NYTAlloc { + +//////////////////////////////////////////////////////////////////////////////// +// Allocations are classified into three types: +// +// a) Small chunks (less than LargeAllocationSizeThreshold) +// These are the fastest and are extensively cached (both per-thread and globally). +// Memory claimed for these allocations is never reclaimed back. +// Code dealing with such allocations is heavy optimized with all hot paths +// as streamlined as possible. The implementation is mostly inspired by LFAlloc. +// +// b) Large blobs (from LargeAllocationSizeThreshold to HugeAllocationSizeThreshold) +// These are cached as well. We expect such allocations to be less frequent +// than small ones but still do our best to provide good scalability. +// In particular, thread-sharded concurrent data structures as used to provide access to +// cached blobs. Memory is claimed via madvise(MADV_POPULATE) and reclaimed back +// via madvise(MADV_FREE). +// +// c) Huge blobs (from HugeAllocationSizeThreshold) +// These should be rare; we delegate directly to mmap and munmap for each allocation. +// +// We also provide a separate allocator for all system allocations (that are needed by YTAlloc itself). +// These are rare and also delegate to mmap/unmap. + +// Periods between background activities. +constexpr auto BackgroundInterval = TDuration::Seconds(1); + +static_assert(LargeRankCount - MinLargeRank <= 16, "Too many large ranks"); +static_assert(SmallRankCount <= 32, "Too many small ranks"); + +constexpr size_t SmallZoneSize = 1_TB; +constexpr size_t LargeZoneSize = 16_TB; +constexpr size_t HugeZoneSize = 1_TB; +constexpr size_t SystemZoneSize = 1_TB; + +constexpr size_t MaxCachedChunksPerRank = 256; + +constexpr uintptr_t UntaggedSmallZonesStart = 0; +constexpr uintptr_t UntaggedSmallZonesEnd = UntaggedSmallZonesStart + 32 * SmallZoneSize; +constexpr uintptr_t MinUntaggedSmallPtr = UntaggedSmallZonesStart + SmallZoneSize * 1; +constexpr uintptr_t MaxUntaggedSmallPtr = UntaggedSmallZonesStart + SmallZoneSize * SmallRankCount; + +constexpr uintptr_t TaggedSmallZonesStart = UntaggedSmallZonesEnd; +constexpr uintptr_t TaggedSmallZonesEnd = TaggedSmallZonesStart + 32 * SmallZoneSize; +constexpr uintptr_t MinTaggedSmallPtr = TaggedSmallZonesStart + SmallZoneSize * 1; +constexpr uintptr_t MaxTaggedSmallPtr = TaggedSmallZonesStart + SmallZoneSize * SmallRankCount; + +constexpr uintptr_t DumpableLargeZoneStart = TaggedSmallZonesEnd; +constexpr uintptr_t DumpableLargeZoneEnd = DumpableLargeZoneStart + LargeZoneSize; + +constexpr uintptr_t UndumpableLargeZoneStart = DumpableLargeZoneEnd; +constexpr uintptr_t UndumpableLargeZoneEnd = UndumpableLargeZoneStart + LargeZoneSize; + +constexpr uintptr_t LargeZoneStart(bool dumpable) +{ + return dumpable ? DumpableLargeZoneStart : UndumpableLargeZoneStart; +} +constexpr uintptr_t LargeZoneEnd(bool dumpable) +{ + return dumpable ? DumpableLargeZoneEnd : UndumpableLargeZoneEnd; +} + +constexpr uintptr_t HugeZoneStart = UndumpableLargeZoneEnd; +constexpr uintptr_t HugeZoneEnd = HugeZoneStart + HugeZoneSize; + +constexpr uintptr_t SystemZoneStart = HugeZoneEnd; +constexpr uintptr_t SystemZoneEnd = SystemZoneStart + SystemZoneSize; + +// We leave 64_KB at the end of 256_MB block and never use it. +// That serves two purposes: +// 1. SmallExtentSize % SmallSegmentSize == 0 +// 2. Every small object satisfies RightReadableArea requirement. +constexpr size_t SmallExtentAllocSize = 256_MB; +constexpr size_t SmallExtentSize = SmallExtentAllocSize - 64_KB; +constexpr size_t SmallSegmentSize = 96_KB; // LCM(SmallRankToSize) + +constexpr ui16 SmallRankBatchSize[SmallRankCount] = { + 0, 256, 256, 256, 256, 256, 256, 256, 256, 256, 192, 128, 96, 64, 48, 32, 24, 16, 12, 8, 6, 4, 3 +}; + +constexpr bool CheckSmallSizes() +{ + for (size_t rank = 0; rank < SmallRankCount; rank++) { + auto size = SmallRankToSize[rank]; + if (size == 0) { + continue; + } + + if (SmallSegmentSize % size != 0) { + return false; + } + + if (SmallRankBatchSize[rank] > MaxCachedChunksPerRank) { + return false; + } + } + + return true; +} + +static_assert(CheckSmallSizes()); +static_assert(SmallExtentSize % SmallSegmentSize == 0); +static_assert(SmallSegmentSize % PageSize == 0); + +constexpr size_t LargeExtentSize = 1_GB; +static_assert(LargeExtentSize >= LargeAllocationSizeThreshold, "LargeExtentSize < LargeAllocationSizeThreshold"); + +constexpr const char* BackgroundThreadName = "YTAllocBack"; +constexpr const char* StockpileThreadName = "YTAllocStock"; + +DEFINE_ENUM(EAllocationKind, + (Untagged) + (Tagged) +); + +// Forward declarations. +struct TThreadState; +struct TLargeArena; +struct TLargeBlobExtent; + +//////////////////////////////////////////////////////////////////////////////// +// Traps and assertions + +[[noreturn]] +void OomTrap() +{ + _exit(9); +} + +[[noreturn]] +void AssertTrap(const char* message, const char* file, int line) +{ + ::fprintf(stderr, "*** YTAlloc has detected an internal trap at %s:%d\n*** %s\n", + file, + line, + message); + __builtin_trap(); +} + +template <class T, class E> +void AssertBlobState(T* header, E expectedState) +{ + auto actualState = header->State; + if (Y_UNLIKELY(actualState != expectedState)) { + char message[256]; + sprintf(message, "Invalid blob header state at %p: expected %" PRIx64 ", actual %" PRIx64, + header, + static_cast<ui64>(expectedState), + static_cast<ui64>(actualState)); + YTALLOC_TRAP(message); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +// Provides a never-dying singleton with explicit construction. +template <class T> +class TExplicitlyConstructableSingleton +{ +public: + TExplicitlyConstructableSingleton() + { } + + ~TExplicitlyConstructableSingleton() + { } + + template <class... Ts> + void Construct(Ts&&... args) + { + new (&Storage_) T(std::forward<Ts>(args)...); +#ifndef NDEBUG + Constructed_ = true; +#endif + } + + Y_FORCE_INLINE T* Get() + { +#ifndef NDEBUG + YTALLOC_PARANOID_ASSERT(Constructed_); +#endif + return &Storage_; + } + + Y_FORCE_INLINE const T* Get() const + { +#ifndef NDEBUG + YTALLOC_PARANOID_ASSERT(Constructed_); +#endif + return &Storage_; + } + + Y_FORCE_INLINE T* operator->() + { + return Get(); + } + + Y_FORCE_INLINE const T* operator->() const + { + return Get(); + } + + Y_FORCE_INLINE T& operator*() + { + return *Get(); + } + + Y_FORCE_INLINE const T& operator*() const + { + return *Get(); + } + +private: + union { + T Storage_; + }; + +#ifndef NDEBUG + bool Constructed_; +#endif +}; + +//////////////////////////////////////////////////////////////////////////////// + +// Initializes all singletons. +// Safe to call multiple times. +// Guaranteed to not allocate. +void InitializeGlobals(); + +// Spawns the background thread, if it's time. +// Safe to call multiple times. +// Must be called on allocation slow path. +void StartBackgroundThread(); + +//////////////////////////////////////////////////////////////////////////////// + +class TLogManager +{ +public: + // Sets the handler to be invoked for each log event produced by YTAlloc. + void EnableLogging(TLogHandler logHandler) + { + LogHandler_.store(logHandler); + } + + // Checks (in a racy way) that logging is enabled. + bool IsLoggingEnabled() + { + return LogHandler_.load() != nullptr; + } + + // Logs the message via log handler (if any). + template <class... Ts> + void LogMessage(ELogEventSeverity severity, const char* format, Ts&&... args) + { + auto logHandler = LogHandler_.load(); + if (!logHandler) { + return; + } + + std::array<char, 16_KB> buffer; + auto len = ::snprintf(buffer.data(), buffer.size(), format, std::forward<Ts>(args)...); + + TLogEvent event; + event.Severity = severity; + event.Message = TStringBuf(buffer.data(), len); + logHandler(event); + } + + // A special case of zero args. + void LogMessage(ELogEventSeverity severity, const char* message) + { + LogMessage(severity, "%s", message); + } + +private: + std::atomic<TLogHandler> LogHandler_= nullptr; + +}; + +TExplicitlyConstructableSingleton<TLogManager> LogManager; + +#define YTALLOC_LOG_EVENT(...) LogManager->LogMessage(__VA_ARGS__) +#define YTALLOC_LOG_DEBUG(...) YTALLOC_LOG_EVENT(ELogEventSeverity::Debug, __VA_ARGS__) +#define YTALLOC_LOG_INFO(...) YTALLOC_LOG_EVENT(ELogEventSeverity::Info, __VA_ARGS__) +#define YTALLOC_LOG_WARNING(...) YTALLOC_LOG_EVENT(ELogEventSeverity::Warning, __VA_ARGS__) +#define YTALLOC_LOG_ERROR(...) YTALLOC_LOG_EVENT(ELogEventSeverity::Error, __VA_ARGS__) + +//////////////////////////////////////////////////////////////////////////////// + +Y_FORCE_INLINE size_t GetUsed(ssize_t allocated, ssize_t freed) +{ + return allocated >= freed ? static_cast<size_t>(allocated - freed) : 0; +} + +template <class T> +Y_FORCE_INLINE void* HeaderToPtr(T* header) +{ + return header + 1; +} + +template <class T> +Y_FORCE_INLINE T* PtrToHeader(void* ptr) +{ + return static_cast<T*>(ptr) - 1; +} + +template <class T> +Y_FORCE_INLINE const T* PtrToHeader(const void* ptr) +{ + return static_cast<const T*>(ptr) - 1; +} + +Y_FORCE_INLINE size_t PtrToSmallRank(const void* ptr) +{ + return (reinterpret_cast<uintptr_t>(ptr) >> 40) & 0x1f; +} + +Y_FORCE_INLINE char* AlignDownToSmallSegment(char* extent, char* ptr) +{ + auto offset = static_cast<uintptr_t>(ptr - extent); + // NB: This modulo operation is always performed using multiplication. + offset -= (offset % SmallSegmentSize); + return extent + offset; +} + +Y_FORCE_INLINE char* AlignUpToSmallSegment(char* extent, char* ptr) +{ + return AlignDownToSmallSegment(extent, ptr + SmallSegmentSize - 1); +} + +template <class T> +static Y_FORCE_INLINE void UnalignPtr(void*& ptr) +{ + if (reinterpret_cast<uintptr_t>(ptr) % PageSize == 0) { + reinterpret_cast<char*&>(ptr) -= PageSize - sizeof (T); + } + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) % PageSize == sizeof (T)); +} + +template <class T> +static Y_FORCE_INLINE void UnalignPtr(const void*& ptr) +{ + if (reinterpret_cast<uintptr_t>(ptr) % PageSize == 0) { + reinterpret_cast<const char*&>(ptr) -= PageSize - sizeof (T); + } + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) % PageSize == sizeof (T)); +} + +template <class T> +Y_FORCE_INLINE size_t GetRawBlobSize(size_t size) +{ + return AlignUp(size + sizeof (T) + RightReadableAreaSize, PageSize); +} + +template <class T> +Y_FORCE_INLINE size_t GetBlobAllocationSize(size_t size) +{ + size += sizeof(T); + size += RightReadableAreaSize; + size = AlignUp(size, PageSize); + size -= sizeof(T); + size -= RightReadableAreaSize; + return size; +} + +Y_FORCE_INLINE size_t GetLargeRank(size_t size) +{ + size_t rank = 64 - __builtin_clzl(size); + if (size == (1ULL << (rank - 1))) { + --rank; + } + return rank; +} + +Y_FORCE_INLINE void PoisonRange(void* ptr, size_t size, ui32 magic) +{ +#ifdef YTALLOC_PARANOID + size = ::AlignUp<size_t>(size, 4); + std::fill(static_cast<ui32*>(ptr), static_cast<ui32*>(ptr) + size / 4, magic); +#else + Y_UNUSED(ptr); + Y_UNUSED(size); + Y_UNUSED(magic); +#endif +} + +Y_FORCE_INLINE void PoisonFreedRange(void* ptr, size_t size) +{ + PoisonRange(ptr, size, 0xdeadbeef); +} + +Y_FORCE_INLINE void PoisonUninitializedRange(void* ptr, size_t size) +{ + PoisonRange(ptr, size, 0xcafebabe); +} + +// Checks that the header size is divisible by 16 (as needed due to alignment restrictions). +#define CHECK_HEADER_ALIGNMENT(T) static_assert(sizeof(T) % 16 == 0, "sizeof(" #T ") % 16 != 0"); + +//////////////////////////////////////////////////////////////////////////////// + +template <class T> +struct TFreeListItem +{ + T* Next = nullptr; +}; + +constexpr size_t CacheLineSize = 64; + +// A lock-free stack of items (derived from TFreeListItem). +// Supports multiple producers and multiple consumers. +// Internally uses DCAS with tagged pointers to defeat ABA. +template <class T> +class TFreeList +{ +public: + void Put(T* item) + { + TTaggedPointer currentTaggedHead{}; + TTaggedPointer newTaggedHead; + do { + item->Next = currentTaggedHead.first; + newTaggedHead = std::make_pair(item, currentTaggedHead.second + 1); + } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead)); + } + + T* Extract() + { + T* item; + TTaggedPointer currentTaggedHead{}; + TTaggedPointer newTaggedHead{}; + CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead); + do { + item = currentTaggedHead.first; + if (!item) { + break; + } + newTaggedHead = std::make_pair(item->Next, currentTaggedHead.second + 1); + } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead)); + return item; + } + + T* ExtractAll() + { + T* item; + TTaggedPointer currentTaggedHead{}; + TTaggedPointer newTaggedHead; + do { + item = currentTaggedHead.first; + newTaggedHead = std::make_pair(nullptr, currentTaggedHead.second + 1); + } while (!CompareAndSet(&TaggedHead_, currentTaggedHead, newTaggedHead)); + return item; + } + +private: + using TAtomicUint128 = volatile unsigned __int128 __attribute__((aligned(16))); + using TTag = ui64; + using TTaggedPointer = std::pair<T*, TTag>; + + TAtomicUint128 TaggedHead_ = 0; + + // Avoid false sharing. + char Padding[CacheLineSize - sizeof(TAtomicUint128)]; + +private: + static Y_FORCE_INLINE bool CompareAndSet(TAtomicUint128* atomic, TTaggedPointer& expectedValue, TTaggedPointer newValue) + { + bool success; + __asm__ __volatile__ + ( + "lock cmpxchg16b %1\n" + "setz %0" + : "=q"(success) + , "+m"(*atomic) + , "+a"(expectedValue.first) + , "+d"(expectedValue.second) + : "b"(newValue.first) + , "c"(newValue.second) + : "cc" + ); + return success; + } +}; + +static_assert(sizeof(TFreeList<void>) == CacheLineSize, "sizeof(TFreeList) != CacheLineSize"); + +//////////////////////////////////////////////////////////////////////////////// + +constexpr size_t ShardCount = 16; +std::atomic<size_t> GlobalCurrentShardIndex; + +// Provides a context for working with sharded data structures. +// Captures the initial shard index upon construction (indicating the shard +// where all insertions go). Maintains the current shard index (round-robin, +// indicating the shard currently used for extraction). +// Can be or be not thread-safe depending on TCounter. +template <class TCounter> +class TShardedState +{ +public: + TShardedState() + : InitialShardIndex_(GlobalCurrentShardIndex++ % ShardCount) + , CurrentShardIndex_(InitialShardIndex_) + { } + + Y_FORCE_INLINE size_t GetInitialShardIndex() const + { + return InitialShardIndex_; + } + + Y_FORCE_INLINE size_t GetNextShardIndex() + { + return ++CurrentShardIndex_ % ShardCount; + } + +private: + const size_t InitialShardIndex_; + TCounter CurrentShardIndex_; +}; + +using TLocalShardedState = TShardedState<size_t>; +using TGlobalShardedState = TShardedState<std::atomic<size_t>>; + +// Implemented as a collection of free lists (each called a shard). +// One needs TShardedState to access the sharded data structure. +template <class T> +class TShardedFreeList +{ +public: + // First tries to extract an item from the initial shard; + // if failed then proceeds to all shards in round-robin fashion. + template <class TState> + T* Extract(TState* state) + { + if (auto* item = Shards_[state->GetInitialShardIndex()].Extract()) { + return item; + } + return ExtractRoundRobin(state); + } + + // Attempts to extract an item from all shards in round-robin fashion. + template <class TState> + T* ExtractRoundRobin(TState* state) + { + for (size_t index = 0; index < ShardCount; ++index) { + if (auto* item = Shards_[state->GetNextShardIndex()].Extract()) { + return item; + } + } + return nullptr; + } + + // Extracts items from all shards linking them together. + T* ExtractAll() + { + T* head = nullptr; + T* tail = nullptr; + for (auto& shard : Shards_) { + auto* item = shard.ExtractAll(); + if (!head) { + head = item; + } + if (tail) { + YTALLOC_PARANOID_ASSERT(!tail->Next); + tail->Next = item; + } else { + tail = item; + } + while (tail && tail->Next) { + tail = tail->Next; + } + } + return head; + } + + template <class TState> + void Put(TState* state, T* item) + { + Shards_[state->GetInitialShardIndex()].Put(item); + } + +private: + std::array<TFreeList<T>, ShardCount> Shards_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +// Holds YTAlloc control knobs. +// Thread safe. +class TConfigurationManager +{ +public: + void SetLargeUnreclaimableCoeff(double value) + { + LargeUnreclaimableCoeff_.store(value); + } + + double GetLargeUnreclaimableCoeff() const + { + return LargeUnreclaimableCoeff_.load(std::memory_order_relaxed); + } + + + void SetMinLargeUnreclaimableBytes(size_t value) + { + MinLargeUnreclaimableBytes_.store(value); + } + + void SetMaxLargeUnreclaimableBytes(size_t value) + { + MaxLargeUnreclaimableBytes_.store(value); + } + + size_t GetMinLargeUnreclaimableBytes() const + { + return MinLargeUnreclaimableBytes_.load(std::memory_order_relaxed); + } + + size_t GetMaxLargeUnreclaimableBytes() const + { + return MaxLargeUnreclaimableBytes_.load(std::memory_order_relaxed); + } + + + void SetTimingEventThreshold(TDuration value) + { + TimingEventThresholdNs_.store(value.MicroSeconds() * 1000); + } + + i64 GetTimingEventThresholdNs() const + { + return TimingEventThresholdNs_.load(std::memory_order_relaxed); + } + + + void SetAllocationProfilingEnabled(bool value); + + bool IsAllocationProfilingEnabled() const + { + return AllocationProfilingEnabled_.load(); + } + + + Y_FORCE_INLINE bool GetAllocationProfilingSamplingRate() + { + return AllocationProfilingSamplingRate_.load(); + } + + void SetAllocationProfilingSamplingRate(double rate) + { + if (rate < 0) { + rate = 0; + } + if (rate > 1) { + rate = 1; + } + i64 rateX64K = static_cast<i64>(rate * (1ULL << 16)); + AllocationProfilingSamplingRateX64K_.store(ClampVal<ui32>(rateX64K, 0, std::numeric_limits<ui16>::max() + 1)); + AllocationProfilingSamplingRate_.store(rate); + } + + + Y_FORCE_INLINE bool IsSmallArenaAllocationProfilingEnabled(size_t rank) + { + return SmallArenaAllocationProfilingEnabled_[rank].load(std::memory_order_relaxed); + } + + Y_FORCE_INLINE bool IsSmallArenaAllocationProfiled(size_t rank) + { + return IsSmallArenaAllocationProfilingEnabled(rank) && IsAllocationSampled(); + } + + void SetSmallArenaAllocationProfilingEnabled(size_t rank, bool value) + { + if (rank >= SmallRankCount) { + return; + } + SmallArenaAllocationProfilingEnabled_[rank].store(value); + } + + + Y_FORCE_INLINE bool IsLargeArenaAllocationProfilingEnabled(size_t rank) + { + return LargeArenaAllocationProfilingEnabled_[rank].load(std::memory_order_relaxed); + } + + Y_FORCE_INLINE bool IsLargeArenaAllocationProfiled(size_t rank) + { + return IsLargeArenaAllocationProfilingEnabled(rank) && IsAllocationSampled(); + } + + void SetLargeArenaAllocationProfilingEnabled(size_t rank, bool value) + { + if (rank >= LargeRankCount) { + return; + } + LargeArenaAllocationProfilingEnabled_[rank].store(value); + } + + + Y_FORCE_INLINE int GetProfilingBacktraceDepth() + { + return ProfilingBacktraceDepth_.load(); + } + + void SetProfilingBacktraceDepth(int depth) + { + if (depth < 1) { + return; + } + if (depth > MaxAllocationProfilingBacktraceDepth) { + depth = MaxAllocationProfilingBacktraceDepth; + } + ProfilingBacktraceDepth_.store(depth); + } + + + Y_FORCE_INLINE size_t GetMinProfilingBytesUsedToReport() + { + return MinProfilingBytesUsedToReport_.load(); + } + + void SetMinProfilingBytesUsedToReport(size_t size) + { + MinProfilingBytesUsedToReport_.store(size); + } + + void SetEnableEagerMemoryRelease(bool value) + { + EnableEagerMemoryRelease_.store(value); + } + + bool GetEnableEagerMemoryRelease() + { + return EnableEagerMemoryRelease_.load(std::memory_order_relaxed); + } + + void SetEnableMadvisePopulate(bool value) + { + EnableMadvisePopulate_.store(value); + } + + bool GetEnableMadvisePopulate() + { + return EnableMadvisePopulate_.load(std::memory_order_relaxed); + } + + void EnableStockpile() + { + StockpileEnabled_.store(true); + } + + bool IsStockpileEnabled() + { + return StockpileEnabled_.load(); + } + + void SetStockpileInterval(TDuration value) + { + StockpileInterval_.store(value); + } + + TDuration GetStockpileInterval() + { + return StockpileInterval_.load(); + } + + void SetStockpileThreadCount(int count) + { + StockpileThreadCount_.store(count); + } + + int GetStockpileThreadCount() + { + return ClampVal(StockpileThreadCount_.load(), 0, MaxStockpileThreadCount); + } + + void SetStockpileSize(size_t value) + { + StockpileSize_.store(value); + } + + size_t GetStockpileSize() + { + return StockpileSize_.load(); + } + +private: + std::atomic<double> LargeUnreclaimableCoeff_ = 0.05; + std::atomic<size_t> MinLargeUnreclaimableBytes_ = 128_MB; + std::atomic<size_t> MaxLargeUnreclaimableBytes_ = 10_GB; + std::atomic<i64> TimingEventThresholdNs_ = 10000000; // in ns, 10 ms by default + + std::atomic<bool> AllocationProfilingEnabled_ = false; + std::atomic<double> AllocationProfilingSamplingRate_ = 1.0; + std::atomic<ui32> AllocationProfilingSamplingRateX64K_ = std::numeric_limits<ui32>::max(); + std::array<std::atomic<bool>, SmallRankCount> SmallArenaAllocationProfilingEnabled_ = {}; + std::array<std::atomic<bool>, LargeRankCount> LargeArenaAllocationProfilingEnabled_ = {}; + std::atomic<int> ProfilingBacktraceDepth_ = 10; + std::atomic<size_t> MinProfilingBytesUsedToReport_ = 1_MB; + + std::atomic<bool> EnableEagerMemoryRelease_ = true; + std::atomic<bool> EnableMadvisePopulate_ = false; + + std::atomic<bool> StockpileEnabled_ = false; + std::atomic<TDuration> StockpileInterval_ = TDuration::MilliSeconds(10); + static constexpr int MaxStockpileThreadCount = 8; + std::atomic<int> StockpileThreadCount_ = 4; + std::atomic<size_t> StockpileSize_ = 1_GB; + +private: + bool IsAllocationSampled() + { + Y_POD_STATIC_THREAD(ui16) Counter; + return Counter++ < AllocationProfilingSamplingRateX64K_.load(); + } +}; + +TExplicitlyConstructableSingleton<TConfigurationManager> ConfigurationManager; + +//////////////////////////////////////////////////////////////////////////////// + +template <class TEvent, class TManager> +class TEventLogManagerBase +{ +public: + void DisableForCurrentThread() + { + TManager::DisabledForCurrentThread_ = true; + } + + template <class... TArgs> + void EnqueueEvent(TArgs&&... args) + { + if (TManager::DisabledForCurrentThread_) { + return; + } + + auto timestamp = TInstant::Now(); + auto fiberId = NYTAlloc::GetCurrentFiberId(); + auto guard = Guard(EventLock_); + + auto event = TEvent(args...); + OnEvent(event); + + if (EventCount_ >= EventBufferSize) { + return; + } + + auto& enqueuedEvent = Events_[EventCount_++]; + enqueuedEvent = std::move(event); + enqueuedEvent.Timestamp = timestamp; + enqueuedEvent.FiberId = fiberId; + } + + void RunBackgroundTasks() + { + if (LogManager->IsLoggingEnabled()) { + for (const auto& event : PullEvents()) { + ProcessEvent(event); + } + } + } + +protected: + NThreading::TForkAwareSpinLock EventLock_; + + virtual void OnEvent(const TEvent& event) = 0; + + virtual void ProcessEvent(const TEvent& event) = 0; + +private: + static constexpr size_t EventBufferSize = 1000; + size_t EventCount_ = 0; + std::array<TEvent, EventBufferSize> Events_; + + std::vector<TEvent> PullEvents() + { + std::vector<TEvent> events; + events.reserve(EventBufferSize); + + auto guard = Guard(EventLock_); + for (size_t index = 0; index < EventCount_; ++index) { + events.push_back(Events_[index]); + } + EventCount_ = 0; + return events; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct TTimingEvent +{ + ETimingEventType Type; + TDuration Duration; + size_t Size; + TInstant Timestamp; + TFiberId FiberId; + + TTimingEvent() + { } + + TTimingEvent( + ETimingEventType type, + TDuration duration, + size_t size) + : Type(type) + , Duration(duration) + , Size(size) + { } +}; + +class TTimingManager + : public TEventLogManagerBase<TTimingEvent, TTimingManager> +{ +public: + TEnumIndexedVector<ETimingEventType, TTimingEventCounters> GetTimingEventCounters() + { + auto guard = Guard(EventLock_); + return EventCounters_; + } + +private: + TEnumIndexedVector<ETimingEventType, TTimingEventCounters> EventCounters_; + + Y_POD_STATIC_THREAD(bool) DisabledForCurrentThread_; + + friend class TEventLogManagerBase<TTimingEvent, TTimingManager>; + + virtual void OnEvent(const TTimingEvent& event) override + { + auto& counters = EventCounters_[event.Type]; + counters.Count += 1; + counters.Size += event.Size; + } + + virtual void ProcessEvent(const TTimingEvent& event) override + { + YTALLOC_LOG_DEBUG("Timing event logged (Type: %s, Duration: %s, Size: %zu, Timestamp: %s, FiberId: %" PRIu64 ")", + ToString(event.Type).c_str(), + ToString(event.Duration).c_str(), + event.Size, + ToString(event.Timestamp).c_str(), + event.FiberId); + } +}; + +Y_POD_THREAD(bool) TTimingManager::DisabledForCurrentThread_; + +TExplicitlyConstructableSingleton<TTimingManager> TimingManager; + +//////////////////////////////////////////////////////////////////////////////// + +i64 GetElapsedNs(const struct timespec& startTime, const struct timespec& endTime) +{ + if (Y_LIKELY(startTime.tv_sec == endTime.tv_sec)) { + return static_cast<i64>(endTime.tv_nsec) - static_cast<i64>(startTime.tv_nsec); + } + + return + static_cast<i64>(endTime.tv_nsec) - static_cast<i64>(startTime.tv_nsec) + + (static_cast<i64>(endTime.tv_sec) - static_cast<i64>(startTime.tv_sec)) * 1000000000; +} + +// Used to log statistics about long-running syscalls and lock acquisitions. +class TTimingGuard + : public TNonCopyable +{ +public: + explicit TTimingGuard(ETimingEventType eventType, size_t size = 0) + : EventType_(eventType) + , Size_(size) + { + ::clock_gettime(CLOCK_MONOTONIC, &StartTime_); + } + + ~TTimingGuard() + { + auto elapsedNs = GetElapsedNs(); + if (elapsedNs > ConfigurationManager->GetTimingEventThresholdNs()) { + TimingManager->EnqueueEvent(EventType_, TDuration::MicroSeconds(elapsedNs / 1000), Size_); + } + } + +private: + const ETimingEventType EventType_; + const size_t Size_; + struct timespec StartTime_; + + i64 GetElapsedNs() const + { + struct timespec endTime; + ::clock_gettime(CLOCK_MONOTONIC, &endTime); + return NYTAlloc::GetElapsedNs(StartTime_, endTime); + } +}; + +template <class T> +Y_FORCE_INLINE TGuard<T> GuardWithTiming(const T& lock) +{ + TTimingGuard timingGuard(ETimingEventType::Locking); + TGuard<T> lockingGuard(lock); + return lockingGuard; +} + +//////////////////////////////////////////////////////////////////////////////// + +// A wrapper for mmap, mumap, and madvise calls. +// The latter are invoked with MADV_POPULATE (if enabled) and MADV_FREE flags +// and may fail if the OS support is missing. These failures are logged (once) and +// handled as follows: +// * if MADV_POPULATE fails then we fallback to manual per-page prefault +// for all subsequent attempts; +// * if MADV_FREE fails then it (and all subsequent attempts) is replaced with MADV_DONTNEED +// (which is non-lazy and is less efficient but will somehow do). +// Also this class mlocks all VMAs on startup to prevent pagefaults in our heavy binaries +// from disturbing latency tails. +class TMappedMemoryManager +{ +public: + void* Map(uintptr_t hint, size_t size, int flags) + { + TTimingGuard timingGuard(ETimingEventType::Mmap, size); + auto* result = ::mmap( + reinterpret_cast<void*>(hint), + size, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANONYMOUS | flags, + -1, + 0); + if (result == MAP_FAILED) { + auto error = errno; + if (error == EEXIST && (flags & MAP_FIXED_NOREPLACE)) { + // Caller must retry with different hint address. + return result; + } + YTALLOC_VERIFY(error == ENOMEM); + ::fprintf(stderr, "*** YTAlloc has received ENOMEM error while trying to mmap %zu bytes\n", + size); + OomTrap(); + } + return result; + } + + void Unmap(void* ptr, size_t size) + { + TTimingGuard timingGuard(ETimingEventType::Munmap, size); + auto result = ::munmap(ptr, size); + YTALLOC_VERIFY(result == 0); + } + + void DontDump(void* ptr, size_t size) + { + auto result = ::madvise(ptr, size, MADV_DONTDUMP); + // Must not fail. + YTALLOC_VERIFY(result == 0); + } + + void PopulateFile(void* ptr, size_t size) + { + TTimingGuard timingGuard(ETimingEventType::FilePrefault, size); + + auto* begin = static_cast<volatile char*>(ptr); + for (auto* current = begin; current < begin + size; current += PageSize) { + *current; + } + } + + void PopulateReadOnly(void* ptr, size_t size) + { + if (!MadvisePopulateUnavailable_.load(std::memory_order_relaxed) && + ConfigurationManager->GetEnableMadvisePopulate()) + { + if (!TryMadvisePopulate(ptr, size)) { + MadvisePopulateUnavailable_.store(true); + } + } + } + + void Populate(void* ptr, size_t size) + { + if (MadvisePopulateUnavailable_.load(std::memory_order_relaxed) || + !ConfigurationManager->GetEnableMadvisePopulate()) + { + DoPrefault(ptr, size); + } else if (!TryMadvisePopulate(ptr, size)) { + MadvisePopulateUnavailable_.store(true); + DoPrefault(ptr, size); + } + } + + void Release(void* ptr, size_t size) + { + if (CanUseMadviseFree() && !ConfigurationManager->GetEnableEagerMemoryRelease()) { + DoMadviseFree(ptr, size); + } else { + DoMadviseDontNeed(ptr, size); + } + } + + bool Stockpile(size_t size) + { + if (MadviseStockpileUnavailable_.load(std::memory_order_relaxed)) { + return false; + } + if (!TryMadviseStockpile(size)) { + MadviseStockpileUnavailable_.store(true); + return false; + } + return true; + } + + void RunBackgroundTasks() + { + if (!LogManager->IsLoggingEnabled()) { + return; + } + if (IsBuggyKernel() && !BuggyKernelLogged_) { + YTALLOC_LOG_WARNING("Kernel is buggy; see KERNEL-118"); + BuggyKernelLogged_ = true; + } + if (MadviseFreeSupported_ && !MadviseFreeSupportedLogged_) { + YTALLOC_LOG_INFO("MADV_FREE is supported"); + MadviseFreeSupportedLogged_ = true; + } + if (MadviseFreeNotSupported_ && !MadviseFreeNotSupportedLogged_) { + YTALLOC_LOG_WARNING("MADV_FREE is not supported"); + MadviseFreeNotSupportedLogged_ = true; + } + if (MadvisePopulateUnavailable_.load() && !MadvisePopulateUnavailableLogged_) { + YTALLOC_LOG_WARNING("MADV_POPULATE is not supported"); + MadvisePopulateUnavailableLogged_ = true; + } + if (MadviseStockpileUnavailable_.load() && !MadviseStockpileUnavailableLogged_) { + YTALLOC_LOG_WARNING("MADV_STOCKPILE is not supported"); + MadviseStockpileUnavailableLogged_ = true; + } + } + +private: + bool BuggyKernelLogged_ = false; + + std::atomic<bool> MadviseFreeSupported_ = false; + bool MadviseFreeSupportedLogged_ = false; + + std::atomic<bool> MadviseFreeNotSupported_ = false; + bool MadviseFreeNotSupportedLogged_ = false; + + std::atomic<bool> MadvisePopulateUnavailable_ = false; + bool MadvisePopulateUnavailableLogged_ = false; + + std::atomic<bool> MadviseStockpileUnavailable_ = false; + bool MadviseStockpileUnavailableLogged_ = false; + +private: + bool TryMadvisePopulate(void* ptr, size_t size) + { + TTimingGuard timingGuard(ETimingEventType::MadvisePopulate, size); + auto result = ::madvise(ptr, size, MADV_POPULATE); + if (result != 0) { + auto error = errno; + YTALLOC_VERIFY(error == EINVAL || error == ENOMEM); + if (error == ENOMEM) { + ::fprintf(stderr, "*** YTAlloc has received ENOMEM error while trying to madvise(MADV_POPULATE) %zu bytes\n", + size); + OomTrap(); + } + return false; + } + return true; + } + + void DoPrefault(void* ptr, size_t size) + { + TTimingGuard timingGuard(ETimingEventType::Prefault, size); + auto* begin = static_cast<char*>(ptr); + for (auto* current = begin; current < begin + size; current += PageSize) { + *current = 0; + } + } + + bool CanUseMadviseFree() + { + if (MadviseFreeSupported_.load()) { + return true; + } + if (MadviseFreeNotSupported_.load()) { + return false; + } + + if (IsBuggyKernel()) { + MadviseFreeNotSupported_.store(true); + } else { + auto* ptr = Map(0, PageSize, 0); + if (::madvise(ptr, PageSize, MADV_FREE) == 0) { + MadviseFreeSupported_.store(true); + } else { + MadviseFreeNotSupported_.store(true); + } + Unmap(ptr, PageSize); + } + + // Will not recurse. + return CanUseMadviseFree(); + } + + void DoMadviseDontNeed(void* ptr, size_t size) + { + TTimingGuard timingGuard(ETimingEventType::MadviseDontNeed, size); + auto result = ::madvise(ptr, size, MADV_DONTNEED); + if (result != 0) { + auto error = errno; + // Failure is possible for locked pages. + Y_VERIFY(error == EINVAL); + } + } + + void DoMadviseFree(void* ptr, size_t size) + { + TTimingGuard timingGuard(ETimingEventType::MadviseFree, size); + auto result = ::madvise(ptr, size, MADV_FREE); + if (result != 0) { + auto error = errno; + // Failure is possible for locked pages. + YTALLOC_VERIFY(error == EINVAL); + } + } + + bool TryMadviseStockpile(size_t size) + { + auto result = ::madvise(nullptr, size, MADV_STOCKPILE); + if (result != 0) { + auto error = errno; + if (error == ENOMEM || error == EAGAIN || error == EINTR) { + // The call is advisory, ignore ENOMEM, EAGAIN, and EINTR. + return true; + } + YTALLOC_VERIFY(error == EINVAL); + return false; + } + return true; + } + + // Some kernels are known to contain bugs in MADV_FREE; see https://st.yandex-team.ru/KERNEL-118. + bool IsBuggyKernel() + { +#ifdef _linux_ + static const bool result = [] () { + struct utsname buf; + YTALLOC_VERIFY(uname(&buf) == 0); + if (strverscmp(buf.release, "4.4.1-1") >= 0 && + strverscmp(buf.release, "4.4.96-44") < 0) + { + return true; + } + if (strverscmp(buf.release, "4.14.1-1") >= 0 && + strverscmp(buf.release, "4.14.79-33") < 0) + { + return true; + } + return false; + }(); + return result; +#else + return false; +#endif + } +}; + +TExplicitlyConstructableSingleton<TMappedMemoryManager> MappedMemoryManager; + +//////////////////////////////////////////////////////////////////////////////// +// System allocator + +// Each system allocation is prepended with such a header. +struct TSystemBlobHeader +{ + explicit TSystemBlobHeader(size_t size) + : Size(size) + { } + + size_t Size; + char Padding[8]; +}; + +CHECK_HEADER_ALIGNMENT(TSystemBlobHeader) + +// Used for some internal allocations. +// Delgates directly to TMappedMemoryManager. +class TSystemAllocator +{ +public: + void* Allocate(size_t size); + void Free(void* ptr); + +private: + std::atomic<uintptr_t> CurrentPtr_ = SystemZoneStart; +}; + +TExplicitlyConstructableSingleton<TSystemAllocator> SystemAllocator; + +//////////////////////////////////////////////////////////////////////////////// + +// Deriving from this class makes instances bound to TSystemAllocator. +struct TSystemAllocatable +{ + void* operator new(size_t size) noexcept + { + return SystemAllocator->Allocate(size); + } + + void* operator new[](size_t size) noexcept + { + return SystemAllocator->Allocate(size); + } + + void operator delete(void* ptr) noexcept + { + SystemAllocator->Free(ptr); + } + + void operator delete[](void* ptr) noexcept + { + SystemAllocator->Free(ptr); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +// Maintains a pool of objects. +// Objects are allocated in groups each containing BatchSize instances. +// The actual allocation is carried out by TSystemAllocator. +// Memory is never actually reclaimed; freed instances are put into TFreeList. +template <class T, size_t BatchSize> +class TSystemPool +{ +public: + T* Allocate() + { + while (true) { + auto* obj = FreeList_.Extract(); + if (Y_LIKELY(obj)) { + new (obj) T(); + return obj; + } + AllocateMore(); + } + } + + void Free(T* obj) + { + obj->T::~T(); + PoisonFreedRange(obj, sizeof(T)); + FreeList_.Put(obj); + } + +private: + TFreeList<T> FreeList_; + +private: + void AllocateMore() + { + auto* objs = static_cast<T*>(SystemAllocator->Allocate(sizeof(T) * BatchSize)); + for (size_t index = 0; index < BatchSize; ++index) { + auto* obj = objs + index; + FreeList_.Put(obj); + } + } +}; + +// A sharded analogue TSystemPool. +template <class T, size_t BatchSize> +class TShardedSystemPool +{ +public: + template <class TState> + T* Allocate(TState* state) + { + if (auto* obj = FreeLists_[state->GetInitialShardIndex()].Extract()) { + new (obj) T(); + return obj; + } + + while (true) { + for (size_t index = 0; index < ShardCount; ++index) { + if (auto* obj = FreeLists_[state->GetNextShardIndex()].Extract()) { + new (obj) T(); + return obj; + } + } + AllocateMore(); + } + } + + template <class TState> + void Free(TState* state, T* obj) + { + obj->T::~T(); + PoisonFreedRange(obj, sizeof(T)); + FreeLists_[state->GetInitialShardIndex()].Put(obj); + } + +private: + std::array<TFreeList<T>, ShardCount> FreeLists_; + +private: + void AllocateMore() + { + auto* objs = static_cast<T*>(SystemAllocator->Allocate(sizeof(T) * BatchSize)); + for (size_t index = 0; index < BatchSize; ++index) { + auto* obj = objs + index; + FreeLists_[index % ShardCount].Put(obj); + } + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +// Handles allocations inside a zone of memory given by its start and end pointers. +// Each allocation is a separate mapped region of memory. +// A special care is taken to guarantee that all allocated regions fall inside the zone. +class TZoneAllocator +{ +public: + TZoneAllocator(uintptr_t zoneStart, uintptr_t zoneEnd) + : ZoneStart_(zoneStart) + , ZoneEnd_(zoneEnd) + , Current_(zoneStart) + { + YTALLOC_VERIFY(ZoneStart_ % PageSize == 0); + } + + void* Allocate(size_t size, int flags) + { + YTALLOC_VERIFY(size % PageSize == 0); + bool restarted = false; + while (true) { + auto hint = (Current_ += size) - size; + if (reinterpret_cast<uintptr_t>(hint) + size > ZoneEnd_) { + if (restarted) { + ::fprintf(stderr, "*** YTAlloc was unable to mmap %zu bytes in zone %" PRIx64 "--%" PRIx64 "\n", + size, + ZoneStart_, + ZoneEnd_); + OomTrap(); + } + restarted = true; + Current_ = ZoneStart_; + } else { + char* ptr = static_cast<char*>(MappedMemoryManager->Map( + hint, + size, + MAP_FIXED_NOREPLACE | flags)); + if (reinterpret_cast<uintptr_t>(ptr) == hint) { + return ptr; + } + if (ptr != MAP_FAILED) { + MappedMemoryManager->Unmap(ptr, size); + } + } + } + } + + void Free(void* ptr, size_t size) + { + MappedMemoryManager->Unmap(ptr, size); + } + +private: + const uintptr_t ZoneStart_; + const uintptr_t ZoneEnd_; + + std::atomic<uintptr_t> Current_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +// YTAlloc supports tagged allocations. +// Since the total number of tags can be huge, a two-level scheme is employed. +// Possible tags are arranged into sets each containing TaggedCounterSetSize tags. +// There are up to MaxTaggedCounterSets in total. +// Upper 4 sets are reserved for profiled allocations. +constexpr size_t TaggedCounterSetSize = 16384; +constexpr size_t AllocationProfilingTaggedCounterSets = 4; +constexpr size_t MaxTaggedCounterSets = 256 + AllocationProfilingTaggedCounterSets; + +constexpr size_t MaxCapturedAllocationBacktraces = 65000; +static_assert( + MaxCapturedAllocationBacktraces < AllocationProfilingTaggedCounterSets * TaggedCounterSetSize, + "MaxCapturedAllocationBacktraces is too big"); + +constexpr TMemoryTag AllocationProfilingMemoryTagBase = TaggedCounterSetSize * (MaxTaggedCounterSets - AllocationProfilingTaggedCounterSets); +constexpr TMemoryTag AllocationProfilingUnknownMemoryTag = AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces; + +static_assert( + MaxMemoryTag == TaggedCounterSetSize * (MaxTaggedCounterSets - AllocationProfilingTaggedCounterSets) - 1, + "Wrong MaxMemoryTag"); + +template <class TCounter> +using TUntaggedTotalCounters = TEnumIndexedVector<EBasicCounter, TCounter>; + +template <class TCounter> +struct TTaggedTotalCounterSet + : public TSystemAllocatable +{ + std::array<TEnumIndexedVector<EBasicCounter, TCounter>, TaggedCounterSetSize> Counters; +}; + +using TLocalTaggedBasicCounterSet = TTaggedTotalCounterSet<ssize_t>; +using TGlobalTaggedBasicCounterSet = TTaggedTotalCounterSet<std::atomic<ssize_t>>; + +template <class TCounter> +struct TTotalCounters +{ + // The sum of counters across all tags. + TUntaggedTotalCounters<TCounter> CumulativeTaggedCounters; + + // Counters for untagged allocations. + TUntaggedTotalCounters<TCounter> UntaggedCounters; + + // Access to tagged counters may involve creation of a new tag set. + // For simplicity, we separate the read-side (TaggedCounterSets) and the write-side (TaggedCounterSetHolders). + // These arrays contain virtually identical data (up to std::unique_ptr and std::atomic semantic differences). + std::array<std::atomic<TTaggedTotalCounterSet<TCounter>*>, MaxTaggedCounterSets> TaggedCounterSets{}; + std::array<std::unique_ptr<TTaggedTotalCounterSet<TCounter>>, MaxTaggedCounterSets> TaggedCounterSetHolders; + + // Protects TaggedCounterSetHolders from concurrent updates. + NThreading::TForkAwareSpinLock TaggedCounterSetsLock; + + // Returns null if the set is not yet constructed. + Y_FORCE_INLINE TTaggedTotalCounterSet<TCounter>* FindTaggedCounterSet(size_t index) const + { + return TaggedCounterSets[index].load(); + } + + // Constructs the set on first access. + TTaggedTotalCounterSet<TCounter>* GetOrCreateTaggedCounterSet(size_t index) + { + auto* set = TaggedCounterSets[index].load(); + if (Y_LIKELY(set)) { + return set; + } + + auto guard = GuardWithTiming(TaggedCounterSetsLock); + auto& setHolder = TaggedCounterSetHolders[index]; + if (!setHolder) { + setHolder = std::make_unique<TTaggedTotalCounterSet<TCounter>>(); + TaggedCounterSets[index] = setHolder.get(); + } + return setHolder.get(); + } +}; + +using TLocalSystemCounters = TEnumIndexedVector<ESystemCounter, ssize_t>; +using TGlobalSystemCounters = TEnumIndexedVector<ESystemCounter, std::atomic<ssize_t>>; + +using TLocalSmallCounters = TEnumIndexedVector<ESmallArenaCounter, ssize_t>; +using TGlobalSmallCounters = TEnumIndexedVector<ESmallArenaCounter, std::atomic<ssize_t>>; + +using TLocalLargeCounters = TEnumIndexedVector<ELargeArenaCounter, ssize_t>; +using TGlobalLargeCounters = TEnumIndexedVector<ELargeArenaCounter, std::atomic<ssize_t>>; + +using TLocalHugeCounters = TEnumIndexedVector<EHugeCounter, ssize_t>; +using TGlobalHugeCounters = TEnumIndexedVector<EHugeCounter, std::atomic<ssize_t>>; + +using TLocalUndumpableCounters = TEnumIndexedVector<EUndumpableCounter, ssize_t>; +using TGlobalUndumpableCounters = TEnumIndexedVector<EUndumpableCounter, std::atomic<ssize_t>>; + +Y_FORCE_INLINE ssize_t LoadCounter(ssize_t counter) +{ + return counter; +} + +Y_FORCE_INLINE ssize_t LoadCounter(const std::atomic<ssize_t>& counter) +{ + return counter.load(); +} + +//////////////////////////////////////////////////////////////////////////////// + +struct TMmapObservationEvent +{ + size_t Size; + std::array<void*, MaxAllocationProfilingBacktraceDepth> Frames; + int FrameCount; + TInstant Timestamp; + TFiberId FiberId; + + TMmapObservationEvent() = default; + + TMmapObservationEvent( + size_t size, + std::array<void*, MaxAllocationProfilingBacktraceDepth> frames, + int frameCount) + : Size(size) + , Frames(frames) + , FrameCount(frameCount) + { } +}; + +class TMmapObservationManager + : public TEventLogManagerBase<TMmapObservationEvent, TMmapObservationManager> +{ +public: + void SetBacktraceFormatter(TBacktraceFormatter formatter) + { + BacktraceFormatter_.store(formatter); + } + +private: + std::atomic<TBacktraceFormatter> BacktraceFormatter_ = nullptr; + + Y_POD_STATIC_THREAD(bool) DisabledForCurrentThread_; + + friend class TEventLogManagerBase<TMmapObservationEvent, TMmapObservationManager>; + + virtual void OnEvent(const TMmapObservationEvent& /*event*/) override + { } + + virtual void ProcessEvent(const TMmapObservationEvent& event) override + { + YTALLOC_LOG_DEBUG("Large arena mmap observed (Size: %zu, Timestamp: %s, FiberId: %" PRIx64 ")", + event.Size, + ToString(event.Timestamp).c_str(), + event.FiberId); + + if (auto backtraceFormatter = BacktraceFormatter_.load()) { + auto backtrace = backtraceFormatter(const_cast<void**>(event.Frames.data()), event.FrameCount); + YTALLOC_LOG_DEBUG("YTAlloc stack backtrace (Stack: %s)", + backtrace.c_str()); + } + } +}; + +Y_POD_THREAD(bool) TMmapObservationManager::DisabledForCurrentThread_; + +TExplicitlyConstructableSingleton<TMmapObservationManager> MmapObservationManager; + +//////////////////////////////////////////////////////////////////////////////// + +// A per-thread structure containing counters, chunk caches etc. +struct TThreadState + : public TFreeListItem<TThreadState> + , public TLocalShardedState +{ + // TThreadState instances of all alive threads are put into a double-linked intrusive list. + // This is a pair of next/prev pointers connecting an instance of TThreadState to its neighbors. + TIntrusiveLinkedListNode<TThreadState> RegistryNode; + + // Pointers to the respective parts of TThreadManager::ThreadControlWord_. + // If null then the thread is already destroyed (but TThreadState may still live for a while + // due to ref-counting). + ui8* AllocationProfilingEnabled; + ui8* BackgroundThreadStarted; + + // TThreadStates are ref-counted. + // TThreadManager::EnumerateThreadStates enumerates the registered states and acquires + // a temporary reference preventing these states from being destructed. This provides + // for shorter periods of time the global lock needs to be held. + int RefCounter = 1; + + // Per-thread counters. + TTotalCounters<ssize_t> TotalCounters; + std::array<TLocalLargeCounters, LargeRankCount> LargeArenaCounters; + TLocalUndumpableCounters UndumpableCounters; + + // Each thread maintains caches of small chunks. + // One cache is for tagged chunks; the other is for untagged ones. + // Each cache contains up to MaxCachedChunksPerRank chunks per any rank. + // Special sentinels are placed to distinguish the boundaries of region containing + // pointers of a specific rank. This enables a tiny-bit faster inplace boundary checks. + + static constexpr uintptr_t LeftSentinel = 1; + static constexpr uintptr_t RightSentinel = 2; + + struct TSmallBlobCache + { + TSmallBlobCache() + { + void** chunkPtrs = CachedChunks.data(); + for (size_t rank = 0; rank < SmallRankCount; ++rank) { + RankToCachedChunkPtrHead[rank] = chunkPtrs; + chunkPtrs[0] = reinterpret_cast<void*>(LeftSentinel); + chunkPtrs[MaxCachedChunksPerRank + 1] = reinterpret_cast<void*>(RightSentinel); + +#ifdef YTALLOC_PARANOID + RankToCachedChunkPtrTail[rank] = chunkPtrs; + CachedChunkFull[rank] = false; + + RankToCachedChunkLeftBorder[rank] = chunkPtrs; + RankToCachedChunkRightBorder[rank] = chunkPtrs + MaxCachedChunksPerRank + 1; +#endif + chunkPtrs += MaxCachedChunksPerRank + 2; + } + } + + // For each rank we have a segment of pointers in CachedChunks with the following layout: + // LCC[C]........R + // Legend: + // . = garbage + // L = left sentinel + // R = right sentinel + // C = cached pointer + // [C] = current cached pointer + // + // Under YTALLOC_PARANOID the following layout is used: + // L.[T]CCC[H]...R + // Legend: + // [H] = head cached pointer, put chunks here + // [T] = tail cached pointer, take chunks from here + + // +2 is for two sentinels + std::array<void*, SmallRankCount * (MaxCachedChunksPerRank + 2)> CachedChunks{}; + + // Pointer to [P] for each rank. + std::array<void**, SmallRankCount> RankToCachedChunkPtrHead{}; + +#ifdef YTALLOC_PARANOID + // Pointers to [L] and [R] for each rank. + std::array<void**, SmallRankCount> RankToCachedChunkLeftBorder{}; + std::array<void**, SmallRankCount> RankToCachedChunkRightBorder{}; + + std::array<void**, SmallRankCount> RankToCachedChunkPtrTail{}; + std::array<bool, SmallRankCount> CachedChunkFull{}; +#endif + }; + TEnumIndexedVector<EAllocationKind, TSmallBlobCache> SmallBlobCache; +}; + +struct TThreadStateToRegistryNode +{ + auto operator() (TThreadState* state) const + { + return &state->RegistryNode; + } +}; + +// Manages all registered threads and controls access to TThreadState. +class TThreadManager +{ +public: + TThreadManager() + { + pthread_key_create(&ThreadDtorKey_, DestroyThread); + + NThreading::TForkAwareSpinLock::AtFork( + this, + nullptr, + nullptr, + &AfterFork); + } + + // Returns TThreadState for the current thread; the caller guarantees that this + // state is initialized and is not destroyed yet. + static TThreadState* GetThreadStateUnchecked(); + + // Returns TThreadState for the current thread; may return null. + static TThreadState* FindThreadState(); + + // Returns TThreadState for the current thread; may not return null + // (but may crash if TThreadState is already destroyed). + static TThreadState* GetThreadStateChecked() + { + auto* state = FindThreadState(); + YTALLOC_VERIFY(state); + return state; + } + + // Enumerates all threads and invokes func passing TThreadState instances. + // func must not throw but can take arbitrary time; no locks are being held while it executes. + template <class THandler> + void EnumerateThreadStatesAsync(const THandler& handler) noexcept + { + TMemoryTagGuard guard(NullMemoryTag); + + std::vector<TThreadState*> states; + states.reserve(1024); // must be enough in most cases + + auto unrefStates = [&] { + // Releasing references also requires global lock to be held to avoid getting zombies above. + auto guard = GuardWithTiming(ThreadRegistryLock_); + for (auto* state : states) { + UnrefThreadState(state); + } + }; + + auto tryRefStates = [&] { + // Only hold this guard for a small period of time to reference all the states. + auto guard = GuardWithTiming(ThreadRegistryLock_); + auto* current = ThreadRegistry_.GetFront(); + while (current) { + if (states.size() == states.capacity()) { + // Cannot allocate while holding ThreadRegistryLock_ due to a possible deadlock as follows: + // EnumerateThreadStatesAsync -> StartBackgroundThread -> EnumerateThreadStatesSync + // (many other scenarios are also possible). + guard.Release(); + unrefStates(); + states.clear(); + states.reserve(states.capacity() * 2); + return false; + } + RefThreadState(current); + states.push_back(current); + current = current->RegistryNode.Next; + } + return true; + }; + + while (!tryRefStates()) ; + + for (auto* state : states) { + handler(state); + } + + unrefStates(); + } + + // Similar to EnumerateThreadStatesAsync but holds the global lock while enumerating the threads. + // Also invokes a given prologue functor while holding the thread registry lock. + // Handler and prologue calls must be fast and must not allocate. + template <class TPrologue, class THandler> + void EnumerateThreadStatesSync(const TPrologue& prologue, const THandler& handler) noexcept + { + auto guard = GuardWithTiming(ThreadRegistryLock_); + prologue(); + auto* current = ThreadRegistry_.GetFront(); + while (current) { + handler(current); + current = current->RegistryNode.Next; + } + } + + + // We store a special 64-bit "thread control word" in TLS encapsulating the following + // crucial per-thread parameters: + // * the current memory tag + // * a flag indicating that a valid TThreadState is known to exists + // (and can be obtained via GetThreadStateUnchecked) + // * a flag indicating that allocation profiling is enabled + // * a flag indicating that background thread is started + // Thread control word is fetched via GetThreadControlWord and is compared + // against FastPathControlWord to see if the fast path can be taken. + // The latter happens when no memory tagging is configured, TThreadState is + // valid, allocation profiling is disabled, and background thread is started. + + // The mask for extracting memory tag from thread control word. + static constexpr ui64 MemoryTagControlWordMask = 0xffffffff; + // ThreadStateValid is on. + static constexpr ui64 ThreadStateValidControlWordMask = (1ULL << 32); + // AllocationProfiling is on. + static constexpr ui64 AllocationProfilingEnabledControlWordMask = (1ULL << 40); + // All background thread are properly started. + static constexpr ui64 BackgroundThreadStartedControlWorkMask = (1ULL << 48); + // Memory tag is NullMemoryTag; thread state is valid. + static constexpr ui64 FastPathControlWord = + BackgroundThreadStartedControlWorkMask | + ThreadStateValidControlWordMask | + NullMemoryTag; + + Y_FORCE_INLINE static ui64 GetThreadControlWord() + { + return (&ThreadControlWord_)->Value; + } + + + static TMemoryTag GetCurrentMemoryTag() + { + return (&ThreadControlWord_)->Parts.MemoryTag; + } + + static void SetCurrentMemoryTag(TMemoryTag tag) + { + Y_VERIFY(tag <= MaxMemoryTag); + (&ThreadControlWord_)->Parts.MemoryTag = tag; + } + + + static EMemoryZone GetCurrentMemoryZone() + { + return CurrentMemoryZone_; + } + + static void SetCurrentMemoryZone(EMemoryZone zone) + { + CurrentMemoryZone_ = zone; + } + + + static void SetCurrentFiberId(TFiberId id) + { + CurrentFiberId_ = id; + } + + static TFiberId GetCurrentFiberId() + { + return CurrentFiberId_; + } + +private: + static void DestroyThread(void*); + + TThreadState* AllocateThreadState(); + + void RefThreadState(TThreadState* state) + { + auto result = ++state->RefCounter; + Y_VERIFY(result > 1); + } + + void UnrefThreadState(TThreadState* state) + { + auto result = --state->RefCounter; + Y_VERIFY(result >= 0); + if (result == 0) { + DestroyThreadState(state); + } + } + + void DestroyThreadState(TThreadState* state); + + static void AfterFork(void* cookie); + void DoAfterFork(); + +private: + // TThreadState instance for the current thread. + // Initially null, then initialized when first needed. + // TThreadState is destroyed upon thread termination (which is detected with + // the help of pthread_key_create machinery), so this pointer can become null again. + Y_POD_STATIC_THREAD(TThreadState*) ThreadState_; + + // Initially false, then set to true then TThreadState is destroyed. + // If the thread requests for its state afterwards, null is returned and no new state is (re-)created. + // The caller must be able to deal with it. + Y_POD_STATIC_THREAD(bool) ThreadStateDestroyed_; + + union TThreadControlWord + { + ui64 __attribute__((__may_alias__)) Value; + struct TParts + { + // The current memory tag used in all allocations by this thread. + ui32 __attribute__((__may_alias__)) MemoryTag; + // Indicates if a valid TThreadState exists and can be obtained via GetThreadStateUnchecked. + ui8 __attribute__((__may_alias__)) ThreadStateValid; + // Indicates if allocation profiling is on. + ui8 __attribute__((__may_alias__)) AllocationProfilingEnabled; + // Indicates if all background threads are properly started. + ui8 __attribute__((__may_alias__)) BackgroundThreadStarted; + ui8 Padding[2]; + } Parts; + }; + Y_POD_STATIC_THREAD(TThreadControlWord) ThreadControlWord_; + + // See memory zone API. + Y_POD_STATIC_THREAD(EMemoryZone) CurrentMemoryZone_; + + // See fiber id API. + Y_POD_STATIC_THREAD(TFiberId) CurrentFiberId_; + + pthread_key_t ThreadDtorKey_; + + static constexpr size_t ThreadStatesBatchSize = 1; + TSystemPool<TThreadState, ThreadStatesBatchSize> ThreadStatePool_; + + NThreading::TForkAwareSpinLock ThreadRegistryLock_; + TIntrusiveLinkedList<TThreadState, TThreadStateToRegistryNode> ThreadRegistry_; +}; + +Y_POD_THREAD(TThreadState*) TThreadManager::ThreadState_; +Y_POD_THREAD(bool) TThreadManager::ThreadStateDestroyed_; +Y_POD_THREAD(TThreadManager::TThreadControlWord) TThreadManager::ThreadControlWord_; +Y_POD_THREAD(EMemoryZone) TThreadManager::CurrentMemoryZone_; +Y_POD_THREAD(TFiberId) TThreadManager::CurrentFiberId_; + +TExplicitlyConstructableSingleton<TThreadManager> ThreadManager; + +//////////////////////////////////////////////////////////////////////////////// + +void TConfigurationManager::SetAllocationProfilingEnabled(bool value) +{ + // Update threads' TLS. + ThreadManager->EnumerateThreadStatesSync( + [&] { + AllocationProfilingEnabled_.store(value); + }, + [&] (auto* state) { + if (state->AllocationProfilingEnabled) { + *state->AllocationProfilingEnabled = value; + } + }); +} + +//////////////////////////////////////////////////////////////////////////////// +// Backtrace Manager +// +// Captures backtraces observed during allocations and assigns memory tags to them. +// Memory tags are chosen sequentially starting from AllocationProfilingMemoryTagBase. +// +// For each backtrace we compute a 64-bit hash and use it as a key in a certain concurrent hashmap. +// This hashmap is organized into BucketCount buckets, each consisting of BucketSize slots. +// +// Backtrace hash is translated into bucket index by taking the appropriate number of +// its lower bits. For each slot, we remember a 32-bit fingerprint, which is +// just the next 32 bits of the backtrace's hash, and the (previously assigned) memory tag. +// +// Upon access to the hashtable, the bucket is first scanned optimistically, without taking +// any locks. In case of a miss, a per-bucket spinlock is acquired and the bucket is rescanned. +// +// The above scheme may involve collisions but we neglect their probability. +// +// If the whole hash table overflows (i.e. a total of MaxCapturedAllocationBacktraces +// backtraces are captured) or the bucket overflows (i.e. all of its slots become occupied), +// the allocation is annotated with AllocationProfilingUnknownMemoryTag. Such allocations +// appear as having no backtrace whatsoever in the profiling reports. + +class TBacktraceManager +{ +public: + // Sets the provider used for collecting backtraces when allocation profiling + // is turned ON. + void SetBacktraceProvider(TBacktraceProvider provider) + { + BacktraceProvider_.store(provider); + } + + // Captures the backtrace and inserts it into the hashtable. + TMemoryTag GetMemoryTagFromBacktrace(int framesToSkip) + { + std::array<void*, MaxAllocationProfilingBacktraceDepth> frames; + auto backtraceProvider = BacktraceProvider_.load(); + if (!backtraceProvider) { + return NullMemoryTag; + } + auto frameCount = backtraceProvider(frames.data(), ConfigurationManager->GetProfilingBacktraceDepth(), framesToSkip); + auto hash = GetBacktraceHash(frames.data(), frameCount); + return CaptureBacktrace(hash, frames.data(), frameCount); + } + + // Returns the backtrace corresponding to the given tag, if any. + std::optional<TBacktrace> FindBacktrace(TMemoryTag tag) + { + if (tag < AllocationProfilingMemoryTagBase || + tag >= AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces) + { + return std::nullopt; + } + const auto& entry = Backtraces_[tag - AllocationProfilingMemoryTagBase]; + if (!entry.Captured.load()) { + return std::nullopt; + } + return entry.Backtrace; + } + +private: + static constexpr int Log2BucketCount = 16; + static constexpr int BucketCount = 1 << Log2BucketCount; + static constexpr int BucketSize = 8; + + std::atomic<TBacktraceProvider> BacktraceProvider_ = nullptr; + + std::array<std::array<std::atomic<ui32>, BucketSize>, BucketCount> Fingerprints_= {}; + std::array<std::array<std::atomic<TMemoryTag>, BucketSize>, BucketCount> MemoryTags_ = {}; + std::array<NThreading::TForkAwareSpinLock, BucketCount> BucketLocks_; + std::atomic<TMemoryTag> CurrentMemoryTag_ = AllocationProfilingMemoryTagBase; + + struct TBacktraceEntry + { + TBacktrace Backtrace; + std::atomic<bool> Captured = false; + }; + + std::array<TBacktraceEntry, MaxCapturedAllocationBacktraces> Backtraces_; + +private: + static size_t GetBacktraceHash(void** frames, int frameCount) + { + size_t hash = 0; + for (int index = 0; index < frameCount; ++index) { + hash = CombineHashes(hash, THash<void*>()(frames[index])); + } + return hash; + } + + TMemoryTag CaptureBacktrace(size_t hash, void** frames, int frameCount) + { + size_t bucketIndex = hash % BucketCount; + ui32 fingerprint = (hash >> Log2BucketCount) & 0xffffffff; + // Zero fingerprint indicates the slot is free; check and adjust to ensure + // that regular fingerprints are non-zero. + if (fingerprint == 0) { + fingerprint = 1; + } + + for (int slotIndex = 0; slotIndex < BucketSize; ++slotIndex) { + auto currentFingerprint = Fingerprints_[bucketIndex][slotIndex].load(std::memory_order_relaxed); + if (currentFingerprint == fingerprint) { + return MemoryTags_[bucketIndex][slotIndex].load(); + } + } + + auto guard = Guard(BucketLocks_[bucketIndex]); + + int spareSlotIndex = -1; + for (int slotIndex = 0; slotIndex < BucketSize; ++slotIndex) { + auto currentFingerprint = Fingerprints_[bucketIndex][slotIndex].load(std::memory_order_relaxed); + if (currentFingerprint == fingerprint) { + return MemoryTags_[bucketIndex][slotIndex]; + } + if (currentFingerprint == 0) { + spareSlotIndex = slotIndex; + break; + } + } + + if (spareSlotIndex < 0) { + return AllocationProfilingUnknownMemoryTag; + } + + auto memoryTag = CurrentMemoryTag_++; + if (memoryTag >= AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces) { + return AllocationProfilingUnknownMemoryTag; + } + + MemoryTags_[bucketIndex][spareSlotIndex].store(memoryTag); + Fingerprints_[bucketIndex][spareSlotIndex].store(fingerprint); + + auto& entry = Backtraces_[memoryTag - AllocationProfilingMemoryTagBase]; + entry.Backtrace.FrameCount = frameCount; + ::memcpy(entry.Backtrace.Frames.data(), frames, sizeof (void*) * frameCount); + entry.Captured.store(true); + + return memoryTag; + } +}; + +TExplicitlyConstructableSingleton<TBacktraceManager> BacktraceManager; + +//////////////////////////////////////////////////////////////////////////////// + +// Mimics the counters of TThreadState but uses std::atomic to survive concurrent access. +struct TGlobalState + : public TGlobalShardedState +{ + TTotalCounters<std::atomic<ssize_t>> TotalCounters; + std::array<TGlobalLargeCounters, LargeRankCount> LargeArenaCounters; + TGlobalUndumpableCounters UndumpableCounters; +}; + +TExplicitlyConstructableSingleton<TGlobalState> GlobalState; + +//////////////////////////////////////////////////////////////////////////////// + +// Accumulates various allocation statistics. +class TStatisticsManager +{ +public: + template <EAllocationKind Kind = EAllocationKind::Tagged, class TState> + static Y_FORCE_INLINE void IncrementTotalCounter(TState* state, TMemoryTag tag, EBasicCounter counter, ssize_t delta) + { + // This branch is typically resolved at compile time. + if (Kind == EAllocationKind::Tagged && tag != NullMemoryTag) { + IncrementTaggedTotalCounter(&state->TotalCounters, tag, counter, delta); + } else { + IncrementUntaggedTotalCounter(&state->TotalCounters, counter, delta); + } + } + + static Y_FORCE_INLINE void IncrementTotalCounter(TMemoryTag tag, EBasicCounter counter, ssize_t delta) + { + IncrementTotalCounter(GlobalState.Get(), tag, counter, delta); + } + + void IncrementSmallArenaCounter(ESmallArenaCounter counter, size_t rank, ssize_t delta) + { + SmallArenaCounters_[rank][counter] += delta; + } + + template <class TState> + static Y_FORCE_INLINE void IncrementLargeArenaCounter(TState* state, size_t rank, ELargeArenaCounter counter, ssize_t delta) + { + state->LargeArenaCounters[rank][counter] += delta; + } + + template <class TState> + static Y_FORCE_INLINE void IncrementUndumpableCounter(TState* state, EUndumpableCounter counter, ssize_t delta) + { + state->UndumpableCounters[counter] += delta; + } + + void IncrementHugeCounter(EHugeCounter counter, ssize_t delta) + { + HugeCounters_[counter] += delta; + } + + void IncrementHugeUndumpableCounter(EUndumpableCounter counter, ssize_t delta) + { + HugeUndumpableCounters_[counter] += delta; + } + + void IncrementSystemCounter(ESystemCounter counter, ssize_t delta) + { + SystemCounters_[counter] += delta; + } + + // Computes memory usage for a list of tags by aggregating counters across threads. + void GetTaggedMemoryCounters(const TMemoryTag* tags, size_t count, TEnumIndexedVector<EBasicCounter, ssize_t>* counters) + { + TMemoryTagGuard guard(NullMemoryTag); + + for (size_t index = 0; index < count; ++index) { + counters[index][EBasicCounter::BytesAllocated] = 0; + counters[index][EBasicCounter::BytesFreed] = 0; + } + + for (size_t index = 0; index < count; ++index) { + auto tag = tags[index]; + counters[index][EBasicCounter::BytesAllocated] += LoadTaggedTotalCounter(GlobalState->TotalCounters, tag, EBasicCounter::BytesAllocated); + counters[index][EBasicCounter::BytesFreed] += LoadTaggedTotalCounter(GlobalState->TotalCounters, tag, EBasicCounter::BytesFreed); + } + + ThreadManager->EnumerateThreadStatesAsync( + [&] (const auto* state) { + for (size_t index = 0; index < count; ++index) { + auto tag = tags[index]; + counters[index][EBasicCounter::BytesAllocated] += LoadTaggedTotalCounter(state->TotalCounters, tag, EBasicCounter::BytesAllocated); + counters[index][EBasicCounter::BytesFreed] += LoadTaggedTotalCounter(state->TotalCounters, tag, EBasicCounter::BytesFreed); + } + }); + + for (size_t index = 0; index < count; ++index) { + counters[index][EBasicCounter::BytesUsed] = GetUsed(counters[index][EBasicCounter::BytesAllocated], counters[index][EBasicCounter::BytesFreed]); + } + } + + void GetTaggedMemoryUsage(const TMemoryTag* tags, size_t count, size_t* results) + { + TMemoryTagGuard guard(NullMemoryTag); + + std::vector<TEnumIndexedVector<EBasicCounter, ssize_t>> counters; + counters.resize(count); + GetTaggedMemoryCounters(tags, count, counters.data()); + + for (size_t index = 0; index < count; ++index) { + results[index] = counters[index][EBasicCounter::BytesUsed]; + } + } + + TEnumIndexedVector<ETotalCounter, ssize_t> GetTotalAllocationCounters() + { + TEnumIndexedVector<ETotalCounter, ssize_t> result; + + auto accumulate = [&] (const auto& counters) { + result[ETotalCounter::BytesAllocated] += LoadCounter(counters[EBasicCounter::BytesAllocated]); + result[ETotalCounter::BytesFreed] += LoadCounter(counters[EBasicCounter::BytesFreed]); + }; + + accumulate(GlobalState->TotalCounters.UntaggedCounters); + accumulate(GlobalState->TotalCounters.CumulativeTaggedCounters); + + ThreadManager->EnumerateThreadStatesAsync( + [&] (const auto* state) { + accumulate(state->TotalCounters.UntaggedCounters); + accumulate(state->TotalCounters.CumulativeTaggedCounters); + }); + + result[ETotalCounter::BytesUsed] = GetUsed( + result[ETotalCounter::BytesAllocated], + result[ETotalCounter::BytesFreed]); + + auto systemCounters = GetSystemAllocationCounters(); + result[ETotalCounter::BytesCommitted] += systemCounters[EBasicCounter::BytesUsed]; + + auto hugeCounters = GetHugeAllocationCounters(); + result[ETotalCounter::BytesCommitted] += hugeCounters[EHugeCounter::BytesUsed]; + + auto smallArenaCounters = GetSmallArenaAllocationCounters(); + for (size_t rank = 0; rank < SmallRankCount; ++rank) { + result[ETotalCounter::BytesCommitted] += smallArenaCounters[rank][ESmallArenaCounter::BytesCommitted]; + } + + auto largeArenaCounters = GetLargeArenaAllocationCounters(); + for (size_t rank = 0; rank < LargeRankCount; ++rank) { + result[ETotalCounter::BytesCommitted] += largeArenaCounters[rank][ELargeArenaCounter::BytesCommitted]; + } + + result[ETotalCounter::BytesUnaccounted] = std::max<ssize_t>(GetProcessRss() - result[ETotalCounter::BytesCommitted], 0); + + return result; + } + + TEnumIndexedVector<ESmallCounter, ssize_t> GetSmallAllocationCounters() + { + TEnumIndexedVector<ESmallCounter, ssize_t> result; + + auto totalCounters = GetTotalAllocationCounters(); + result[ESmallCounter::BytesAllocated] = totalCounters[ETotalCounter::BytesAllocated]; + result[ESmallCounter::BytesFreed] = totalCounters[ETotalCounter::BytesFreed]; + result[ESmallCounter::BytesUsed] = totalCounters[ETotalCounter::BytesUsed]; + + auto largeArenaCounters = GetLargeArenaAllocationCounters(); + for (size_t rank = 0; rank < LargeRankCount; ++rank) { + result[ESmallCounter::BytesAllocated] -= largeArenaCounters[rank][ELargeArenaCounter::BytesAllocated]; + result[ESmallCounter::BytesFreed] -= largeArenaCounters[rank][ELargeArenaCounter::BytesFreed]; + result[ESmallCounter::BytesUsed] -= largeArenaCounters[rank][ELargeArenaCounter::BytesUsed]; + } + + auto hugeCounters = GetHugeAllocationCounters(); + result[ESmallCounter::BytesAllocated] -= hugeCounters[EHugeCounter::BytesAllocated]; + result[ESmallCounter::BytesFreed] -= hugeCounters[EHugeCounter::BytesFreed]; + result[ESmallCounter::BytesUsed] -= hugeCounters[EHugeCounter::BytesUsed]; + + return result; + } + + std::array<TLocalSmallCounters, SmallRankCount> GetSmallArenaAllocationCounters() + { + std::array<TLocalSmallCounters, SmallRankCount> result; + for (size_t rank = 0; rank < SmallRankCount; ++rank) { + for (auto counter : TEnumTraits<ESmallArenaCounter>::GetDomainValues()) { + result[rank][counter] = SmallArenaCounters_[rank][counter].load(); + } + } + return result; + } + + TEnumIndexedVector<ELargeCounter, ssize_t> GetLargeAllocationCounters() + { + TEnumIndexedVector<ELargeCounter, ssize_t> result; + auto largeArenaCounters = GetLargeArenaAllocationCounters(); + for (size_t rank = 0; rank < LargeRankCount; ++rank) { + result[ESmallCounter::BytesAllocated] += largeArenaCounters[rank][ELargeArenaCounter::BytesAllocated]; + result[ESmallCounter::BytesFreed] += largeArenaCounters[rank][ELargeArenaCounter::BytesFreed]; + result[ESmallCounter::BytesUsed] += largeArenaCounters[rank][ELargeArenaCounter::BytesUsed]; + } + return result; + } + + std::array<TLocalLargeCounters, LargeRankCount> GetLargeArenaAllocationCounters() + { + std::array<TLocalLargeCounters, LargeRankCount> result{}; + + for (size_t rank = 0; rank < LargeRankCount; ++rank) { + for (auto counter : TEnumTraits<ELargeArenaCounter>::GetDomainValues()) { + result[rank][counter] = GlobalState->LargeArenaCounters[rank][counter].load(); + } + } + + ThreadManager->EnumerateThreadStatesAsync( + [&] (const auto* state) { + for (size_t rank = 0; rank < LargeRankCount; ++rank) { + for (auto counter : TEnumTraits<ELargeArenaCounter>::GetDomainValues()) { + result[rank][counter] += state->LargeArenaCounters[rank][counter]; + } + } + }); + + for (size_t rank = 0; rank < LargeRankCount; ++rank) { + result[rank][ELargeArenaCounter::BytesUsed] = GetUsed(result[rank][ELargeArenaCounter::BytesAllocated], result[rank][ELargeArenaCounter::BytesFreed]); + result[rank][ELargeArenaCounter::BlobsUsed] = GetUsed(result[rank][ELargeArenaCounter::BlobsAllocated], result[rank][ELargeArenaCounter::BlobsFreed]); + } + + return result; + } + + TLocalSystemCounters GetSystemAllocationCounters() + { + TLocalSystemCounters result; + for (auto counter : TEnumTraits<ESystemCounter>::GetDomainValues()) { + result[counter] = SystemCounters_[counter].load(); + } + result[ESystemCounter::BytesUsed] = GetUsed(result[ESystemCounter::BytesAllocated], result[ESystemCounter::BytesFreed]); + return result; + } + + TLocalHugeCounters GetHugeAllocationCounters() + { + TLocalHugeCounters result; + for (auto counter : TEnumTraits<EHugeCounter>::GetDomainValues()) { + result[counter] = HugeCounters_[counter].load(); + } + result[EHugeCounter::BytesUsed] = GetUsed(result[EHugeCounter::BytesAllocated], result[EHugeCounter::BytesFreed]); + result[EHugeCounter::BlobsUsed] = GetUsed(result[EHugeCounter::BlobsAllocated], result[EHugeCounter::BlobsFreed]); + return result; + } + + TLocalUndumpableCounters GetUndumpableAllocationCounters() + { + TLocalUndumpableCounters result; + for (auto counter : TEnumTraits<EUndumpableCounter>::GetDomainValues()) { + result[counter] = HugeUndumpableCounters_[counter].load(); + result[counter] += GlobalState->UndumpableCounters[counter].load(); + } + + ThreadManager->EnumerateThreadStatesAsync( + [&] (const auto* state) { + result[EUndumpableCounter::BytesAllocated] += LoadCounter(state->UndumpableCounters[EUndumpableCounter::BytesAllocated]); + result[EUndumpableCounter::BytesFreed] += LoadCounter(state->UndumpableCounters[EUndumpableCounter::BytesFreed]); + }); + + result[EUndumpableCounter::BytesUsed] = GetUsed(result[EUndumpableCounter::BytesAllocated], result[EUndumpableCounter::BytesFreed]); + return result; + } + + // Called before TThreadState is destroyed. + // Adds the counter values from TThreadState to the global counters. + void AccumulateLocalCounters(TThreadState* state) + { + for (auto counter : TEnumTraits<EBasicCounter>::GetDomainValues()) { + GlobalState->TotalCounters.CumulativeTaggedCounters[counter] += state->TotalCounters.CumulativeTaggedCounters[counter]; + GlobalState->TotalCounters.UntaggedCounters[counter] += state->TotalCounters.UntaggedCounters[counter]; + } + for (size_t index = 0; index < MaxTaggedCounterSets; ++index) { + const auto* localSet = state->TotalCounters.FindTaggedCounterSet(index); + if (!localSet) { + continue; + } + auto* globalSet = GlobalState->TotalCounters.GetOrCreateTaggedCounterSet(index); + for (size_t jndex = 0; jndex < TaggedCounterSetSize; ++jndex) { + for (auto counter : TEnumTraits<EBasicCounter>::GetDomainValues()) { + globalSet->Counters[jndex][counter] += localSet->Counters[jndex][counter]; + } + } + } + for (size_t rank = 0; rank < LargeRankCount; ++rank) { + for (auto counter : TEnumTraits<ELargeArenaCounter>::GetDomainValues()) { + GlobalState->LargeArenaCounters[rank][counter] += state->LargeArenaCounters[rank][counter]; + } + } + for (auto counter : TEnumTraits<EUndumpableCounter>::GetDomainValues()) { + GlobalState->UndumpableCounters[counter] += state->UndumpableCounters[counter]; + } + } + +private: + template <class TCounter> + static ssize_t LoadTaggedTotalCounter(const TTotalCounters<TCounter>& counters, TMemoryTag tag, EBasicCounter counter) + { + const auto* set = counters.FindTaggedCounterSet(tag / TaggedCounterSetSize); + if (Y_UNLIKELY(!set)) { + return 0; + } + return LoadCounter(set->Counters[tag % TaggedCounterSetSize][counter]); + } + + template <class TCounter> + static Y_FORCE_INLINE void IncrementUntaggedTotalCounter(TTotalCounters<TCounter>* counters, EBasicCounter counter, ssize_t delta) + { + counters->UntaggedCounters[counter] += delta; + } + + template <class TCounter> + static Y_FORCE_INLINE void IncrementTaggedTotalCounter(TTotalCounters<TCounter>* counters, TMemoryTag tag, EBasicCounter counter, ssize_t delta) + { + counters->CumulativeTaggedCounters[counter] += delta; + auto* set = counters->GetOrCreateTaggedCounterSet(tag / TaggedCounterSetSize); + set->Counters[tag % TaggedCounterSetSize][counter] += delta; + } + + + static ssize_t GetProcessRss() + { + auto* file = ::fopen("/proc/self/statm", "r"); + if (!file) { + return 0; + } + + ssize_t dummy; + ssize_t rssPages; + auto readResult = fscanf(file, "%zd %zd", &dummy, &rssPages); + + ::fclose(file); + + if (readResult != 2) { + return 0; + } + + return rssPages * PageSize; + } + +private: + TGlobalSystemCounters SystemCounters_; + std::array<TGlobalSmallCounters, SmallRankCount> SmallArenaCounters_; + TGlobalHugeCounters HugeCounters_; + TGlobalUndumpableCounters HugeUndumpableCounters_; +}; + +TExplicitlyConstructableSingleton<TStatisticsManager> StatisticsManager; + +//////////////////////////////////////////////////////////////////////////////// + +void* TSystemAllocator::Allocate(size_t size) +{ + auto rawSize = GetRawBlobSize<TSystemBlobHeader>(size); + void* mmappedPtr; + while (true) { + auto currentPtr = CurrentPtr_.fetch_add(rawSize); + Y_VERIFY(currentPtr + rawSize <= SystemZoneEnd); + mmappedPtr = MappedMemoryManager->Map( + currentPtr, + rawSize, + MAP_FIXED_NOREPLACE | MAP_POPULATE); + if (mmappedPtr == reinterpret_cast<void*>(currentPtr)) { + break; + } + if (mmappedPtr != MAP_FAILED) { + MappedMemoryManager->Unmap(mmappedPtr, rawSize); + } + } + auto* blob = static_cast<TSystemBlobHeader*>(mmappedPtr); + new (blob) TSystemBlobHeader(size); + auto* result = HeaderToPtr(blob); + PoisonUninitializedRange(result, size); + StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, rawSize); + return result; +} + +void TSystemAllocator::Free(void* ptr) +{ + auto* blob = PtrToHeader<TSystemBlobHeader>(ptr); + auto rawSize = GetRawBlobSize<TSystemBlobHeader>(blob->Size); + MappedMemoryManager->Unmap(blob, rawSize); + StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesFreed, rawSize); +} + +//////////////////////////////////////////////////////////////////////////////// +// Small allocator +// +// Allocations (called small chunks) are grouped by their sizes. Two most-significant binary digits are +// used to determine the rank of a chunk, which guarantees 25% overhead in the worst case. +// A pair of helper arrays (SizeToSmallRank1 and SizeToSmallRank2) are used to compute ranks; we expect +// them to be permanently cached. +// +// Chunks of the same rank are served by a (small) arena allocator. +// In fact, there are two arenas for each rank: one is for tagged allocations and another is for untagged ones. +// +// We encode chunk's rank and whether it is tagged or not in the resulting pointer as follows: +// 0- 3: must be zero due to alignment +// 4-39: varies +// 40-44: rank +// 45: 0 for untagged allocations, 1 for tagged ones +// 45-63: zeroes +// This enables computing chunk's rank and also determining if it is tagged in constant time +// without any additional lookups. Also, one pays no space overhead for untagged allocations +// and pays 16 bytes for each tagged one. +// +// Each arena allocates extents of memory by calling mmap for each extent of SmallExtentSize bytes. +// (Recall that this memory is never reclaimed.) +// Each extent is then sliced into segments of SmallSegmentSize bytes. +// Whenever a new segment is acquired, its memory is pre-faulted by madvise(MADV_POPULATE). +// New segments are acquired in a lock-free manner. +// +// Each thread maintains a separate cache of chunks of each rank (two caches to be precise: one +// for tagged allocations and the other for untagged). These caches are fully thread-local and +// involve no atomic operations. +// +// There are also global caches (per rank, for tagged and untagged allocations). +// Instead of keeping individual chunks these work with chunk groups (collections of up to ChunksPerGroup +// arbitrary chunks). +// +// When the local cache becomes exhausted, a group of chunks is fetched from the global cache +// (if the latter is empty then the arena allocator is consulted). +// Vice versa, if the local cache overflows, a group of chunks is moved from it to the global cache. +// +// Global caches and arena allocators also take care of (rare) cases when Allocate/Free is called +// without a valid thread state (which happens during thread shutdown when TThreadState is already destroyed). +// +// Each arena allocates memory in a certain "data" zone of SmallZoneSize. +// In addition to that zone, up to two "shadow" zones are maintained. +// +// The first one contains memory tags of chunks residing in the primary zone. +// The second one (which is present if YTALLOC_NERVOUS is defined) contains +// states of chunks. These states enable some simple internal sanity checks +// (e.g. detect attempts to double-free a chunk). +// +// Addresses in the data zone are directly mapped to offsets in shadow zones. +// When a segment of a small arena zone is allocated, the relevant portions of shadow +// zones get initialized (and also accounted for as a system allocation). +// +// Shadow zones are memory-mapped with MAP_NORESERVE flag and are quite sparse. +// These zones are omitted from core dumps due to their huge size and sparsity. + +// For each small rank i, gives max K such that 2^k <= SmallRankToSize[i]. +// Chunk pointer is mapped to its shadow image via GetShadowOffset helper. +// Note that chunk size is not always a power of 2. To avoid costly integer division, +// chunk pointer is translated by means of bitwise shift only (leaving some bytes +// of shadow zones unused). This array provides the needed shifts. +constexpr int SmallRankToLogSize[SmallRankCount] = { + 0, + 4, 5, 5, 6, 6, 7, + 7, 8, 8, 9, 9, 10, 10, 11, + 11, 12, 12, 13, 13, 14, 14, 15 +}; + +enum class ESmallChunkState : ui8 +{ + Spare = 0, + Allocated = 0x61, // a + Freed = 0x66 // f +}; + +class TSmallArenaAllocator +{ +public: + TSmallArenaAllocator(EAllocationKind kind, size_t rank, uintptr_t dataZoneStart) + : Kind_(kind) + , Rank_(rank) + , LogSize_(SmallRankToLogSize[Rank_]) + , ChunkSize_(SmallRankToSize[Rank_]) + , DataZoneStart_(dataZoneStart) + , DataZoneAllocator_(DataZoneStart_, DataZoneStart_ + SmallZoneSize) + { } + + size_t PullMany(void** batch, size_t maxCount) + { + size_t count; + while (true) { + count = TryAllocateFromCurrentExtent(batch, maxCount); + if (Y_LIKELY(count != 0)) { + break; + } + PopulateAnotherExtent(); + } + return count; + } + + void* Allocate(size_t size) + { + void* ptr; + auto count = PullMany(&ptr, 1); + YTALLOC_PARANOID_ASSERT(count == 1); + YTALLOC_PARANOID_ASSERT(PtrToSmallRank(ptr) == Rank_); + PoisonUninitializedRange(ptr, size); + UpdateChunkState(ptr, ESmallChunkState::Freed, ESmallChunkState::Allocated); + return ptr; + } + + TMemoryTag GetAndResetMemoryTag(const void* ptr) + { + auto& tag = MemoryTagZoneStart_[GetShadowOffset(ptr)]; + auto currentTag = tag; + tag = NullMemoryTag; + return currentTag; + } + + void SetMemoryTag(void* ptr, TMemoryTag tag) + { + MemoryTagZoneStart_[GetShadowOffset(ptr)] = tag; + } + + void UpdateChunkState(const void* ptr, ESmallChunkState expectedState, ESmallChunkState newState) + { +#ifdef YTALLOC_NERVOUS + auto& state = ChunkStateZoneStart_[GetShadowOffset(ptr)]; + auto actualState = state; + if (Y_UNLIKELY(actualState != expectedState)) { + char message[256]; + sprintf(message, "Invalid small chunk state at %p: expected %" PRIx8 ", actual %" PRIx8, + ptr, + static_cast<ui8>(expectedState), + static_cast<ui8>(actualState)); + YTALLOC_TRAP(message); + } + state = newState; +#else + Y_UNUSED(ptr); + Y_UNUSED(expectedState); + Y_UNUSED(newState); +#endif + } + +private: + size_t TryAllocateFromCurrentExtent(void** batch, size_t maxCount) + { + auto* oldPtr = CurrentPtr_.load(); + if (Y_UNLIKELY(!oldPtr)) { + return 0; + } + + auto* currentExtent = CurrentExtent_.load(std::memory_order_relaxed); + if (Y_UNLIKELY(!currentExtent)) { + return 0; + } + + char* newPtr; + while (true) { + if (Y_UNLIKELY(oldPtr < currentExtent || oldPtr + ChunkSize_ + RightReadableAreaSize > currentExtent + SmallExtentSize)) { + return 0; + } + + newPtr = std::min( + oldPtr + ChunkSize_ * maxCount, + currentExtent + SmallExtentSize); + + auto* alignedNewPtr = AlignDownToSmallSegment(currentExtent, newPtr); + if (alignedNewPtr > oldPtr) { + newPtr = alignedNewPtr; + } + + if (Y_LIKELY(CurrentPtr_.compare_exchange_weak(oldPtr, newPtr))) { + break; + } + } + + auto* firstSegment = AlignUpToSmallSegment(currentExtent, oldPtr); + auto* nextSegment = AlignUpToSmallSegment(currentExtent, newPtr); + if (firstSegment != nextSegment) { + auto size = nextSegment - firstSegment; + MappedMemoryManager->PopulateReadOnly(firstSegment, size); + + StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::BytesCommitted, Rank_, size); + StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::PagesCommitted, Rank_, size / PageSize); + if (Kind_ == EAllocationKind::Tagged) { + StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, size / ChunkSize_ * sizeof(TMemoryTag)); + } +#ifdef YTALLOC_NERVOUS + StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, size / ChunkSize_ * sizeof(ESmallChunkState)); +#endif + } + + size_t count = 0; + while (oldPtr != newPtr) { + UpdateChunkState(oldPtr, ESmallChunkState::Spare, ESmallChunkState::Freed); + + batch[count] = oldPtr; + + oldPtr += ChunkSize_; + count++; + } + return count; + } + + void PopulateAnotherExtent() + { + auto lockGuard = GuardWithTiming(ExtentLock_); + + auto* currentPtr = CurrentPtr_.load(); + auto* currentExtent = CurrentExtent_.load(); + + if (currentPtr && currentPtr + ChunkSize_ + RightReadableAreaSize <= currentExtent + SmallExtentSize) { + // No need for a new extent. + return; + } + + auto* newExtent = static_cast<char*>(DataZoneAllocator_.Allocate(SmallExtentAllocSize, 0)); + + AllocateShadowZones(); + + YTALLOC_VERIFY(reinterpret_cast<uintptr_t>(newExtent) % SmallExtentAllocSize == 0); + CurrentPtr_ = CurrentExtent_ = newExtent; + + StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::BytesMapped, Rank_, SmallExtentAllocSize); + StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::PagesMapped, Rank_, SmallExtentAllocSize / PageSize); + } + +private: + const EAllocationKind Kind_; + const size_t Rank_; + const size_t LogSize_; + const size_t ChunkSize_; + const uintptr_t DataZoneStart_; + + TZoneAllocator DataZoneAllocator_; + + bool ShadowZonesAllocated_ = false; + TMemoryTag* MemoryTagZoneStart_; +#ifdef YTALLOC_NERVOUS + ESmallChunkState* ChunkStateZoneStart_; +#endif + + NThreading::TForkAwareSpinLock ExtentLock_; + std::atomic<char*> CurrentPtr_ = nullptr; + std::atomic<char*> CurrentExtent_ = nullptr; + + size_t GetShadowOffset(const void* ptr) + { + return (reinterpret_cast<uintptr_t>(ptr) - DataZoneStart_) >> LogSize_; + } + + void AllocateShadowZones() + { + if (ShadowZonesAllocated_) { + return; + } + + if (Kind_ == EAllocationKind::Tagged) { + MemoryTagZoneStart_ = MapShadowZone<TMemoryTag>(); + } +#ifdef YTALLOC_NERVOUS + ChunkStateZoneStart_ = MapShadowZone<ESmallChunkState>(); +#endif + + ShadowZonesAllocated_ = true; + } + + template <class T> + T* MapShadowZone() + { + auto size = AlignUp((SmallZoneSize >> LogSize_) * sizeof (T), PageSize); + auto* ptr = static_cast<T*>(MappedMemoryManager->Map(SystemZoneStart, size, MAP_NORESERVE)); + MappedMemoryManager->DontDump(ptr, size); + return ptr; + } +}; + +TExplicitlyConstructableSingleton<TEnumIndexedVector<EAllocationKind, std::array<TExplicitlyConstructableSingleton<TSmallArenaAllocator>, SmallRankCount>>> SmallArenaAllocators; + +//////////////////////////////////////////////////////////////////////////////// + +constexpr size_t ChunksPerGroup = 128; +constexpr size_t GroupsBatchSize = 1024; + +static_assert(ChunksPerGroup <= MaxCachedChunksPerRank, "ChunksPerGroup > MaxCachedChunksPerRank"); + +class TChunkGroup + : public TFreeListItem<TChunkGroup> +{ +public: + bool IsEmpty() const + { + return Size_ == 0; + } + + size_t ExtractAll(void** ptrs) + { + auto count = Size_; + ::memcpy(ptrs, Ptrs_.data(), count * sizeof(void*)); + Size_ = 0; + return count; + } + + void PutOne(void* ptr) + { + PutMany(&ptr, 1); + } + + void PutMany(void** ptrs, size_t count) + { + YTALLOC_PARANOID_ASSERT(Size_ == 0); + YTALLOC_PARANOID_ASSERT(count <= ChunksPerGroup); + ::memcpy(Ptrs_.data(), ptrs, count * sizeof(void*)); + Size_ = count; + } + +private: + size_t Size_ = 0; // <= ChunksPerGroup + std::array<void*, ChunksPerGroup> Ptrs_; +}; + +class TGlobalSmallChunkCache +{ +public: + explicit TGlobalSmallChunkCache(EAllocationKind kind) + : Kind_(kind) + { } + +#ifdef YTALLOC_PARANOID + void CanonizeChunkPtrs(TThreadState* state, size_t rank) + { + auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank]; + + auto leftBorder = state->SmallBlobCache[Kind_].RankToCachedChunkLeftBorder[rank]; + auto rightBorder = state->SmallBlobCache[Kind_].RankToCachedChunkRightBorder[rank]; + + state->SmallBlobCache[Kind_].CachedChunkFull[rank] = false; + if (chunkPtrPtr + 1 == rightBorder) { + chunkPtrPtr = leftBorder; + state->SmallBlobCache[Kind_].CachedChunkFull[rank] = true; + } + + state->SmallBlobCache[Kind_].RankToCachedChunkPtrTail[rank] = leftBorder; + } +#endif + + bool TryMoveGroupToLocal(TThreadState* state, size_t rank) + { + auto& groups = RankToChunkGroups_[rank]; + auto* group = groups.Extract(state); + if (!Y_LIKELY(group)) { + return false; + } + + YTALLOC_PARANOID_ASSERT(!group->IsEmpty()); + + auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank]; +#ifdef YTALLOC_PARANOID + chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkLeftBorder[rank]; + state->SmallBlobCache[Kind_].RankToCachedChunkPtrTail[rank] = chunkPtrPtr; +#endif + auto chunkCount = group->ExtractAll(chunkPtrPtr + 1); + chunkPtrPtr += chunkCount; + +#ifdef YTALLOC_PARANOID + CanonizeChunkPtrs(state, rank); +#endif + GroupPool_.Free(state, group); + return true; + } + + void MoveGroupToGlobal(TThreadState* state, size_t rank) + { + auto* group = GroupPool_.Allocate(state); + + auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank]; + YTALLOC_PARANOID_ASSERT(*(chunkPtrPtr + 1) == reinterpret_cast<void*>(TThreadState::RightSentinel)); + group->PutMany(chunkPtrPtr - ChunksPerGroup + 1, ChunksPerGroup); + chunkPtrPtr -= ChunksPerGroup; +#ifdef YTALLOC_PARANOID + ::memset(chunkPtrPtr + 1, 0, sizeof(void*) * ChunksPerGroup); + CanonizeChunkPtrs(state, rank); +#endif + + auto& groups = RankToChunkGroups_[rank]; + YTALLOC_PARANOID_ASSERT(!group->IsEmpty()); + groups.Put(state, group); + } + + void MoveOneToGlobal(void* ptr, size_t rank) + { + auto* group = GroupPool_.Allocate(&GlobalShardedState_); + group->PutOne(ptr); + + auto& groups = RankToChunkGroups_[rank]; + YTALLOC_PARANOID_ASSERT(!group->IsEmpty()); + groups.Put(&GlobalShardedState_, group); + } + +#ifdef YTALLOC_PARANOID + void MoveAllToGlobal(TThreadState* state, size_t rank) + { + auto leftSentinelBorder = state->SmallBlobCache[Kind_].RankToCachedChunkLeftBorder[rank]; + auto rightSentinelBorder = state->SmallBlobCache[Kind_].RankToCachedChunkRightBorder[rank]; + + auto& headPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank]; + auto& tailPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrTail[rank]; + + if (tailPtr == headPtr && !state->SmallBlobCache[Kind_].CachedChunkFull[rank]) { + headPtr = leftSentinelBorder; + return; + } + + // (leftBorder, rightBorder] + auto moveIntervalToGlobal = [=] (void** leftBorder, void** rightBorder) { + while (true) { + size_t count = 0; + while (count < ChunksPerGroup && rightBorder != leftBorder) { + --rightBorder; + ++count; + } + + if (count == 0) { + break; + } + + auto* group = GroupPool_.Allocate(state); + group->PutMany(rightBorder + 1, count); + ::memset(rightBorder + 1, 0, sizeof(void*) * count); + auto& groups = RankToChunkGroups_[rank]; + groups.Put(state, group); + } + }; + + if (tailPtr >= headPtr) { + moveIntervalToGlobal(tailPtr, rightSentinelBorder - 1); + moveIntervalToGlobal(leftSentinelBorder, headPtr); + } else { + moveIntervalToGlobal(tailPtr, headPtr); + } + + headPtr = leftSentinelBorder; + } +#else + void MoveAllToGlobal(TThreadState* state, size_t rank) + { + auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank]; + while (true) { + size_t count = 0; + while (count < ChunksPerGroup && *chunkPtrPtr != reinterpret_cast<void*>(TThreadState::LeftSentinel)) { + --chunkPtrPtr; + ++count; + } + + if (count == 0) { + break; + } + + auto* group = GroupPool_.Allocate(state); + group->PutMany(chunkPtrPtr + 1, count); + auto& groups = RankToChunkGroups_[rank]; + groups.Put(state, group); + } + } +#endif + +private: + const EAllocationKind Kind_; + + TGlobalShardedState GlobalShardedState_; + TShardedSystemPool<TChunkGroup, GroupsBatchSize> GroupPool_; + std::array<TShardedFreeList<TChunkGroup>, SmallRankCount> RankToChunkGroups_; +}; + +TExplicitlyConstructableSingleton<TEnumIndexedVector<EAllocationKind, TExplicitlyConstructableSingleton<TGlobalSmallChunkCache>>> GlobalSmallChunkCaches; + +//////////////////////////////////////////////////////////////////////////////// + +class TSmallAllocator +{ +public: + template <EAllocationKind Kind> + static Y_FORCE_INLINE void* Allocate(TMemoryTag tag, size_t rank) + { + auto* state = TThreadManager::FindThreadState(); + if (Y_LIKELY(state)) { + return Allocate<Kind>(tag, rank, state); + } + auto size = SmallRankToSize[rank]; + return AllocateGlobal<Kind>(tag, rank, size); + } + +#ifdef YTALLOC_PARANOID + template <EAllocationKind Kind> + static Y_FORCE_INLINE void* Allocate(TMemoryTag tag, size_t rank, TThreadState* state) + { + auto& localCache = state->SmallBlobCache[Kind]; + auto& allocator = *(*SmallArenaAllocators)[Kind][rank]; + + size_t size = SmallRankToSize[rank]; + StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesAllocated, size); + + auto leftBorder = localCache.RankToCachedChunkLeftBorder[rank]; + auto rightBorder = localCache.RankToCachedChunkRightBorder[rank]; + + void* result; + while (true) { + auto& chunkHeadPtr = localCache.RankToCachedChunkPtrHead[rank]; + auto& cachedHeadPtr = *(chunkHeadPtr + 1); + auto* headPtr = cachedHeadPtr; + + auto& chunkTailPtr = localCache.RankToCachedChunkPtrTail[rank]; + auto& cachedTailPtr = *(chunkTailPtr + 1); + auto* tailPtr = cachedTailPtr; + + auto& chunkFull = localCache.CachedChunkFull[rank]; + + if (Y_LIKELY(chunkFull || headPtr != tailPtr)) { + YTALLOC_PARANOID_ASSERT(tailPtr); + cachedTailPtr = nullptr; + ++chunkTailPtr; + if (Y_LIKELY(chunkTailPtr + 1 == rightBorder)) { + chunkTailPtr = leftBorder; + } + + chunkFull = false; + result = tailPtr; + PoisonUninitializedRange(result, size); + allocator.UpdateChunkState(result, ESmallChunkState::Freed, ESmallChunkState::Allocated); + break; + } + + auto& globalCache = *(*GlobalSmallChunkCaches)[Kind]; + if (!globalCache.TryMoveGroupToLocal(state, rank)) { + result = allocator.Allocate(size); + break; + } + } + + if constexpr(Kind == EAllocationKind::Tagged) { + allocator.SetMemoryTag(result, tag); + } + + return result; + } + + template <EAllocationKind Kind> + static Y_FORCE_INLINE void Free(void* ptr) + { + auto rank = PtrToSmallRank(ptr); + auto size = SmallRankToSize[rank]; + + auto& allocator = *(*SmallArenaAllocators)[Kind][rank]; + + auto tag = NullMemoryTag; + if constexpr(Kind == EAllocationKind::Tagged) { + tag = allocator.GetAndResetMemoryTag(ptr); + YTALLOC_PARANOID_ASSERT(tag != NullMemoryTag); + } + + allocator.UpdateChunkState(ptr, ESmallChunkState::Allocated, ESmallChunkState::Freed); + PoisonFreedRange(ptr, size); + + auto* state = TThreadManager::FindThreadState(); + if (Y_UNLIKELY(!state)) { + FreeGlobal<Kind>(tag, ptr, rank, size); + return; + } + + StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesFreed, size); + + auto& localCache = state->SmallBlobCache[Kind]; + + auto leftBorder = localCache.RankToCachedChunkLeftBorder[rank]; + auto rightBorder = localCache.RankToCachedChunkRightBorder[rank]; + + while (true) { + auto& chunkHeadPtr = localCache.RankToCachedChunkPtrHead[rank]; + auto& headPtr = *(chunkHeadPtr + 1); + + auto& chunkTailPtr = localCache.RankToCachedChunkPtrTail[rank]; + auto& chunkFull = localCache.CachedChunkFull[rank]; + + if (Y_LIKELY(!chunkFull)) { + headPtr = ptr; + ++chunkHeadPtr; + if (Y_LIKELY(chunkHeadPtr + 1 == rightBorder)) { + chunkHeadPtr = leftBorder; + } + chunkFull = (chunkHeadPtr == chunkTailPtr); + break; + } + + chunkHeadPtr = rightBorder - 1; + chunkTailPtr = leftBorder; + + auto& globalCache = *(*GlobalSmallChunkCaches)[Kind]; + globalCache.MoveGroupToGlobal(state, rank); + } + } + +#else + + template <EAllocationKind Kind> + static Y_FORCE_INLINE void* Allocate(TMemoryTag tag, size_t rank, TThreadState* state) + { + size_t size = SmallRankToSize[rank]; + StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesAllocated, size); + + auto& localCache = state->SmallBlobCache[Kind]; + auto& allocator = *(*SmallArenaAllocators)[Kind][rank]; + + void* result; + while (true) { + auto& chunkPtr = localCache.RankToCachedChunkPtrHead[rank]; + auto& cachedPtr = *chunkPtr; + auto* ptr = cachedPtr; + if (Y_LIKELY(ptr != reinterpret_cast<void*>(TThreadState::LeftSentinel))) { + --chunkPtr; + result = ptr; + allocator.UpdateChunkState(result, ESmallChunkState::Freed, ESmallChunkState::Allocated); + PoisonUninitializedRange(result, size); + break; + } + + auto& globalCache = *(*GlobalSmallChunkCaches)[Kind]; + if (globalCache.TryMoveGroupToLocal(state, rank)) { + continue; + } + + auto count = allocator.PullMany( + chunkPtr + 1, + SmallRankBatchSize[rank]); + chunkPtr += count; + } + + if constexpr(Kind == EAllocationKind::Tagged) { + allocator.SetMemoryTag(result, tag); + } + + return result; + } + + template <EAllocationKind Kind> + static Y_FORCE_INLINE void Free(void* ptr) + { + auto rank = PtrToSmallRank(ptr); + auto size = SmallRankToSize[rank]; + + auto& allocator = *(*SmallArenaAllocators)[Kind][rank]; + + auto tag = NullMemoryTag; + if constexpr(Kind == EAllocationKind::Tagged) { + tag = allocator.GetAndResetMemoryTag(ptr); + YTALLOC_PARANOID_ASSERT(tag != NullMemoryTag); + } + + allocator.UpdateChunkState(ptr, ESmallChunkState::Allocated, ESmallChunkState::Freed); + PoisonFreedRange(ptr, size); + + auto* state = TThreadManager::FindThreadState(); + if (Y_UNLIKELY(!state)) { + FreeGlobal<Kind>(tag, ptr, rank, size); + return; + } + + StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesFreed, size); + + auto& localCache = state->SmallBlobCache[Kind]; + + while (true) { + auto& chunkPtrPtr = localCache.RankToCachedChunkPtrHead[rank]; + auto& chunkPtr = *(chunkPtrPtr + 1); + if (Y_LIKELY(chunkPtr != reinterpret_cast<void*>(TThreadState::RightSentinel))) { + chunkPtr = ptr; + ++chunkPtrPtr; + break; + } + + auto& globalCache = *(*GlobalSmallChunkCaches)[Kind]; + globalCache.MoveGroupToGlobal(state, rank); + } + } +#endif + + static size_t GetAllocationSize(const void* ptr) + { + return SmallRankToSize[PtrToSmallRank(ptr)]; + } + + static size_t GetAllocationSize(size_t size) + { + return SmallRankToSize[SizeToSmallRank(size)]; + } + + static void PurgeCaches() + { + DoPurgeCaches<EAllocationKind::Untagged>(); + DoPurgeCaches<EAllocationKind::Tagged>(); + } + +private: + template <EAllocationKind Kind> + static void DoPurgeCaches() + { + auto* state = TThreadManager::GetThreadStateChecked(); + for (size_t rank = 0; rank < SmallRankCount; ++rank) { + (*GlobalSmallChunkCaches)[Kind]->MoveAllToGlobal(state, rank); + } + } + + template <EAllocationKind Kind> + static void* AllocateGlobal(TMemoryTag tag, size_t rank, size_t size) + { + StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesAllocated, size); + + auto& allocator = *(*SmallArenaAllocators)[Kind][rank]; + auto* result = allocator.Allocate(size); + + if constexpr(Kind == EAllocationKind::Tagged) { + allocator.SetMemoryTag(result, tag); + } + + return result; + } + + template <EAllocationKind Kind> + static void FreeGlobal(TMemoryTag tag, void* ptr, size_t rank, size_t size) + { + StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesFreed, size); + + auto& globalCache = *(*GlobalSmallChunkCaches)[Kind]; + globalCache.MoveOneToGlobal(ptr, rank); + } +}; + +//////////////////////////////////////////////////////////////////////////////// +// Large blob allocator +// +// Like for small chunks, large blobs are grouped into arenas, where arena K handles +// blobs of size (2^{K-1},2^K]. Memory is mapped in extents of LargeExtentSize bytes. +// Each extent is split into segments of size 2^K (here segment is just a memory region, which may fully consist of +// unmapped pages). When a segment is actually allocated, it becomes a blob and a TLargeBlobHeader +// structure is placed at its start. +// +// When an extent is allocated, it is sliced into segments (not blobs, since no headers are placed and +// no memory is touched). These segments are put into disposed segments list. +// +// For each blob two separate sizes are maintained: BytesAcquired indicates the number of bytes +// acquired via madvise(MADV_POPULATE) from the system; BytesAllocated (<= BytesAcquired) corresponds +// to the number of bytes claimed by the user (including the header and page size alignment). +// If BytesAllocated == 0 then this blob is spare, i.e. +// was freed and remains cached for further possible reuse. +// +// When a new blob is being allocated, the allocator first tries to extract a spare blob. On success, +// its acquired size is extended (if needed); the acquired size never shrinks on allocation. +// If no spare blobs exist, a disposed segment is extracted and is turned into a blob (i.e. +// its header is initialized) and the needed number of bytes is acquired. If no disposed segments +// exist, then a new extent is allocated and sliced into segments. +// +// The above algorithm only claims memory from the system (by means of madvise(MADV_POPULATE)); +// the reclaim is handled by a separate background mechanism. Two types of reclaimable memory +// regions are possible: +// * spare: these correspond to spare blobs; upon reclaiming this region becomes a disposed segment +// * overhead: these correspond to trailing parts of allocated blobs in [BytesAllocated, BytesAcquired) byte range +// +// Reclaiming spare blobs is easy as these are explicitly tracked by spare blob lists. To reclaim, +// we atomically extract a blob from a spare list, call madvise(MADV_FREE), and put the pointer to +// the disposed segment list. +// +// Reclaiming overheads is more complicated since (a) allocated blobs are never tracked directly and +// (b) reclaiming them may interfere with Allocate and Free. +// +// To overcome (a), for each extent we maintain a bitmap marking segments that are actually blobs +// (i.e. contain a header). (For simplicity and efficiency this bitmap is just a vector of bytes.) +// These flags are updated in Allocate/Free with appropriate memory ordering. Note that +// blobs are only disposed (and are turned into segments) by the background thread; if this +// thread discovers a segment that is marked as a blob, then it is safe to assume that this segment +// remains a blob unless the thread disposes it. +// +// To overcome (b), each large blob header maintains a spin lock. When blob B is extracted +// from a spare list in Allocate, an acquisition is tried. If successful, B is returned to the +// user. Otherwise it is assumed that B is currently being examined by the background +// reclaimer thread. Allocate then skips this blob and retries extraction; the problem is that +// since the spare list is basically a stack one cannot just push B back into the spare list. +// Instead, B is pushed into a special locked spare list. This list is purged by the background +// thread on each tick and its items are pushed back into the usual spare list. +// +// A similar trick is used by Free: when invoked for blob B its spin lock acquisition is first +// tried. Upon success, B is moved to the spare list. On failure, Free has to postpone this deallocation +// by moving B into the freed locked list. This list, similarly, is being purged by the background thread. +// +// It remains to explain how the background thread computes the number of bytes to be reclaimed from +// each arena. To this aim, we first compute the total number of reclaimable bytes. +// This is the sum of spare and overhead bytes in all arenas minus the number of unreclaimable bytes +// The latter grows linearly in the number of used bytes and is capped from below by a MinUnreclaimableLargeBytes; +// and from above by MaxUnreclaimableLargeBytes. SetLargeUnreclaimableCoeff and Set(Min|Max)LargeUnreclaimableBytes +// enable tuning these control knobs. The reclaimable bytes are being taken from arenas starting from those +// with the largest spare and overhead volumes. +// +// The above implies that each large blob contains a fixed-size header preceeding it. +// Hence ptr % PageSize == sizeof (TLargeBlobHeader) for each ptr returned by Allocate +// (since large blob sizes are larger than PageSize and are divisible by PageSize). +// For AllocatePageAligned, however, ptr must be divisible by PageSize. To handle such an allocation, we +// artificially increase its size and align the result of Allocate up to the next page boundary. +// When handling a deallocation, ptr is moved back by UnalignPtr (which is capable of dealing +// with both the results of Allocate and AllocatePageAligned). +// This technique applies to both large and huge blobs. + +enum ELargeBlobState : ui64 +{ + Allocated = 0x6c6c61656772616cULL, // largeall + Spare = 0x727073656772616cULL, // largespr + LockedSpare = 0x70736c656772616cULL, // largelsp + LockedFreed = 0x72666c656772616cULL // largelfr +}; + +// Every large blob (either tagged or not) is prepended with this header. +struct TLargeBlobHeader + : public TFreeListItem<TLargeBlobHeader> +{ + TLargeBlobHeader( + TLargeBlobExtent* extent, + size_t bytesAcquired, + size_t bytesAllocated, + TMemoryTag tag) + : Extent(extent) + , BytesAcquired(bytesAcquired) + , Tag(tag) + , BytesAllocated(bytesAllocated) + , State(ELargeBlobState::Allocated) + { } + + TLargeBlobExtent* Extent; + // Number of bytes in all acquired pages. + size_t BytesAcquired; + std::atomic<bool> Locked = false; + TMemoryTag Tag = NullMemoryTag; + // For spare blobs this is zero. + // For allocated blobs this is the number of bytes requested by user (not including header of any alignment). + size_t BytesAllocated; + ELargeBlobState State; + char Padding[12]; +}; + +CHECK_HEADER_ALIGNMENT(TLargeBlobHeader) + +struct TLargeBlobExtent +{ + TLargeBlobExtent(size_t segmentCount, char* ptr) + : SegmentCount(segmentCount) + , Ptr(ptr) + { } + + size_t SegmentCount; + char* Ptr; + TLargeBlobExtent* NextExtent = nullptr; + + std::atomic<bool> DisposedFlags[0]; +}; + +// A helper node that enables storing a number of extent's segments +// in a free list. Recall that segments themselves do not posses any headers. +struct TDisposedSegment + : public TFreeListItem<TDisposedSegment> +{ + size_t Index; + TLargeBlobExtent* Extent; +}; + +struct TLargeArena +{ + size_t Rank = 0; + size_t SegmentSize = 0; + + TShardedFreeList<TLargeBlobHeader> SpareBlobs; + TFreeList<TLargeBlobHeader> LockedSpareBlobs; + TFreeList<TLargeBlobHeader> LockedFreedBlobs; + TFreeList<TDisposedSegment> DisposedSegments; + std::atomic<TLargeBlobExtent*> FirstExtent = nullptr; + + TLargeBlobExtent* CurrentOverheadScanExtent = nullptr; + size_t CurrentOverheadScanSegment = 0; +}; + +template <bool Dumpable> +class TLargeBlobAllocator +{ +public: + TLargeBlobAllocator() + : ZoneAllocator_(LargeZoneStart(Dumpable), LargeZoneEnd(Dumpable)) + { + for (size_t rank = 0; rank < Arenas_.size(); ++rank) { + auto& arena = Arenas_[rank]; + arena.Rank = rank; + arena.SegmentSize = (1ULL << rank); + } + } + + void* Allocate(size_t size) + { + auto* state = TThreadManager::FindThreadState(); + return Y_LIKELY(state) + ? DoAllocate(state, size) + : DoAllocate(GlobalState.Get(), size); + } + + void Free(void* ptr) + { + auto* state = TThreadManager::FindThreadState(); + if (Y_LIKELY(state)) { + DoFree(state, ptr); + } else { + DoFree(GlobalState.Get(), ptr); + } + } + + static size_t GetAllocationSize(const void* ptr) + { + UnalignPtr<TLargeBlobHeader>(ptr); + const auto* blob = PtrToHeader<TLargeBlobHeader>(ptr); + return blob->BytesAllocated; + } + + static size_t GetAllocationSize(size_t size) + { + return GetBlobAllocationSize<TLargeBlobHeader>(size); + } + + void RunBackgroundTasks() + { + ReinstallLockedBlobs(); + ReclaimMemory(); + } + + void SetBacktraceProvider(TBacktraceProvider provider) + { + BacktraceProvider_.store(provider); + } + +private: + template <class TState> + void PopulateArenaPages(TState* state, TLargeArena* arena, void* ptr, size_t size) + { + MappedMemoryManager->Populate(ptr, size); + StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesPopulated, size); + StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesPopulated, size / PageSize); + StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesCommitted, size); + StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesCommitted, size / PageSize); + } + + template <class TState> + void ReleaseArenaPages(TState* state, TLargeArena* arena, void* ptr, size_t size) + { + MappedMemoryManager->Release(ptr, size); + StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesReleased, size); + StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesReleased, size / PageSize); + StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesCommitted, -size); + StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesCommitted, -size / PageSize); + } + + bool TryLockBlob(TLargeBlobHeader* blob) + { + bool expected = false; + return blob->Locked.compare_exchange_strong(expected, true); + } + + void UnlockBlob(TLargeBlobHeader* blob) + { + blob->Locked.store(false); + } + + template <class TState> + void MoveBlobToSpare(TState* state, TLargeArena* arena, TLargeBlobHeader* blob, bool unlock) + { + auto rank = arena->Rank; + auto size = blob->BytesAllocated; + auto rawSize = GetRawBlobSize<TLargeBlobHeader>(size); + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesSpare, blob->BytesAcquired); + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesOverhead, -(blob->BytesAcquired - rawSize)); + blob->BytesAllocated = 0; + if (unlock) { + UnlockBlob(blob); + } else { + YTALLOC_VERIFY(!blob->Locked.load()); + } + blob->State = ELargeBlobState::Spare; + arena->SpareBlobs.Put(state, blob); + } + + size_t GetBytesToReclaim(const std::array<TLocalLargeCounters, LargeRankCount>& arenaCounters) + { + size_t totalBytesAllocated = 0; + size_t totalBytesFreed = 0; + size_t totalBytesSpare = 0; + size_t totalBytesOverhead = 0; + for (size_t rank = 0; rank < Arenas_.size(); ++rank) { + const auto& counters = arenaCounters[rank]; + totalBytesAllocated += counters[ELargeArenaCounter::BytesAllocated]; + totalBytesFreed += counters[ELargeArenaCounter::BytesFreed]; + totalBytesSpare += counters[ELargeArenaCounter::BytesSpare]; + totalBytesOverhead += counters[ELargeArenaCounter::BytesOverhead]; + } + + auto totalBytesUsed = totalBytesAllocated - totalBytesFreed; + auto totalBytesReclaimable = totalBytesSpare + totalBytesOverhead; + + auto threshold = ClampVal( + static_cast<size_t>(ConfigurationManager->GetLargeUnreclaimableCoeff() * totalBytesUsed), + ConfigurationManager->GetMinLargeUnreclaimableBytes(), + ConfigurationManager->GetMaxLargeUnreclaimableBytes()); + if (totalBytesReclaimable < threshold) { + return 0; + } + + auto bytesToReclaim = totalBytesReclaimable - threshold; + return AlignUp(bytesToReclaim, PageSize); + } + + void ReinstallLockedSpareBlobs(TLargeArena* arena) + { + auto* blob = arena->LockedSpareBlobs.ExtractAll(); + auto* state = TThreadManager::GetThreadStateChecked(); + + size_t count = 0; + while (blob) { + auto* nextBlob = blob->Next; + YTALLOC_VERIFY(!blob->Locked.load()); + AssertBlobState(blob, ELargeBlobState::LockedSpare); + blob->State = ELargeBlobState::Spare; + arena->SpareBlobs.Put(state, blob); + blob = nextBlob; + ++count; + } + + if (count > 0) { + YTALLOC_LOG_DEBUG("Locked spare blobs reinstalled (Rank: %d, Blobs: %zu)", + arena->Rank, + count); + } + } + + void ReinstallLockedFreedBlobs(TLargeArena* arena) + { + auto* state = TThreadManager::GetThreadStateChecked(); + auto* blob = arena->LockedFreedBlobs.ExtractAll(); + + size_t count = 0; + while (blob) { + auto* nextBlob = blob->Next; + AssertBlobState(blob, ELargeBlobState::LockedFreed); + MoveBlobToSpare(state, arena, blob, false); + ++count; + blob = nextBlob; + } + + if (count > 0) { + YTALLOC_LOG_DEBUG("Locked freed blobs reinstalled (Rank: %d, Blobs: %zu)", + arena->Rank, + count); + } + } + + void ReclaimSpareMemory(TLargeArena* arena, ssize_t bytesToReclaim) + { + if (bytesToReclaim <= 0) { + return; + } + + auto rank = arena->Rank; + auto* state = TThreadManager::GetThreadStateChecked(); + + YTALLOC_LOG_DEBUG("Started processing spare memory in arena (BytesToReclaim: %zdM, Rank: %d)", + bytesToReclaim / 1_MB, + rank); + + size_t bytesReclaimed = 0; + size_t blobsReclaimed = 0; + while (bytesToReclaim > 0) { + auto* blob = arena->SpareBlobs.ExtractRoundRobin(state); + if (!blob) { + break; + } + + AssertBlobState(blob, ELargeBlobState::Spare); + YTALLOC_VERIFY(blob->BytesAllocated == 0); + + auto bytesAcquired = blob->BytesAcquired; + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesSpare, -bytesAcquired); + bytesToReclaim -= bytesAcquired; + bytesReclaimed += bytesAcquired; + blobsReclaimed += 1; + + auto* extent = blob->Extent; + auto* ptr = reinterpret_cast<char*>(blob); + ReleaseArenaPages( + state, + arena, + ptr, + bytesAcquired); + + size_t segmentIndex = (ptr - extent->Ptr) / arena->SegmentSize; + extent->DisposedFlags[segmentIndex].store(true, std::memory_order_relaxed); + + auto* disposedSegment = DisposedSegmentPool_.Allocate(); + disposedSegment->Index = segmentIndex; + disposedSegment->Extent = extent; + arena->DisposedSegments.Put(disposedSegment); + } + + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::SpareBytesReclaimed, bytesReclaimed); + + YTALLOC_LOG_DEBUG("Finished processing spare memory in arena (Rank: %d, BytesReclaimed: %zdM, BlobsReclaimed: %zu)", + arena->Rank, + bytesReclaimed / 1_MB, + blobsReclaimed); + } + + void ReclaimOverheadMemory(TLargeArena* arena, ssize_t bytesToReclaim) + { + if (bytesToReclaim == 0) { + return; + } + + auto* state = TThreadManager::GetThreadStateChecked(); + auto rank = arena->Rank; + + YTALLOC_LOG_DEBUG("Started processing overhead memory in arena (BytesToReclaim: %zdM, Rank: %d)", + bytesToReclaim / 1_MB, + rank); + + size_t extentsTraversed = 0; + size_t segmentsTraversed = 0; + size_t bytesReclaimed = 0; + + bool restartedFromFirstExtent = false; + auto& currentExtent = arena->CurrentOverheadScanExtent; + auto& currentSegment = arena->CurrentOverheadScanSegment; + while (bytesToReclaim > 0) { + if (!currentExtent) { + if (restartedFromFirstExtent) { + break; + } + currentExtent = arena->FirstExtent.load(); + if (!currentExtent) { + break; + } + restartedFromFirstExtent = true; + } + + while (currentSegment < currentExtent->SegmentCount && bytesToReclaim > 0) { + ++segmentsTraversed; + if (!currentExtent->DisposedFlags[currentSegment].load(std::memory_order_acquire)) { + auto* ptr = currentExtent->Ptr + currentSegment * arena->SegmentSize; + auto* blob = reinterpret_cast<TLargeBlobHeader*>(ptr); + YTALLOC_PARANOID_ASSERT(blob->Extent == currentExtent); + if (TryLockBlob(blob)) { + if (blob->BytesAllocated > 0) { + size_t rawSize = GetRawBlobSize<TLargeBlobHeader>(blob->BytesAllocated); + size_t bytesToRelease = blob->BytesAcquired - rawSize; + if (bytesToRelease > 0) { + ReleaseArenaPages( + state, + arena, + ptr + blob->BytesAcquired - bytesToRelease, + bytesToRelease); + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesOverhead, -bytesToRelease); + blob->BytesAcquired = rawSize; + bytesToReclaim -= bytesToRelease; + bytesReclaimed += bytesToRelease; + } + } + UnlockBlob(blob); + } + } + ++currentSegment; + } + + ++extentsTraversed; + currentSegment = 0; + currentExtent = currentExtent->NextExtent; + } + + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::OverheadBytesReclaimed, bytesReclaimed); + + YTALLOC_LOG_DEBUG("Finished processing overhead memory in arena (Rank: %d, Extents: %zu, Segments: %zu, BytesReclaimed: %zuM)", + arena->Rank, + extentsTraversed, + segmentsTraversed, + bytesReclaimed / 1_MB); + } + + void ReinstallLockedBlobs() + { + for (auto& arena : Arenas_) { + ReinstallLockedSpareBlobs(&arena); + ReinstallLockedFreedBlobs(&arena); + } + } + + void ReclaimMemory() + { + auto arenaCounters = StatisticsManager->GetLargeArenaAllocationCounters(); + ssize_t bytesToReclaim = GetBytesToReclaim(arenaCounters); + if (bytesToReclaim == 0) { + return; + } + + YTALLOC_LOG_DEBUG("Memory reclaim started (BytesToReclaim: %zdM)", + bytesToReclaim / 1_MB); + + std::array<ssize_t, LargeRankCount * 2> bytesReclaimablePerArena; + for (size_t rank = 0; rank < LargeRankCount; ++rank) { + bytesReclaimablePerArena[rank * 2] = arenaCounters[rank][ELargeArenaCounter::BytesOverhead]; + bytesReclaimablePerArena[rank * 2 + 1] = arenaCounters[rank][ELargeArenaCounter::BytesSpare]; + } + + std::array<ssize_t, LargeRankCount * 2> bytesToReclaimPerArena{}; + while (bytesToReclaim > 0) { + ssize_t maxBytes = std::numeric_limits<ssize_t>::min(); + int maxIndex = -1; + for (int index = 0; index < LargeRankCount * 2; ++index) { + if (bytesReclaimablePerArena[index] > maxBytes) { + maxBytes = bytesReclaimablePerArena[index]; + maxIndex = index; + } + } + + if (maxIndex < 0) { + break; + } + + auto bytesToReclaimPerStep = std::min<ssize_t>({bytesToReclaim, maxBytes, 4_MB}); + if (bytesToReclaimPerStep < 0) { + break; + } + + bytesToReclaimPerArena[maxIndex] += bytesToReclaimPerStep; + bytesReclaimablePerArena[maxIndex] -= bytesToReclaimPerStep; + bytesToReclaim -= bytesToReclaimPerStep; + } + + for (auto& arena : Arenas_) { + auto rank = arena.Rank; + ReclaimOverheadMemory(&arena, bytesToReclaimPerArena[rank * 2]); + ReclaimSpareMemory(&arena, bytesToReclaimPerArena[rank * 2 + 1]); + } + + YTALLOC_LOG_DEBUG("Memory reclaim finished"); + } + + template <class TState> + void AllocateArenaExtent(TState* state, TLargeArena* arena) + { + auto rank = arena->Rank; + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::ExtentsAllocated, 1); + + size_t segmentCount = LargeExtentSize / arena->SegmentSize; + size_t extentHeaderSize = AlignUp(sizeof (TLargeBlobExtent) + sizeof (TLargeBlobExtent::DisposedFlags[0]) * segmentCount, PageSize); + size_t allocationSize = extentHeaderSize + LargeExtentSize; + + auto* ptr = ZoneAllocator_.Allocate(allocationSize, MAP_NORESERVE); + if (!Dumpable) { + MappedMemoryManager->DontDump(ptr, allocationSize); + } + + if (auto backtraceProvider = BacktraceProvider_.load()) { + std::array<void*, MaxAllocationProfilingBacktraceDepth> frames; + auto frameCount = backtraceProvider( + frames.data(), + MaxAllocationProfilingBacktraceDepth, + 3); + MmapObservationManager->EnqueueEvent(allocationSize, frames, frameCount); + } + + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesMapped, allocationSize); + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::PagesMapped, allocationSize / PageSize); + + auto* extent = static_cast<TLargeBlobExtent*>(ptr); + MappedMemoryManager->Populate(ptr, extentHeaderSize); + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesPopulated, extentHeaderSize); + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::PagesPopulated, extentHeaderSize / PageSize); + StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, extentHeaderSize); + + new (extent) TLargeBlobExtent(segmentCount, static_cast<char*>(ptr) + extentHeaderSize); + + for (size_t index = 0; index < segmentCount; ++index) { + auto* disposedSegment = DisposedSegmentPool_.Allocate(); + disposedSegment->Index = index; + disposedSegment->Extent = extent; + arena->DisposedSegments.Put(disposedSegment); + extent->DisposedFlags[index].store(true); + } + + auto* expectedFirstExtent = arena->FirstExtent.load(); + do { + extent->NextExtent = expectedFirstExtent; + } while (Y_UNLIKELY(!arena->FirstExtent.compare_exchange_weak(expectedFirstExtent, extent))); + } + + template <class TState> + void* DoAllocate(TState* state, size_t size) + { + auto rawSize = GetRawBlobSize<TLargeBlobHeader>(size); + auto rank = GetLargeRank(rawSize); + auto tag = ConfigurationManager->IsLargeArenaAllocationProfiled(rank) + ? BacktraceManager->GetMemoryTagFromBacktrace(3) + : TThreadManager::GetCurrentMemoryTag(); + auto& arena = Arenas_[rank]; + YTALLOC_PARANOID_ASSERT(rawSize <= arena.SegmentSize); + + TLargeBlobHeader* blob; + while (true) { + blob = arena.SpareBlobs.Extract(state); + if (blob) { + AssertBlobState(blob, ELargeBlobState::Spare); + if (TryLockBlob(blob)) { + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesSpare, -blob->BytesAcquired); + if (blob->BytesAcquired < rawSize) { + PopulateArenaPages( + state, + &arena, + reinterpret_cast<char*>(blob) + blob->BytesAcquired, + rawSize - blob->BytesAcquired); + blob->BytesAcquired = rawSize; + } else { + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesOverhead, blob->BytesAcquired - rawSize); + } + YTALLOC_PARANOID_ASSERT(blob->BytesAllocated == 0); + blob->BytesAllocated = size; + blob->Tag = tag; + blob->State = ELargeBlobState::Allocated; + UnlockBlob(blob); + break; + } else { + blob->State = ELargeBlobState::LockedSpare; + arena.LockedSpareBlobs.Put(blob); + } + } + + auto* disposedSegment = arena.DisposedSegments.Extract(); + if (disposedSegment) { + auto index = disposedSegment->Index; + auto* extent = disposedSegment->Extent; + DisposedSegmentPool_.Free(disposedSegment); + + auto* ptr = extent->Ptr + index * arena.SegmentSize; + PopulateArenaPages( + state, + &arena, + ptr, + rawSize); + + blob = reinterpret_cast<TLargeBlobHeader*>(ptr); + new (blob) TLargeBlobHeader(extent, rawSize, size, tag); + + extent->DisposedFlags[index].store(false, std::memory_order_release); + + break; + } + + AllocateArenaExtent(state, &arena); + } + + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BlobsAllocated, 1); + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesAllocated, size); + StatisticsManager->IncrementTotalCounter(state, tag, EBasicCounter::BytesAllocated, size); + if (!Dumpable) { + StatisticsManager->IncrementUndumpableCounter(state, EUndumpableCounter::BytesAllocated, size); + } + + auto* result = HeaderToPtr(blob); + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= LargeZoneStart(Dumpable) && reinterpret_cast<uintptr_t>(result) < LargeZoneEnd(Dumpable)); + PoisonUninitializedRange(result, size); + return result; + } + + template <class TState> + void DoFree(TState* state, void* ptr) + { + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= LargeZoneStart(Dumpable) && reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(Dumpable)); + + auto* blob = PtrToHeader<TLargeBlobHeader>(ptr); + AssertBlobState(blob, ELargeBlobState::Allocated); + + auto size = blob->BytesAllocated; + PoisonFreedRange(ptr, size); + + auto rawSize = GetRawBlobSize<TLargeBlobHeader>(size); + auto rank = GetLargeRank(rawSize); + auto& arena = Arenas_[rank]; + YTALLOC_PARANOID_ASSERT(blob->BytesAcquired <= arena.SegmentSize); + + auto tag = blob->Tag; + + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BlobsFreed, 1); + StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesFreed, size); + StatisticsManager->IncrementTotalCounter(state, tag, EBasicCounter::BytesFreed, size); + if (!Dumpable) { + StatisticsManager->IncrementUndumpableCounter(state, EUndumpableCounter::BytesFreed, size); + } + + if (TryLockBlob(blob)) { + MoveBlobToSpare(state, &arena, blob, true); + } else { + blob->State = ELargeBlobState::LockedFreed; + arena.LockedFreedBlobs.Put(blob); + } + } + +private: + TZoneAllocator ZoneAllocator_; + std::array<TLargeArena, LargeRankCount> Arenas_; + + static constexpr size_t DisposedSegmentsBatchSize = 1024; + TSystemPool<TDisposedSegment, DisposedSegmentsBatchSize> DisposedSegmentPool_; + + std::atomic<TBacktraceProvider> BacktraceProvider_ = nullptr; +}; + +TExplicitlyConstructableSingleton<TLargeBlobAllocator<true>> DumpableLargeBlobAllocator; +TExplicitlyConstructableSingleton<TLargeBlobAllocator<false>> UndumpableLargeBlobAllocator; + +//////////////////////////////////////////////////////////////////////////////// +// Huge blob allocator +// +// Basically a wrapper for TZoneAllocator. + +// Acts as a signature to detect broken headers. +enum class EHugeBlobState : ui64 +{ + Allocated = 0x72666c656772616cULL // hugeallc +}; + +// Every huge blob (both tagged or not) is prepended with this header. +struct THugeBlobHeader +{ + THugeBlobHeader(TMemoryTag tag, size_t size, bool dumpable) + : Tag(tag) + , Size(size) + , State(EHugeBlobState::Allocated) + , Dumpable(dumpable) + { } + + TMemoryTag Tag; + size_t Size; + EHugeBlobState State; + bool Dumpable; + char Padding[7]; +}; + +CHECK_HEADER_ALIGNMENT(THugeBlobHeader) + +class THugeBlobAllocator +{ +public: + THugeBlobAllocator() + : ZoneAllocator_(HugeZoneStart, HugeZoneEnd) + { } + + void* Allocate(size_t size, bool dumpable) + { + YTALLOC_VERIFY(size <= MaxAllocationSize); + auto tag = TThreadManager::GetCurrentMemoryTag(); + auto rawSize = GetRawBlobSize<THugeBlobHeader>(size); + auto* blob = static_cast<THugeBlobHeader*>(ZoneAllocator_.Allocate(rawSize, MAP_POPULATE)); + if (!dumpable) { + MappedMemoryManager->DontDump(blob, rawSize); + } + new (blob) THugeBlobHeader(tag, size, dumpable); + + StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesAllocated, size); + StatisticsManager->IncrementHugeCounter(EHugeCounter::BlobsAllocated, 1); + StatisticsManager->IncrementHugeCounter(EHugeCounter::BytesAllocated, size); + if (!dumpable) { + StatisticsManager->IncrementHugeUndumpableCounter(EUndumpableCounter::BytesAllocated, size); + } + + auto* result = HeaderToPtr(blob); + PoisonUninitializedRange(result, size); + return result; + } + + void Free(void* ptr) + { + auto* blob = PtrToHeader<THugeBlobHeader>(ptr); + AssertBlobState(blob, EHugeBlobState::Allocated); + auto tag = blob->Tag; + auto size = blob->Size; + auto dumpable = blob->Dumpable; + PoisonFreedRange(ptr, size); + + auto rawSize = GetRawBlobSize<THugeBlobHeader>(size); + ZoneAllocator_.Free(blob, rawSize); + + StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesFreed, size); + StatisticsManager->IncrementHugeCounter(EHugeCounter::BlobsFreed, 1); + StatisticsManager->IncrementHugeCounter(EHugeCounter::BytesFreed, size); + if (!dumpable) { + StatisticsManager->IncrementHugeUndumpableCounter(EUndumpableCounter::BytesFreed, size); + } + } + + static size_t GetAllocationSize(const void* ptr) + { + UnalignPtr<THugeBlobHeader>(ptr); + const auto* blob = PtrToHeader<THugeBlobHeader>(ptr); + return blob->Size; + } + + static size_t GetAllocationSize(size_t size) + { + return GetBlobAllocationSize<THugeBlobHeader>(size); + } + +private: + TZoneAllocator ZoneAllocator_; +}; + +TExplicitlyConstructableSingleton<THugeBlobAllocator> HugeBlobAllocator; + +//////////////////////////////////////////////////////////////////////////////// +// A thunk to large and huge blob allocators + +class TBlobAllocator +{ +public: + static void* Allocate(size_t size) + { + InitializeGlobals(); + bool dumpable = GetCurrentMemoryZone() != EMemoryZone::Undumpable; + // NB: Account for the header. Also note that we may safely ignore the alignment since + // HugeAllocationSizeThreshold is already page-aligned. + if (Y_LIKELY(size < HugeAllocationSizeThreshold - sizeof(TLargeBlobHeader) - RightReadableAreaSize)) { + void* result = dumpable + ? DumpableLargeBlobAllocator->Allocate(size) + : UndumpableLargeBlobAllocator->Allocate(size); + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= LargeZoneStart(dumpable) && reinterpret_cast<uintptr_t>(result) < LargeZoneEnd(dumpable)); + return result; + } else { + auto* result = HugeBlobAllocator->Allocate(size, dumpable); + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= HugeZoneStart && reinterpret_cast<uintptr_t>(result) < HugeZoneEnd); + return result; + } + } + + static void Free(void* ptr) + { + InitializeGlobals(); + if (reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(true)) { + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= LargeZoneStart(true) && reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(true)); + UnalignPtr<TLargeBlobHeader>(ptr); + DumpableLargeBlobAllocator->Free(ptr); + } else if (reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(false)) { + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= LargeZoneStart(false) && reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(false)); + UnalignPtr<TLargeBlobHeader>(ptr); + UndumpableLargeBlobAllocator->Free(ptr); + } else if (reinterpret_cast<uintptr_t>(ptr) < HugeZoneEnd) { + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= HugeZoneStart && reinterpret_cast<uintptr_t>(ptr) < HugeZoneEnd); + UnalignPtr<THugeBlobHeader>(ptr); + HugeBlobAllocator->Free(ptr); + } else { + YTALLOC_TRAP("Wrong ptr passed to Free"); + } + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +Y_POD_THREAD(bool) CurrentThreadIsBackground; + +// Base class for all background threads. +template <class T> +class TBackgroundThreadBase +{ +public: + TBackgroundThreadBase() + : State_(new TState()) + { + NThreading::TForkAwareSpinLock::AtFork( + static_cast<T*>(this), + &BeforeFork, + &AfterForkParent, + &AfterForkChild); + } + + virtual ~TBackgroundThreadBase() + { + Stop(); + } + +private: + struct TState + : public TSystemAllocatable + { + std::mutex StartStopMutex; + std::optional<std::thread> Thread; + + std::mutex StopFlagMutex; + std::condition_variable StopFlagVariable; + std::chrono::system_clock::time_point LastInvocationTime; + bool StopFlag = false; + bool Paused = false; + + std::atomic<int> ForkDepth = 0; + bool RestartAfterFork = false; + }; + + TState* State_; + +private: + static void BeforeFork(void* cookie) + { + static_cast<T*>(cookie)->DoBeforeFork(); + } + + void DoBeforeFork() + { + bool stopped = Stop(); + if (State_->ForkDepth++ == 0) { + State_->RestartAfterFork = stopped; + } + } + + static void AfterForkParent(void* cookie) + { + static_cast<T*>(cookie)->DoAfterForkParent(); + } + + void DoAfterForkParent() + { + if (--State_->ForkDepth == 0) { + if (State_->RestartAfterFork) { + Start(false); + } + } + } + + static void AfterForkChild(void* cookie) + { + static_cast<T*>(cookie)->DoAfterForkChild(); + } + + void DoAfterForkChild() + { + bool restart = State_->RestartAfterFork; + State_ = new TState(); + if (restart) { + Start(false); + } + } + + virtual void ThreadMain() = 0; + +protected: + void Start(bool fromAlloc) + { + std::unique_lock<std::mutex> guard(State_->StartStopMutex, std::defer_lock); + if (fromAlloc) { + if (!guard.try_lock()) { + return; + } + + if (State_->Paused) { + return; + } + } else { + guard.lock(); + } + + State_->Paused = false; + if (State_->Thread) { + return; + } + + State_->StopFlag = false; + + State_->Thread.emplace([=] { + CurrentThreadIsBackground = true; + ThreadMain(); + }); + + OnStart(); + } + + bool Stop() + { + std::unique_lock<std::mutex> guard(State_->StartStopMutex); + + State_->Paused = true; + if (!State_->Thread) { + return false; + } + + std::unique_lock<std::mutex> flagGuard(State_->StopFlagMutex); + State_->StopFlag = true; + flagGuard.unlock(); + State_->StopFlagVariable.notify_one(); + + State_->Thread->join(); + State_->Thread.reset(); + + OnStop(); + + return true; + } + + bool IsDone(TDuration interval) + { + std::unique_lock<std::mutex> flagGuard(State_->StopFlagMutex); + auto result = State_->StopFlagVariable.wait_until( + flagGuard, + State_->LastInvocationTime + std::chrono::microseconds(interval.MicroSeconds()), + [&] { return State_->StopFlag; }); + State_->LastInvocationTime = std::chrono::system_clock::now(); + return result; + } + + virtual void OnStart() + { } + + virtual void OnStop() + { } +}; + +//////////////////////////////////////////////////////////////////////////////// + +// Invokes madvise(MADV_STOCKPILE) periodically. +class TStockpileThread + : public TBackgroundThreadBase<TStockpileThread> +{ +public: + explicit TStockpileThread(int index) + : Index_(index) + { + Start(false); + } + +private: + const int Index_; + + virtual void ThreadMain() override + { + TThread::SetCurrentThreadName(Sprintf("%s:%d", StockpileThreadName, Index_).c_str()); + + while (!IsDone(ConfigurationManager->GetStockpileInterval())) { + if (!MappedMemoryManager->Stockpile(ConfigurationManager->GetStockpileSize())) { + // No use to proceed. + YTALLOC_LOG_INFO("Stockpile call failed; terminating stockpile thread"); + break; + } + } + } +}; + +// Manages a bunch of TStockpileThreads. +class TStockpileManager +{ +public: + void SpawnIfNeeded() + { + if (!ConfigurationManager->IsStockpileEnabled()) { + return; + } + + int threadCount = ConfigurationManager->GetStockpileThreadCount(); + while (static_cast<int>(Threads_.size()) > threadCount) { + Threads_.pop_back(); + } + while (static_cast<int>(Threads_.size()) < threadCount) { + Threads_.push_back(std::make_unique<TStockpileThread>(static_cast<int>(Threads_.size()))); + } + } + +private: + std::vector<std::unique_ptr<TStockpileThread>> Threads_; +}; + +TExplicitlyConstructableSingleton<TStockpileManager> StockpileManager; + +//////////////////////////////////////////////////////////////////////////////// + +// Time to wait before re-spawning the thread after a fork. +static constexpr auto BackgroundThreadRespawnDelay = TDuration::Seconds(3); + +// Runs basic background activities: reclaim, logging, profiling etc. +class TBackgroundThread + : public TBackgroundThreadBase<TBackgroundThread> +{ +public: + bool IsStarted() + { + return Started_.load(); + } + + void SpawnIfNeeded() + { + if (CurrentThreadIsBackground) { + return; + } + Start(true); + } + +private: + std::atomic<bool> Started_ = false; + +private: + virtual void ThreadMain() override + { + TThread::SetCurrentThreadName(BackgroundThreadName); + TimingManager->DisableForCurrentThread(); + MmapObservationManager->DisableForCurrentThread(); + + while (!IsDone(BackgroundInterval)) { + DumpableLargeBlobAllocator->RunBackgroundTasks(); + UndumpableLargeBlobAllocator->RunBackgroundTasks(); + MappedMemoryManager->RunBackgroundTasks(); + TimingManager->RunBackgroundTasks(); + MmapObservationManager->RunBackgroundTasks(); + StockpileManager->SpawnIfNeeded(); + } + } + + virtual void OnStart() override + { + DoUpdateAllThreadsControlWord(true); + } + + virtual void OnStop() override + { + DoUpdateAllThreadsControlWord(false); + } + + void DoUpdateAllThreadsControlWord(bool started) + { + // Update threads' TLS. + ThreadManager->EnumerateThreadStatesSync( + [&] { + Started_.store(started); + }, + [&] (auto* state) { + if (state->BackgroundThreadStarted) { + *state->BackgroundThreadStarted = started; + } + }); + } +}; + +TExplicitlyConstructableSingleton<TBackgroundThread> BackgroundThread; + +//////////////////////////////////////////////////////////////////////////////// + +Y_FORCE_INLINE TThreadState* TThreadManager::GetThreadStateUnchecked() +{ + YTALLOC_PARANOID_ASSERT(ThreadState_); + return ThreadState_; +} + +Y_FORCE_INLINE TThreadState* TThreadManager::FindThreadState() +{ + if (Y_LIKELY(ThreadState_)) { + return ThreadState_; + } + + if (ThreadStateDestroyed_) { + return nullptr; + } + + InitializeGlobals(); + + // InitializeGlobals must not allocate. + Y_VERIFY(!ThreadState_); + ThreadState_ = ThreadManager->AllocateThreadState(); + (&ThreadControlWord_)->Parts.ThreadStateValid = true; + + return ThreadState_; +} + +void TThreadManager::DestroyThread(void*) +{ + TSmallAllocator::PurgeCaches(); + + TThreadState* state = ThreadState_; + ThreadState_ = nullptr; + ThreadStateDestroyed_ = true; + (&ThreadControlWord_)->Parts.ThreadStateValid = false; + + { + auto guard = GuardWithTiming(ThreadManager->ThreadRegistryLock_); + state->AllocationProfilingEnabled = nullptr; + state->BackgroundThreadStarted = nullptr; + ThreadManager->UnrefThreadState(state); + } +} + +void TThreadManager::DestroyThreadState(TThreadState* state) +{ + StatisticsManager->AccumulateLocalCounters(state); + ThreadRegistry_.Remove(state); + ThreadStatePool_.Free(state); +} + +void TThreadManager::AfterFork(void* cookie) +{ + static_cast<TThreadManager*>(cookie)->DoAfterFork(); +} + +void TThreadManager::DoAfterFork() +{ + auto guard = GuardWithTiming(ThreadRegistryLock_); + ThreadRegistry_.Clear(); + TThreadState* state = ThreadState_; + if (state) { + ThreadRegistry_.PushBack(state); + } +} + +TThreadState* TThreadManager::AllocateThreadState() +{ + auto* state = ThreadStatePool_.Allocate(); + state->AllocationProfilingEnabled = &(*&ThreadControlWord_).Parts.AllocationProfilingEnabled; + state->BackgroundThreadStarted = &(*&ThreadControlWord_).Parts.BackgroundThreadStarted; + + { + auto guard = GuardWithTiming(ThreadRegistryLock_); + // NB: These flags must be initialized under ThreadRegistryLock_; see EnumerateThreadStatesSync. + *state->AllocationProfilingEnabled = ConfigurationManager->IsAllocationProfilingEnabled(); + *state->BackgroundThreadStarted = BackgroundThread->IsStarted(); + ThreadRegistry_.PushBack(state); + } + + // Need to pass some non-null value for DestroyThread to be called. + pthread_setspecific(ThreadDtorKey_, (void*)-1); + + return state; +} + +//////////////////////////////////////////////////////////////////////////////// + +void InitializeGlobals() +{ + static std::once_flag Initialized; + std::call_once(Initialized, [] () { + LogManager.Construct(); + BacktraceManager.Construct(); + StatisticsManager.Construct(); + MappedMemoryManager.Construct(); + ThreadManager.Construct(); + GlobalState.Construct(); + DumpableLargeBlobAllocator.Construct(); + UndumpableLargeBlobAllocator.Construct(); + HugeBlobAllocator.Construct(); + ConfigurationManager.Construct(); + SystemAllocator.Construct(); + TimingManager.Construct(); + MmapObservationManager.Construct(); + StockpileManager.Construct(); + BackgroundThread.Construct(); + + SmallArenaAllocators.Construct(); + auto constructSmallArenaAllocators = [&] (EAllocationKind kind, uintptr_t zonesStart) { + for (size_t rank = 1; rank < SmallRankCount; ++rank) { + (*SmallArenaAllocators)[kind][rank].Construct(kind, rank, zonesStart + rank * SmallZoneSize); + } + }; + constructSmallArenaAllocators(EAllocationKind::Untagged, UntaggedSmallZonesStart); + constructSmallArenaAllocators(EAllocationKind::Tagged, TaggedSmallZonesStart); + + GlobalSmallChunkCaches.Construct(); + (*GlobalSmallChunkCaches)[EAllocationKind::Tagged].Construct(EAllocationKind::Tagged); + (*GlobalSmallChunkCaches)[EAllocationKind::Untagged].Construct(EAllocationKind::Untagged); + }); +} + +//////////////////////////////////////////////////////////////////////////////// + +void StartBackgroundThread() +{ + InitializeGlobals(); + BackgroundThread->SpawnIfNeeded(); +} + +//////////////////////////////////////////////////////////////////////////////// + +template <class... Ts> +Y_FORCE_INLINE void* AllocateSmallUntagged(size_t rank, Ts... args) +{ + auto* result = TSmallAllocator::Allocate<EAllocationKind::Untagged>(NullMemoryTag, rank, std::forward<Ts>(args)...); + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= MinUntaggedSmallPtr && reinterpret_cast<uintptr_t>(result) < MaxUntaggedSmallPtr); + return result; +} + +template <class... Ts> +Y_FORCE_INLINE void* AllocateSmallTagged(ui64 controlWord, size_t rank, Ts... args) +{ + auto tag = Y_UNLIKELY((controlWord & TThreadManager::AllocationProfilingEnabledControlWordMask) && ConfigurationManager->IsSmallArenaAllocationProfiled(rank)) + ? BacktraceManager->GetMemoryTagFromBacktrace(2) + : static_cast<TMemoryTag>(controlWord & TThreadManager::MemoryTagControlWordMask); + auto* result = TSmallAllocator::Allocate<EAllocationKind::Tagged>(tag, rank, std::forward<Ts>(args)...); + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= MinTaggedSmallPtr && reinterpret_cast<uintptr_t>(result) < MaxTaggedSmallPtr); + return result; +} + +Y_FORCE_INLINE void* AllocateInline(size_t size) +{ + size_t rank; + if (Y_LIKELY(size <= 512)) { + rank = SizeToSmallRank1[(size + 7) >> 3]; + } else if (Y_LIKELY(size < LargeAllocationSizeThreshold)) { + rank = SizeToSmallRank2[(size - 1) >> 8]; + } else { + StartBackgroundThread(); + return TBlobAllocator::Allocate(size); + } + + auto controlWord = TThreadManager::GetThreadControlWord(); + if (Y_LIKELY(controlWord == TThreadManager::FastPathControlWord)) { + return AllocateSmallUntagged(rank, TThreadManager::GetThreadStateUnchecked()); + } + + if (Y_UNLIKELY(!(controlWord & TThreadManager::BackgroundThreadStartedControlWorkMask))) { + StartBackgroundThread(); + } + + if (!(controlWord & (TThreadManager::MemoryTagControlWordMask | TThreadManager::AllocationProfilingEnabledControlWordMask))) { + return AllocateSmallUntagged(rank); + } else { + return AllocateSmallTagged(controlWord, rank); + } +} + +Y_FORCE_INLINE void* AllocateSmallInline(size_t rank) +{ + auto controlWord = TThreadManager::GetThreadControlWord(); + if (Y_LIKELY(controlWord == TThreadManager::FastPathControlWord)) { + return AllocateSmallUntagged(rank, TThreadManager::GetThreadStateUnchecked()); + } + + if (!(controlWord & (TThreadManager::MemoryTagControlWordMask | TThreadManager::AllocationProfilingEnabledControlWordMask))) { + return AllocateSmallUntagged(rank); + } else { + return AllocateSmallTagged(controlWord, rank); + } +} + +Y_FORCE_INLINE void* AllocatePageAlignedInline(size_t size) +{ + size = std::max(AlignUp(size, PageSize), PageSize); + void* result = size >= LargeAllocationSizeThreshold + ? AlignUp(TBlobAllocator::Allocate(size + PageSize), PageSize) + : Allocate(size); + YTALLOC_ASSERT(reinterpret_cast<uintptr_t>(result) % PageSize == 0); + return result; +} + +Y_FORCE_INLINE void FreeNonNullInline(void* ptr) +{ + YTALLOC_ASSERT(ptr); + if (Y_LIKELY(reinterpret_cast<uintptr_t>(ptr) < UntaggedSmallZonesEnd)) { + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= MinUntaggedSmallPtr && reinterpret_cast<uintptr_t>(ptr) < MaxUntaggedSmallPtr); + TSmallAllocator::Free<EAllocationKind::Untagged>(ptr); + } else if (Y_LIKELY(reinterpret_cast<uintptr_t>(ptr) < TaggedSmallZonesEnd)) { + YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= MinTaggedSmallPtr && reinterpret_cast<uintptr_t>(ptr) < MaxTaggedSmallPtr); + TSmallAllocator::Free<EAllocationKind::Tagged>(ptr); + } else { + TBlobAllocator::Free(ptr); + } +} + +Y_FORCE_INLINE void FreeInline(void* ptr) +{ + if (Y_LIKELY(ptr)) { + FreeNonNullInline(ptr); + } +} + +Y_FORCE_INLINE size_t GetAllocationSizeInline(const void* ptr) +{ + if (Y_UNLIKELY(!ptr)) { + return 0; + } + + auto uintptr = reinterpret_cast<uintptr_t>(ptr); + if (uintptr < UntaggedSmallZonesEnd) { + YTALLOC_PARANOID_ASSERT(uintptr >= MinUntaggedSmallPtr && uintptr < MaxUntaggedSmallPtr); + return TSmallAllocator::GetAllocationSize(ptr); + } else if (uintptr < TaggedSmallZonesEnd) { + YTALLOC_PARANOID_ASSERT(uintptr >= MinTaggedSmallPtr && uintptr < MaxTaggedSmallPtr); + return TSmallAllocator::GetAllocationSize(ptr); + } else if (uintptr < LargeZoneEnd(true)) { + YTALLOC_PARANOID_ASSERT(uintptr >= LargeZoneStart(true) && uintptr < LargeZoneEnd(true)); + return TLargeBlobAllocator<true>::GetAllocationSize(ptr); + } else if (uintptr < LargeZoneEnd(false)) { + YTALLOC_PARANOID_ASSERT(uintptr >= LargeZoneStart(false) && uintptr < LargeZoneEnd(false)); + return TLargeBlobAllocator<false>::GetAllocationSize(ptr); + } else if (uintptr < HugeZoneEnd) { + YTALLOC_PARANOID_ASSERT(uintptr >= HugeZoneStart && uintptr < HugeZoneEnd); + return THugeBlobAllocator::GetAllocationSize(ptr); + } else { + YTALLOC_TRAP("Wrong ptr passed to GetAllocationSizeInline"); + } +} + +Y_FORCE_INLINE size_t GetAllocationSizeInline(size_t size) +{ + if (size <= LargeAllocationSizeThreshold) { + return TSmallAllocator::GetAllocationSize(size); + } else if (size <= HugeAllocationSizeThreshold) { + return TLargeBlobAllocator<true>::GetAllocationSize(size); + } else { + return THugeBlobAllocator::GetAllocationSize(size); + } +} + +void EnableLogging(TLogHandler logHandler) +{ + InitializeGlobals(); + LogManager->EnableLogging(logHandler); +} + +void SetBacktraceProvider(TBacktraceProvider provider) +{ + InitializeGlobals(); + BacktraceManager->SetBacktraceProvider(provider); + DumpableLargeBlobAllocator->SetBacktraceProvider(provider); + UndumpableLargeBlobAllocator->SetBacktraceProvider(provider); +} + +void SetBacktraceFormatter(TBacktraceFormatter provider) +{ + InitializeGlobals(); + MmapObservationManager->SetBacktraceFormatter(provider); +} + +void EnableStockpile() +{ + InitializeGlobals(); + ConfigurationManager->EnableStockpile(); +} + +void SetStockpileInterval(TDuration value) +{ + InitializeGlobals(); + ConfigurationManager->SetStockpileInterval(value); +} + +void SetStockpileThreadCount(int value) +{ + InitializeGlobals(); + ConfigurationManager->SetStockpileThreadCount(value); +} + +void SetStockpileSize(size_t value) +{ + InitializeGlobals(); + ConfigurationManager->SetStockpileSize(value); +} + +void SetLargeUnreclaimableCoeff(double value) +{ + InitializeGlobals(); + ConfigurationManager->SetLargeUnreclaimableCoeff(value); +} + +void SetTimingEventThreshold(TDuration value) +{ + InitializeGlobals(); + ConfigurationManager->SetTimingEventThreshold(value); +} + +void SetMinLargeUnreclaimableBytes(size_t value) +{ + InitializeGlobals(); + ConfigurationManager->SetMinLargeUnreclaimableBytes(value); +} + +void SetMaxLargeUnreclaimableBytes(size_t value) +{ + InitializeGlobals(); + ConfigurationManager->SetMaxLargeUnreclaimableBytes(value); +} + +void SetAllocationProfilingEnabled(bool value) +{ + ConfigurationManager->SetAllocationProfilingEnabled(value); +} + +void SetAllocationProfilingSamplingRate(double rate) +{ + ConfigurationManager->SetAllocationProfilingSamplingRate(rate); +} + +void SetSmallArenaAllocationProfilingEnabled(size_t rank, bool value) +{ + ConfigurationManager->SetSmallArenaAllocationProfilingEnabled(rank, value); +} + +void SetLargeArenaAllocationProfilingEnabled(size_t rank, bool value) +{ + ConfigurationManager->SetLargeArenaAllocationProfilingEnabled(rank, value); +} + +void SetProfilingBacktraceDepth(int depth) +{ + ConfigurationManager->SetProfilingBacktraceDepth(depth); +} + +void SetMinProfilingBytesUsedToReport(size_t size) +{ + ConfigurationManager->SetMinProfilingBytesUsedToReport(size); +} + +void SetEnableEagerMemoryRelease(bool value) +{ + ConfigurationManager->SetEnableEagerMemoryRelease(value); +} + +void SetEnableMadvisePopulate(bool value) +{ + ConfigurationManager->SetEnableMadvisePopulate(value); +} + +TEnumIndexedVector<ETotalCounter, ssize_t> GetTotalAllocationCounters() +{ + InitializeGlobals(); + return StatisticsManager->GetTotalAllocationCounters(); +} + +TEnumIndexedVector<ESystemCounter, ssize_t> GetSystemAllocationCounters() +{ + InitializeGlobals(); + return StatisticsManager->GetSystemAllocationCounters(); +} + +TEnumIndexedVector<ESystemCounter, ssize_t> GetUndumpableAllocationCounters() +{ + InitializeGlobals(); + return StatisticsManager->GetUndumpableAllocationCounters(); +} + +TEnumIndexedVector<ESmallCounter, ssize_t> GetSmallAllocationCounters() +{ + InitializeGlobals(); + return StatisticsManager->GetSmallAllocationCounters(); +} + +TEnumIndexedVector<ESmallCounter, ssize_t> GetLargeAllocationCounters() +{ + InitializeGlobals(); + return StatisticsManager->GetLargeAllocationCounters(); +} + +std::array<TEnumIndexedVector<ESmallArenaCounter, ssize_t>, SmallRankCount> GetSmallArenaAllocationCounters() +{ + InitializeGlobals(); + return StatisticsManager->GetSmallArenaAllocationCounters(); +} + +std::array<TEnumIndexedVector<ELargeArenaCounter, ssize_t>, LargeRankCount> GetLargeArenaAllocationCounters() +{ + InitializeGlobals(); + return StatisticsManager->GetLargeArenaAllocationCounters(); +} + +TEnumIndexedVector<EHugeCounter, ssize_t> GetHugeAllocationCounters() +{ + InitializeGlobals(); + return StatisticsManager->GetHugeAllocationCounters(); +} + +std::vector<TProfiledAllocation> GetProfiledAllocationStatistics() +{ + InitializeGlobals(); + + if (!ConfigurationManager->IsAllocationProfilingEnabled()) { + return {}; + } + + std::vector<TMemoryTag> tags; + tags.reserve(MaxCapturedAllocationBacktraces + 1); + for (TMemoryTag tag = AllocationProfilingMemoryTagBase; + tag < AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces; + ++tag) + { + tags.push_back(tag); + } + tags.push_back(AllocationProfilingUnknownMemoryTag); + + std::vector<TEnumIndexedVector<EBasicCounter, ssize_t>> counters; + counters.resize(tags.size()); + StatisticsManager->GetTaggedMemoryCounters(tags.data(), tags.size(), counters.data()); + + std::vector<TProfiledAllocation> statistics; + for (size_t index = 0; index < tags.size(); ++index) { + if (counters[index][EBasicCounter::BytesUsed] < static_cast<ssize_t>(ConfigurationManager->GetMinProfilingBytesUsedToReport())) { + continue; + } + auto tag = tags[index]; + auto optionalBacktrace = BacktraceManager->FindBacktrace(tag); + if (!optionalBacktrace && tag != AllocationProfilingUnknownMemoryTag) { + continue; + } + statistics.push_back(TProfiledAllocation{ + optionalBacktrace.value_or(TBacktrace()), + counters[index] + }); + } + return statistics; +} + +TEnumIndexedVector<ETimingEventType, TTimingEventCounters> GetTimingEventCounters() +{ + InitializeGlobals(); + return TimingManager->GetTimingEventCounters(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NYTAlloc + |