#pragma once
#include "aligned_page_pool.h"
#include "mkql_mem_info.h"
#include <yql/essentials/core/pg_settings/guc_settings.h>
#include <yql/essentials/parser/pg_wrapper/interface/context.h>
#include <yql/essentials/public/udf/udf_allocator.h>
#include <yql/essentials/public/udf/udf_value.h>
#include <util/string/builder.h>
#include <util/system/align.h>
#include <util/system/defaults.h>
#include <util/system/tls.h>
#include <new>
#include <unordered_map>
#include <atomic>
#include <memory>

namespace NKikimr {

namespace NMiniKQL {

const ui64 MKQL_ALIGNMENT = 16;

struct TAllocPageHeader {
    ui64 Capacity;
    ui64 Offset;
    ui64 UseCount;
    ui64 Deallocated;
    TAlignedPagePool* MyAlloc;
    TAllocPageHeader* Link;
};

using TMemorySubPoolIdx = ui32;
enum class EMemorySubPool: TMemorySubPoolIdx {
    Default = 0,
    Temporary = 1,

    Count
};

constexpr ui32 MaxPageUserData = TAlignedPagePool::POOL_PAGE_SIZE - sizeof(TAllocPageHeader);

static_assert(sizeof(TAllocPageHeader) % MKQL_ALIGNMENT == 0, "Incorrect size of header");

struct TAllocState : public TAlignedPagePool
{
    struct TListEntry {
        TListEntry *Left = nullptr;
        TListEntry *Right = nullptr;

        void Link(TListEntry* root) noexcept;
        void Unlink() noexcept;
        void InitLinks() noexcept { Left = Right = this; }
        void Clear() noexcept { Left = Right = nullptr; }
        bool IsUnlinked() const noexcept { return !Left && !Right; }
    };

#ifndef NDEBUG
    std::unordered_map<TMemoryUsageInfo*, TIntrusivePtr<TMemoryUsageInfo>> ActiveMemInfo;
#endif
    bool SupportsSizedAllocators = false;

    void* LargeAlloc(size_t size) {
#if defined(ALLOW_DEFAULT_ALLOCATOR)
        if (Y_UNLIKELY(IsDefaultAllocatorUsed())) {
            return malloc(size);
        }
#endif

        return Alloc(size);
    }

    void LargeFree(void* ptr, size_t size) noexcept {
#if defined(ALLOW_DEFAULT_ALLOCATOR)
        if (Y_UNLIKELY(IsDefaultAllocatorUsed())) {
            free(ptr);
            return;
        }
#endif

        Free(ptr, size);
    }

    using TCurrentPages = std::array<TAllocPageHeader*, (TMemorySubPoolIdx)EMemorySubPool::Count>;

    static TAllocPageHeader EmptyPageHeader;
    static TCurrentPages EmptyCurrentPages;

    std::array<TAllocPageHeader*, (TMemorySubPoolIdx)EMemorySubPool::Count> CurrentPages = EmptyCurrentPages;
    TListEntry OffloadedBlocksRoot;
    TListEntry GlobalPAllocList;
    TListEntry* CurrentPAllocList;
    TListEntry ArrowBlocksRoot;
    std::unordered_set<const void*> ArrowBuffers;
    bool EnableArrowTracking = true;

    void* MainContext = nullptr;
    void* CurrentContext = nullptr;

    struct TLockInfo {
         i32 OriginalRefs;
         i32 Locks;
    };

    bool UseRefLocking = false;
    std::unordered_map<void*, TLockInfo> LockedObjectsRefs;

    ::NKikimr::NUdf::TBoxedValueLink Root;

    NKikimr::NUdf::TBoxedValueLink* GetRoot() noexcept {
        return &Root;
    }

    explicit TAllocState(const TSourceLocation& location, const TAlignedPagePoolCounters& counters, bool supportsSizedAllocators);
    void KillAllBoxed();
    void InvalidateMemInfo();
    size_t GetDeallocatedInPages() const;
    static void CleanupPAllocList(TListEntry* root);
    static void CleanupArrowList(TListEntry* root);

