#pragma once

// This file contains the core parts of YTAlloc but no malloc/free-bridge.
// The latter bridge is placed into alloc.cpp, which includes (sic!) core-inl.h.
// This ensures that AllocateInline/FreeInline calls are properly inlined into malloc/free.
// Also core-inl.h can be directly included in, e.g., benchmarks.

#include <library/cpp/yt/containers/intrusive_linked_list.h>

#include <library/cpp/yt/memory/memory_tag.h>

#include <library/cpp/yt/threading/at_fork.h>
#include <library/cpp/yt/threading/fork_aware_spin_lock.h>

#include <library/cpp/yt/memory/free_list.h>

#include <util/system/tls.h>
#include <util/system/align.h>
#include <util/system/thread.h>

#include <util/string/printf.h>

#include <util/generic/singleton.h>
#include <util/generic/size_literals.h>
#include <util/generic/utility.h>

#include <util/digest/numeric.h>

#include <library/cpp/ytalloc/api/ytalloc.h>

#include <atomic>
#include <array>
#include <vector>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <cstdio>
#include <optional>

#include <sys/mman.h>

#ifdef _linux_
    #include <sys/utsname.h>
#endif

#include <errno.h>
#include <pthread.h>
#include <time.h>

#ifndef MAP_POPULATE
    #define MAP_POPULATE 0x08000
#endif

// MAP_FIXED which doesn't unmap underlying mapping.
// Linux kernels older than 4.17 silently ignore this flag.
#ifndef MAP_FIXED_NOREPLACE
    #ifdef _linux_
        #define MAP_FIXED_NOREPLACE 0x100000
    #else
        #define MAP_FIXED_NOREPLACE 0
    #endif
#endif

#ifndef MADV_POPULATE
    #define MADV_POPULATE 0x59410003
#endif

#ifndef MADV_STOCKPILE
    #define MADV_STOCKPILE 0x59410004
#endif

#ifndef MADV_FREE
    #define MADV_FREE 8
#endif

#ifndef MADV_DONTDUMP
    #define MADV_DONTDUMP 16
#endif

#ifndef NDEBUG
    #define YTALLOC_PARANOID
#endif

#ifdef YTALLOC_PARANOID
    #define YTALLOC_NERVOUS
#endif

#define YTALLOC_VERIFY(condition)                                                             \
    do {                                                                                      \
        if (Y_UNLIKELY(!(condition))) {                                                       \
            ::NYT::NYTAlloc::AssertTrap("Assertion failed: " #condition, __FILE__, __LINE__); \
        }                                                                                     \
    } while (false)

#ifdef NDEBUG
    #define YTALLOC_ASSERT(condition) YTALLOC_VERIFY(condition)
#else
    #define YTALLOC_ASSERT(condition) (void)(0)
#endif

#ifdef YTALLOC_PARANOID
    #define YTALLOC_PARANOID_ASSERT(condition) YTALLOC_VERIFY(condition)
#else
    #define YTALLOC_PARANOID_ASSERT(condition) (true || (condition))
#endif

#define YTALLOC_TRAP(message) ::NYT::NYTAlloc::AssertTrap(message, __FILE__, __LINE__)

namespace NYT::NYTAlloc {

////////////////////////////////////////////////////////////////////////////////
// Allocations are classified into three types:
//
// a) Small chunks (less than LargeAllocationSizeThreshold)
// These are the fastest and are extensively cached (both per-thread and globally).
// Memory claimed for these allocations is never reclaimed back.
// Code dealing with such allocations is heavy optimized with all hot paths
// as streamlined as possible. The implementation is mostly inspired by LFAlloc.
//
// b) Large blobs (from LargeAllocationSizeThreshold to HugeAllocationSizeThreshold)
// These are cached as well. We expect such allocations to be less frequent
// than small ones but still do our best to provide good scalability.
// In particular, thread-sharded concurrent data structures as used to provide access to
// cached blobs. Memory is claimed via madvise(MADV_POPULATE) and reclaimed back
// via madvise(MADV_FREE).
//
// c) Huge blobs (from HugeAllocationSizeThreshold)
// These should be rare; we delegate directly to mmap and munmap for each allocation.
//
// We also provide a separate allocator for all system allocations (that are needed by YTAlloc itself).
// These are rare and also delegate to mmap/unmap.

// Periods between background activities.
constexpr auto BackgroundInterval = TDuration::Seconds(1);

static_assert(LargeRankCount - MinLargeRank <= 16, "Too many large ranks");
static_assert(SmallRankCount <= 32, "Too many small ranks");

constexpr size_t SmallZoneSize = 1_TB;
constexpr size_t LargeZoneSize = 16_TB;
constexpr size_t HugeZoneSize = 1_TB;
constexpr size_t SystemZoneSize = 1_TB;

constexpr size_t MaxCachedChunksPerRank = 256;

constexpr uintptr_t UntaggedSmallZonesStart = 0;
constexpr uintptr_t UntaggedSmallZonesEnd = UntaggedSmallZonesStart + 32 * SmallZoneSize;
constexpr uintptr_t MinUntaggedSmallPtr = UntaggedSmallZonesStart + SmallZoneSize * 1;
constexpr uintptr_t MaxUntaggedSmallPtr = UntaggedSmallZonesStart + SmallZoneSize * SmallRankCount;

constexpr uintptr_t TaggedSmallZonesStart = UntaggedSmallZonesEnd;
constexpr uintptr_t TaggedSmallZonesEnd = TaggedSmallZonesStart + 32 * SmallZoneSize;
constexpr uintptr_t MinTaggedSmallPtr = TaggedSmallZonesStart + SmallZoneSize * 1;
constexpr uintptr_t MaxTaggedSmallPtr = TaggedSmallZonesStart + SmallZoneSize * SmallRankCount;

constexpr uintptr_t DumpableLargeZoneStart = TaggedSmallZonesEnd;
constexpr uintptr_t DumpableLargeZoneEnd = DumpableLargeZoneStart + LargeZoneSize;

constexpr uintptr_t UndumpableLargeZoneStart = DumpableLargeZoneEnd;
constexpr uintptr_t UndumpableLargeZoneEnd = UndumpableLargeZoneStart + LargeZoneSize;

constexpr uintptr_t LargeZoneStart(bool dumpable)
{
    return dumpable ? DumpableLargeZoneStart : UndumpableLargeZoneStart;
}
constexpr uintptr_t LargeZoneEnd(bool dumpable)
{
    return dumpable ? DumpableLargeZoneEnd : UndumpableLargeZoneEnd;
}

constexpr uintptr_t HugeZoneStart = UndumpableLargeZoneEnd;
constexpr uintptr_t HugeZoneEnd = HugeZoneStart + HugeZoneSize;

constexpr uintptr_t SystemZoneStart = HugeZoneEnd;
constexpr uintptr_t SystemZoneEnd = SystemZoneStart + SystemZoneSize;

// We leave 64_KB at the end of 256_MB block and never use it.
// That serves two purposes:
//   1. SmallExtentSize % SmallSegmentSize == 0
//   2. Every small object satisfies RightReadableArea requirement.
constexpr size_t SmallExtentAllocSize = 256_MB;
constexpr size_t SmallExtentSize = SmallExtentAllocSize - 64_KB;
constexpr size_t SmallSegmentSize = 96_KB; // LCM(SmallRankToSize)

constexpr ui16 SmallRankBatchSize[SmallRankCount] = {
    0, 256, 256, 256, 256, 256, 256, 256, 256, 256, 192, 128, 96, 64, 48, 32, 24, 16, 12, 8, 6, 4, 3
};

constexpr bool CheckSmallSizes()
{
    for (size_t rank = 0; rank < SmallRankCount; rank++) {
        auto size = SmallRankToSize[rank];
        if (size == 0) {
            continue;
        }

        if (SmallSegmentSize % size != 0) {
            return false;
        }

        if (SmallRankBatchSize[rank] > MaxCachedChunksPerRank) {
            return false;
        }
    }

    return true;
}

static_assert(CheckSmallSizes());
static_assert(SmallExtentSize % SmallSegmentSize == 0);
static_assert(SmallSegmentSize % PageSize == 0);

constexpr size_t LargeExtentSize = 1_GB;
static_assert(LargeExtentSize >= LargeAllocationSizeThreshold, "LargeExtentSize < LargeAllocationSizeThreshold");

constexpr const char* BackgroundThreadName = "YTAllocBack";
constexpr const char* StockpileThreadName = "YTAllocStock";

DEFINE_ENUM(EAllocationKind,
    (Untagged)
    (Tagged)
);

// Forward declarations.
struct TThreadState;
struct TLargeArena;
struct TLargeBlobExtent;

////////////////////////////////////////////////////////////////////////////////
// Traps and assertions

[[noreturn]]
void OomTrap()
{
    _exit(9);
}

[[noreturn]]
void AssertTrap(const char* message, const char* file, int line)
{
    ::fprintf(stderr, "*** YTAlloc has detected an internal trap at %s:%d\n*** %s\n",
        file,
        line,
        message);
    __builtin_trap();
}

template <class T, class E>
void AssertBlobState(T* header, E expectedState)
{
    auto actualState = header->State;
    if (Y_UNLIKELY(actualState != expectedState)) {
        char message[256];
        sprintf(message, "Invalid blob header state at %p: expected %" PRIx64 ", actual %" PRIx64,
            header,
            static_cast<ui64>(expectedState),
            static_cast<ui64>(actualState));
        YTALLOC_TRAP(message);
    }
}

////////////////////////////////////////////////////////////////////////////////

// Provides a never-dying singleton with explicit construction.
template <class T>
class TExplicitlyConstructableSingleton
{
public:
    TExplicitlyConstructableSingleton()
    { }

    ~TExplicitlyConstructableSingleton()
    { }

    template <class... Ts>
    void Construct(Ts&&... args)
    {
        new (&Storage_) T(std::forward<Ts>(args)...);
#ifndef NDEBUG
        Constructed_ = true;
#endif
    }

    Y_FORCE_INLINE T* Get()
    {
#ifndef NDEBUG
        YTALLOC_PARANOID_ASSERT(Constructed_);
#endif
        return &Storage_;
    }

    Y_FORCE_INLINE const T* Get() const
    {
#ifndef NDEBUG
        YTALLOC_PARANOID_ASSERT(Constructed_);
#endif
        return &Storage_;
    }

    Y_FORCE_INLINE T* operator->()
    {
        return Get();
    }

    Y_FORCE_INLINE const T* operator->() const
    {
        return Get();
    }

    Y_FORCE_INLINE T& operator*()
    {
        return *Get();
    }

    Y_FORCE_INLINE const T& operator*() const
    {
        return *Get();
    }

private:
    union {
        T Storage_;
    };

#ifndef NDEBUG
    bool Constructed_;
#endif
};

////////////////////////////////////////////////////////////////////////////////

// Initializes all singletons.
// Safe to call multiple times.
// Guaranteed to not allocate.
void InitializeGlobals();

// Spawns the background thread, if it's time.
// Safe to call multiple times.
// Must be called on allocation slow path.
void StartBackgroundThread();

////////////////////////////////////////////////////////////////////////////////

class TLogManager
{
public:
    // Sets the handler to be invoked for each log event produced by YTAlloc.
    void EnableLogging(TLogHandler logHandler)
    {
        LogHandler_.store(logHandler);
    }

    // Checks (in a racy way) that logging is enabled.
    bool IsLoggingEnabled()
    {
        return LogHandler_.load() != nullptr;
    }

    // Logs the message via log handler (if any).
    template <class... Ts>
    void LogMessage(ELogEventSeverity severity, const char* format, Ts&&... args)
    {
        auto logHandler = LogHandler_.load();
        if (!logHandler) {
            return;
        }

        std::array<char, 16_KB> buffer;
        auto len = ::snprintf(buffer.data(), buffer.size(), format, std::forward<Ts>(args)...);

        TLogEvent event;
        event.Severity = severity;
        event.Message = TStringBuf(buffer.data(), len);
        logHandler(event);
    }

    // A special case of zero args.
    void LogMessage(ELogEventSeverity severity, const char* message)
    {
        LogMessage(severity, "%s", message);
    }

private:
    std::atomic<TLogHandler> LogHandler_= nullptr;

};

TExplicitlyConstructableSingleton<TLogManager> LogManager;

#define YTALLOC_LOG_EVENT(...)   LogManager->LogMessage(__VA_ARGS__)
#define YTALLOC_LOG_DEBUG(...)   YTALLOC_LOG_EVENT(ELogEventSeverity::Debug, __VA_ARGS__)
#define YTALLOC_LOG_INFO(...)    YTALLOC_LOG_EVENT(ELogEventSeverity::Info, __VA_ARGS__)
#define YTALLOC_LOG_WARNING(...) YTALLOC_LOG_EVENT(ELogEventSeverity::Warning, __VA_ARGS__)
#define YTALLOC_LOG_ERROR(...)   YTALLOC_LOG_EVENT(ELogEventSeverity::Error, __VA_ARGS__)

////////////////////////////////////////////////////////////////////////////////

Y_FORCE_INLINE size_t GetUsed(ssize_t allocated, ssize_t freed)
{
    return allocated >= freed ? static_cast<size_t>(allocated - freed) : 0;
}

template <class T>
Y_FORCE_INLINE void* HeaderToPtr(T* header)
{
    return header + 1;
}

template <class T>
Y_FORCE_INLINE T* PtrToHeader(void* ptr)
{
    return static_cast<T*>(ptr) - 1;
}

template <class T>
Y_FORCE_INLINE const T* PtrToHeader(const void* ptr)
{
    return static_cast<const T*>(ptr) - 1;
}

Y_FORCE_INLINE size_t PtrToSmallRank(const void* ptr)
{
    return (reinterpret_cast<uintptr_t>(ptr) >> 40) & 0x1f;
}

Y_FORCE_INLINE char* AlignDownToSmallSegment(char* extent, char* ptr)
{
    auto offset = static_cast<uintptr_t>(ptr - extent);
    // NB: This modulo operation is always performed using multiplication.
    offset -= (offset % SmallSegmentSize);
    return extent + offset;
}

Y_FORCE_INLINE char* AlignUpToSmallSegment(char* extent, char* ptr)
{
    return AlignDownToSmallSegment(extent, ptr + SmallSegmentSize - 1);
}

template <class T>
static Y_FORCE_INLINE void UnalignPtr(void*& ptr)
{
    if (reinterpret_cast<uintptr_t>(ptr) % PageSize == 0) {
        reinterpret_cast<char*&>(ptr) -= PageSize - sizeof (T);
    }
    YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) % PageSize == sizeof (T));
}

template <class T>
static Y_FORCE_INLINE void UnalignPtr(const void*& ptr)
{
    if (reinterpret_cast<uintptr_t>(ptr) % PageSize == 0) {
        reinterpret_cast<const char*&>(ptr) -= PageSize - sizeof (T);
    }
    YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) % PageSize == sizeof (T));
}

template <class T>
Y_FORCE_INLINE size_t GetRawBlobSize(size_t size)
{
    return AlignUp(size + sizeof (T) + RightReadableAreaSize, PageSize);
}

template <class T>
Y_FORCE_INLINE size_t GetBlobAllocationSize(size_t size)
{
    size += sizeof(T);
    size += RightReadableAreaSize;
    size = AlignUp(size, PageSize);
    size -= sizeof(T);
    size -= RightReadableAreaSize;
    return size;
}

Y_FORCE_INLINE size_t GetLargeRank(size_t size)
{
    size_t rank = 64 - __builtin_clzl(size);
    if (size == (1ULL << (rank - 1))) {
        --rank;
    }
    return rank;
}

Y_FORCE_INLINE void PoisonRange(void* ptr, size_t size, ui32 magic)
{
#ifdef YTALLOC_PARANOID
    size = ::AlignUp<size_t>(size, 4);
    std::fill(static_cast<ui32*>(ptr), static_cast<ui32*>(ptr) + size / 4, magic);
#else
    Y_UNUSED(ptr);
    Y_UNUSED(size);
    Y_UNUSED(magic);
#endif
}

Y_FORCE_INLINE void PoisonFreedRange(void* ptr, size_t size)
{
    PoisonRange(ptr, size, 0xdeadbeef);
}

Y_FORCE_INLINE void PoisonUninitializedRange(void* ptr, size_t size)
{
    PoisonRange(ptr, size, 0xcafebabe);
}

// Checks that the header size is divisible by 16 (as needed due to alignment restrictions).
#define CHECK_HEADER_ALIGNMENT(T) static_assert(sizeof(T) % 16 == 0, "sizeof(" #T ") % 16 != 0");

////////////////////////////////////////////////////////////////////////////////

static_assert(sizeof(TFreeList<void>) == CacheLineSize, "sizeof(TFreeList) != CacheLineSize");

////////////////////////////////////////////////////////////////////////////////

constexpr size_t ShardCount = 16;
std::atomic<size_t> GlobalCurrentShardIndex;

// Provides a context for working with sharded data structures.
// Captures the initial shard index upon construction (indicating the shard
// where all insertions go). Maintains the current shard index (round-robin,
// indicating the shard currently used for extraction).
// Can be or be not thread-safe depending on TCounter.
template <class TCounter>
class TShardedState
{
public:
    TShardedState()
        : InitialShardIndex_(GlobalCurrentShardIndex++ % ShardCount)
        , CurrentShardIndex_(InitialShardIndex_)
    { }

    Y_FORCE_INLINE size_t GetInitialShardIndex() const
    {
        return InitialShardIndex_;
    }

    Y_FORCE_INLINE size_t GetNextShardIndex()
    {
        return ++CurrentShardIndex_ % ShardCount;
    }

private:
    const size_t InitialShardIndex_;
    TCounter CurrentShardIndex_;
};

using TLocalShardedState = TShardedState<size_t>;
using TGlobalShardedState = TShardedState<std::atomic<size_t>>;

// Implemented as a collection of free lists (each called a shard).
// One needs TShardedState to access the sharded data structure.
template <class T>
class TShardedFreeList
{
public:
    // First tries to extract an item from the initial shard;
    // if failed then proceeds to all shards in round-robin fashion.
    template <class TState>
    T* Extract(TState* state)
    {
        if (auto* item = Shards_[state->GetInitialShardIndex()].Extract()) {
            return item;
        }
        return ExtractRoundRobin(state);
    }

    // Attempts to extract an item from all shards in round-robin fashion.
    template <class TState>
    T* ExtractRoundRobin(TState* state)
    {
       for (size_t index = 0; index < ShardCount; ++index) {
            if (auto* item = Shards_[state->GetNextShardIndex()].Extract()) {
                return item;
            }
        }
        return nullptr;
    }

    // Extracts items from all shards linking them together.
    T* ExtractAll()
    {
        T* head = nullptr;
        T* tail = nullptr;
        for (auto& shard : Shards_) {
            auto* item = shard.ExtractAll();
            if (!head) {
                head = item;
            }
            if (tail) {
                YTALLOC_PARANOID_ASSERT(!tail->Next);
                tail->Next = item;
            } else {
                tail = item;
            }
            while (tail && tail->Next) {
                tail = tail->Next;
            }
        }
        return head;
    }

    template <class TState>
    void Put(TState* state, T* item)
    {
        Shards_[state->GetInitialShardIndex()].Put(item);
    }

private:
    std::array<TFreeList<T>, ShardCount> Shards_;
};

////////////////////////////////////////////////////////////////////////////////

// Holds YTAlloc control knobs.
// Thread safe.
class TConfigurationManager
{
public:
    void SetLargeUnreclaimableCoeff(double value)
    {
        LargeUnreclaimableCoeff_.store(value);
    }

    double GetLargeUnreclaimableCoeff() const
    {
        return LargeUnreclaimableCoeff_.load(std::memory_order_relaxed);
    }


    void SetMinLargeUnreclaimableBytes(size_t value)
    {
        MinLargeUnreclaimableBytes_.store(value);
    }

    void SetMaxLargeUnreclaimableBytes(size_t value)
    {
        MaxLargeUnreclaimableBytes_.store(value);
    }

    size_t GetMinLargeUnreclaimableBytes() const
    {
        return MinLargeUnreclaimableBytes_.load(std::memory_order_relaxed);
    }

    size_t GetMaxLargeUnreclaimableBytes() const
    {
        return MaxLargeUnreclaimableBytes_.load(std::memory_order_relaxed);
    }


    void SetTimingEventThreshold(TDuration value)
    {
        TimingEventThresholdNs_.store(value.MicroSeconds() * 1000);
    }

    i64 GetTimingEventThresholdNs() const
    {
        return TimingEventThresholdNs_.load(std::memory_order_relaxed);
    }


    void SetAllocationProfilingEnabled(bool value);

    bool IsAllocationProfilingEnabled() const
    {
        return AllocationProfilingEnabled_.load();
    }


    Y_FORCE_INLINE bool GetAllocationProfilingSamplingRate()
    {
        return AllocationProfilingSamplingRate_.load();
    }

    void SetAllocationProfilingSamplingRate(double rate)
    {
        if (rate < 0) {
            rate = 0;
        }
        if (rate > 1) {
            rate = 1;
        }
        i64 rateX64K = static_cast<i64>(rate * (1ULL << 16));
        AllocationProfilingSamplingRateX64K_.store(ClampVal<ui32>(rateX64K, 0, std::numeric_limits<ui16>::max() + 1));
        AllocationProfilingSamplingRate_.store(rate);
    }