    void LockObject(::NKikimr::NUdf::TUnboxedValuePod value);
    void UnlockObject(::NKikimr::NUdf::TUnboxedValuePod value);
};

extern Y_POD_THREAD(TAllocState*) TlsAllocState;

class TPAllocScope {
public:
    TPAllocScope() {
        PAllocList.InitLinks();
        Attach();
    }

    ~TPAllocScope() {
        Cleanup();
        Detach();
    }

    void Attach() {
        Y_ABORT_UNLESS(!Prev);
        Prev = TlsAllocState->CurrentPAllocList;
        Y_ABORT_UNLESS(Prev);
        TlsAllocState->CurrentPAllocList = &PAllocList;
    }

    void Detach() {
        if (Prev) {
           Y_ABORT_UNLESS(TlsAllocState->CurrentPAllocList == &PAllocList);
           TlsAllocState->CurrentPAllocList = Prev;
           Prev = nullptr;
        }
    }

    void Cleanup() {
        TAllocState::CleanupPAllocList(&PAllocList);
    }

private:
    TAllocState::TListEntry PAllocList;
    TAllocState::TListEntry* Prev = nullptr;
};

// TListEntry and IBoxedValue use the same place
static_assert(sizeof(NUdf::IBoxedValue) == sizeof(TAllocState::TListEntry));

class TBoxedValueWithFree : public NUdf::TBoxedValueBase {
public:
    void operator delete(void *mem) noexcept;
};

struct TMkqlPAllocHeader {
    union {
        TAllocState::TListEntry Entry;
        TBoxedValueWithFree Boxed;
    } U;

    size_t Size;
    ui64 Self; // should be placed right before pointer to allocated area, see GetMemoryChunkContext
};

static_assert(sizeof(TMkqlPAllocHeader) ==
    sizeof(size_t) +
    sizeof(TAllocState::TListEntry) +
    sizeof(void*), "Padding is not allowed");

constexpr size_t ArrowAlignment = 64;
struct TMkqlArrowHeader {
    TAllocState::TListEntry Entry;
    ui64 Size;
    char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64)];
};

static_assert(sizeof(TMkqlArrowHeader) == ArrowAlignment);

class TScopedAlloc {
public:
    explicit TScopedAlloc(const TSourceLocation& location,
            const TAlignedPagePoolCounters& counters = TAlignedPagePoolCounters(), bool supportsSizedAllocators = false, bool initiallyAcquired = true)
        : InitiallyAcquired_(initiallyAcquired)
        , MyState_(location, counters, supportsSizedAllocators)
    {
        MyState_.MainContext = PgInitializeMainContext();
        if (InitiallyAcquired_) {
            Acquire();
        }
    }

    ~TScopedAlloc()
    {
        if (!InitiallyAcquired_) {
            Acquire();
        }
        MyState_.KillAllBoxed();
        Release();
        PgDestroyMainContext(MyState_.MainContext);
    }

    TAllocState& Ref() {
        return MyState_;
    }

    void Acquire();
    void Release();

    size_t GetUsed() const { return MyState_.GetUsed(); }
    size_t GetPeakUsed() const { return MyState_.GetPeakUsed(); }
    size_t GetAllocated() const { return MyState_.GetAllocated(); }
    size_t GetPeakAllocated() const { return MyState_.GetPeakAllocated(); }

    size_t GetLimit() const { return MyState_.GetLimit(); }
    void SetLimit(size_t limit) { MyState_.SetLimit(limit); }
    void DisableStrictAllocationCheck() { MyState_.DisableStrictAllocationCheck(); }

    void ReleaseFreePages() { MyState_.ReleaseFreePages(); }
    void InvalidateMemInfo() { MyState_.InvalidateMemInfo(); }