    Y_FORCE_INLINE bool IsSmallArenaAllocationProfilingEnabled(size_t rank)
    {
        return SmallArenaAllocationProfilingEnabled_[rank].load(std::memory_order_relaxed);
    }

    Y_FORCE_INLINE bool IsSmallArenaAllocationProfiled(size_t rank)
    {
        return IsSmallArenaAllocationProfilingEnabled(rank) && IsAllocationSampled();
    }

    void SetSmallArenaAllocationProfilingEnabled(size_t rank, bool value)
    {
        if (rank >= SmallRankCount) {
            return;
        }
        SmallArenaAllocationProfilingEnabled_[rank].store(value);
    }


    Y_FORCE_INLINE bool IsLargeArenaAllocationProfilingEnabled(size_t rank)
    {
        return LargeArenaAllocationProfilingEnabled_[rank].load(std::memory_order_relaxed);
    }

    Y_FORCE_INLINE bool IsLargeArenaAllocationProfiled(size_t rank)
    {
        return IsLargeArenaAllocationProfilingEnabled(rank) && IsAllocationSampled();
    }

    void SetLargeArenaAllocationProfilingEnabled(size_t rank, bool value)
    {
        if (rank >= LargeRankCount) {
            return;
        }
        LargeArenaAllocationProfilingEnabled_[rank].store(value);
    }


    Y_FORCE_INLINE int GetProfilingBacktraceDepth()
    {
        return ProfilingBacktraceDepth_.load();
    }

    void SetProfilingBacktraceDepth(int depth)
    {
        if (depth < 1) {
            return;
        }
        if (depth > MaxAllocationProfilingBacktraceDepth) {
            depth = MaxAllocationProfilingBacktraceDepth;
        }
        ProfilingBacktraceDepth_.store(depth);
    }


    Y_FORCE_INLINE size_t GetMinProfilingBytesUsedToReport()
    {
        return MinProfilingBytesUsedToReport_.load();
    }

    void SetMinProfilingBytesUsedToReport(size_t size)
    {
        MinProfilingBytesUsedToReport_.store(size);
    }

    void SetEnableEagerMemoryRelease(bool value)
    {
        EnableEagerMemoryRelease_.store(value);
    }

    bool GetEnableEagerMemoryRelease()
    {
        return EnableEagerMemoryRelease_.load(std::memory_order_relaxed);
    }

    void SetEnableMadvisePopulate(bool value)
    {
        EnableMadvisePopulate_.store(value);
    }

    bool GetEnableMadvisePopulate()
    {
        return EnableMadvisePopulate_.load(std::memory_order_relaxed);
    }

    void EnableStockpile()
    {
        StockpileEnabled_.store(true);
    }

    bool IsStockpileEnabled()
    {
        return StockpileEnabled_.load();
    }

    void SetStockpileInterval(TDuration value)
    {
        StockpileInterval_.store(value);
    }

    TDuration GetStockpileInterval()
    {
        return StockpileInterval_.load();
    }

    void SetStockpileThreadCount(int count)
    {
        StockpileThreadCount_.store(count);
    }

    int GetStockpileThreadCount()
    {
        return ClampVal(StockpileThreadCount_.load(), 0, MaxStockpileThreadCount);
    }

    void SetStockpileSize(size_t value)
    {
        StockpileSize_.store(value);
    }

    size_t GetStockpileSize()
    {
        return StockpileSize_.load();
    }

private:
    std::atomic<double> LargeUnreclaimableCoeff_ = 0.05;
    std::atomic<size_t> MinLargeUnreclaimableBytes_ = 128_MB;
    std::atomic<size_t> MaxLargeUnreclaimableBytes_ = 10_GB;
    std::atomic<i64> TimingEventThresholdNs_ = 10000000; // in ns, 10 ms by default

    std::atomic<bool> AllocationProfilingEnabled_ = false;
    std::atomic<double> AllocationProfilingSamplingRate_ = 1.0;
    std::atomic<ui32> AllocationProfilingSamplingRateX64K_ = std::numeric_limits<ui32>::max();
    std::array<std::atomic<bool>, SmallRankCount> SmallArenaAllocationProfilingEnabled_ = {};
    std::array<std::atomic<bool>, LargeRankCount> LargeArenaAllocationProfilingEnabled_ = {};
    std::atomic<int> ProfilingBacktraceDepth_ = 10;
    std::atomic<size_t> MinProfilingBytesUsedToReport_ = 1_MB;

    std::atomic<bool> EnableEagerMemoryRelease_ = true;
    std::atomic<bool> EnableMadvisePopulate_ = false;

    std::atomic<bool> StockpileEnabled_ = false;
    std::atomic<TDuration> StockpileInterval_ = TDuration::MilliSeconds(10);
    static constexpr int MaxStockpileThreadCount = 8;
    std::atomic<int> StockpileThreadCount_ = 4;
    std::atomic<size_t> StockpileSize_ = 1_GB;

private:
    bool IsAllocationSampled()
    {
        Y_POD_STATIC_THREAD(ui16) Counter;
        return Counter++ < AllocationProfilingSamplingRateX64K_.load();
    }
};

TExplicitlyConstructableSingleton<TConfigurationManager> ConfigurationManager;

////////////////////////////////////////////////////////////////////////////////

template <class TEvent, class TManager>
class TEventLogManagerBase
{
public:
    void DisableForCurrentThread()
    {
        TManager::DisabledForCurrentThread_ = true;
    }

    template <class... TArgs>
    void EnqueueEvent(TArgs&&... args)
    {
        if (TManager::DisabledForCurrentThread_) {
            return;
        }

        auto timestamp = TInstant::Now();
        auto fiberId = NYTAlloc::GetCurrentFiberId();
        auto guard = Guard(EventLock_);

        auto event = TEvent(args...);
        OnEvent(event);

        if (EventCount_ >= EventBufferSize) {
            return;
        }

        auto& enqueuedEvent = Events_[EventCount_++];
        enqueuedEvent = std::move(event);
        enqueuedEvent.Timestamp = timestamp;
        enqueuedEvent.FiberId = fiberId;
    }

    void RunBackgroundTasks()
    {
        if (LogManager->IsLoggingEnabled()) {
            for (const auto& event : PullEvents()) {
                ProcessEvent(event);
            }
        }
    }

protected:
    NThreading::TForkAwareSpinLock EventLock_;

    virtual void OnEvent(const TEvent& event) = 0;

    virtual void ProcessEvent(const TEvent& event) = 0;

private:
    static constexpr size_t EventBufferSize = 1000;
    size_t EventCount_ = 0;
    std::array<TEvent, EventBufferSize> Events_;

    std::vector<TEvent> PullEvents()
    {
        std::vector<TEvent> events;
        events.reserve(EventBufferSize);

        auto guard = Guard(EventLock_);
        for (size_t index = 0; index < EventCount_; ++index) {
            events.push_back(Events_[index]);
        }
        EventCount_ = 0;
        return events;
    }
};

////////////////////////////////////////////////////////////////////////////////

struct TTimingEvent
{
    ETimingEventType Type;
    TDuration Duration;
    size_t Size;
    TInstant Timestamp;
    TFiberId FiberId;

    TTimingEvent()
    { }

    TTimingEvent(
        ETimingEventType type,
        TDuration duration,
        size_t size)
        : Type(type)
        , Duration(duration)
        , Size(size)
    { }
};

class TTimingManager
    : public TEventLogManagerBase<TTimingEvent, TTimingManager>
{
public:
    TEnumIndexedVector<ETimingEventType, TTimingEventCounters> GetTimingEventCounters()
    {
        auto guard = Guard(EventLock_);
        return EventCounters_;
    }

private:
    TEnumIndexedVector<ETimingEventType, TTimingEventCounters> EventCounters_;

    Y_POD_STATIC_THREAD(bool) DisabledForCurrentThread_;

    friend class TEventLogManagerBase<TTimingEvent, TTimingManager>;

    virtual void OnEvent(const TTimingEvent& event) override
    {
        auto& counters = EventCounters_[event.Type];
        counters.Count += 1;
        counters.Size += event.Size;
    }

    virtual void ProcessEvent(const TTimingEvent& event) override
    {
        YTALLOC_LOG_DEBUG("Timing event logged (Type: %s, Duration: %s, Size: %zu, Timestamp: %s, FiberId: %" PRIu64 ")",
            ToString(event.Type).c_str(),
            ToString(event.Duration).c_str(),
            event.Size,
            ToString(event.Timestamp).c_str(),
            event.FiberId);
    }
};

Y_POD_THREAD(bool) TTimingManager::DisabledForCurrentThread_;

TExplicitlyConstructableSingleton<TTimingManager> TimingManager;

////////////////////////////////////////////////////////////////////////////////

i64 GetElapsedNs(const struct timespec& startTime, const struct timespec& endTime)
{
    if (Y_LIKELY(startTime.tv_sec == endTime.tv_sec)) {
        return static_cast<i64>(endTime.tv_nsec) - static_cast<i64>(startTime.tv_nsec);
    }

    return
        static_cast<i64>(endTime.tv_nsec) - static_cast<i64>(startTime.tv_nsec) +
        (static_cast<i64>(endTime.tv_sec) - static_cast<i64>(startTime.tv_sec)) * 1000000000;
}

// Used to log statistics about long-running syscalls and lock acquisitions.
class TTimingGuard
    : public TNonCopyable
{
public:
    explicit TTimingGuard(ETimingEventType eventType, size_t size = 0)
        : EventType_(eventType)
        , Size_(size)
    {
        ::clock_gettime(CLOCK_MONOTONIC, &StartTime_);
    }

    ~TTimingGuard()
    {
        auto elapsedNs = GetElapsedNs();
        if (elapsedNs > ConfigurationManager->GetTimingEventThresholdNs()) {
            TimingManager->EnqueueEvent(EventType_, TDuration::MicroSeconds(elapsedNs / 1000), Size_);
        }
    }

private:
    const ETimingEventType EventType_;
    const size_t Size_;
    struct timespec StartTime_;

    i64 GetElapsedNs() const
    {
        struct timespec endTime;
        ::clock_gettime(CLOCK_MONOTONIC, &endTime);
        return NYTAlloc::GetElapsedNs(StartTime_, endTime);
    }
};

template <class T>
Y_FORCE_INLINE TGuard<T> GuardWithTiming(const T& lock)
{
    TTimingGuard timingGuard(ETimingEventType::Locking);
    TGuard<T> lockingGuard(lock);
    return lockingGuard;
}

////////////////////////////////////////////////////////////////////////////////

// A wrapper for mmap, mumap, and madvise calls.
// The latter are invoked with MADV_POPULATE (if enabled) and MADV_FREE flags
// and may fail if the OS support is missing. These failures are logged (once) and
// handled as follows:
// * if MADV_POPULATE fails then we fallback to manual per-page prefault
// for all subsequent attempts;
// * if MADV_FREE fails then it (and all subsequent attempts) is replaced with MADV_DONTNEED
// (which is non-lazy and is less efficient but will somehow do).
// Also this class mlocks all VMAs on startup to prevent pagefaults in our heavy binaries
// from disturbing latency tails.
class TMappedMemoryManager
{
public:
    void* Map(uintptr_t hint, size_t size, int flags)
    {
        TTimingGuard timingGuard(ETimingEventType::Mmap, size);
        auto* result = ::mmap(
            reinterpret_cast<void*>(hint),
            size,
            PROT_READ | PROT_WRITE,
            MAP_PRIVATE | MAP_ANONYMOUS | flags,
            -1,
            0);
        if (result == MAP_FAILED) {
            auto error = errno;
            if (error == EEXIST && (flags & MAP_FIXED_NOREPLACE)) {
                // Caller must retry with different hint address.
                return result;
            }
            YTALLOC_VERIFY(error == ENOMEM);
            ::fprintf(stderr, "*** YTAlloc has received ENOMEM error while trying to mmap %zu bytes\n",
                size);
            OomTrap();
        }
        return result;
    }

    void Unmap(void* ptr, size_t size)
    {
        TTimingGuard timingGuard(ETimingEventType::Munmap, size);
        auto result = ::munmap(ptr, size);
        YTALLOC_VERIFY(result == 0);
    }

    void DontDump(void* ptr, size_t size)
    {
        auto result = ::madvise(ptr, size, MADV_DONTDUMP);
        // Must not fail.
        YTALLOC_VERIFY(result == 0);
    }

    void PopulateFile(void* ptr, size_t size)
    {
        TTimingGuard timingGuard(ETimingEventType::FilePrefault, size);

        auto* begin = static_cast<volatile char*>(ptr);
        for (auto* current = begin; current < begin + size; current += PageSize) {
            *current;
        }
    }

    void PopulateReadOnly(void* ptr, size_t size)
    {
        if (!MadvisePopulateUnavailable_.load(std::memory_order_relaxed) &&
            ConfigurationManager->GetEnableMadvisePopulate())
        {
            if (!TryMadvisePopulate(ptr, size)) {
                MadvisePopulateUnavailable_.store(true);
            }
        }
    }

    void Populate(void* ptr, size_t size)
    {
        if (MadvisePopulateUnavailable_.load(std::memory_order_relaxed) ||
            !ConfigurationManager->GetEnableMadvisePopulate())
        {
            DoPrefault(ptr, size);
        } else if (!TryMadvisePopulate(ptr, size)) {
            MadvisePopulateUnavailable_.store(true);
            DoPrefault(ptr, size);
        }
    }

    void Release(void* ptr, size_t size)
    {
        if (CanUseMadviseFree() && !ConfigurationManager->GetEnableEagerMemoryRelease()) {
            DoMadviseFree(ptr, size);
        } else {
            DoMadviseDontNeed(ptr, size);
        }
    }

    bool Stockpile(size_t size)
    {
        if (MadviseStockpileUnavailable_.load(std::memory_order_relaxed)) {
            return false;
        }
        if (!TryMadviseStockpile(size)) {
            MadviseStockpileUnavailable_.store(true);
            return false;
        }
        return true;
    }

    void RunBackgroundTasks()
    {
        if (!LogManager->IsLoggingEnabled()) {
            return;
        }
        if (IsBuggyKernel() && !BuggyKernelLogged_) {
            YTALLOC_LOG_WARNING("Kernel is buggy; see KERNEL-118");
            BuggyKernelLogged_ = true;
        }
        if (MadviseFreeSupported_ && !MadviseFreeSupportedLogged_) {
            YTALLOC_LOG_INFO("MADV_FREE is supported");
            MadviseFreeSupportedLogged_ = true;
        }
        if (MadviseFreeNotSupported_ && !MadviseFreeNotSupportedLogged_) {
            YTALLOC_LOG_WARNING("MADV_FREE is not supported");
            MadviseFreeNotSupportedLogged_ = true;
        }
        if (MadvisePopulateUnavailable_.load() && !MadvisePopulateUnavailableLogged_) {
            YTALLOC_LOG_WARNING("MADV_POPULATE is not supported");
            MadvisePopulateUnavailableLogged_ = true;
        }
        if (MadviseStockpileUnavailable_.load() && !MadviseStockpileUnavailableLogged_) {
            YTALLOC_LOG_WARNING("MADV_STOCKPILE is not supported");
            MadviseStockpileUnavailableLogged_ = true;
        }
    }

private:
    bool BuggyKernelLogged_ = false;

    std::atomic<bool> MadviseFreeSupported_ = false;
    bool MadviseFreeSupportedLogged_ = false;

    std::atomic<bool> MadviseFreeNotSupported_ = false;
    bool MadviseFreeNotSupportedLogged_ = false;

    std::atomic<bool> MadvisePopulateUnavailable_ = false;
    bool MadvisePopulateUnavailableLogged_ = false;

    std::atomic<bool> MadviseStockpileUnavailable_ = false;
    bool MadviseStockpileUnavailableLogged_ = false;

private:
    bool TryMadvisePopulate(void* ptr, size_t size)
    {
        TTimingGuard timingGuard(ETimingEventType::MadvisePopulate, size);
        auto result = ::madvise(ptr, size, MADV_POPULATE);
        if (result != 0) {
            auto error = errno;
            YTALLOC_VERIFY(error == EINVAL || error == ENOMEM);
            if (error == ENOMEM) {
                ::fprintf(stderr, "*** YTAlloc has received ENOMEM error while trying to madvise(MADV_POPULATE) %zu bytes\n",
                    size);
                OomTrap();
            }
            return false;
        }
        return true;
    }

    void DoPrefault(void* ptr, size_t size)
    {
        TTimingGuard timingGuard(ETimingEventType::Prefault, size);
        auto* begin = static_cast<char*>(ptr);
        for (auto* current = begin; current < begin + size; current += PageSize) {
            *current = 0;
        }
    }

    bool CanUseMadviseFree()
    {
        if (MadviseFreeSupported_.load()) {
            return true;
        }
        if (MadviseFreeNotSupported_.load()) {
            return false;
        }

        if (IsBuggyKernel()) {
            MadviseFreeNotSupported_.store(true);
        } else {
            auto* ptr = Map(0, PageSize, 0);
            if (::madvise(ptr, PageSize, MADV_FREE) == 0) {
                MadviseFreeSupported_.store(true);
            } else {
                MadviseFreeNotSupported_.store(true);
            }
            Unmap(ptr, PageSize);
        }

        // Will not recurse.
        return CanUseMadviseFree();
    }

    void DoMadviseDontNeed(void* ptr, size_t size)
    {
        TTimingGuard timingGuard(ETimingEventType::MadviseDontNeed, size);
        auto result = ::madvise(ptr, size, MADV_DONTNEED);
        if (result != 0) {
            auto error = errno;
            // Failure is possible for locked pages.
            Y_VERIFY(error == EINVAL);
        }
    }

    void DoMadviseFree(void* ptr, size_t size)
    {
        TTimingGuard timingGuard(ETimingEventType::MadviseFree, size);
        auto result = ::madvise(ptr, size, MADV_FREE);
        if (result != 0) {
            auto error = errno;
            // Failure is possible for locked pages.
            YTALLOC_VERIFY(error == EINVAL);
        }
    }

    bool TryMadviseStockpile(size_t size)
    {
        auto result = ::madvise(nullptr, size, MADV_STOCKPILE);
        if (result != 0) {
            auto error = errno;
            if (error == ENOMEM || error == EAGAIN || error == EINTR) {
                // The call is advisory, ignore ENOMEM, EAGAIN, and EINTR.
                return true;
            }
            YTALLOC_VERIFY(error == EINVAL);
            return false;
        }
        return true;
    }

    // Some kernels are known to contain bugs in MADV_FREE; see https://st.yandex-team.ru/KERNEL-118.
    bool IsBuggyKernel()
    {
#ifdef _linux_
        static const bool result = [] () {
            struct utsname buf;
            YTALLOC_VERIFY(uname(&buf) == 0);
            if (strverscmp(buf.release, "4.4.1-1") >= 0 &&
                strverscmp(buf.release, "4.4.96-44") < 0)
            {
                return true;
            }
            if (strverscmp(buf.release, "4.14.1-1") >= 0 &&
                strverscmp(buf.release, "4.14.79-33") < 0)
            {
                return true;
            }
            return false;
        }();
        return result;
#else
        return false;
#endif
    }
};

TExplicitlyConstructableSingleton<TMappedMemoryManager> MappedMemoryManager;

////////////////////////////////////////////////////////////////////////////////
// System allocator

// Each system allocation is prepended with such a header.
struct TSystemBlobHeader
{
    explicit TSystemBlobHeader(size_t size)
        : Size(size)
    { }

    size_t Size;
    char Padding[8];
};

CHECK_HEADER_ALIGNMENT(TSystemBlobHeader)

// Used for some internal allocations.
// Delgates directly to TMappedMemoryManager.
class TSystemAllocator
{
public:
    void* Allocate(size_t size);
    void Free(void* ptr);

private:
    std::atomic<uintptr_t> CurrentPtr_ = SystemZoneStart;
};

TExplicitlyConstructableSingleton<TSystemAllocator> SystemAllocator;

////////////////////////////////////////////////////////////////////////////////

// Deriving from this class makes instances bound to TSystemAllocator.
struct TSystemAllocatable
{
    void* operator new(size_t size) noexcept
    {
        return SystemAllocator->Allocate(size);
    }

    void* operator new[](size_t size) noexcept
    {
        return SystemAllocator->Allocate(size);
    }

    void operator delete(void* ptr) noexcept
    {
        SystemAllocator->Free(ptr);
    }

    void operator delete[](void* ptr) noexcept
    {
        SystemAllocator->Free(ptr);
    }
};

////////////////////////////////////////////////////////////////////////////////

// Maintains a pool of objects.
// Objects are allocated in groups each containing BatchSize instances.
// The actual allocation is carried out by TSystemAllocator.
// Memory is never actually reclaimed; freed instances are put into TFreeList.
template <class T, size_t BatchSize>
class TSystemPool
{
public:
    T* Allocate()
    {
        while (true) {
            auto* obj = FreeList_.Extract();
            if (Y_LIKELY(obj)) {
                new (obj) T();
                return obj;
            }
            AllocateMore();
        }
    }

    void Free(T* obj)
    {
        obj->T::~T();
        PoisonFreedRange(obj, sizeof(T));
        FreeList_.Put(obj);
    }

private:
    TFreeList<T> FreeList_;

private:
    void AllocateMore()
    {
        auto* objs = static_cast<T*>(SystemAllocator->Allocate(sizeof(T) * BatchSize));
        for (size_t index = 0; index < BatchSize; ++index) {
            auto* obj = objs + index;
            FreeList_.Put(obj);
        }
    }
};

// A sharded analogue TSystemPool.
template <class T, size_t BatchSize>
class TShardedSystemPool
{
public:
    template <class TState>
    T* Allocate(TState* state)
    {
        if (auto* obj = FreeLists_[state->GetInitialShardIndex()].Extract()) {
            new (obj) T();
            return obj;
        }

        while (true) {
            for (size_t index = 0; index < ShardCount; ++index) {
                if (auto* obj = FreeLists_[state->GetNextShardIndex()].Extract()) {
                    new (obj) T();
                    return obj;
                }
            }
            AllocateMore();
        }
    }

    template <class TState>
    void Free(TState* state, T* obj)
    {
        obj->T::~T();
        PoisonFreedRange(obj, sizeof(T));
        FreeLists_[state->GetInitialShardIndex()].Put(obj);
    }

private:
    std::array<TFreeList<T>, ShardCount> FreeLists_;

private:
    void AllocateMore()
    {
        auto* objs = static_cast<T*>(SystemAllocator->Allocate(sizeof(T) * BatchSize));
        for (size_t index = 0; index < BatchSize; ++index) {
            auto* obj = objs + index;
            FreeLists_[index % ShardCount].Put(obj);
        }
    }
};

////////////////////////////////////////////////////////////////////////////////

// Handles allocations inside a zone of memory given by its start and end pointers.
// Each allocation is a separate mapped region of memory.
// A special care is taken to guarantee that all allocated regions fall inside the zone.
class TZoneAllocator
{
public:
    TZoneAllocator(uintptr_t zoneStart, uintptr_t zoneEnd)
        : ZoneStart_(zoneStart)
        , ZoneEnd_(zoneEnd)
        , Current_(zoneStart)
    {
        YTALLOC_VERIFY(ZoneStart_ % PageSize == 0);
    }

    void* Allocate(size_t size, int flags)
    {
        YTALLOC_VERIFY(size % PageSize == 0);
        bool restarted = false;
        while (true) {
            auto hint = (Current_ += size) - size;
            if (reinterpret_cast<uintptr_t>(hint) + size > ZoneEnd_) {
                if (restarted) {
                    ::fprintf(stderr, "*** YTAlloc was unable to mmap %zu bytes in zone %" PRIx64 "--%" PRIx64 "\n",
                        size,
                        ZoneStart_,
                        ZoneEnd_);
                    OomTrap();
                }
                restarted = true;
                Current_ = ZoneStart_;
            } else {
                char* ptr = static_cast<char*>(MappedMemoryManager->Map(
                    hint,
                    size,
                    MAP_FIXED_NOREPLACE | flags));
                if (reinterpret_cast<uintptr_t>(ptr) == hint) {
                    return ptr;
                }
                if (ptr != MAP_FAILED) {
                    MappedMemoryManager->Unmap(ptr, size);
                }
            }
        }
    }

    void Free(void* ptr, size_t size)
    {
        MappedMemoryManager->Unmap(ptr, size);
    }

private:
    const uintptr_t ZoneStart_;
    const uintptr_t ZoneEnd_;

    std::atomic<uintptr_t> Current_;
};

////////////////////////////////////////////////////////////////////////////////

// YTAlloc supports tagged allocations.
// Since the total number of tags can be huge, a two-level scheme is employed.
// Possible tags are arranged into sets each containing TaggedCounterSetSize tags.
// There are up to MaxTaggedCounterSets in total.
// Upper 4 sets are reserved for profiled allocations.
constexpr size_t TaggedCounterSetSize = 16384;
constexpr size_t AllocationProfilingTaggedCounterSets = 4;
constexpr size_t MaxTaggedCounterSets = 256 + AllocationProfilingTaggedCounterSets;

constexpr size_t MaxCapturedAllocationBacktraces = 65000;
static_assert(
    MaxCapturedAllocationBacktraces < AllocationProfilingTaggedCounterSets * TaggedCounterSetSize,
    "MaxCapturedAllocationBacktraces is too big");

constexpr TMemoryTag AllocationProfilingMemoryTagBase = TaggedCounterSetSize * (MaxTaggedCounterSets - AllocationProfilingTaggedCounterSets);
constexpr TMemoryTag AllocationProfilingUnknownMemoryTag = AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces;

static_assert(
    MaxMemoryTag == TaggedCounterSetSize * (MaxTaggedCounterSets - AllocationProfilingTaggedCounterSets) - 1,
    "Wrong MaxMemoryTag");

template <class TCounter>
using TUntaggedTotalCounters = TEnumIndexedVector<EBasicCounter, TCounter>;

template <class TCounter>
struct TTaggedTotalCounterSet
    : public TSystemAllocatable
{
    std::array<TEnumIndexedVector<EBasicCounter, TCounter>, TaggedCounterSetSize> Counters;
};

using TLocalTaggedBasicCounterSet = TTaggedTotalCounterSet<ssize_t>;
using TGlobalTaggedBasicCounterSet = TTaggedTotalCounterSet<std::atomic<ssize_t>>;

template <class TCounter>
struct TTotalCounters
{
    // The sum of counters across all tags.
    TUntaggedTotalCounters<TCounter> CumulativeTaggedCounters;

    // Counters for untagged allocations.
    TUntaggedTotalCounters<TCounter> UntaggedCounters;

    // Access to tagged counters may involve creation of a new tag set.
    // For simplicity, we separate the read-side (TaggedCounterSets) and the write-side (TaggedCounterSetHolders).
    // These arrays contain virtually identical data (up to std::unique_ptr and std::atomic semantic differences).
    std::array<std::atomic<TTaggedTotalCounterSet<TCounter>*>, MaxTaggedCounterSets> TaggedCounterSets{};
    std::array<std::unique_ptr<TTaggedTotalCounterSet<TCounter>>, MaxTaggedCounterSets> TaggedCounterSetHolders;

    // Protects TaggedCounterSetHolders from concurrent updates.
    NThreading::TForkAwareSpinLock TaggedCounterSetsLock;

    // Returns null if the set is not yet constructed.
    Y_FORCE_INLINE TTaggedTotalCounterSet<TCounter>* FindTaggedCounterSet(size_t index) const
    {
        return TaggedCounterSets[index].load();
    }

    // Constructs the set on first access.
    TTaggedTotalCounterSet<TCounter>* GetOrCreateTaggedCounterSet(size_t index)
    {
        auto* set = TaggedCounterSets[index].load();
        if (Y_LIKELY(set)) {
            return set;
        }

        auto guard = GuardWithTiming(TaggedCounterSetsLock);
        auto& setHolder = TaggedCounterSetHolders[index];
        if (!setHolder) {
            setHolder = std::make_unique<TTaggedTotalCounterSet<TCounter>>();
            TaggedCounterSets[index] = setHolder.get();
        }
        return setHolder.get();
    }
};

using TLocalSystemCounters = TEnumIndexedVector<ESystemCounter, ssize_t>;
using TGlobalSystemCounters = TEnumIndexedVector<ESystemCounter, std::atomic<ssize_t>>;

using TLocalSmallCounters = TEnumIndexedVector<ESmallArenaCounter, ssize_t>;
using TGlobalSmallCounters = TEnumIndexedVector<ESmallArenaCounter, std::atomic<ssize_t>>;

using TLocalLargeCounters = TEnumIndexedVector<ELargeArenaCounter, ssize_t>;
using TGlobalLargeCounters = TEnumIndexedVector<ELargeArenaCounter, std::atomic<ssize_t>>;

using TLocalHugeCounters = TEnumIndexedVector<EHugeCounter, ssize_t>;
using TGlobalHugeCounters = TEnumIndexedVector<EHugeCounter, std::atomic<ssize_t>>;

using TLocalUndumpableCounters = TEnumIndexedVector<EUndumpableCounter, ssize_t>;
using TGlobalUndumpableCounters = TEnumIndexedVector<EUndumpableCounter, std::atomic<ssize_t>>;

Y_FORCE_INLINE ssize_t LoadCounter(ssize_t counter)
{
    return counter;
}

Y_FORCE_INLINE ssize_t LoadCounter(const std::atomic<ssize_t>& counter)
{
    return counter.load();
}

////////////////////////////////////////////////////////////////////////////////

struct TMmapObservationEvent
{
    size_t Size;
    std::array<void*, MaxAllocationProfilingBacktraceDepth> Frames;
    int FrameCount;
    TInstant Timestamp;
    TFiberId FiberId;

    TMmapObservationEvent() = default;

    TMmapObservationEvent(
        size_t size,
        std::array<void*, MaxAllocationProfilingBacktraceDepth> frames,
        int frameCount)
        : Size(size)
        , Frames(frames)
        , FrameCount(frameCount)
    { }
};

class TMmapObservationManager
    : public TEventLogManagerBase<TMmapObservationEvent, TMmapObservationManager>
{
public:
    void SetBacktraceFormatter(TBacktraceFormatter formatter)
    {
        BacktraceFormatter_.store(formatter);
    }

private:
    std::atomic<TBacktraceFormatter> BacktraceFormatter_ = nullptr;

    Y_POD_STATIC_THREAD(bool) DisabledForCurrentThread_;

    friend class TEventLogManagerBase<TMmapObservationEvent, TMmapObservationManager>;

    virtual void OnEvent(const TMmapObservationEvent& /*event*/) override
    { }

    virtual void ProcessEvent(const TMmapObservationEvent& event) override
    {
        YTALLOC_LOG_DEBUG("Large arena mmap observed (Size: %zu, Timestamp: %s, FiberId: %" PRIx64 ")",
            event.Size,
            ToString(event.Timestamp).c_str(),
            event.FiberId);

        if (auto backtraceFormatter = BacktraceFormatter_.load()) {
            auto backtrace = backtraceFormatter(const_cast<void**>(event.Frames.data()), event.FrameCount);
            YTALLOC_LOG_DEBUG("YTAlloc stack backtrace (Stack: %s)",
                backtrace.c_str());
        }
    }
};

Y_POD_THREAD(bool) TMmapObservationManager::DisabledForCurrentThread_;

TExplicitlyConstructableSingleton<TMmapObservationManager> MmapObservationManager;

////////////////////////////////////////////////////////////////////////////////

// A per-thread structure containing counters, chunk caches etc.
struct TThreadState
    : public TFreeListItemBase<TThreadState>
    , public TLocalShardedState
{
    // TThreadState instances of all alive threads are put into a double-linked intrusive list.
    // This is a pair of next/prev pointers connecting an instance of TThreadState to its neighbors.
    TIntrusiveLinkedListNode<TThreadState> RegistryNode;

    // Pointers to the respective parts of TThreadManager::ThreadControlWord_.
    // If null then the thread is already destroyed (but TThreadState may still live for a while
    // due to ref-counting).
    ui8* AllocationProfilingEnabled;
    ui8* BackgroundThreadStarted;

    // TThreadStates are ref-counted.
    // TThreadManager::EnumerateThreadStates enumerates the registered states and acquires
    // a temporary reference preventing these states from being destructed. This provides
    // for shorter periods of time the global lock needs to be held.
    int RefCounter = 1;

    // Per-thread counters.
    TTotalCounters<ssize_t> TotalCounters;
    std::array<TLocalLargeCounters, LargeRankCount> LargeArenaCounters;
    TLocalUndumpableCounters UndumpableCounters;

    // Each thread maintains caches of small chunks.
    // One cache is for tagged chunks; the other is for untagged ones.
    // Each cache contains up to MaxCachedChunksPerRank chunks per any rank.
    // Special sentinels are placed to distinguish the boundaries of region containing
    // pointers of a specific rank. This enables a tiny-bit faster inplace boundary checks.

    static constexpr uintptr_t LeftSentinel = 1;
    static constexpr uintptr_t RightSentinel = 2;

    struct TSmallBlobCache
    {
        TSmallBlobCache()
        {
            void** chunkPtrs = CachedChunks.data();
            for (size_t rank = 0; rank < SmallRankCount; ++rank) {
                RankToCachedChunkPtrHead[rank] = chunkPtrs;
                chunkPtrs[0] = reinterpret_cast<void*>(LeftSentinel);
                chunkPtrs[MaxCachedChunksPerRank + 1] = reinterpret_cast<void*>(RightSentinel);

#ifdef YTALLOC_PARANOID
                RankToCachedChunkPtrTail[rank] = chunkPtrs;
                CachedChunkFull[rank] = false;

                RankToCachedChunkLeftBorder[rank] = chunkPtrs;
                RankToCachedChunkRightBorder[rank] = chunkPtrs + MaxCachedChunksPerRank + 1;
#endif
                chunkPtrs += MaxCachedChunksPerRank + 2;
            }
        }

        // For each rank we have a segment of pointers in CachedChunks with the following layout:
        //   LCC[C]........R
        // Legend:
        //   .  = garbage
        //   L  = left sentinel
        //   R  = right sentinel
        //   C  = cached pointer
        //  [C] = current cached pointer
        //
        // Under YTALLOC_PARANOID the following layout is used:
        //   L.[T]CCC[H]...R
        // Legend:
        //   [H] = head cached pointer, put chunks here
        //   [T] = tail cached pointer, take chunks from here

        //  +2 is for two sentinels
        std::array<void*, SmallRankCount * (MaxCachedChunksPerRank + 2)> CachedChunks{};

        // Pointer to [P] for each rank.
        std::array<void**, SmallRankCount> RankToCachedChunkPtrHead{};

#ifdef YTALLOC_PARANOID
        // Pointers to [L] and [R] for each rank.
        std::array<void**, SmallRankCount> RankToCachedChunkLeftBorder{};
        std::array<void**, SmallRankCount> RankToCachedChunkRightBorder{};

        std::array<void**, SmallRankCount> RankToCachedChunkPtrTail{};
        std::array<bool, SmallRankCount> CachedChunkFull{};
#endif
    };
    TEnumIndexedVector<EAllocationKind, TSmallBlobCache> SmallBlobCache;
};

struct TThreadStateToRegistryNode
{
    auto operator() (TThreadState* state) const
    {
        return &state->RegistryNode;
    }
};

// Manages all registered threads and controls access to TThreadState.
class TThreadManager
{
public:
    TThreadManager()
    {
        pthread_key_create(&ThreadDtorKey_, DestroyThread);

        NThreading::RegisterAtForkHandlers(
            nullptr,
            nullptr,
            [=] { AfterFork(); });
    }

    // Returns TThreadState for the current thread; the caller guarantees that this
    // state is initialized and is not destroyed yet.
    static TThreadState* GetThreadStateUnchecked();

    // Returns TThreadState for the current thread; may return null.
    static TThreadState* FindThreadState();

    // Returns TThreadState for the current thread; may not return null
    // (but may crash if TThreadState is already destroyed).
    static TThreadState* GetThreadStateChecked()
    {
        auto* state = FindThreadState();
        YTALLOC_VERIFY(state);
        return state;
    }

    // Enumerates all threads and invokes func passing TThreadState instances.
    // func must not throw but can take arbitrary time; no locks are being held while it executes.
    template <class THandler>
    void EnumerateThreadStatesAsync(const THandler& handler) noexcept
    {
        TMemoryTagGuard guard(NullMemoryTag);

        std::vector<TThreadState*> states;
        states.reserve(1024); // must be enough in most cases

        auto unrefStates = [&] {
            // Releasing references also requires global lock to be held to avoid getting zombies above.
            auto guard = GuardWithTiming(ThreadRegistryLock_);
            for (auto* state : states) {
                UnrefThreadState(state);
            }
        };

        auto tryRefStates = [&] {
            // Only hold this guard for a small period of time to reference all the states.
            auto guard = GuardWithTiming(ThreadRegistryLock_);
            auto* current = ThreadRegistry_.GetFront();
            while (current) {
                if (states.size() == states.capacity()) {
                    // Cannot allocate while holding ThreadRegistryLock_ due to a possible deadlock as follows:
                    // EnumerateThreadStatesAsync -> StartBackgroundThread -> EnumerateThreadStatesSync
                    // (many other scenarios are also possible).
                    guard.Release();
                    unrefStates();
                    states.clear();
                    states.reserve(states.capacity() * 2);
                    return false;
                }
                RefThreadState(current);
                states.push_back(current);
                current = current->RegistryNode.Next;
            }
            return true;
        };

        while (!tryRefStates()) ;

        for (auto* state : states) {
            handler(state);
        }

        unrefStates();
    }

    // Similar to EnumerateThreadStatesAsync but holds the global lock while enumerating the threads.
    // Also invokes a given prologue functor while holding the thread registry lock.
    // Handler and prologue calls must be fast and must not allocate.
    template <class TPrologue, class THandler>
    void EnumerateThreadStatesSync(const TPrologue& prologue, const THandler& handler) noexcept
    {
        auto guard = GuardWithTiming(ThreadRegistryLock_);
        prologue();
        auto* current = ThreadRegistry_.GetFront();
        while (current) {
            handler(current);
            current = current->RegistryNode.Next;
        }
    }


    // We store a special 64-bit "thread control word" in TLS encapsulating the following
    // crucial per-thread parameters:
    // * the current memory tag
    // * a flag indicating that a valid TThreadState is known to exists
    // (and can be obtained via GetThreadStateUnchecked)
    // * a flag indicating that allocation profiling is enabled
    // * a flag indicating that background thread is started
    // Thread control word is fetched via GetThreadControlWord and is compared
    // against FastPathControlWord to see if the fast path can be taken.
    // The latter happens when no memory tagging is configured, TThreadState is
    // valid, allocation profiling is disabled, and background thread is started.

    // The mask for extracting memory tag from thread control word.
    static constexpr ui64 MemoryTagControlWordMask = 0xffffffff;
    // ThreadStateValid is on.
    static constexpr ui64 ThreadStateValidControlWordMask = (1ULL << 32);
    // AllocationProfiling is on.
    static constexpr ui64 AllocationProfilingEnabledControlWordMask = (1ULL << 40);
    // All background thread are properly started.
    static constexpr ui64 BackgroundThreadStartedControlWorkMask = (1ULL << 48);
    // Memory tag is NullMemoryTag; thread state is valid.
    static constexpr ui64 FastPathControlWord =
        BackgroundThreadStartedControlWorkMask |
        ThreadStateValidControlWordMask |
        NullMemoryTag;

    Y_FORCE_INLINE static ui64 GetThreadControlWord()
    {
        return (&ThreadControlWord_)->Value;
    }


    static TMemoryTag GetCurrentMemoryTag()
    {
        return (&ThreadControlWord_)->Parts.MemoryTag;
    }

    static void SetCurrentMemoryTag(TMemoryTag tag)
    {
        Y_VERIFY(tag <= MaxMemoryTag);
        (&ThreadControlWord_)->Parts.MemoryTag = tag;
    }


    static EMemoryZone GetCurrentMemoryZone()
    {
        return CurrentMemoryZone_;
    }

    static void SetCurrentMemoryZone(EMemoryZone zone)
    {
        CurrentMemoryZone_ = zone;
    }


    static void SetCurrentFiberId(TFiberId id)
    {
        CurrentFiberId_ = id;
    }

    static TFiberId GetCurrentFiberId()
    {
        return CurrentFiberId_;
    }

private:
    static void DestroyThread(void*);

    TThreadState* AllocateThreadState();

    void RefThreadState(TThreadState* state)
    {
        auto result = ++state->RefCounter;
        Y_VERIFY(result > 1);
    }

    void UnrefThreadState(TThreadState* state)
    {
        auto result = --state->RefCounter;
        Y_VERIFY(result >= 0);
        if (result == 0) {
            DestroyThreadState(state);
        }
    }

    void DestroyThreadState(TThreadState* state);

    void AfterFork();

private:
    // TThreadState instance for the current thread.
    // Initially null, then initialized when first needed.
    // TThreadState is destroyed upon thread termination (which is detected with
    // the help of pthread_key_create machinery), so this pointer can become null again.
    Y_POD_STATIC_THREAD(TThreadState*) ThreadState_;

    // Initially false, then set to true then TThreadState is destroyed.
    // If the thread requests for its state afterwards, null is returned and no new state is (re-)created.
    // The caller must be able to deal with it.
    Y_POD_STATIC_THREAD(bool) ThreadStateDestroyed_;

    union TThreadControlWord
    {
        ui64 __attribute__((__may_alias__)) Value;
        struct TParts
        {
            // The current memory tag used in all allocations by this thread.
            ui32 __attribute__((__may_alias__)) MemoryTag;
            // Indicates if a valid TThreadState exists and can be obtained via GetThreadStateUnchecked.
            ui8 __attribute__((__may_alias__)) ThreadStateValid;
            // Indicates if allocation profiling is on.
            ui8 __attribute__((__may_alias__)) AllocationProfilingEnabled;
            // Indicates if all background threads are properly started.
            ui8 __attribute__((__may_alias__)) BackgroundThreadStarted;
            ui8 Padding[2];
        } Parts;
    };
    Y_POD_STATIC_THREAD(TThreadControlWord) ThreadControlWord_;

    // See memory zone API.
    Y_POD_STATIC_THREAD(EMemoryZone) CurrentMemoryZone_;

    // See fiber id API.
    Y_POD_STATIC_THREAD(TFiberId) CurrentFiberId_;

    pthread_key_t ThreadDtorKey_;

    static constexpr size_t ThreadStatesBatchSize = 1;
    TSystemPool<TThreadState, ThreadStatesBatchSize> ThreadStatePool_;

    NThreading::TForkAwareSpinLock ThreadRegistryLock_;
    TIntrusiveLinkedList<TThreadState, TThreadStateToRegistryNode> ThreadRegistry_;
};