    bool IsAttached() const { return AttachedCount_ > 0; }

    void SetGUCSettings(const TGUCSettings::TPtr& GUCSettings) {
        Acquire();
        PgSetGUCSettings(MyState_.MainContext, GUCSettings);
        Release();
    }

    void SetMaximumLimitValueReached(bool IsReached) {
        MyState_.SetMaximumLimitValueReached(IsReached);
    }

private:
    const bool InitiallyAcquired_;
    TAllocState MyState_;
    size_t AttachedCount_ = 0;
    TAllocState* PrevState_ = nullptr;
};

class TPagedArena {
public:
    TPagedArena(TAlignedPagePool* pagePool) noexcept
        : PagePool_(pagePool)
        , CurrentPages_(TAllocState::EmptyCurrentPages)
    {}

    TPagedArena(const TPagedArena&) = delete;
    TPagedArena(TPagedArena&& other) noexcept
        : PagePool_(other.PagePool_)
        , CurrentPages_(other.CurrentPages_)
    {
        other.CurrentPages_ = TAllocState::EmptyCurrentPages;
    }

    void operator=(const TPagedArena&) = delete;
    void operator=(TPagedArena&& other) noexcept {
        Clear();
        PagePool_ = other.PagePool_;
        CurrentPages_ = other.CurrentPages_;
        other.CurrentPages_ = TAllocState::EmptyCurrentPages;
    }

    ~TPagedArena() noexcept {
        Clear();
    }

    void* Alloc(size_t sz, const EMemorySubPool pagePool = EMemorySubPool::Default) {
        auto& currentPage = CurrentPages_[(TMemorySubPoolIdx)pagePool];
        if (Y_LIKELY(currentPage->Offset + sz <= currentPage->Capacity)) {
            void* ret = (char*)currentPage + currentPage->Offset;
            currentPage->Offset = AlignUp(currentPage->Offset + sz, MKQL_ALIGNMENT);
            return ret;
        }

        return AllocSlow(sz, pagePool);
    }

    void Clear() noexcept;

private:
    void* AllocSlow(const size_t sz, const EMemorySubPool pagePool);

private:
    TAlignedPagePool* PagePool_;
    TAllocState::TCurrentPages CurrentPages_ = TAllocState::EmptyCurrentPages;
};

void* MKQLAllocSlow(size_t sz, TAllocState* state, const EMemorySubPool mPool);

inline void* MKQLAllocFastDeprecated(size_t sz, TAllocState* state, const EMemorySubPool mPool) {
    Y_DEBUG_ABORT_UNLESS(state);

#if defined(ALLOW_DEFAULT_ALLOCATOR)
    if (Y_UNLIKELY(TAllocState::IsDefaultAllocatorUsed())) {
        auto ret = (TAllocState::TListEntry*)malloc(sizeof(TAllocState::TListEntry) + sz);
        if (!ret) {
            throw TMemoryLimitExceededException();
        }

        ret->Link(&state->OffloadedBlocksRoot);
        return ret + 1;
    }
#endif

    auto currPage = state->CurrentPages[(TMemorySubPoolIdx)mPool];
    if (Y_LIKELY(currPage->Offset + sz <= currPage->Capacity)) {
        void* ret = (char*)currPage + currPage->Offset;
        currPage->Offset = AlignUp(currPage->Offset + sz, MKQL_ALIGNMENT);
        ++currPage->UseCount;
        return ret;
    }

    return MKQLAllocSlow(sz, state, mPool);
}