Y_POD_THREAD(TThreadState*) TThreadManager::ThreadState_;
Y_POD_THREAD(bool) TThreadManager::ThreadStateDestroyed_;
Y_POD_THREAD(TThreadManager::TThreadControlWord) TThreadManager::ThreadControlWord_;
Y_POD_THREAD(EMemoryZone) TThreadManager::CurrentMemoryZone_;
Y_POD_THREAD(TFiberId) TThreadManager::CurrentFiberId_;

TExplicitlyConstructableSingleton<TThreadManager> ThreadManager;

////////////////////////////////////////////////////////////////////////////////

void TConfigurationManager::SetAllocationProfilingEnabled(bool value)
{
    // Update threads' TLS.
    ThreadManager->EnumerateThreadStatesSync(
        [&] {
            AllocationProfilingEnabled_.store(value);
        },
        [&] (auto* state) {
            if (state->AllocationProfilingEnabled) {
                *state->AllocationProfilingEnabled = value;
            }
        });
}

////////////////////////////////////////////////////////////////////////////////
// Backtrace Manager
//
// Captures backtraces observed during allocations and assigns memory tags to them.
// Memory tags are chosen sequentially starting from AllocationProfilingMemoryTagBase.
//
// For each backtrace we compute a 64-bit hash and use it as a key in a certain concurrent hashmap.
// This hashmap is organized into BucketCount buckets, each consisting of BucketSize slots.
//
// Backtrace hash is translated into bucket index by taking the appropriate number of
// its lower bits. For each slot, we remember a 32-bit fingerprint, which is
// just the next 32 bits of the backtrace's hash, and the (previously assigned) memory tag.
//
// Upon access to the hashtable, the bucket is first scanned optimistically, without taking
// any locks. In case of a miss, a per-bucket spinlock is acquired and the bucket is rescanned.
//
// The above scheme may involve collisions but we neglect their probability.
//
// If the whole hash table overflows (i.e. a total of MaxCapturedAllocationBacktraces
// backtraces are captured) or the bucket overflows (i.e. all of its slots become occupied),
// the allocation is annotated with AllocationProfilingUnknownMemoryTag. Such allocations
// appear as having no backtrace whatsoever in the profiling reports.

class TBacktraceManager
{
public:
    // Sets the provider used for collecting backtraces when allocation profiling
    // is turned ON.
    void SetBacktraceProvider(TBacktraceProvider provider)
    {
        BacktraceProvider_.store(provider);
    }

    // Captures the backtrace and inserts it into the hashtable.
    TMemoryTag GetMemoryTagFromBacktrace(int framesToSkip)
    {
        std::array<void*, MaxAllocationProfilingBacktraceDepth> frames;
        auto backtraceProvider = BacktraceProvider_.load();
        if (!backtraceProvider) {
            return NullMemoryTag;
        }
        auto frameCount  = backtraceProvider(frames.data(), ConfigurationManager->GetProfilingBacktraceDepth(), framesToSkip);
        auto hash = GetBacktraceHash(frames.data(), frameCount);
        return CaptureBacktrace(hash, frames.data(), frameCount);
    }

    // Returns the backtrace corresponding to the given tag, if any.
    std::optional<TBacktrace> FindBacktrace(TMemoryTag tag)
    {
        if (tag < AllocationProfilingMemoryTagBase ||
            tag >= AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces)
        {
            return std::nullopt;
        }
        const auto& entry = Backtraces_[tag - AllocationProfilingMemoryTagBase];
        if (!entry.Captured.load()) {
            return std::nullopt;
        }
        return entry.Backtrace;
    }

private:
    static constexpr int Log2BucketCount = 16;
    static constexpr int BucketCount = 1 << Log2BucketCount;
    static constexpr int BucketSize = 8;

    std::atomic<TBacktraceProvider> BacktraceProvider_ = nullptr;

    std::array<std::array<std::atomic<ui32>, BucketSize>, BucketCount> Fingerprints_= {};
    std::array<std::array<std::atomic<TMemoryTag>, BucketSize>, BucketCount> MemoryTags_ = {};
    std::array<NThreading::TForkAwareSpinLock, BucketCount> BucketLocks_;
    std::atomic<TMemoryTag> CurrentMemoryTag_ = AllocationProfilingMemoryTagBase;

    struct TBacktraceEntry
    {
        TBacktrace Backtrace;
        std::atomic<bool> Captured = false;
    };

    std::array<TBacktraceEntry, MaxCapturedAllocationBacktraces> Backtraces_;

private:
    static size_t GetBacktraceHash(void** frames, int frameCount)
    {
        size_t hash = 0;
        for (int index = 0; index < frameCount; ++index) {
            hash = CombineHashes(hash, THash<void*>()(frames[index]));
        }
        return hash;
    }

    TMemoryTag CaptureBacktrace(size_t hash, void** frames, int frameCount)
    {
        size_t bucketIndex = hash % BucketCount;
        ui32 fingerprint = (hash >> Log2BucketCount) & 0xffffffff;
        // Zero fingerprint indicates the slot is free; check and adjust to ensure
        // that regular fingerprints are non-zero.
        if (fingerprint == 0) {
            fingerprint = 1;
        }

        for (int slotIndex = 0; slotIndex < BucketSize; ++slotIndex) {
            auto currentFingerprint = Fingerprints_[bucketIndex][slotIndex].load(std::memory_order_relaxed);
            if (currentFingerprint == fingerprint) {
                return MemoryTags_[bucketIndex][slotIndex].load();
            }
        }

        auto guard = Guard(BucketLocks_[bucketIndex]);

        int spareSlotIndex = -1;
        for (int slotIndex = 0; slotIndex < BucketSize; ++slotIndex) {
            auto currentFingerprint = Fingerprints_[bucketIndex][slotIndex].load(std::memory_order_relaxed);
            if (currentFingerprint == fingerprint) {
                return MemoryTags_[bucketIndex][slotIndex];
            }
            if (currentFingerprint == 0) {
                spareSlotIndex = slotIndex;
                break;
            }
        }

        if (spareSlotIndex < 0) {
            return AllocationProfilingUnknownMemoryTag;
        }

        auto memoryTag = CurrentMemoryTag_++;
        if (memoryTag >= AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces) {
            return AllocationProfilingUnknownMemoryTag;
        }

        MemoryTags_[bucketIndex][spareSlotIndex].store(memoryTag);
        Fingerprints_[bucketIndex][spareSlotIndex].store(fingerprint);

        auto& entry = Backtraces_[memoryTag - AllocationProfilingMemoryTagBase];
        entry.Backtrace.FrameCount = frameCount;
        ::memcpy(entry.Backtrace.Frames.data(), frames, sizeof (void*) * frameCount);
        entry.Captured.store(true);

        return memoryTag;
    }
};

TExplicitlyConstructableSingleton<TBacktraceManager> BacktraceManager;

////////////////////////////////////////////////////////////////////////////////

// Mimics the counters of TThreadState but uses std::atomic to survive concurrent access.
struct TGlobalState
    : public TGlobalShardedState
{
    TTotalCounters<std::atomic<ssize_t>> TotalCounters;
    std::array<TGlobalLargeCounters, LargeRankCount> LargeArenaCounters;
    TGlobalUndumpableCounters UndumpableCounters;
};

TExplicitlyConstructableSingleton<TGlobalState> GlobalState;

////////////////////////////////////////////////////////////////////////////////

// Accumulates various allocation statistics.
class TStatisticsManager
{
public:
    template <EAllocationKind Kind = EAllocationKind::Tagged, class TState>
    static Y_FORCE_INLINE void IncrementTotalCounter(TState* state, TMemoryTag tag, EBasicCounter counter, ssize_t delta)
    {
        // This branch is typically resolved at compile time.
        if (Kind == EAllocationKind::Tagged && tag != NullMemoryTag) {
            IncrementTaggedTotalCounter(&state->TotalCounters, tag, counter, delta);
        } else {
            IncrementUntaggedTotalCounter(&state->TotalCounters, counter, delta);
        }
    }

    static Y_FORCE_INLINE void IncrementTotalCounter(TMemoryTag tag, EBasicCounter counter, ssize_t delta)
    {
        IncrementTotalCounter(GlobalState.Get(), tag, counter, delta);
    }

    void IncrementSmallArenaCounter(ESmallArenaCounter counter, size_t rank, ssize_t delta)
    {
        SmallArenaCounters_[rank][counter] += delta;
    }

    template <class TState>
    static Y_FORCE_INLINE void IncrementLargeArenaCounter(TState* state, size_t rank, ELargeArenaCounter counter, ssize_t delta)
    {
        state->LargeArenaCounters[rank][counter] += delta;
    }

    template <class TState>
    static Y_FORCE_INLINE void IncrementUndumpableCounter(TState* state, EUndumpableCounter counter, ssize_t delta)
    {
        state->UndumpableCounters[counter] += delta;
    }

    void IncrementHugeCounter(EHugeCounter counter, ssize_t delta)
    {
        HugeCounters_[counter] += delta;
    }

    void IncrementHugeUndumpableCounter(EUndumpableCounter counter, ssize_t delta)
    {
        HugeUndumpableCounters_[counter] += delta;
    }

    void IncrementSystemCounter(ESystemCounter counter, ssize_t delta)
    {
        SystemCounters_[counter] += delta;
    }

    // Computes memory usage for a list of tags by aggregating counters across threads.
    void GetTaggedMemoryCounters(const TMemoryTag* tags, size_t count, TEnumIndexedVector<EBasicCounter, ssize_t>* counters)
    {
        TMemoryTagGuard guard(NullMemoryTag);

        for (size_t index = 0; index < count; ++index) {
            counters[index][EBasicCounter::BytesAllocated] = 0;
            counters[index][EBasicCounter::BytesFreed] = 0;
        }

        for (size_t index = 0; index < count; ++index) {
            auto tag = tags[index];
            counters[index][EBasicCounter::BytesAllocated] += LoadTaggedTotalCounter(GlobalState->TotalCounters, tag, EBasicCounter::BytesAllocated);
            counters[index][EBasicCounter::BytesFreed] += LoadTaggedTotalCounter(GlobalState->TotalCounters, tag, EBasicCounter::BytesFreed);
        }

        ThreadManager->EnumerateThreadStatesAsync(
            [&] (const auto* state) {
                for (size_t index = 0; index < count; ++index) {
                    auto tag = tags[index];
                    counters[index][EBasicCounter::BytesAllocated] += LoadTaggedTotalCounter(state->TotalCounters, tag, EBasicCounter::BytesAllocated);
                    counters[index][EBasicCounter::BytesFreed] += LoadTaggedTotalCounter(state->TotalCounters, tag, EBasicCounter::BytesFreed);
                }
            });

        for (size_t index = 0; index < count; ++index) {
            counters[index][EBasicCounter::BytesUsed] = GetUsed(counters[index][EBasicCounter::BytesAllocated], counters[index][EBasicCounter::BytesFreed]);
        }
    }

    void GetTaggedMemoryUsage(const TMemoryTag* tags, size_t count, size_t* results)
    {
        TMemoryTagGuard guard(NullMemoryTag);

        std::vector<TEnumIndexedVector<EBasicCounter, ssize_t>> counters;
        counters.resize(count);
        GetTaggedMemoryCounters(tags, count, counters.data());

        for (size_t index = 0; index < count; ++index) {
            results[index] = counters[index][EBasicCounter::BytesUsed];
        }
    }

    TEnumIndexedVector<ETotalCounter, ssize_t> GetTotalAllocationCounters()
    {
        TEnumIndexedVector<ETotalCounter, ssize_t> result;

        auto accumulate = [&] (const auto& counters) {
            result[ETotalCounter::BytesAllocated] += LoadCounter(counters[EBasicCounter::BytesAllocated]);
            result[ETotalCounter::BytesFreed] += LoadCounter(counters[EBasicCounter::BytesFreed]);
        };

        accumulate(GlobalState->TotalCounters.UntaggedCounters);
        accumulate(GlobalState->TotalCounters.CumulativeTaggedCounters);

        ThreadManager->EnumerateThreadStatesAsync(
            [&] (const auto* state) {
                accumulate(state->TotalCounters.UntaggedCounters);
                accumulate(state->TotalCounters.CumulativeTaggedCounters);
            });

        result[ETotalCounter::BytesUsed] = GetUsed(
            result[ETotalCounter::BytesAllocated],
            result[ETotalCounter::BytesFreed]);

        auto systemCounters = GetSystemAllocationCounters();
        result[ETotalCounter::BytesCommitted] += systemCounters[EBasicCounter::BytesUsed];

        auto hugeCounters = GetHugeAllocationCounters();
        result[ETotalCounter::BytesCommitted] += hugeCounters[EHugeCounter::BytesUsed];

        auto smallArenaCounters = GetSmallArenaAllocationCounters();
        for (size_t rank = 0; rank < SmallRankCount; ++rank) {
            result[ETotalCounter::BytesCommitted] += smallArenaCounters[rank][ESmallArenaCounter::BytesCommitted];
        }

        auto largeArenaCounters = GetLargeArenaAllocationCounters();
        for (size_t rank = 0; rank < LargeRankCount; ++rank) {
            result[ETotalCounter::BytesCommitted] += largeArenaCounters[rank][ELargeArenaCounter::BytesCommitted];
        }

        result[ETotalCounter::BytesUnaccounted] = std::max<ssize_t>(GetProcessRss() - result[ETotalCounter::BytesCommitted], 0);

        return result;
    }

    TEnumIndexedVector<ESmallCounter, ssize_t> GetSmallAllocationCounters()
    {
        TEnumIndexedVector<ESmallCounter, ssize_t> result;

        auto totalCounters = GetTotalAllocationCounters();
        result[ESmallCounter::BytesAllocated] = totalCounters[ETotalCounter::BytesAllocated];
        result[ESmallCounter::BytesFreed] = totalCounters[ETotalCounter::BytesFreed];
        result[ESmallCounter::BytesUsed] = totalCounters[ETotalCounter::BytesUsed];

        auto largeArenaCounters = GetLargeArenaAllocationCounters();
        for (size_t rank = 0; rank < LargeRankCount; ++rank) {
            result[ESmallCounter::BytesAllocated] -= largeArenaCounters[rank][ELargeArenaCounter::BytesAllocated];
            result[ESmallCounter::BytesFreed] -= largeArenaCounters[rank][ELargeArenaCounter::BytesFreed];
            result[ESmallCounter::BytesUsed] -= largeArenaCounters[rank][ELargeArenaCounter::BytesUsed];
        }

        auto hugeCounters = GetHugeAllocationCounters();
        result[ESmallCounter::BytesAllocated] -= hugeCounters[EHugeCounter::BytesAllocated];
        result[ESmallCounter::BytesFreed] -= hugeCounters[EHugeCounter::BytesFreed];
        result[ESmallCounter::BytesUsed] -= hugeCounters[EHugeCounter::BytesUsed];

        return result;
    }

    std::array<TLocalSmallCounters, SmallRankCount> GetSmallArenaAllocationCounters()
    {
        std::array<TLocalSmallCounters, SmallRankCount> result;
        for (size_t rank = 0; rank < SmallRankCount; ++rank) {
            for (auto counter : TEnumTraits<ESmallArenaCounter>::GetDomainValues()) {
                result[rank][counter] = SmallArenaCounters_[rank][counter].load();
            }
        }
        return result;
    }

    TEnumIndexedVector<ELargeCounter, ssize_t> GetLargeAllocationCounters()
    {
        TEnumIndexedVector<ELargeCounter, ssize_t> result;
        auto largeArenaCounters = GetLargeArenaAllocationCounters();
        for (size_t rank = 0; rank < LargeRankCount; ++rank) {
            result[ESmallCounter::BytesAllocated] += largeArenaCounters[rank][ELargeArenaCounter::BytesAllocated];
            result[ESmallCounter::BytesFreed] += largeArenaCounters[rank][ELargeArenaCounter::BytesFreed];
            result[ESmallCounter::BytesUsed] += largeArenaCounters[rank][ELargeArenaCounter::BytesUsed];
        }
        return result;
    }

    std::array<TLocalLargeCounters, LargeRankCount> GetLargeArenaAllocationCounters()
    {
        std::array<TLocalLargeCounters, LargeRankCount> result{};

        for (size_t rank = 0; rank < LargeRankCount; ++rank) {
            for (auto counter : TEnumTraits<ELargeArenaCounter>::GetDomainValues()) {
                result[rank][counter] = GlobalState->LargeArenaCounters[rank][counter].load();
            }
        }

        ThreadManager->EnumerateThreadStatesAsync(
            [&] (const auto* state) {
                for (size_t rank = 0; rank < LargeRankCount; ++rank) {
                    for (auto counter : TEnumTraits<ELargeArenaCounter>::GetDomainValues()) {
                        result[rank][counter] += state->LargeArenaCounters[rank][counter];
                    }
                }
            });

        for (size_t rank = 0; rank < LargeRankCount; ++rank) {
            result[rank][ELargeArenaCounter::BytesUsed] = GetUsed(result[rank][ELargeArenaCounter::BytesAllocated], result[rank][ELargeArenaCounter::BytesFreed]);
            result[rank][ELargeArenaCounter::BlobsUsed] = GetUsed(result[rank][ELargeArenaCounter::BlobsAllocated], result[rank][ELargeArenaCounter::BlobsFreed]);
        }

        return result;
    }

    TLocalSystemCounters GetSystemAllocationCounters()
    {
        TLocalSystemCounters result;
        for (auto counter : TEnumTraits<ESystemCounter>::GetDomainValues()) {
            result[counter] = SystemCounters_[counter].load();
        }
        result[ESystemCounter::BytesUsed] = GetUsed(result[ESystemCounter::BytesAllocated], result[ESystemCounter::BytesFreed]);
        return result;
    }

    TLocalHugeCounters GetHugeAllocationCounters()
    {
        TLocalHugeCounters result;
        for (auto counter : TEnumTraits<EHugeCounter>::GetDomainValues()) {
            result[counter] = HugeCounters_[counter].load();
        }
        result[EHugeCounter::BytesUsed] = GetUsed(result[EHugeCounter::BytesAllocated], result[EHugeCounter::BytesFreed]);
        result[EHugeCounter::BlobsUsed] = GetUsed(result[EHugeCounter::BlobsAllocated], result[EHugeCounter::BlobsFreed]);
        return result;
    }

    TLocalUndumpableCounters GetUndumpableAllocationCounters()
    {
        TLocalUndumpableCounters result;
        for (auto counter : TEnumTraits<EUndumpableCounter>::GetDomainValues()) {
            result[counter] = HugeUndumpableCounters_[counter].load();
            result[counter] += GlobalState->UndumpableCounters[counter].load();
        }

        ThreadManager->EnumerateThreadStatesAsync(
            [&] (const auto* state) {
                result[EUndumpableCounter::BytesAllocated] += LoadCounter(state->UndumpableCounters[EUndumpableCounter::BytesAllocated]);
                result[EUndumpableCounter::BytesFreed] += LoadCounter(state->UndumpableCounters[EUndumpableCounter::BytesFreed]);
            });

        result[EUndumpableCounter::BytesUsed] = GetUsed(result[EUndumpableCounter::BytesAllocated], result[EUndumpableCounter::BytesFreed]);
        return result;
    }

    // Called before TThreadState is destroyed.
    // Adds the counter values from TThreadState to the global counters.
    void AccumulateLocalCounters(TThreadState* state)
    {
        for (auto counter : TEnumTraits<EBasicCounter>::GetDomainValues()) {
            GlobalState->TotalCounters.CumulativeTaggedCounters[counter] += state->TotalCounters.CumulativeTaggedCounters[counter];
            GlobalState->TotalCounters.UntaggedCounters[counter] += state->TotalCounters.UntaggedCounters[counter];
        }
        for (size_t index = 0; index < MaxTaggedCounterSets; ++index) {
            const auto* localSet = state->TotalCounters.FindTaggedCounterSet(index);
            if (!localSet) {
                continue;
            }
            auto* globalSet = GlobalState->TotalCounters.GetOrCreateTaggedCounterSet(index);
            for (size_t jndex = 0; jndex < TaggedCounterSetSize; ++jndex) {
                for (auto counter : TEnumTraits<EBasicCounter>::GetDomainValues()) {
                    globalSet->Counters[jndex][counter] += localSet->Counters[jndex][counter];
                }
            }
        }
        for (size_t rank = 0; rank < LargeRankCount; ++rank) {
            for (auto counter : TEnumTraits<ELargeArenaCounter>::GetDomainValues()) {
                GlobalState->LargeArenaCounters[rank][counter] += state->LargeArenaCounters[rank][counter];
            }
        }
        for (auto counter : TEnumTraits<EUndumpableCounter>::GetDomainValues()) {
            GlobalState->UndumpableCounters[counter] += state->UndumpableCounters[counter];
        }
    }

private:
    template <class TCounter>
    static ssize_t LoadTaggedTotalCounter(const TTotalCounters<TCounter>& counters, TMemoryTag tag, EBasicCounter counter)
    {
        const auto* set = counters.FindTaggedCounterSet(tag / TaggedCounterSetSize);
        if (Y_UNLIKELY(!set)) {
            return 0;
        }
        return LoadCounter(set->Counters[tag % TaggedCounterSetSize][counter]);
    }

    template <class TCounter>
    static Y_FORCE_INLINE void IncrementUntaggedTotalCounter(TTotalCounters<TCounter>* counters, EBasicCounter counter, ssize_t delta)
    {
        counters->UntaggedCounters[counter] += delta;
    }

    template <class TCounter>
    static Y_FORCE_INLINE void IncrementTaggedTotalCounter(TTotalCounters<TCounter>* counters, TMemoryTag tag, EBasicCounter counter, ssize_t delta)
    {
        counters->CumulativeTaggedCounters[counter] += delta;
        auto* set = counters->GetOrCreateTaggedCounterSet(tag / TaggedCounterSetSize);
        set->Counters[tag % TaggedCounterSetSize][counter] += delta;
    }


    static ssize_t GetProcessRss()
    {
        auto* file = ::fopen("/proc/self/statm", "r");
        if (!file) {
            return 0;
        }

        ssize_t dummy;
        ssize_t rssPages;
        auto readResult = fscanf(file, "%zd %zd", &dummy, &rssPages);

        ::fclose(file);

        if (readResult != 2) {
            return 0;
        }

        return rssPages * PageSize;
    }

private:
    TGlobalSystemCounters SystemCounters_;
    std::array<TGlobalSmallCounters, SmallRankCount> SmallArenaCounters_;
    TGlobalHugeCounters HugeCounters_;
    TGlobalUndumpableCounters HugeUndumpableCounters_;
};

TExplicitlyConstructableSingleton<TStatisticsManager> StatisticsManager;

////////////////////////////////////////////////////////////////////////////////

void* TSystemAllocator::Allocate(size_t size)
{
    auto rawSize = GetRawBlobSize<TSystemBlobHeader>(size);
    void* mmappedPtr;
    while (true) {
        auto currentPtr = CurrentPtr_.fetch_add(rawSize);
        Y_VERIFY(currentPtr + rawSize <= SystemZoneEnd);
        mmappedPtr = MappedMemoryManager->Map(
            currentPtr,
            rawSize,
            MAP_FIXED_NOREPLACE | MAP_POPULATE);
        if (mmappedPtr == reinterpret_cast<void*>(currentPtr)) {
            break;
        }
        if (mmappedPtr != MAP_FAILED) {
            MappedMemoryManager->Unmap(mmappedPtr, rawSize);
        }
    }
    auto* blob = static_cast<TSystemBlobHeader*>(mmappedPtr);
    new (blob) TSystemBlobHeader(size);
    auto* result = HeaderToPtr(blob);
    PoisonUninitializedRange(result, size);
    StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, rawSize);
    return result;
}

void TSystemAllocator::Free(void* ptr)
{
    auto* blob = PtrToHeader<TSystemBlobHeader>(ptr);
    auto rawSize = GetRawBlobSize<TSystemBlobHeader>(blob->Size);
    MappedMemoryManager->Unmap(blob, rawSize);
    StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesFreed, rawSize);
}

////////////////////////////////////////////////////////////////////////////////
// Small allocator
//
// Allocations (called small chunks) are grouped by their sizes. Two most-significant binary digits are
// used to determine the rank of a chunk, which guarantees 25% overhead in the worst case.
// A pair of helper arrays (SizeToSmallRank1 and SizeToSmallRank2) are used to compute ranks; we expect
// them to be permanently cached.
//
// Chunks of the same rank are served by a (small) arena allocator.
// In fact, there are two arenas for each rank: one is for tagged allocations and another is for untagged ones.
//
// We encode chunk's rank and whether it is tagged or not in the resulting pointer as follows:
//   0- 3:  must be zero due to alignment
//   4-39:  varies
//  40-44:  rank
//     45:  0 for untagged allocations, 1 for tagged ones
//  45-63:  zeroes
// This enables computing chunk's rank and also determining if it is tagged in constant time
// without any additional lookups. Also, one pays no space overhead for untagged allocations
// and pays 16 bytes for each tagged one.
//
// Each arena allocates extents of memory by calling mmap for each extent of SmallExtentSize bytes.
// (Recall that this memory is never reclaimed.)
// Each extent is then sliced into segments of SmallSegmentSize bytes.
// Whenever a new segment is acquired, its memory is pre-faulted by madvise(MADV_POPULATE).
// New segments are acquired in a lock-free manner.
//
// Each thread maintains a separate cache of chunks of each rank (two caches to be precise: one
// for tagged allocations and the other for untagged). These caches are fully thread-local and
// involve no atomic operations.
//
// There are also global caches (per rank, for tagged and untagged allocations).
// Instead of keeping individual chunks these work with chunk groups (collections of up to ChunksPerGroup
// arbitrary chunks).
//
// When the local cache becomes exhausted, a group of chunks is fetched from the global cache
// (if the latter is empty then the arena allocator is consulted).
// Vice versa, if the local cache overflows, a group of chunks is moved from it to the global cache.
//
// Global caches and arena allocators also take care of (rare) cases when Allocate/Free is called
// without a valid thread state (which happens during thread shutdown when TThreadState is already destroyed).
//
// Each arena allocates memory in a certain "data" zone of SmallZoneSize.
// In addition to that zone, up to two "shadow" zones are maintained.
//
// The first one contains memory tags of chunks residing in the primary zone.
// The second one (which is present if YTALLOC_NERVOUS is defined) contains
// states of chunks. These states enable some simple internal sanity checks
// (e.g. detect attempts to double-free a chunk).
//
// Addresses in the data zone are directly mapped to offsets in shadow zones.
// When a segment of a small arena zone is allocated, the relevant portions of shadow
// zones get initialized (and also accounted for as a system allocation).
//
// Shadow zones are memory-mapped with MAP_NORESERVE flag and are quite sparse.
// These zones are omitted from core dumps due to their huge size and sparsity.

// For each small rank i, gives max K such that 2^k <= SmallRankToSize[i].
// Chunk pointer is mapped to its shadow image via GetShadowOffset helper.
// Note that chunk size is not always a power of 2. To avoid costly integer division,
// chunk pointer is translated by means of bitwise shift only (leaving some bytes
// of shadow zones unused). This array provides the needed shifts.
constexpr int SmallRankToLogSize[SmallRankCount] = {
    0,
    4, 5, 5, 6, 6, 7,
    7, 8, 8, 9, 9, 10, 10, 11,
    11, 12, 12, 13, 13, 14, 14, 15
};

enum class ESmallChunkState : ui8
{
    Spare         = 0,
    Allocated     = 0x61, // a
    Freed         = 0x66  // f
};

class TSmallArenaAllocator
{
public:
    TSmallArenaAllocator(EAllocationKind kind, size_t rank, uintptr_t dataZoneStart)
        : Kind_(kind)
        , Rank_(rank)
        , LogSize_(SmallRankToLogSize[Rank_])
        , ChunkSize_(SmallRankToSize[Rank_])
        , DataZoneStart_(dataZoneStart)
        , DataZoneAllocator_(DataZoneStart_, DataZoneStart_ + SmallZoneSize)
    { }

    size_t PullMany(void** batch, size_t maxCount)
    {
        size_t count;
        while (true) {
            count = TryAllocateFromCurrentExtent(batch, maxCount);
            if (Y_LIKELY(count != 0)) {
                break;
            }
            PopulateAnotherExtent();
        }
        return count;
    }

    void* Allocate(size_t size)
    {
        void* ptr;
        auto count = PullMany(&ptr, 1);
        YTALLOC_PARANOID_ASSERT(count == 1);
        YTALLOC_PARANOID_ASSERT(PtrToSmallRank(ptr) == Rank_);
        PoisonUninitializedRange(ptr, size);
        UpdateChunkState(ptr, ESmallChunkState::Freed, ESmallChunkState::Allocated);
        return ptr;
    }

    TMemoryTag GetAndResetMemoryTag(const void* ptr)
    {
        auto& tag = MemoryTagZoneStart_[GetShadowOffset(ptr)];
        auto currentTag = tag;
        tag = NullMemoryTag;
        return currentTag;
    }

    void SetMemoryTag(void* ptr, TMemoryTag tag)
    {
        MemoryTagZoneStart_[GetShadowOffset(ptr)] = tag;
    }

    void UpdateChunkState(const void* ptr, ESmallChunkState expectedState, ESmallChunkState newState)
    {
#ifdef YTALLOC_NERVOUS
        auto& state = ChunkStateZoneStart_[GetShadowOffset(ptr)];
        auto actualState = state;
        if (Y_UNLIKELY(actualState != expectedState)) {
            char message[256];
            sprintf(message, "Invalid small chunk state at %p: expected %" PRIx8 ", actual %" PRIx8,
                ptr,
                static_cast<ui8>(expectedState),
                static_cast<ui8>(actualState));
            YTALLOC_TRAP(message);
        }
        state = newState;
#else
        Y_UNUSED(ptr);
        Y_UNUSED(expectedState);
        Y_UNUSED(newState);
#endif
    }

private:
    size_t TryAllocateFromCurrentExtent(void** batch, size_t maxCount)
    {
        auto* oldPtr = CurrentPtr_.load();
        if (Y_UNLIKELY(!oldPtr)) {
            return 0;
        }

        auto* currentExtent = CurrentExtent_.load(std::memory_order_relaxed);
        if (Y_UNLIKELY(!currentExtent)) {
            return 0;
        }

        char* newPtr;
        while (true) {
            if (Y_UNLIKELY(oldPtr < currentExtent || oldPtr + ChunkSize_ + RightReadableAreaSize > currentExtent + SmallExtentSize)) {
                return 0;
            }

            newPtr = std::min(
                oldPtr + ChunkSize_ * maxCount,
                currentExtent + SmallExtentSize);

            auto* alignedNewPtr = AlignDownToSmallSegment(currentExtent, newPtr);
            if (alignedNewPtr > oldPtr) {
                newPtr = alignedNewPtr;
            }

            if (Y_LIKELY(CurrentPtr_.compare_exchange_weak(oldPtr, newPtr))) {
                break;
            }
        }

        auto* firstSegment = AlignUpToSmallSegment(currentExtent, oldPtr);
        auto* nextSegment = AlignUpToSmallSegment(currentExtent, newPtr);
        if (firstSegment != nextSegment) {
            auto size = nextSegment - firstSegment;
            MappedMemoryManager->PopulateReadOnly(firstSegment, size);

            StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::BytesCommitted, Rank_, size);
            StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::PagesCommitted, Rank_, size / PageSize);
            if (Kind_ == EAllocationKind::Tagged) {
                StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, size / ChunkSize_ * sizeof(TMemoryTag));
            }
#ifdef YTALLOC_NERVOUS
            StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, size / ChunkSize_ * sizeof(ESmallChunkState));
#endif
        }

        size_t count = 0;
        while (oldPtr != newPtr) {
            UpdateChunkState(oldPtr, ESmallChunkState::Spare, ESmallChunkState::Freed);

            batch[count] = oldPtr;

            oldPtr += ChunkSize_;
            count++;
        }
        return count;
    }

    void PopulateAnotherExtent()
    {
        auto lockGuard = GuardWithTiming(ExtentLock_);

        auto* currentPtr = CurrentPtr_.load();
        auto* currentExtent = CurrentExtent_.load();

        if (currentPtr && currentPtr + ChunkSize_ + RightReadableAreaSize <= currentExtent + SmallExtentSize) {
            // No need for a new extent.
            return;
        }

        auto* newExtent = static_cast<char*>(DataZoneAllocator_.Allocate(SmallExtentAllocSize, 0));

        AllocateShadowZones();

        YTALLOC_VERIFY(reinterpret_cast<uintptr_t>(newExtent) % SmallExtentAllocSize == 0);
        CurrentPtr_ = CurrentExtent_ = newExtent;

        StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::BytesMapped, Rank_, SmallExtentAllocSize);
        StatisticsManager->IncrementSmallArenaCounter(ESmallArenaCounter::PagesMapped, Rank_, SmallExtentAllocSize / PageSize);
    }

private:
    const EAllocationKind Kind_;
    const size_t Rank_;
    const size_t LogSize_;
    const size_t ChunkSize_;
    const uintptr_t DataZoneStart_;

    TZoneAllocator DataZoneAllocator_;

    bool ShadowZonesAllocated_ = false;
    TMemoryTag* MemoryTagZoneStart_;
#ifdef YTALLOC_NERVOUS
    ESmallChunkState* ChunkStateZoneStart_;
#endif

    NThreading::TForkAwareSpinLock ExtentLock_;
    std::atomic<char*> CurrentPtr_ = nullptr;
    std::atomic<char*> CurrentExtent_ = nullptr;

    size_t GetShadowOffset(const void* ptr)
    {
        return (reinterpret_cast<uintptr_t>(ptr) - DataZoneStart_) >> LogSize_;
    }

    void AllocateShadowZones()
    {
        if (ShadowZonesAllocated_) {
            return;
        }

        if (Kind_ == EAllocationKind::Tagged) {
            MemoryTagZoneStart_ = MapShadowZone<TMemoryTag>();
        }
#ifdef YTALLOC_NERVOUS
        ChunkStateZoneStart_ = MapShadowZone<ESmallChunkState>();
#endif

        ShadowZonesAllocated_ = true;
    }

    template <class T>
    T* MapShadowZone()
    {
        auto size = AlignUp((SmallZoneSize >> LogSize_) * sizeof (T), PageSize);
        auto* ptr = static_cast<T*>(MappedMemoryManager->Map(SystemZoneStart, size, MAP_NORESERVE));
        MappedMemoryManager->DontDump(ptr, size);
        return ptr;
    }
};

TExplicitlyConstructableSingleton<TEnumIndexedVector<EAllocationKind, std::array<TExplicitlyConstructableSingleton<TSmallArenaAllocator>, SmallRankCount>>> SmallArenaAllocators;

////////////////////////////////////////////////////////////////////////////////

constexpr size_t ChunksPerGroup = 128;
constexpr size_t GroupsBatchSize = 1024;

static_assert(ChunksPerGroup <= MaxCachedChunksPerRank, "ChunksPerGroup > MaxCachedChunksPerRank");

class TChunkGroup
    : public TFreeListItemBase<TChunkGroup>
{
public:
    bool IsEmpty() const
    {
        return Size_ == 0;
    }

    size_t ExtractAll(void** ptrs)
    {
        auto count = Size_;
        ::memcpy(ptrs, Ptrs_.data(), count * sizeof(void*));
        Size_ = 0;
        return count;
    }

    void PutOne(void* ptr)
    {
        PutMany(&ptr, 1);
    }

    void PutMany(void** ptrs, size_t count)
    {
        YTALLOC_PARANOID_ASSERT(Size_ == 0);
        YTALLOC_PARANOID_ASSERT(count <= ChunksPerGroup);
        ::memcpy(Ptrs_.data(), ptrs, count * sizeof(void*));
        Size_ = count;
    }

private:
    size_t Size_ = 0; // <= ChunksPerGroup
    std::array<void*, ChunksPerGroup> Ptrs_;
};

class TGlobalSmallChunkCache
{
public:
    explicit TGlobalSmallChunkCache(EAllocationKind kind)
        : Kind_(kind)
    { }

#ifdef YTALLOC_PARANOID
    void CanonizeChunkPtrs(TThreadState* state, size_t rank)
    {
        auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank];

        auto leftBorder = state->SmallBlobCache[Kind_].RankToCachedChunkLeftBorder[rank];
        auto rightBorder = state->SmallBlobCache[Kind_].RankToCachedChunkRightBorder[rank];

        state->SmallBlobCache[Kind_].CachedChunkFull[rank] = false;
        if (chunkPtrPtr + 1 == rightBorder) {
            chunkPtrPtr = leftBorder;
            state->SmallBlobCache[Kind_].CachedChunkFull[rank] = true;
        }

        state->SmallBlobCache[Kind_].RankToCachedChunkPtrTail[rank] = leftBorder;
    }
#endif

    bool TryMoveGroupToLocal(TThreadState* state, size_t rank)
    {
        auto& groups = RankToChunkGroups_[rank];
        auto* group = groups.Extract(state);
        if (!Y_LIKELY(group)) {
            return false;
        }

        YTALLOC_PARANOID_ASSERT(!group->IsEmpty());

        auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank];
#ifdef YTALLOC_PARANOID
        chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkLeftBorder[rank];
        state->SmallBlobCache[Kind_].RankToCachedChunkPtrTail[rank] = chunkPtrPtr;
#endif
        auto chunkCount = group->ExtractAll(chunkPtrPtr + 1);
        chunkPtrPtr += chunkCount;

#ifdef YTALLOC_PARANOID
        CanonizeChunkPtrs(state, rank);
#endif
        GroupPool_.Free(state, group);
        return true;
    }

    void MoveGroupToGlobal(TThreadState* state, size_t rank)
    {
        auto* group = GroupPool_.Allocate(state);

        auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank];
        YTALLOC_PARANOID_ASSERT(*(chunkPtrPtr + 1) == reinterpret_cast<void*>(TThreadState::RightSentinel));
        group->PutMany(chunkPtrPtr - ChunksPerGroup + 1, ChunksPerGroup);
        chunkPtrPtr -= ChunksPerGroup;
#ifdef YTALLOC_PARANOID
        ::memset(chunkPtrPtr + 1, 0, sizeof(void*) * ChunksPerGroup);
        CanonizeChunkPtrs(state, rank);
#endif

        auto& groups = RankToChunkGroups_[rank];
        YTALLOC_PARANOID_ASSERT(!group->IsEmpty());
        groups.Put(state, group);
    }

    void MoveOneToGlobal(void* ptr, size_t rank)
    {
        auto* group = GroupPool_.Allocate(&GlobalShardedState_);
        group->PutOne(ptr);

        auto& groups = RankToChunkGroups_[rank];
        YTALLOC_PARANOID_ASSERT(!group->IsEmpty());
        groups.Put(&GlobalShardedState_, group);
    }

#ifdef YTALLOC_PARANOID
    void MoveAllToGlobal(TThreadState* state, size_t rank)
    {
        auto leftSentinelBorder = state->SmallBlobCache[Kind_].RankToCachedChunkLeftBorder[rank];
        auto rightSentinelBorder = state->SmallBlobCache[Kind_].RankToCachedChunkRightBorder[rank];

        auto& headPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank];
        auto& tailPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrTail[rank];

        if (tailPtr == headPtr && !state->SmallBlobCache[Kind_].CachedChunkFull[rank]) {
            headPtr = leftSentinelBorder;
            return;
        }

        // (leftBorder, rightBorder]
        auto moveIntervalToGlobal = [=] (void** leftBorder, void** rightBorder) {
            while (true) {
                size_t count = 0;
                while (count < ChunksPerGroup && rightBorder != leftBorder) {
                    --rightBorder;
                    ++count;
                }

                if (count == 0) {
                    break;
                }

                auto* group = GroupPool_.Allocate(state);
                group->PutMany(rightBorder + 1, count);
                ::memset(rightBorder + 1, 0, sizeof(void*) * count);
                auto& groups = RankToChunkGroups_[rank];
                groups.Put(state, group);
            }
        };

        if (tailPtr >= headPtr) {
            moveIntervalToGlobal(tailPtr, rightSentinelBorder - 1);
            moveIntervalToGlobal(leftSentinelBorder, headPtr);
        } else {
            moveIntervalToGlobal(tailPtr, headPtr);
        }

        headPtr = leftSentinelBorder;
    }
#else
    void MoveAllToGlobal(TThreadState* state, size_t rank)
    {
        auto& chunkPtrPtr = state->SmallBlobCache[Kind_].RankToCachedChunkPtrHead[rank];
        while (true) {
            size_t count = 0;
            while (count < ChunksPerGroup && *chunkPtrPtr != reinterpret_cast<void*>(TThreadState::LeftSentinel)) {
                --chunkPtrPtr;
                ++count;
            }

            if (count == 0) {
                break;
            }

            auto* group = GroupPool_.Allocate(state);
            group->PutMany(chunkPtrPtr + 1, count);
            auto& groups = RankToChunkGroups_[rank];
            groups.Put(state, group);
        }
    }
#endif

private:
    const EAllocationKind Kind_;

    TGlobalShardedState GlobalShardedState_;
    TShardedSystemPool<TChunkGroup, GroupsBatchSize> GroupPool_;
    std::array<TShardedFreeList<TChunkGroup>, SmallRankCount> RankToChunkGroups_;
};

TExplicitlyConstructableSingleton<TEnumIndexedVector<EAllocationKind, TExplicitlyConstructableSingleton<TGlobalSmallChunkCache>>> GlobalSmallChunkCaches;

////////////////////////////////////////////////////////////////////////////////

class TSmallAllocator
{
public:
    template <EAllocationKind Kind>
    static Y_FORCE_INLINE void* Allocate(TMemoryTag tag, size_t rank)
    {
        auto* state = TThreadManager::FindThreadState();
        if (Y_LIKELY(state)) {
            return Allocate<Kind>(tag, rank, state);
        }
        auto size = SmallRankToSize[rank];
        return AllocateGlobal<Kind>(tag, rank, size);
    }

#ifdef YTALLOC_PARANOID
    template <EAllocationKind Kind>
    static Y_FORCE_INLINE void* Allocate(TMemoryTag tag, size_t rank, TThreadState* state)
    {
        auto& localCache = state->SmallBlobCache[Kind];
        auto& allocator = *(*SmallArenaAllocators)[Kind][rank];

        size_t size = SmallRankToSize[rank];
        StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesAllocated, size);

        auto leftBorder = localCache.RankToCachedChunkLeftBorder[rank];
        auto rightBorder = localCache.RankToCachedChunkRightBorder[rank];