inline void* MKQLAllocFastWithSize(size_t sz, TAllocState* state, const EMemorySubPool mPool) {
    Y_DEBUG_ABORT_UNLESS(state);

    bool useMalloc = state->SupportsSizedAllocators && sz > MaxPageUserData;
#if defined(ALLOW_DEFAULT_ALLOCATOR)
    useMalloc = useMalloc || TAllocState::IsDefaultAllocatorUsed();
#endif

    if (Y_UNLIKELY(useMalloc)) {
        state->OffloadAlloc(sizeof(TAllocState::TListEntry) + sz);
        auto ret = (TAllocState::TListEntry*)malloc(sizeof(TAllocState::TListEntry) + sz);
        if (!ret) {
            throw TMemoryLimitExceededException();
        }

        ret->Link(&state->OffloadedBlocksRoot);
        return ret + 1;
    }

    auto currPage = state->CurrentPages[(TMemorySubPoolIdx)mPool];
    if (Y_LIKELY(currPage->Offset + sz <= currPage->Capacity)) {
        void* ret = (char*)currPage + currPage->Offset;
        currPage->Offset = AlignUp(currPage->Offset + sz, MKQL_ALIGNMENT);
        ++currPage->UseCount;
        return ret;
    }

    return MKQLAllocSlow(sz, state, mPool);
}

void MKQLFreeSlow(TAllocPageHeader* header, TAllocState *state, const EMemorySubPool mPool) noexcept;

inline void MKQLFreeDeprecated(const void* mem, const EMemorySubPool mPool) noexcept {
    if (!mem) {
        return;
    }

#if defined(ALLOW_DEFAULT_ALLOCATOR)
    if (Y_UNLIKELY(TAllocState::IsDefaultAllocatorUsed())) {
        TAllocState *state = TlsAllocState;
        Y_DEBUG_ABORT_UNLESS(state);

        auto entry = (TAllocState::TListEntry*)(mem) - 1;
        entry->Unlink();
        free(entry);
        return;
    }
#endif

    TAllocPageHeader* header = (TAllocPageHeader*)TAllocState::GetPageStart(mem);
    Y_DEBUG_ABORT_UNLESS(header->MyAlloc == TlsAllocState, "%s", (TStringBuilder() << "wrong allocator was used; "
        "allocated with: " << header->MyAlloc->GetDebugInfo() << " freed with: " << TlsAllocState->GetDebugInfo()).data());
    if (Y_LIKELY(--header->UseCount != 0)) {
        return;
    }

    MKQLFreeSlow(header, TlsAllocState, mPool);
}

inline void MKQLFreeFastWithSize(const void* mem, size_t sz, TAllocState* state, const EMemorySubPool mPool) noexcept {
    if (!mem) {
        return;
    }

    Y_DEBUG_ABORT_UNLESS(state);

    bool useFree = state->SupportsSizedAllocators && sz > MaxPageUserData;
#if defined(ALLOW_DEFAULT_ALLOCATOR)
    useFree = useFree || TAllocState::IsDefaultAllocatorUsed();
#endif

    if (Y_UNLIKELY(useFree)) {
        auto entry = (TAllocState::TListEntry*)(mem) - 1;
        entry->Unlink();
        free(entry);
        state->OffloadFree(sizeof(TAllocState::TListEntry) + sz);
        return;
    }

    TAllocPageHeader* header = (TAllocPageHeader*)TAllocState::GetPageStart(mem);
    Y_DEBUG_ABORT_UNLESS(header->MyAlloc == state, "%s", (TStringBuilder() << "wrong allocator was used; "
        "allocated with: " << header->MyAlloc->GetDebugInfo() << " freed with: " << TlsAllocState->GetDebugInfo()).data());
    if (Y_LIKELY(--header->UseCount != 0)) {
        header->Deallocated += sz;
        return;
    }

    MKQLFreeSlow(header, state, mPool);
}

inline void* MKQLAllocDeprecated(size_t sz, const EMemorySubPool mPool) {
    return MKQLAllocFastDeprecated(sz, TlsAllocState, mPool);
}

inline void* MKQLAllocWithSize(size_t sz, const EMemorySubPool mPool) {
    return MKQLAllocFastWithSize(sz, TlsAllocState, mPool);
}