        void* result;
        while (true) {
            auto& chunkHeadPtr = localCache.RankToCachedChunkPtrHead[rank];
            auto& cachedHeadPtr = *(chunkHeadPtr + 1);
            auto* headPtr = cachedHeadPtr;

            auto& chunkTailPtr = localCache.RankToCachedChunkPtrTail[rank];
            auto& cachedTailPtr = *(chunkTailPtr + 1);
            auto* tailPtr = cachedTailPtr;

            auto& chunkFull = localCache.CachedChunkFull[rank];

            if (Y_LIKELY(chunkFull || headPtr != tailPtr)) {
                YTALLOC_PARANOID_ASSERT(tailPtr);
                cachedTailPtr = nullptr;
                ++chunkTailPtr;
                if (Y_LIKELY(chunkTailPtr + 1 == rightBorder)) {
                    chunkTailPtr = leftBorder;
                }

                chunkFull = false;
                result = tailPtr;
                PoisonUninitializedRange(result, size);
                allocator.UpdateChunkState(result, ESmallChunkState::Freed, ESmallChunkState::Allocated);
                break;
            }

            auto& globalCache = *(*GlobalSmallChunkCaches)[Kind];
            if (!globalCache.TryMoveGroupToLocal(state, rank)) {
                result = allocator.Allocate(size);
                break;
            }
        }

        if constexpr(Kind == EAllocationKind::Tagged) {
            allocator.SetMemoryTag(result, tag);
        }

        return result;
    }

    template <EAllocationKind Kind>
    static Y_FORCE_INLINE void Free(void* ptr)
    {
        auto rank = PtrToSmallRank(ptr);
        auto size = SmallRankToSize[rank];

        auto& allocator = *(*SmallArenaAllocators)[Kind][rank];

        auto tag = NullMemoryTag;
        if constexpr(Kind == EAllocationKind::Tagged) {
            tag = allocator.GetAndResetMemoryTag(ptr);
            YTALLOC_PARANOID_ASSERT(tag != NullMemoryTag);
        }

        allocator.UpdateChunkState(ptr, ESmallChunkState::Allocated, ESmallChunkState::Freed);
        PoisonFreedRange(ptr, size);

        auto* state = TThreadManager::FindThreadState();
        if (Y_UNLIKELY(!state)) {
            FreeGlobal<Kind>(tag, ptr, rank, size);
            return;
        }

        StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesFreed, size);

        auto& localCache = state->SmallBlobCache[Kind];

        auto leftBorder = localCache.RankToCachedChunkLeftBorder[rank];
        auto rightBorder = localCache.RankToCachedChunkRightBorder[rank];

        while (true) {
            auto& chunkHeadPtr = localCache.RankToCachedChunkPtrHead[rank];
            auto& headPtr = *(chunkHeadPtr + 1);

            auto& chunkTailPtr = localCache.RankToCachedChunkPtrTail[rank];
            auto& chunkFull = localCache.CachedChunkFull[rank];

            if (Y_LIKELY(!chunkFull)) {
                headPtr = ptr;
                ++chunkHeadPtr;
                if (Y_LIKELY(chunkHeadPtr + 1 == rightBorder)) {
                    chunkHeadPtr = leftBorder;
                }
                chunkFull = (chunkHeadPtr == chunkTailPtr);
                break;
            }

            chunkHeadPtr = rightBorder - 1;
            chunkTailPtr = leftBorder;

            auto& globalCache = *(*GlobalSmallChunkCaches)[Kind];
            globalCache.MoveGroupToGlobal(state, rank);
        }
    }

#else

    template <EAllocationKind Kind>
    static Y_FORCE_INLINE void* Allocate(TMemoryTag tag, size_t rank, TThreadState* state)
    {
        size_t size = SmallRankToSize[rank];
        StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesAllocated, size);

        auto& localCache = state->SmallBlobCache[Kind];
        auto& allocator = *(*SmallArenaAllocators)[Kind][rank];

        void* result;
        while (true) {
            auto& chunkPtr = localCache.RankToCachedChunkPtrHead[rank];
            auto& cachedPtr = *chunkPtr;
            auto* ptr = cachedPtr;
            if (Y_LIKELY(ptr != reinterpret_cast<void*>(TThreadState::LeftSentinel))) {
                --chunkPtr;
                result = ptr;
                allocator.UpdateChunkState(result, ESmallChunkState::Freed, ESmallChunkState::Allocated);
                PoisonUninitializedRange(result, size);
                break;
            }

            auto& globalCache = *(*GlobalSmallChunkCaches)[Kind];
            if (globalCache.TryMoveGroupToLocal(state, rank)) {
                continue;
            }

            auto count = allocator.PullMany(
                chunkPtr + 1,
                SmallRankBatchSize[rank]);
            chunkPtr += count;
        }

        if constexpr(Kind == EAllocationKind::Tagged) {
            allocator.SetMemoryTag(result, tag);
        }

        return result;
    }

    template <EAllocationKind Kind>
    static Y_FORCE_INLINE void Free(void* ptr)
    {
        auto rank = PtrToSmallRank(ptr);
        auto size = SmallRankToSize[rank];

        auto& allocator = *(*SmallArenaAllocators)[Kind][rank];

        auto tag = NullMemoryTag;
        if constexpr(Kind == EAllocationKind::Tagged) {
            tag = allocator.GetAndResetMemoryTag(ptr);
            YTALLOC_PARANOID_ASSERT(tag != NullMemoryTag);
        }

        allocator.UpdateChunkState(ptr, ESmallChunkState::Allocated, ESmallChunkState::Freed);
        PoisonFreedRange(ptr, size);

        auto* state = TThreadManager::FindThreadState();
        if (Y_UNLIKELY(!state)) {
            FreeGlobal<Kind>(tag, ptr, rank, size);
            return;
        }

        StatisticsManager->IncrementTotalCounter<Kind>(state, tag, EBasicCounter::BytesFreed, size);

        auto& localCache = state->SmallBlobCache[Kind];

        while (true) {
            auto& chunkPtrPtr = localCache.RankToCachedChunkPtrHead[rank];
            auto& chunkPtr = *(chunkPtrPtr + 1);
            if (Y_LIKELY(chunkPtr != reinterpret_cast<void*>(TThreadState::RightSentinel))) {
                chunkPtr = ptr;
                ++chunkPtrPtr;
                break;
            }

            auto& globalCache = *(*GlobalSmallChunkCaches)[Kind];
            globalCache.MoveGroupToGlobal(state, rank);
        }
    }
#endif

    static size_t GetAllocationSize(const void* ptr)
    {
        return SmallRankToSize[PtrToSmallRank(ptr)];
    }

    static size_t GetAllocationSize(size_t size)
    {
        return SmallRankToSize[SizeToSmallRank(size)];
    }

    static void PurgeCaches()
    {
        DoPurgeCaches<EAllocationKind::Untagged>();
        DoPurgeCaches<EAllocationKind::Tagged>();
    }

private:
    template <EAllocationKind Kind>
    static void DoPurgeCaches()
    {
        auto* state = TThreadManager::GetThreadStateChecked();
        for (size_t rank = 0; rank < SmallRankCount; ++rank) {
            (*GlobalSmallChunkCaches)[Kind]->MoveAllToGlobal(state, rank);
        }
    }

    template <EAllocationKind Kind>
    static void* AllocateGlobal(TMemoryTag tag, size_t rank, size_t size)
    {
        StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesAllocated, size);

        auto& allocator = *(*SmallArenaAllocators)[Kind][rank];
        auto* result = allocator.Allocate(size);

        if constexpr(Kind == EAllocationKind::Tagged) {
            allocator.SetMemoryTag(result, tag);
        }

        return result;
    }

    template <EAllocationKind Kind>
    static void FreeGlobal(TMemoryTag tag, void* ptr, size_t rank, size_t size)
    {
        StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesFreed, size);

        auto& globalCache = *(*GlobalSmallChunkCaches)[Kind];
        globalCache.MoveOneToGlobal(ptr, rank);
    }
};

////////////////////////////////////////////////////////////////////////////////
// Large blob allocator
//
// Like for small chunks, large blobs are grouped into arenas, where arena K handles
// blobs of size (2^{K-1},2^K]. Memory is mapped in extents of LargeExtentSize bytes.
// Each extent is split into segments of size 2^K (here segment is just a memory region, which may fully consist of
// unmapped pages). When a segment is actually allocated, it becomes a blob and a TLargeBlobHeader
// structure is placed at its start.
//
// When an extent is allocated, it is sliced into segments (not blobs, since no headers are placed and
// no memory is touched). These segments are put into disposed segments list.
//
// For each blob two separate sizes are maintained: BytesAcquired indicates the number of bytes
// acquired via madvise(MADV_POPULATE) from the system; BytesAllocated (<= BytesAcquired) corresponds
// to the number of bytes claimed by the user (including the header and page size alignment).
// If BytesAllocated == 0 then this blob is spare, i.e.
// was freed and remains cached for further possible reuse.
//
// When a new blob is being allocated, the allocator first tries to extract a spare blob. On success,
// its acquired size is extended (if needed); the acquired size never shrinks on allocation.
// If no spare blobs exist, a disposed segment is extracted and is turned into a blob (i.e.
// its header is initialized) and the needed number of bytes is acquired. If no disposed segments
// exist, then a new extent is allocated and sliced into segments.
//
// The above algorithm only claims memory from the system (by means of madvise(MADV_POPULATE));
// the reclaim is handled by a separate background mechanism. Two types of reclaimable memory
// regions are possible:
// * spare: these correspond to spare blobs; upon reclaiming this region becomes a disposed segment
// * overhead: these correspond to trailing parts of allocated blobs in [BytesAllocated, BytesAcquired) byte range
//
// Reclaiming spare blobs is easy as these are explicitly tracked by spare blob lists. To reclaim,
// we atomically extract a blob from a spare list, call madvise(MADV_FREE), and put the pointer to
// the disposed segment list.
//
// Reclaiming overheads is more complicated since (a) allocated blobs are never tracked directly and
// (b) reclaiming them may interfere with Allocate and Free.
//
// To overcome (a), for each extent we maintain a bitmap marking segments that are actually blobs
// (i.e. contain a header). (For simplicity and efficiency this bitmap is just a vector of bytes.)
// These flags are updated in Allocate/Free with appropriate memory ordering. Note that
// blobs are only disposed (and are turned into segments) by the background thread; if this
// thread discovers a segment that is marked as a blob, then it is safe to assume that this segment
// remains a blob unless the thread disposes it.
//
// To overcome (b), each large blob header maintains a spin lock. When blob B is extracted
// from a spare list in Allocate, an acquisition is tried. If successful, B is returned to the
// user. Otherwise it is assumed that B is currently being examined by the background
// reclaimer thread. Allocate then skips this blob and retries extraction; the problem is that
// since the spare list is basically a stack one cannot just push B back into the spare list.
// Instead, B is pushed into a special locked spare list. This list is purged by the background
// thread on each tick and its items are pushed back into the usual spare list.
//
// A similar trick is used by Free: when invoked for blob B its spin lock acquisition is first
// tried. Upon success, B is moved to the spare list. On failure, Free has to postpone this deallocation
// by moving B into the freed locked list. This list, similarly, is being purged by the background thread.
//
// It remains to explain how the background thread computes the number of bytes to be reclaimed from
// each arena. To this aim, we first compute the total number of reclaimable bytes.
// This is the sum of spare and overhead bytes in all arenas minus the number of unreclaimable bytes
// The latter grows linearly in the number of used bytes and is capped from below by a MinUnreclaimableLargeBytes;
// and from above by MaxUnreclaimableLargeBytes. SetLargeUnreclaimableCoeff and Set(Min|Max)LargeUnreclaimableBytes
// enable tuning these control knobs. The reclaimable bytes are being taken from arenas starting from those
// with the largest spare and overhead volumes.
//
// The above implies that each large blob contains a fixed-size header preceeding it.
// Hence ptr % PageSize == sizeof (TLargeBlobHeader) for each ptr returned by Allocate
// (since large blob sizes are larger than PageSize and are divisible by PageSize).
// For AllocatePageAligned, however, ptr must be divisible by PageSize. To handle such an allocation, we
// artificially increase its size and align the result of Allocate up to the next page boundary.
// When handling a deallocation, ptr is moved back by UnalignPtr (which is capable of dealing
// with both the results of Allocate and AllocatePageAligned).
// This technique applies to both large and huge blobs.

enum ELargeBlobState : ui64
{
    Allocated   = 0x6c6c61656772616cULL, // largeall
    Spare       = 0x727073656772616cULL, // largespr
    LockedSpare = 0x70736c656772616cULL, // largelsp
    LockedFreed = 0x72666c656772616cULL  // largelfr
};

// Every large blob (either tagged or not) is prepended with this header.
struct TLargeBlobHeader
    : public TFreeListItemBase<TLargeBlobHeader>
{
    TLargeBlobHeader(
        TLargeBlobExtent* extent,
        size_t bytesAcquired,
        size_t bytesAllocated,
        TMemoryTag tag)
        : Extent(extent)
        , BytesAcquired(bytesAcquired)
        , Tag(tag)
        , BytesAllocated(bytesAllocated)
        , State(ELargeBlobState::Allocated)
    { }

    TLargeBlobExtent* Extent;
    // Number of bytes in all acquired pages.
    size_t BytesAcquired;
    std::atomic<bool> Locked = false;
    TMemoryTag Tag = NullMemoryTag;
    // For spare blobs this is zero.
    // For allocated blobs this is the number of bytes requested by user (not including header of any alignment).
    size_t BytesAllocated;
    ELargeBlobState State;
    char Padding[12];
};

CHECK_HEADER_ALIGNMENT(TLargeBlobHeader)

struct TLargeBlobExtent
{
    TLargeBlobExtent(size_t segmentCount, char* ptr)
        : SegmentCount(segmentCount)
        , Ptr(ptr)
    { }

    size_t SegmentCount;
    char* Ptr;
    TLargeBlobExtent* NextExtent = nullptr;

    std::atomic<bool> DisposedFlags[0];
};

// A helper node that enables storing a number of extent's segments
// in a free list. Recall that segments themselves do not posses any headers.
struct TDisposedSegment
    : public TFreeListItemBase<TDisposedSegment>
{
    size_t Index;
    TLargeBlobExtent* Extent;
};

struct TLargeArena
{
    size_t Rank = 0;
    size_t SegmentSize = 0;

    TShardedFreeList<TLargeBlobHeader> SpareBlobs;
    TFreeList<TLargeBlobHeader> LockedSpareBlobs;
    TFreeList<TLargeBlobHeader> LockedFreedBlobs;
    TFreeList<TDisposedSegment> DisposedSegments;
    std::atomic<TLargeBlobExtent*> FirstExtent = nullptr;

    TLargeBlobExtent* CurrentOverheadScanExtent = nullptr;
    size_t CurrentOverheadScanSegment = 0;
};

template <bool Dumpable>
class TLargeBlobAllocator
{
public:
    TLargeBlobAllocator()
        : ZoneAllocator_(LargeZoneStart(Dumpable), LargeZoneEnd(Dumpable))
    {
        for (size_t rank = 0; rank < Arenas_.size(); ++rank) {
            auto& arena = Arenas_[rank];
            arena.Rank = rank;
            arena.SegmentSize = (1ULL << rank);
        }
    }

    void* Allocate(size_t size)
    {
        auto* state = TThreadManager::FindThreadState();
        return Y_LIKELY(state)
            ? DoAllocate(state, size)
            : DoAllocate(GlobalState.Get(), size);
    }

    void Free(void* ptr)
    {
        auto* state = TThreadManager::FindThreadState();
        if (Y_LIKELY(state)) {
            DoFree(state, ptr);
        } else {
            DoFree(GlobalState.Get(), ptr);
        }
    }

    static size_t GetAllocationSize(const void* ptr)
    {
        UnalignPtr<TLargeBlobHeader>(ptr);
        const auto* blob = PtrToHeader<TLargeBlobHeader>(ptr);
        return blob->BytesAllocated;
    }

    static size_t GetAllocationSize(size_t size)
    {
        return GetBlobAllocationSize<TLargeBlobHeader>(size);
    }

    void RunBackgroundTasks()
    {
        ReinstallLockedBlobs();
        ReclaimMemory();
    }