inline void MKQLFreeWithSize(const void* mem, size_t sz, const EMemorySubPool mPool) noexcept {
    MKQLFreeFastWithSize(mem, sz, TlsAllocState, mPool);
}

inline void MKQLRegisterObject(NUdf::TBoxedValue* value) noexcept {
    value->Link(TlsAllocState->GetRoot());
}

inline void MKQLUnregisterObject(NUdf::TBoxedValue* value) noexcept {
    value->Unlink();
}

void* MKQLArrowAllocate(ui64 size);
void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size);
void MKQLArrowFree(const void* mem, ui64 size);
void MKQLArrowUntrack(const void* mem);

template <const EMemorySubPool MemoryPoolExt = EMemorySubPool::Default>
struct TWithMiniKQLAlloc {
    static constexpr EMemorySubPool MemoryPool = MemoryPoolExt;

    static void FreeWithSize(const void* mem, const size_t sz) {
        NMiniKQL::MKQLFreeWithSize(mem, sz, MemoryPool);
    }

    static void* AllocWithSize(const size_t sz) {
        return NMiniKQL::MKQLAllocWithSize(sz, MemoryPool);
    }

    void* operator new(size_t sz) {
        return NMiniKQL::MKQLAllocWithSize(sz, MemoryPool);
    }

    void* operator new[](size_t sz) {
        return NMiniKQL::MKQLAllocWithSize(sz, MemoryPool);
    }

    void operator delete(void *mem, std::size_t sz) noexcept {
        NMiniKQL::MKQLFreeWithSize(mem, sz, MemoryPool);
    }

    void operator delete[](void *mem, std::size_t sz) noexcept {
        NMiniKQL::MKQLFreeWithSize(mem, sz, MemoryPool);
    }
};

template <typename T, typename... Args>
T* AllocateOn(TAllocState* state, Args&&... args)
{
    void* addr = MKQLAllocFastWithSize(sizeof(T), state, T::MemoryPool);
    return ::new(addr) T(std::forward<Args>(args)...);
    static_assert(std::is_base_of<TWithMiniKQLAlloc<T::MemoryPool>, T>::value, "Class must inherit TWithMiniKQLAlloc.");
}

template <typename Type, EMemorySubPool MemoryPool = EMemorySubPool::Default>
struct TMKQLAllocator
{
    typedef Type value_type;
    typedef Type* pointer;
    typedef const Type* const_pointer;
    typedef Type& reference;
    typedef const Type& const_reference;
    typedef size_t size_type;
    typedef ptrdiff_t difference_type;

    TMKQLAllocator() noexcept = default;
    ~TMKQLAllocator() noexcept = default;

    template<typename U> TMKQLAllocator(const TMKQLAllocator<U, MemoryPool>&) noexcept {}
    template<typename U> struct rebind { typedef TMKQLAllocator<U, MemoryPool> other; };
    template<typename U> bool operator==(const TMKQLAllocator<U, MemoryPool>&) const { return true; }
    template<typename U> bool operator!=(const TMKQLAllocator<U, MemoryPool>&) const { return false; }

    static pointer allocate(size_type n, const void* = nullptr)
    {
        return static_cast<pointer>(MKQLAllocWithSize(n * sizeof(value_type), MemoryPool));
    }

    static void deallocate(const_pointer p, size_type n) noexcept
    {
        MKQLFreeWithSize(p, n * sizeof(value_type), MemoryPool);
    }
};

using TWithDefaultMiniKQLAlloc = TWithMiniKQLAlloc<EMemorySubPool::Default>;
using TWithTemporaryMiniKQLAlloc = TWithMiniKQLAlloc<EMemorySubPool::Temporary>;

template <typename Type>
struct TMKQLHugeAllocator
{
    typedef Type value_type;
    typedef Type* pointer;
    typedef const Type* const_pointer;
    typedef Type& reference;
    typedef const Type& const_reference;
    typedef size_t size_type;
    typedef ptrdiff_t difference_type;

    TMKQLHugeAllocator() noexcept = default;
    ~TMKQLHugeAllocator() noexcept = default;

    template<typename U> TMKQLHugeAllocator(const TMKQLHugeAllocator<U>&) noexcept {}
    template<typename U> struct rebind { typedef TMKQLHugeAllocator<U> other; };
    template<typename U> bool operator==(const TMKQLHugeAllocator<U>&) const { return true; }
    template<typename U> bool operator!=(const TMKQLHugeAllocator<U>&) const { return false; }

    static pointer allocate(size_type n, const void* = nullptr)
    {
        size_t size = Max(n * sizeof(value_type), TAllocState::POOL_PAGE_SIZE);
        return static_cast<pointer>(TlsAllocState->GetBlock(size));
    }

    static void deallocate(const_pointer p, size_type n) noexcept
    {
        size_t size = Max(n * sizeof(value_type), TAllocState::POOL_PAGE_SIZE);
        TlsAllocState->ReturnBlock(const_cast<pointer>(p), size);
    }
};

template <typename T>
class TPagedList
{
public:
    static_assert(sizeof(T) <= TAlignedPagePool::POOL_PAGE_SIZE, "Too big object");
    static constexpr size_t OBJECTS_PER_PAGE = TAlignedPagePool::POOL_PAGE_SIZE / sizeof(T);

    class TIterator;
    class TConstIterator;

    TPagedList(TAlignedPagePool& pool)
        : Pool(pool)
        , IndexInLastPage(OBJECTS_PER_PAGE)
    {}

    TPagedList(const TPagedList&) = delete;
    TPagedList(TPagedList&&) = delete;

    ~TPagedList() {
        Clear();
    }

    void Add(T&& value) {
        if (IndexInLastPage < OBJECTS_PER_PAGE) {
            auto ptr = ObjectAt(Pages.back(), IndexInLastPage);
            new(ptr) T(std::move(value));
            ++IndexInLastPage;
            return;
        }

        auto ptr = Pool.GetPage();
        IndexInLastPage = 1;
        Pages.push_back(ptr);
        new(ptr) T(std::move(value));
    }

    void Clear() {
        for (ui32 i = 0; i + 1 < Pages.size(); ++i) {
            for (ui32 objIndex = 0; objIndex < OBJECTS_PER_PAGE; ++objIndex) {
                ObjectAt(Pages[i], objIndex)->~T();
            }

            Pool.ReturnPage(Pages[i]);
        }

        if (!Pages.empty()) {
            for (ui32 objIndex = 0; objIndex < IndexInLastPage; ++objIndex) {
                ObjectAt(Pages.back(), objIndex)->~T();
            }

            Pool.ReturnPage(Pages.back());
        }

        TPages().swap(Pages);
        IndexInLastPage = OBJECTS_PER_PAGE;
    }

    const T& operator[](size_t i) const {
        const auto table = i / OBJECTS_PER_PAGE;
        const auto index = i % OBJECTS_PER_PAGE;
        return *ObjectAt(Pages[table], index);
    }

    size_t Size() const {
        return Pages.empty() ? 0 : ((Pages.size() - 1) * OBJECTS_PER_PAGE + IndexInLastPage);
    }

    TConstIterator Begin() const {
        return TConstIterator(this, 0, 0);
    }

    TConstIterator begin() const {
        return Begin();
    }

    TConstIterator End() const {
        if (IndexInLastPage == OBJECTS_PER_PAGE) {
            return TConstIterator(this, Pages.size(), 0);
        }

        return TConstIterator(this, Pages.size() - 1, IndexInLastPage);
    }

    TConstIterator end() const {
        return End();
    }

    TIterator Begin() {
        return TIterator(this, 0, 0);
    }

    TIterator begin() {
        return Begin();
    }

    TIterator End() {
        if (IndexInLastPage == OBJECTS_PER_PAGE) {
            return TIterator(this, Pages.size(), 0);
        }

        return TIterator(this, Pages.size() - 1, IndexInLastPage);
    }