    void SetBacktraceProvider(TBacktraceProvider provider)
    {
        BacktraceProvider_.store(provider);
    }

private:
    template <class TState>
    void PopulateArenaPages(TState* state, TLargeArena* arena, void* ptr, size_t size)
    {
        MappedMemoryManager->Populate(ptr, size);
        StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesPopulated, size);
        StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesPopulated, size / PageSize);
        StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesCommitted, size);
        StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesCommitted, size / PageSize);
    }

    template <class TState>
    void ReleaseArenaPages(TState* state, TLargeArena* arena, void* ptr, size_t size)
    {
        MappedMemoryManager->Release(ptr, size);
        StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesReleased, size);
        StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesReleased, size / PageSize);
        StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::BytesCommitted, -size);
        StatisticsManager->IncrementLargeArenaCounter(state, arena->Rank, ELargeArenaCounter::PagesCommitted, -size / PageSize);
    }

    bool TryLockBlob(TLargeBlobHeader* blob)
    {
        bool expected = false;
        return blob->Locked.compare_exchange_strong(expected, true);
    }

    void UnlockBlob(TLargeBlobHeader* blob)
    {
        blob->Locked.store(false);
    }

    template <class TState>
    void MoveBlobToSpare(TState* state, TLargeArena* arena, TLargeBlobHeader* blob, bool unlock)
    {
        auto rank = arena->Rank;
        auto size = blob->BytesAllocated;
        auto rawSize = GetRawBlobSize<TLargeBlobHeader>(size);
        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesSpare, blob->BytesAcquired);
        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesOverhead, -(blob->BytesAcquired - rawSize));
        blob->BytesAllocated = 0;
        if (unlock) {
            UnlockBlob(blob);
        } else {
            YTALLOC_VERIFY(!blob->Locked.load());
        }
        blob->State = ELargeBlobState::Spare;
        arena->SpareBlobs.Put(state, blob);
    }

    size_t GetBytesToReclaim(const std::array<TLocalLargeCounters, LargeRankCount>& arenaCounters)
    {
        size_t totalBytesAllocated = 0;
        size_t totalBytesFreed = 0;
        size_t totalBytesSpare = 0;
        size_t totalBytesOverhead = 0;
        for (size_t rank = 0; rank < Arenas_.size(); ++rank) {
            const auto& counters = arenaCounters[rank];
            totalBytesAllocated += counters[ELargeArenaCounter::BytesAllocated];
            totalBytesFreed += counters[ELargeArenaCounter::BytesFreed];
            totalBytesSpare += counters[ELargeArenaCounter::BytesSpare];
            totalBytesOverhead += counters[ELargeArenaCounter::BytesOverhead];
        }

        auto totalBytesUsed = totalBytesAllocated - totalBytesFreed;
        auto totalBytesReclaimable = totalBytesSpare + totalBytesOverhead;

        auto threshold = ClampVal(
            static_cast<size_t>(ConfigurationManager->GetLargeUnreclaimableCoeff() * totalBytesUsed),
            ConfigurationManager->GetMinLargeUnreclaimableBytes(),
            ConfigurationManager->GetMaxLargeUnreclaimableBytes());
        if (totalBytesReclaimable < threshold) {
            return 0;
        }

        auto bytesToReclaim = totalBytesReclaimable - threshold;
        return AlignUp(bytesToReclaim, PageSize);
    }

    void ReinstallLockedSpareBlobs(TLargeArena* arena)
    {
        auto* blob = arena->LockedSpareBlobs.ExtractAll();
        auto* state = TThreadManager::GetThreadStateChecked();

        size_t count = 0;
        while (blob) {
            auto* nextBlob = blob->Next.load();
            YTALLOC_VERIFY(!blob->Locked.load());
            AssertBlobState(blob, ELargeBlobState::LockedSpare);
            blob->State = ELargeBlobState::Spare;
            arena->SpareBlobs.Put(state, blob);
            blob = nextBlob;
            ++count;
        }

        if (count > 0) {
            YTALLOC_LOG_DEBUG("Locked spare blobs reinstalled (Rank: %d, Blobs: %zu)",
                arena->Rank,
                count);
        }
    }

    void ReinstallLockedFreedBlobs(TLargeArena* arena)
    {
        auto* state = TThreadManager::GetThreadStateChecked();
        auto* blob = arena->LockedFreedBlobs.ExtractAll();

        size_t count = 0;
        while (blob) {
            auto* nextBlob = blob->Next.load();
            AssertBlobState(blob, ELargeBlobState::LockedFreed);
            MoveBlobToSpare(state, arena, blob, false);
            ++count;
            blob = nextBlob;
        }

        if (count > 0) {
            YTALLOC_LOG_DEBUG("Locked freed blobs reinstalled (Rank: %d, Blobs: %zu)",
                arena->Rank,
                count);
        }
    }

    void ReclaimSpareMemory(TLargeArena* arena, ssize_t bytesToReclaim)
    {
        if (bytesToReclaim <= 0) {
            return;
        }

        auto rank = arena->Rank;
        auto* state = TThreadManager::GetThreadStateChecked();

        YTALLOC_LOG_DEBUG("Started processing spare memory in arena (BytesToReclaim: %zdM, Rank: %d)",
            bytesToReclaim / 1_MB,
            rank);

        size_t bytesReclaimed = 0;
        size_t blobsReclaimed = 0;
        while (bytesToReclaim > 0) {
            auto* blob = arena->SpareBlobs.ExtractRoundRobin(state);
            if (!blob) {
                break;
            }

            AssertBlobState(blob, ELargeBlobState::Spare);
            YTALLOC_VERIFY(blob->BytesAllocated == 0);

            auto bytesAcquired = blob->BytesAcquired;
            StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesSpare, -bytesAcquired);
            bytesToReclaim -= bytesAcquired;
            bytesReclaimed += bytesAcquired;
            blobsReclaimed += 1;

            auto* extent = blob->Extent;
            auto* ptr = reinterpret_cast<char*>(blob);
            ReleaseArenaPages(
                state,
                arena,
                ptr,
                bytesAcquired);

            size_t segmentIndex = (ptr - extent->Ptr) / arena->SegmentSize;
            extent->DisposedFlags[segmentIndex].store(true, std::memory_order_relaxed);

            auto* disposedSegment = DisposedSegmentPool_.Allocate();
            disposedSegment->Index = segmentIndex;
            disposedSegment->Extent = extent;
            arena->DisposedSegments.Put(disposedSegment);
        }

        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::SpareBytesReclaimed, bytesReclaimed);

        YTALLOC_LOG_DEBUG("Finished processing spare memory in arena (Rank: %d, BytesReclaimed: %zdM, BlobsReclaimed: %zu)",
            arena->Rank,
            bytesReclaimed / 1_MB,
            blobsReclaimed);
    }

    void ReclaimOverheadMemory(TLargeArena* arena, ssize_t bytesToReclaim)
    {
        if (bytesToReclaim == 0) {
            return;
        }

        auto* state = TThreadManager::GetThreadStateChecked();
        auto rank = arena->Rank;

        YTALLOC_LOG_DEBUG("Started processing overhead memory in arena (BytesToReclaim: %zdM, Rank: %d)",
            bytesToReclaim / 1_MB,
            rank);

        size_t extentsTraversed = 0;
        size_t segmentsTraversed = 0;
        size_t bytesReclaimed = 0;

        bool restartedFromFirstExtent = false;
        auto& currentExtent = arena->CurrentOverheadScanExtent;
        auto& currentSegment = arena->CurrentOverheadScanSegment;
        while (bytesToReclaim > 0) {
            if (!currentExtent) {
                if (restartedFromFirstExtent) {
                    break;
                }
                currentExtent = arena->FirstExtent.load();
                if (!currentExtent) {
                    break;
                }
                restartedFromFirstExtent = true;
            }

            while (currentSegment  < currentExtent->SegmentCount && bytesToReclaim > 0) {
                ++segmentsTraversed;
                if (!currentExtent->DisposedFlags[currentSegment].load(std::memory_order_acquire)) {
                    auto* ptr = currentExtent->Ptr + currentSegment * arena->SegmentSize;
                    auto* blob = reinterpret_cast<TLargeBlobHeader*>(ptr);
                    YTALLOC_PARANOID_ASSERT(blob->Extent == currentExtent);
                    if (TryLockBlob(blob)) {
                        if (blob->BytesAllocated > 0) {
                            size_t rawSize = GetRawBlobSize<TLargeBlobHeader>(blob->BytesAllocated);
                            size_t bytesToRelease = blob->BytesAcquired - rawSize;
                            if (bytesToRelease > 0) {
                                ReleaseArenaPages(
                                    state,
                                    arena,
                                    ptr + blob->BytesAcquired - bytesToRelease,
                                    bytesToRelease);
                                StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesOverhead, -bytesToRelease);
                                blob->BytesAcquired = rawSize;
                                bytesToReclaim -= bytesToRelease;
                                bytesReclaimed += bytesToRelease;
                            }
                        }
                        UnlockBlob(blob);
                    }
                }
                ++currentSegment;
            }

            ++extentsTraversed;
            currentSegment = 0;
            currentExtent = currentExtent->NextExtent;
        }

        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::OverheadBytesReclaimed, bytesReclaimed);

        YTALLOC_LOG_DEBUG("Finished processing overhead memory in arena (Rank: %d, Extents: %zu, Segments: %zu, BytesReclaimed: %zuM)",
            arena->Rank,
            extentsTraversed,
            segmentsTraversed,
            bytesReclaimed / 1_MB);
    }

    void ReinstallLockedBlobs()
    {
        for (auto& arena : Arenas_) {
            ReinstallLockedSpareBlobs(&arena);
            ReinstallLockedFreedBlobs(&arena);
        }
    }

    void ReclaimMemory()
    {
        auto arenaCounters = StatisticsManager->GetLargeArenaAllocationCounters();
        ssize_t bytesToReclaim = GetBytesToReclaim(arenaCounters);
        if (bytesToReclaim == 0) {
            return;
        }

        YTALLOC_LOG_DEBUG("Memory reclaim started (BytesToReclaim: %zdM)",
            bytesToReclaim / 1_MB);

        std::array<ssize_t, LargeRankCount * 2> bytesReclaimablePerArena;
        for (size_t rank = 0; rank < LargeRankCount; ++rank) {
            bytesReclaimablePerArena[rank * 2] = arenaCounters[rank][ELargeArenaCounter::BytesOverhead];
            bytesReclaimablePerArena[rank * 2 + 1] = arenaCounters[rank][ELargeArenaCounter::BytesSpare];
        }

        std::array<ssize_t, LargeRankCount * 2> bytesToReclaimPerArena{};
        while (bytesToReclaim > 0) {
            ssize_t maxBytes = std::numeric_limits<ssize_t>::min();
            int maxIndex = -1;
            for (int index = 0; index < LargeRankCount * 2; ++index) {
                if (bytesReclaimablePerArena[index] > maxBytes) {
                    maxBytes = bytesReclaimablePerArena[index];
                    maxIndex = index;
                }
            }

            if (maxIndex < 0) {
                break;
            }

            auto bytesToReclaimPerStep = std::min<ssize_t>({bytesToReclaim, maxBytes, 4_MB});
            if (bytesToReclaimPerStep < 0) {
                break;
            }

            bytesToReclaimPerArena[maxIndex] += bytesToReclaimPerStep;
            bytesReclaimablePerArena[maxIndex] -= bytesToReclaimPerStep;
            bytesToReclaim -= bytesToReclaimPerStep;
        }

        for (auto& arena : Arenas_) {
            auto rank = arena.Rank;
            ReclaimOverheadMemory(&arena, bytesToReclaimPerArena[rank * 2]);
            ReclaimSpareMemory(&arena, bytesToReclaimPerArena[rank * 2 + 1]);
        }

        YTALLOC_LOG_DEBUG("Memory reclaim finished");
    }

    template <class TState>
    void AllocateArenaExtent(TState* state, TLargeArena* arena)
    {
        auto rank = arena->Rank;
        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::ExtentsAllocated, 1);

        size_t segmentCount = LargeExtentSize / arena->SegmentSize;
        size_t extentHeaderSize = AlignUp(sizeof (TLargeBlobExtent) + sizeof (TLargeBlobExtent::DisposedFlags[0]) * segmentCount, PageSize);
        size_t allocationSize = extentHeaderSize + LargeExtentSize;

        auto* ptr = ZoneAllocator_.Allocate(allocationSize, MAP_NORESERVE);
        if (!Dumpable) {
            MappedMemoryManager->DontDump(ptr, allocationSize);
        }

        if (auto backtraceProvider = BacktraceProvider_.load()) {
            std::array<void*, MaxAllocationProfilingBacktraceDepth> frames;
            auto frameCount = backtraceProvider(
                frames.data(),
                MaxAllocationProfilingBacktraceDepth,
                3);
            MmapObservationManager->EnqueueEvent(allocationSize, frames, frameCount);
        }

        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesMapped, allocationSize);
        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::PagesMapped, allocationSize / PageSize);

        auto* extent = static_cast<TLargeBlobExtent*>(ptr);
        MappedMemoryManager->Populate(ptr, extentHeaderSize);
        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesPopulated, extentHeaderSize);
        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::PagesPopulated, extentHeaderSize / PageSize);
        StatisticsManager->IncrementSystemCounter(ESystemCounter::BytesAllocated, extentHeaderSize);

        new (extent) TLargeBlobExtent(segmentCount, static_cast<char*>(ptr) + extentHeaderSize);

        for (size_t index = 0; index < segmentCount; ++index) {
            auto* disposedSegment = DisposedSegmentPool_.Allocate();
            disposedSegment->Index = index;
            disposedSegment->Extent = extent;
            arena->DisposedSegments.Put(disposedSegment);
            extent->DisposedFlags[index].store(true);
        }

        auto* expectedFirstExtent = arena->FirstExtent.load();
        do {
            extent->NextExtent = expectedFirstExtent;
        } while (Y_UNLIKELY(!arena->FirstExtent.compare_exchange_weak(expectedFirstExtent, extent)));
    }

    template <class TState>
    void* DoAllocate(TState* state, size_t size)
    {
        auto rawSize = GetRawBlobSize<TLargeBlobHeader>(size);
        auto rank = GetLargeRank(rawSize);
        auto tag = ConfigurationManager->IsLargeArenaAllocationProfiled(rank)
            ? BacktraceManager->GetMemoryTagFromBacktrace(3)
            : TThreadManager::GetCurrentMemoryTag();
        auto& arena = Arenas_[rank];
        YTALLOC_PARANOID_ASSERT(rawSize <= arena.SegmentSize);

        TLargeBlobHeader* blob;
        while (true) {
            blob = arena.SpareBlobs.Extract(state);
            if (blob) {
                AssertBlobState(blob, ELargeBlobState::Spare);
                if (TryLockBlob(blob)) {
                    StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesSpare, -blob->BytesAcquired);
                    if (blob->BytesAcquired < rawSize) {
                        PopulateArenaPages(
                            state,
                            &arena,
                            reinterpret_cast<char*>(blob) + blob->BytesAcquired,
                            rawSize - blob->BytesAcquired);
                        blob->BytesAcquired = rawSize;
                    } else {
                        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesOverhead, blob->BytesAcquired - rawSize);
                    }
                    YTALLOC_PARANOID_ASSERT(blob->BytesAllocated == 0);
                    blob->BytesAllocated = size;
                    blob->Tag = tag;
                    blob->State = ELargeBlobState::Allocated;
                    UnlockBlob(blob);
                    break;
                } else {
                    blob->State = ELargeBlobState::LockedSpare;
                    arena.LockedSpareBlobs.Put(blob);
                }
            }

            auto* disposedSegment = arena.DisposedSegments.Extract();
            if (disposedSegment) {
                auto index = disposedSegment->Index;
                auto* extent = disposedSegment->Extent;
                DisposedSegmentPool_.Free(disposedSegment);

                auto* ptr = extent->Ptr + index * arena.SegmentSize;
                PopulateArenaPages(
                    state,
                    &arena,
                    ptr,
                    rawSize);

                blob = reinterpret_cast<TLargeBlobHeader*>(ptr);
                new (blob) TLargeBlobHeader(extent, rawSize, size, tag);

                extent->DisposedFlags[index].store(false, std::memory_order_release);

                break;
            }

            AllocateArenaExtent(state, &arena);
        }

        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BlobsAllocated, 1);
        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesAllocated, size);
        StatisticsManager->IncrementTotalCounter(state, tag, EBasicCounter::BytesAllocated, size);
        if (!Dumpable) {
            StatisticsManager->IncrementUndumpableCounter(state, EUndumpableCounter::BytesAllocated, size);
        }

        auto* result = HeaderToPtr(blob);
        YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= LargeZoneStart(Dumpable) && reinterpret_cast<uintptr_t>(result) < LargeZoneEnd(Dumpable));
        PoisonUninitializedRange(result, size);
        return result;
    }

    template <class TState>
    void DoFree(TState* state, void* ptr)
    {
        YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= LargeZoneStart(Dumpable) && reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(Dumpable));

        auto* blob = PtrToHeader<TLargeBlobHeader>(ptr);
        AssertBlobState(blob, ELargeBlobState::Allocated);

        auto size = blob->BytesAllocated;
        PoisonFreedRange(ptr, size);

        auto rawSize = GetRawBlobSize<TLargeBlobHeader>(size);
        auto rank = GetLargeRank(rawSize);
        auto& arena = Arenas_[rank];
        YTALLOC_PARANOID_ASSERT(blob->BytesAcquired <= arena.SegmentSize);

        auto tag = blob->Tag;

        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BlobsFreed, 1);
        StatisticsManager->IncrementLargeArenaCounter(state, rank, ELargeArenaCounter::BytesFreed, size);
        StatisticsManager->IncrementTotalCounter(state, tag, EBasicCounter::BytesFreed, size);
        if (!Dumpable) {
            StatisticsManager->IncrementUndumpableCounter(state, EUndumpableCounter::BytesFreed, size);
        }

        if (TryLockBlob(blob)) {
            MoveBlobToSpare(state, &arena, blob, true);
        } else {
            blob->State = ELargeBlobState::LockedFreed;
            arena.LockedFreedBlobs.Put(blob);
        }
    }

private:
    TZoneAllocator ZoneAllocator_;
    std::array<TLargeArena, LargeRankCount> Arenas_;

    static constexpr size_t DisposedSegmentsBatchSize = 1024;
    TSystemPool<TDisposedSegment, DisposedSegmentsBatchSize> DisposedSegmentPool_;

    std::atomic<TBacktraceProvider> BacktraceProvider_ = nullptr;
};

TExplicitlyConstructableSingleton<TLargeBlobAllocator<true>> DumpableLargeBlobAllocator;
TExplicitlyConstructableSingleton<TLargeBlobAllocator<false>> UndumpableLargeBlobAllocator;

////////////////////////////////////////////////////////////////////////////////
// Huge blob allocator
//
// Basically a wrapper for TZoneAllocator.

// Acts as a signature to detect broken headers.
enum class EHugeBlobState : ui64
{
    Allocated = 0x72666c656772616cULL // hugeallc
};

// Every huge blob (both tagged or not) is prepended with this header.
struct THugeBlobHeader
{
    THugeBlobHeader(TMemoryTag tag, size_t size, bool dumpable)
        : Tag(tag)
        , Size(size)
        , State(EHugeBlobState::Allocated)
        , Dumpable(dumpable)
    { }

    TMemoryTag Tag;
    size_t Size;
    EHugeBlobState State;
    bool Dumpable;
    char Padding[7];
};

CHECK_HEADER_ALIGNMENT(THugeBlobHeader)

class THugeBlobAllocator
{
public:
    THugeBlobAllocator()
        : ZoneAllocator_(HugeZoneStart, HugeZoneEnd)
    { }

    void* Allocate(size_t size, bool dumpable)
    {
        YTALLOC_VERIFY(size <= MaxAllocationSize);
        auto tag = TThreadManager::GetCurrentMemoryTag();
        auto rawSize = GetRawBlobSize<THugeBlobHeader>(size);
        auto* blob = static_cast<THugeBlobHeader*>(ZoneAllocator_.Allocate(rawSize, MAP_POPULATE));
        if (!dumpable) {
            MappedMemoryManager->DontDump(blob, rawSize);
        }
        new (blob) THugeBlobHeader(tag, size, dumpable);

        StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesAllocated, size);
        StatisticsManager->IncrementHugeCounter(EHugeCounter::BlobsAllocated, 1);
        StatisticsManager->IncrementHugeCounter(EHugeCounter::BytesAllocated, size);
        if (!dumpable) {
            StatisticsManager->IncrementHugeUndumpableCounter(EUndumpableCounter::BytesAllocated, size);
        }

        auto* result = HeaderToPtr(blob);
        PoisonUninitializedRange(result, size);
        return result;
    }

    void Free(void* ptr)
    {
        auto* blob = PtrToHeader<THugeBlobHeader>(ptr);
        AssertBlobState(blob, EHugeBlobState::Allocated);
        auto tag = blob->Tag;
        auto size = blob->Size;
        auto dumpable = blob->Dumpable;
        PoisonFreedRange(ptr, size);

        auto rawSize = GetRawBlobSize<THugeBlobHeader>(size);
        ZoneAllocator_.Free(blob, rawSize);

        StatisticsManager->IncrementTotalCounter(tag, EBasicCounter::BytesFreed, size);
        StatisticsManager->IncrementHugeCounter(EHugeCounter::BlobsFreed, 1);
        StatisticsManager->IncrementHugeCounter(EHugeCounter::BytesFreed, size);
        if (!dumpable) {
            StatisticsManager->IncrementHugeUndumpableCounter(EUndumpableCounter::BytesFreed, size);
        }
    }

    static size_t GetAllocationSize(const void* ptr)
    {
        UnalignPtr<THugeBlobHeader>(ptr);
        const auto* blob = PtrToHeader<THugeBlobHeader>(ptr);
        return blob->Size;
    }

    static size_t GetAllocationSize(size_t size)
    {
        return GetBlobAllocationSize<THugeBlobHeader>(size);
    }

private:
    TZoneAllocator ZoneAllocator_;
};

TExplicitlyConstructableSingleton<THugeBlobAllocator> HugeBlobAllocator;

////////////////////////////////////////////////////////////////////////////////
// A thunk to large and huge blob allocators

class TBlobAllocator
{
public:
    static void* Allocate(size_t size)
    {
        InitializeGlobals();
        bool dumpable = GetCurrentMemoryZone() != EMemoryZone::Undumpable;
        // NB: Account for the header. Also note that we may safely ignore the alignment since
        // HugeAllocationSizeThreshold is already page-aligned.
        if (Y_LIKELY(size < HugeAllocationSizeThreshold - sizeof(TLargeBlobHeader) - RightReadableAreaSize)) {
            void* result = dumpable
                ? DumpableLargeBlobAllocator->Allocate(size)
                : UndumpableLargeBlobAllocator->Allocate(size);
            YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= LargeZoneStart(dumpable) && reinterpret_cast<uintptr_t>(result) < LargeZoneEnd(dumpable));
            return result;
        } else {
            auto* result = HugeBlobAllocator->Allocate(size, dumpable);
            YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= HugeZoneStart && reinterpret_cast<uintptr_t>(result) < HugeZoneEnd);
            return result;
        }
    }

    static void Free(void* ptr)
    {
        InitializeGlobals();
        if (reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(true)) {
            YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= LargeZoneStart(true) && reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(true));
            UnalignPtr<TLargeBlobHeader>(ptr);
            DumpableLargeBlobAllocator->Free(ptr);
        } else if (reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(false)) {
            YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= LargeZoneStart(false) && reinterpret_cast<uintptr_t>(ptr) < LargeZoneEnd(false));
            UnalignPtr<TLargeBlobHeader>(ptr);
            UndumpableLargeBlobAllocator->Free(ptr);
        } else if (reinterpret_cast<uintptr_t>(ptr) < HugeZoneEnd) {
            YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= HugeZoneStart && reinterpret_cast<uintptr_t>(ptr) < HugeZoneEnd);
            UnalignPtr<THugeBlobHeader>(ptr);
            HugeBlobAllocator->Free(ptr);
        } else {
            YTALLOC_TRAP("Wrong ptr passed to Free");
        }
    }
};

////////////////////////////////////////////////////////////////////////////////

Y_POD_THREAD(bool) CurrentThreadIsBackground;

// Base class for all background threads.
template <class T>
class TBackgroundThreadBase
{
public:
    TBackgroundThreadBase()
        : State_(new TState())
    {
        NThreading::RegisterAtForkHandlers(
            [=] { BeforeFork(); },
            [=] { AfterForkParent(); },
            [=] { AfterForkChild(); });
    }

    virtual ~TBackgroundThreadBase()
    {
        Stop();
    }

private:
    struct TState
        : public TSystemAllocatable
    {
        std::mutex StartStopMutex;
        std::optional<std::thread> Thread;

        std::mutex StopFlagMutex;
        std::condition_variable StopFlagVariable;
        std::chrono::system_clock::time_point LastInvocationTime;
        bool StopFlag = false;
        bool Paused = false;

        std::atomic<int> ForkDepth = 0;
        bool RestartAfterFork = false;
    };

    TState* State_;

private:
    void BeforeFork()
    {
        bool stopped = Stop();
        if (State_->ForkDepth++ == 0) {
            State_->RestartAfterFork = stopped;
        }
    }

    void AfterForkParent()
    {
        if (--State_->ForkDepth == 0) {
            if (State_->RestartAfterFork) {
                Start(false);
            }
        }
    }

    void AfterForkChild()
    {
        bool restart = State_->RestartAfterFork;
        State_ = new TState();
        if (restart) {
            Start(false);
        }
    }

    virtual void ThreadMain() = 0;

protected:
    void Start(bool fromAlloc)
    {
        std::unique_lock<std::mutex> guard(State_->StartStopMutex, std::defer_lock);
        if (fromAlloc) {
            if (!guard.try_lock()) {
                return;
            }

            if (State_->Paused) {
                return;
            }
        } else {
            guard.lock();
        }

        State_->Paused = false;
        if (State_->Thread) {
            return;
        }

        State_->StopFlag = false;

        State_->Thread.emplace([=] {
            CurrentThreadIsBackground = true;
            ThreadMain();
        });

        OnStart();
    }

    bool Stop()
    {
        std::unique_lock<std::mutex> guard(State_->StartStopMutex);

        State_->Paused = true;
        if (!State_->Thread) {
            return false;
        }

        std::unique_lock<std::mutex> flagGuard(State_->StopFlagMutex);
        State_->StopFlag = true;
        flagGuard.unlock();
        State_->StopFlagVariable.notify_one();

        State_->Thread->join();
        State_->Thread.reset();

        OnStop();

        return true;
    }