    TIterator end() {
        return End();
    }

    class TIterator
    {
    public:
        using TOwner = TPagedList<T>;

        TIterator()
            : Owner(nullptr)
            , PageNo(0)
            , PageIndex(0)
        {}

        TIterator(const TIterator&) = default;
        TIterator& operator=(const TIterator&) = default;

        TIterator(TOwner* owner, size_t pageNo, size_t pageIndex)
            : Owner(owner)
            , PageNo(pageNo)
            , PageIndex(pageIndex)
        {}

        T& operator*() {
            Y_DEBUG_ABORT_UNLESS(PageIndex < OBJECTS_PER_PAGE);
            Y_DEBUG_ABORT_UNLESS(PageNo < Owner->Pages.size());
            Y_DEBUG_ABORT_UNLESS(PageNo + 1 < Owner->Pages.size() || PageIndex < Owner->IndexInLastPage);
            return *Owner->ObjectAt(Owner->Pages[PageNo], PageIndex);
        }

        TIterator& operator++() {
            if (++PageIndex == OBJECTS_PER_PAGE) {
                ++PageNo;
                PageIndex = 0;
            }

            return *this;
        }

        bool operator==(const TIterator& other) const {
            return PageNo == other.PageNo && PageIndex == other.PageIndex;
        }

        bool operator!=(const TIterator& other) const {
            return !operator==(other);
        }

    private:
        TOwner* Owner;
        size_t PageNo;
        size_t PageIndex;
    };

    class TConstIterator
    {
    public:
        using TOwner = TPagedList<T>;

        TConstIterator()
            : Owner(nullptr)
            , PageNo(0)
            , PageIndex(0)
        {}

        TConstIterator(const TConstIterator&) = default;
        TConstIterator& operator=(const TConstIterator&) = default;

        TConstIterator(const TOwner* owner, size_t pageNo, size_t pageIndex)
            : Owner(owner)
            , PageNo(pageNo)
            , PageIndex(pageIndex)
        {}

        const T& operator*() {
            Y_DEBUG_ABORT_UNLESS(PageIndex < OBJECTS_PER_PAGE);
            Y_DEBUG_ABORT_UNLESS(PageNo < Owner->Pages.size());
            Y_DEBUG_ABORT_UNLESS(PageNo + 1 < Owner->Pages.size() || PageIndex < Owner->IndexInLastPage);
            return *Owner->ObjectAt(Owner->Pages[PageNo], PageIndex);
        }

        TConstIterator& operator++() {
            if (++PageIndex == OBJECTS_PER_PAGE) {
                ++PageNo;
                PageIndex = 0;
            }

            return *this;
        }

        bool operator==(const TConstIterator& other) const {
            return PageNo == other.PageNo && PageIndex == other.PageIndex;
        }

        bool operator!=(const TConstIterator& other) const {
            return !operator==(other);
        }

    private:
        const TOwner* Owner;
        size_t PageNo;
        size_t PageIndex;
    };

private:
    static const T* ObjectAt(const void* page, size_t objectIndex) {
        return reinterpret_cast<const T*>(static_cast<const char*>(page) + objectIndex * sizeof(T));
    }

    static T* ObjectAt(void* page, size_t objectIndex) {
        return reinterpret_cast<T*>(static_cast<char*>(page) + objectIndex * sizeof(T));
    }

    TAlignedPagePool& Pool;
    using TPages = std::vector<void*, TMKQLAllocator<void*>>;
    TPages Pages;
    size_t IndexInLastPage;
};

inline void TBoxedValueWithFree::operator delete(void *mem) noexcept {
    auto size = ((TMkqlPAllocHeader*)mem)->Size + sizeof(TMkqlPAllocHeader);
    MKQLFreeWithSize(mem, size, EMemorySubPool::Default);
}

} // NMiniKQL

} // NKikimr