    bool IsDone(TDuration interval)
    {
        std::unique_lock<std::mutex> flagGuard(State_->StopFlagMutex);
        auto result = State_->StopFlagVariable.wait_until(
            flagGuard,
            State_->LastInvocationTime + std::chrono::microseconds(interval.MicroSeconds()),
            [&] { return State_->StopFlag; });
        State_->LastInvocationTime = std::chrono::system_clock::now();
        return result;
    }

    virtual void OnStart()
    { }

    virtual void OnStop()
    { }
};

////////////////////////////////////////////////////////////////////////////////

// Invokes madvise(MADV_STOCKPILE) periodically.
class TStockpileThread
    : public TBackgroundThreadBase<TStockpileThread>
{
public:
    explicit TStockpileThread(int index)
        : Index_(index)
    {
        Start(false);
    }

private:
    const int Index_;

    virtual void ThreadMain() override
    {
        TThread::SetCurrentThreadName(Sprintf("%s:%d", StockpileThreadName, Index_).c_str());

        while (!IsDone(ConfigurationManager->GetStockpileInterval())) {
            if (!MappedMemoryManager->Stockpile(ConfigurationManager->GetStockpileSize())) {
                // No use to proceed.
                YTALLOC_LOG_INFO("Stockpile call failed; terminating stockpile thread");
                break;
            }
        }
    }
};

// Manages a bunch of TStockpileThreads.
class TStockpileManager
{
public:
    void SpawnIfNeeded()
    {
        if (!ConfigurationManager->IsStockpileEnabled()) {
            return;
        }

        int threadCount = ConfigurationManager->GetStockpileThreadCount();
        while (static_cast<int>(Threads_.size()) > threadCount) {
            Threads_.pop_back();
        }
        while (static_cast<int>(Threads_.size()) < threadCount) {
            Threads_.push_back(std::make_unique<TStockpileThread>(static_cast<int>(Threads_.size())));
        }
    }

private:
    std::vector<std::unique_ptr<TStockpileThread>> Threads_;
};

TExplicitlyConstructableSingleton<TStockpileManager> StockpileManager;

////////////////////////////////////////////////////////////////////////////////

// Time to wait before re-spawning the thread after a fork.
static constexpr auto BackgroundThreadRespawnDelay = TDuration::Seconds(3);

// Runs basic background activities: reclaim, logging, profiling etc.
class TBackgroundThread
    : public TBackgroundThreadBase<TBackgroundThread>
{
public:
    bool IsStarted()
    {
        return Started_.load();
    }

    void SpawnIfNeeded()
    {
        if (CurrentThreadIsBackground) {
            return;
        }
        Start(true);
    }

private:
    std::atomic<bool> Started_ = false;

private:
    virtual void ThreadMain() override
    {
        TThread::SetCurrentThreadName(BackgroundThreadName);
        TimingManager->DisableForCurrentThread();
        MmapObservationManager->DisableForCurrentThread();

        while (!IsDone(BackgroundInterval)) {
            DumpableLargeBlobAllocator->RunBackgroundTasks();
            UndumpableLargeBlobAllocator->RunBackgroundTasks();
            MappedMemoryManager->RunBackgroundTasks();
            TimingManager->RunBackgroundTasks();
            MmapObservationManager->RunBackgroundTasks();
            StockpileManager->SpawnIfNeeded();
        }
    }

    virtual void OnStart() override
    {
        DoUpdateAllThreadsControlWord(true);
    }

    virtual void OnStop() override
    {
        DoUpdateAllThreadsControlWord(false);
    }

    void DoUpdateAllThreadsControlWord(bool started)
    {
        // Update threads' TLS.
        ThreadManager->EnumerateThreadStatesSync(
            [&] {
                Started_.store(started);
            },
            [&] (auto* state) {
                if (state->BackgroundThreadStarted) {
                    *state->BackgroundThreadStarted = started;
                }
            });
    }
};

TExplicitlyConstructableSingleton<TBackgroundThread> BackgroundThread;

////////////////////////////////////////////////////////////////////////////////

Y_FORCE_INLINE TThreadState* TThreadManager::GetThreadStateUnchecked()
{
    YTALLOC_PARANOID_ASSERT(ThreadState_);
    return ThreadState_;
}

Y_FORCE_INLINE TThreadState* TThreadManager::FindThreadState()
{
    if (Y_LIKELY(ThreadState_)) {
        return ThreadState_;
    }

    if (ThreadStateDestroyed_) {
        return nullptr;
    }

    InitializeGlobals();

    // InitializeGlobals must not allocate.
    Y_VERIFY(!ThreadState_);
    ThreadState_ = ThreadManager->AllocateThreadState();
    (&ThreadControlWord_)->Parts.ThreadStateValid = true;

    return ThreadState_;
}

void TThreadManager::DestroyThread(void*)
{
    TSmallAllocator::PurgeCaches();

    TThreadState* state = ThreadState_;
    ThreadState_ = nullptr;
    ThreadStateDestroyed_ = true;
    (&ThreadControlWord_)->Parts.ThreadStateValid = false;

    {
        auto guard = GuardWithTiming(ThreadManager->ThreadRegistryLock_);
        state->AllocationProfilingEnabled = nullptr;
        state->BackgroundThreadStarted = nullptr;
        ThreadManager->UnrefThreadState(state);
    }
}

void TThreadManager::DestroyThreadState(TThreadState* state)
{
    StatisticsManager->AccumulateLocalCounters(state);
    ThreadRegistry_.Remove(state);
    ThreadStatePool_.Free(state);
}

void TThreadManager::AfterFork()
{
    auto guard = GuardWithTiming(ThreadRegistryLock_);
    ThreadRegistry_.Clear();
    TThreadState* state = ThreadState_;
    if (state) {
        ThreadRegistry_.PushBack(state);
    }
}

TThreadState* TThreadManager::AllocateThreadState()
{
    auto* state = ThreadStatePool_.Allocate();
    state->AllocationProfilingEnabled = &(*&ThreadControlWord_).Parts.AllocationProfilingEnabled;
    state->BackgroundThreadStarted = &(*&ThreadControlWord_).Parts.BackgroundThreadStarted;

    {
        auto guard = GuardWithTiming(ThreadRegistryLock_);
        // NB: These flags must be initialized under ThreadRegistryLock_; see EnumerateThreadStatesSync.
        *state->AllocationProfilingEnabled = ConfigurationManager->IsAllocationProfilingEnabled();
        *state->BackgroundThreadStarted = BackgroundThread->IsStarted();
        ThreadRegistry_.PushBack(state);
    }

    // Need to pass some non-null value for DestroyThread to be called.
    pthread_setspecific(ThreadDtorKey_, (void*)-1);

    return state;
}

////////////////////////////////////////////////////////////////////////////////

void InitializeGlobals()
{
    static std::once_flag Initialized;
    std::call_once(Initialized, [] () {
        LogManager.Construct();
        BacktraceManager.Construct();
        StatisticsManager.Construct();
        MappedMemoryManager.Construct();
        ThreadManager.Construct();
        GlobalState.Construct();
        DumpableLargeBlobAllocator.Construct();
        UndumpableLargeBlobAllocator.Construct();
        HugeBlobAllocator.Construct();
        ConfigurationManager.Construct();
        SystemAllocator.Construct();
        TimingManager.Construct();
        MmapObservationManager.Construct();
        StockpileManager.Construct();
        BackgroundThread.Construct();

        SmallArenaAllocators.Construct();
        auto constructSmallArenaAllocators = [&] (EAllocationKind kind, uintptr_t zonesStart) {
            for (size_t rank = 1; rank < SmallRankCount; ++rank) {
                (*SmallArenaAllocators)[kind][rank].Construct(kind, rank, zonesStart + rank * SmallZoneSize);
            }
        };
        constructSmallArenaAllocators(EAllocationKind::Untagged, UntaggedSmallZonesStart);
        constructSmallArenaAllocators(EAllocationKind::Tagged, TaggedSmallZonesStart);

        GlobalSmallChunkCaches.Construct();
        (*GlobalSmallChunkCaches)[EAllocationKind::Tagged].Construct(EAllocationKind::Tagged);
        (*GlobalSmallChunkCaches)[EAllocationKind::Untagged].Construct(EAllocationKind::Untagged);
    });
}

////////////////////////////////////////////////////////////////////////////////

void StartBackgroundThread()
{
    InitializeGlobals();
    BackgroundThread->SpawnIfNeeded();
}

////////////////////////////////////////////////////////////////////////////////

template <class... Ts>
Y_FORCE_INLINE void* AllocateSmallUntagged(size_t rank, Ts... args)
{
    auto* result = TSmallAllocator::Allocate<EAllocationKind::Untagged>(NullMemoryTag, rank, std::forward<Ts>(args)...);
    YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= MinUntaggedSmallPtr && reinterpret_cast<uintptr_t>(result) < MaxUntaggedSmallPtr);
    return result;
}

template <class... Ts>
Y_FORCE_INLINE void* AllocateSmallTagged(ui64 controlWord, size_t rank, Ts... args)
{
    auto tag = Y_UNLIKELY((controlWord & TThreadManager::AllocationProfilingEnabledControlWordMask) && ConfigurationManager->IsSmallArenaAllocationProfiled(rank))
        ? BacktraceManager->GetMemoryTagFromBacktrace(2)
        : static_cast<TMemoryTag>(controlWord & TThreadManager::MemoryTagControlWordMask);
    auto* result = TSmallAllocator::Allocate<EAllocationKind::Tagged>(tag, rank, std::forward<Ts>(args)...);
    YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(result) >= MinTaggedSmallPtr && reinterpret_cast<uintptr_t>(result) < MaxTaggedSmallPtr);
    return result;
}

Y_FORCE_INLINE void* AllocateInline(size_t size)
{
    size_t rank;
    if (Y_LIKELY(size <= 512)) {
        rank = SizeToSmallRank1[(size + 7) >> 3];
    } else if (Y_LIKELY(size < LargeAllocationSizeThreshold)) {
        rank = SizeToSmallRank2[(size - 1) >> 8];
    } else {
        StartBackgroundThread();
        return TBlobAllocator::Allocate(size);
    }

    auto controlWord = TThreadManager::GetThreadControlWord();
    if (Y_LIKELY(controlWord == TThreadManager::FastPathControlWord)) {
        return AllocateSmallUntagged(rank, TThreadManager::GetThreadStateUnchecked());
    }

    if (Y_UNLIKELY(!(controlWord & TThreadManager::BackgroundThreadStartedControlWorkMask))) {
        StartBackgroundThread();
    }

    if (!(controlWord & (TThreadManager::MemoryTagControlWordMask | TThreadManager::AllocationProfilingEnabledControlWordMask))) {
        return AllocateSmallUntagged(rank);
    } else {
        return AllocateSmallTagged(controlWord, rank);
    }
}

Y_FORCE_INLINE void* AllocateSmallInline(size_t rank)
{
    auto controlWord = TThreadManager::GetThreadControlWord();
    if (Y_LIKELY(controlWord == TThreadManager::FastPathControlWord)) {
        return AllocateSmallUntagged(rank, TThreadManager::GetThreadStateUnchecked());
    }

    if (!(controlWord & (TThreadManager::MemoryTagControlWordMask | TThreadManager::AllocationProfilingEnabledControlWordMask))) {
        return AllocateSmallUntagged(rank);
    } else {
        return AllocateSmallTagged(controlWord, rank);
    }
}

Y_FORCE_INLINE void* AllocatePageAlignedInline(size_t size)
{
    size = std::max(AlignUp(size, PageSize), PageSize);
    void* result = size >= LargeAllocationSizeThreshold
        ? AlignUp(TBlobAllocator::Allocate(size + PageSize), PageSize)
        : Allocate(size);
    YTALLOC_ASSERT(reinterpret_cast<uintptr_t>(result) % PageSize == 0);
    return result;
}

Y_FORCE_INLINE void FreeNonNullInline(void* ptr)
{
    YTALLOC_ASSERT(ptr);
    if (Y_LIKELY(reinterpret_cast<uintptr_t>(ptr) < UntaggedSmallZonesEnd)) {
        YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= MinUntaggedSmallPtr && reinterpret_cast<uintptr_t>(ptr) < MaxUntaggedSmallPtr);
        TSmallAllocator::Free<EAllocationKind::Untagged>(ptr);
    } else if (Y_LIKELY(reinterpret_cast<uintptr_t>(ptr) < TaggedSmallZonesEnd)) {
        YTALLOC_PARANOID_ASSERT(reinterpret_cast<uintptr_t>(ptr) >= MinTaggedSmallPtr && reinterpret_cast<uintptr_t>(ptr) < MaxTaggedSmallPtr);
        TSmallAllocator::Free<EAllocationKind::Tagged>(ptr);
    } else {
        TBlobAllocator::Free(ptr);
    }
}

Y_FORCE_INLINE void FreeInline(void* ptr)
{
    if (Y_LIKELY(ptr)) {
        FreeNonNullInline(ptr);
    }
}

Y_FORCE_INLINE size_t GetAllocationSizeInline(const void* ptr)
{
    if (Y_UNLIKELY(!ptr)) {
        return 0;
    }

    auto uintptr = reinterpret_cast<uintptr_t>(ptr);
    if (uintptr < UntaggedSmallZonesEnd) {
        YTALLOC_PARANOID_ASSERT(uintptr >= MinUntaggedSmallPtr && uintptr < MaxUntaggedSmallPtr);
        return TSmallAllocator::GetAllocationSize(ptr);
    } else if (uintptr < TaggedSmallZonesEnd) {
        YTALLOC_PARANOID_ASSERT(uintptr >= MinTaggedSmallPtr && uintptr < MaxTaggedSmallPtr);
        return TSmallAllocator::GetAllocationSize(ptr);
    } else if (uintptr < LargeZoneEnd(true)) {
        YTALLOC_PARANOID_ASSERT(uintptr >= LargeZoneStart(true) && uintptr < LargeZoneEnd(true));
        return TLargeBlobAllocator<true>::GetAllocationSize(ptr);
    } else if (uintptr < LargeZoneEnd(false)) {
        YTALLOC_PARANOID_ASSERT(uintptr >= LargeZoneStart(false) && uintptr < LargeZoneEnd(false));
        return TLargeBlobAllocator<false>::GetAllocationSize(ptr);
    } else if (uintptr < HugeZoneEnd) {
        YTALLOC_PARANOID_ASSERT(uintptr >= HugeZoneStart && uintptr < HugeZoneEnd);
        return THugeBlobAllocator::GetAllocationSize(ptr);
    } else {
        YTALLOC_TRAP("Wrong ptr passed to GetAllocationSizeInline");
    }
}

Y_FORCE_INLINE size_t GetAllocationSizeInline(size_t size)
{
    if (size <= LargeAllocationSizeThreshold) {
        return TSmallAllocator::GetAllocationSize(size);
    } else if (size <= HugeAllocationSizeThreshold) {
        return TLargeBlobAllocator<true>::GetAllocationSize(size);
    } else {
        return THugeBlobAllocator::GetAllocationSize(size);
    }
}

void EnableLogging(TLogHandler logHandler)
{
    InitializeGlobals();
    LogManager->EnableLogging(logHandler);
}

void SetBacktraceProvider(TBacktraceProvider provider)
{
    InitializeGlobals();
    BacktraceManager->SetBacktraceProvider(provider);
    DumpableLargeBlobAllocator->SetBacktraceProvider(provider);
    UndumpableLargeBlobAllocator->SetBacktraceProvider(provider);
}

void SetBacktraceFormatter(TBacktraceFormatter provider)
{
    InitializeGlobals();
    MmapObservationManager->SetBacktraceFormatter(provider);
}

void EnableStockpile()
{
    InitializeGlobals();
    ConfigurationManager->EnableStockpile();
}

void SetStockpileInterval(TDuration value)
{
    InitializeGlobals();
    ConfigurationManager->SetStockpileInterval(value);
}

void SetStockpileThreadCount(int value)
{
    InitializeGlobals();
    ConfigurationManager->SetStockpileThreadCount(value);
}

void SetStockpileSize(size_t value)
{
    InitializeGlobals();
    ConfigurationManager->SetStockpileSize(value);
}

void SetLargeUnreclaimableCoeff(double value)
{
    InitializeGlobals();
    ConfigurationManager->SetLargeUnreclaimableCoeff(value);
}

void SetTimingEventThreshold(TDuration value)
{
    InitializeGlobals();
    ConfigurationManager->SetTimingEventThreshold(value);
}

void SetMinLargeUnreclaimableBytes(size_t value)
{
    InitializeGlobals();
    ConfigurationManager->SetMinLargeUnreclaimableBytes(value);
}

void SetMaxLargeUnreclaimableBytes(size_t value)
{
    InitializeGlobals();
    ConfigurationManager->SetMaxLargeUnreclaimableBytes(value);
}

void SetAllocationProfilingEnabled(bool value)
{
    ConfigurationManager->SetAllocationProfilingEnabled(value);
}

void SetAllocationProfilingSamplingRate(double rate)
{
    ConfigurationManager->SetAllocationProfilingSamplingRate(rate);
}

void SetSmallArenaAllocationProfilingEnabled(size_t rank, bool value)
{
    ConfigurationManager->SetSmallArenaAllocationProfilingEnabled(rank, value);
}

void SetLargeArenaAllocationProfilingEnabled(size_t rank, bool value)
{
    ConfigurationManager->SetLargeArenaAllocationProfilingEnabled(rank, value);
}

void SetProfilingBacktraceDepth(int depth)
{
    ConfigurationManager->SetProfilingBacktraceDepth(depth);
}

void SetMinProfilingBytesUsedToReport(size_t size)
{
    ConfigurationManager->SetMinProfilingBytesUsedToReport(size);
}

void SetEnableEagerMemoryRelease(bool value)
{
    ConfigurationManager->SetEnableEagerMemoryRelease(value);
}

void SetEnableMadvisePopulate(bool value)
{
    ConfigurationManager->SetEnableMadvisePopulate(value);
}

TEnumIndexedVector<ETotalCounter, ssize_t> GetTotalAllocationCounters()
{
    InitializeGlobals();
    return StatisticsManager->GetTotalAllocationCounters();
}

TEnumIndexedVector<ESystemCounter, ssize_t> GetSystemAllocationCounters()
{
    InitializeGlobals();
    return StatisticsManager->GetSystemAllocationCounters();
}

TEnumIndexedVector<ESystemCounter, ssize_t> GetUndumpableAllocationCounters()
{
    InitializeGlobals();
    return StatisticsManager->GetUndumpableAllocationCounters();
}

TEnumIndexedVector<ESmallCounter, ssize_t> GetSmallAllocationCounters()
{
    InitializeGlobals();
    return StatisticsManager->GetSmallAllocationCounters();
}

TEnumIndexedVector<ESmallCounter, ssize_t> GetLargeAllocationCounters()
{
    InitializeGlobals();
    return StatisticsManager->GetLargeAllocationCounters();
}

std::array<TEnumIndexedVector<ESmallArenaCounter, ssize_t>, SmallRankCount> GetSmallArenaAllocationCounters()
{
    InitializeGlobals();
    return StatisticsManager->GetSmallArenaAllocationCounters();
}

std::array<TEnumIndexedVector<ELargeArenaCounter, ssize_t>, LargeRankCount> GetLargeArenaAllocationCounters()
{
    InitializeGlobals();
    return StatisticsManager->GetLargeArenaAllocationCounters();
}

TEnumIndexedVector<EHugeCounter, ssize_t> GetHugeAllocationCounters()
{
    InitializeGlobals();
    return StatisticsManager->GetHugeAllocationCounters();
}

std::vector<TProfiledAllocation> GetProfiledAllocationStatistics()
{
    InitializeGlobals();

    if (!ConfigurationManager->IsAllocationProfilingEnabled()) {
        return {};
    }

    std::vector<TMemoryTag> tags;
    tags.reserve(MaxCapturedAllocationBacktraces + 1);
    for (TMemoryTag tag = AllocationProfilingMemoryTagBase;
        tag < AllocationProfilingMemoryTagBase + MaxCapturedAllocationBacktraces;
        ++tag)
    {
        tags.push_back(tag);
    }
    tags.push_back(AllocationProfilingUnknownMemoryTag);

    std::vector<TEnumIndexedVector<EBasicCounter, ssize_t>> counters;
    counters.resize(tags.size());
    StatisticsManager->GetTaggedMemoryCounters(tags.data(), tags.size(), counters.data());

    std::vector<TProfiledAllocation> statistics;
    for (size_t index = 0; index < tags.size(); ++index) {
        if (counters[index][EBasicCounter::BytesUsed] < static_cast<ssize_t>(ConfigurationManager->GetMinProfilingBytesUsedToReport())) {
            continue;
        }
        auto tag = tags[index];
        auto optionalBacktrace = BacktraceManager->FindBacktrace(tag);
        if (!optionalBacktrace && tag != AllocationProfilingUnknownMemoryTag) {
            continue;
        }
        statistics.push_back(TProfiledAllocation{
            optionalBacktrace.value_or(TBacktrace()),
            counters[index]
        });
    }
    return statistics;
}

TEnumIndexedVector<ETimingEventType, TTimingEventCounters> GetTimingEventCounters()
{
    InitializeGlobals();
    return TimingManager->GetTimingEventCounters();
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NYTAlloc