aboutsummaryrefslogblamecommitdiffstats
path: root/yql/essentials/minikql/mkql_alloc.cpp
blob: 839b011679345708392dd541dadd857bc738d0cd (plain) (tree)

















































                                                                                                                                          

                                 

                                                        
      
                    




































































































































































































                                                                                                                           





                                              
                                        
      































                                                                      

                                 
                                         
      
























                                                                
#include "mkql_alloc.h"
#include <util/system/align.h>
#include <yql/essentials/public/udf/udf_value.h>
#include <tuple>

namespace NKikimr {

namespace NMiniKQL {

Y_POD_THREAD(TAllocState*) TlsAllocState;

TAllocPageHeader TAllocState::EmptyPageHeader = { 0, 0, 0, 0, nullptr, nullptr };
TAllocState::TCurrentPages TAllocState::EmptyCurrentPages = { &TAllocState::EmptyPageHeader, &TAllocState::EmptyPageHeader };

void TAllocState::TListEntry::Link(TAllocState::TListEntry* root) noexcept {
    Left = root;
    Right = root->Right;
    Right->Left = Left->Right = this;
}

void TAllocState::TListEntry::Unlink() noexcept {
    std::tie(Right->Left, Left->Right) = std::make_pair(Left, Right);
    Clear();
}

TAllocState::TAllocState(const TSourceLocation& location, const NKikimr::TAlignedPagePoolCounters &counters, bool supportsSizedAllocators)
    : TAlignedPagePool(location, counters)
    , SupportsSizedAllocators(supportsSizedAllocators)
    , CurrentPAllocList(&GlobalPAllocList)
{
    GetRoot()->InitLinks();
    OffloadedBlocksRoot.InitLinks();
    GlobalPAllocList.InitLinks();
    ArrowBlocksRoot.InitLinks();
}

void TAllocState::CleanupPAllocList(TListEntry* root) {
    for (auto curr = root->Right; curr != root; ) {
        auto next = curr->Right;
        auto size = ((TMkqlPAllocHeader*)curr)->Size;
        auto fullSize = size + sizeof(TMkqlPAllocHeader);
        MKQLFreeWithSize(curr, fullSize, EMemorySubPool::Default); // may free items from OffloadedBlocksRoot
        curr = next;
    }

    root->InitLinks();
}

void TAllocState::CleanupArrowList(TListEntry* root) {
    for (auto curr = root->Right; curr != root; ) {
        auto next = curr->Right;
#ifdef PROFILE_MEMORY_ALLOCATIONS
        free(curr);
#else
        auto size = ((TMkqlArrowHeader*)curr)->Size;
        auto fullSize = size + sizeof(TMkqlArrowHeader);
        ReleaseAlignedPage(curr, fullSize);
#endif
        curr = next;
    }

    root->InitLinks();
}

void TAllocState::KillAllBoxed() {
    {
        const auto root = GetRoot();
        for (auto next = root->GetRight(); next != root; next = root->GetLeft()) {
            next->Ref();
            next->~TBoxedValueLink();
        }

        GetRoot()->InitLinks();
    }

    {
        Y_ABORT_UNLESS(CurrentPAllocList == &GlobalPAllocList);
        CleanupPAllocList(&GlobalPAllocList);
    }

    {
        const auto root = &OffloadedBlocksRoot;
        for (auto curr = root->Right; curr != root; ) {
            auto next = curr->Right;
            free(curr);
            curr = next;
        }

        OffloadedBlocksRoot.InitLinks();
    }

    CleanupArrowList(&ArrowBlocksRoot);

#ifndef NDEBUG
    ActiveMemInfo.clear();
#endif
}

void TAllocState::InvalidateMemInfo() {
#ifndef NDEBUG
    for (auto& pair : ActiveMemInfo) {
        pair.first->CheckOnExit(false);
    }
#endif
}

size_t TAllocState::GetDeallocatedInPages() const {
    size_t deallocated = 0;
    for (auto x : AllPages) {
        auto currPage = (TAllocPageHeader*)x;
        if (currPage->UseCount) {
            deallocated += currPage->Deallocated;
        }
    }

    return deallocated;
}

void TAllocState::LockObject(::NKikimr::NUdf::TUnboxedValuePod value) {
    if (!UseRefLocking) {
        return;
    }

    void* obj;
    if (value.IsString()) {
       obj = value.AsStringRef().Data();
    } else if (value.IsBoxed()) {
       obj = value.AsBoxed().Get();
    } else {
       return;
    }

    auto [it, isNew] = LockedObjectsRefs.emplace(obj, TLockInfo{ 0, 0 });
    if (isNew) {
        it->second.OriginalRefs = value.LockRef();
    }

    ++it->second.Locks;
}

void TAllocState::UnlockObject(::NKikimr::NUdf::TUnboxedValuePod value) {
    if (!UseRefLocking) {
        return;
    }

    void* obj;
    if (value.IsString()) {
       obj = value.AsStringRef().Data();
    } else if (value.IsBoxed()) {
       obj = value.AsBoxed().Get();
    } else {
       return;
    }

    auto it = LockedObjectsRefs.find(obj);
    Y_ABORT_UNLESS(it != LockedObjectsRefs.end());
    if (--it->second.Locks == 0) {
       value.UnlockRef(it->second.OriginalRefs);
       LockedObjectsRefs.erase(it);
    }
}

void TScopedAlloc::Acquire() {
    if (!AttachedCount_) {
        if (PrevState_) {
            PgReleaseThreadContext(PrevState_->MainContext);
        }
        PrevState_ = TlsAllocState;
        TlsAllocState = &MyState_;
        PgAcquireThreadContext(MyState_.MainContext);
    } else {
        Y_ABORT_UNLESS(TlsAllocState == &MyState_, "Mismatch allocator in thread");

    }
    ++AttachedCount_;
}

void TScopedAlloc::Release() {
    if (AttachedCount_ && --AttachedCount_ == 0) {
        Y_ABORT_UNLESS(TlsAllocState == &MyState_, "Mismatch allocator in thread");
        PgReleaseThreadContext(MyState_.MainContext);
        TlsAllocState = PrevState_;
        if (PrevState_) {
            PgAcquireThreadContext(PrevState_->MainContext);
        }
        PrevState_ = nullptr;
    }
}

void* MKQLAllocSlow(size_t sz, TAllocState* state, const EMemorySubPool mPool) {
    auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT);
    auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize);
    auto currPage = (TAllocPageHeader*)state->GetBlock(capacity);
    currPage->Deallocated = 0;
    currPage->Capacity = capacity;
    currPage->Offset = roundedSize;

    auto& mPage = state->CurrentPages[(TMemorySubPoolIdx)mPool];
    auto newPageAvailable = capacity - roundedSize;
    auto curPageAvailable = mPage->Capacity - mPage->Offset;

    if (newPageAvailable > curPageAvailable) {
        mPage = currPage;
    }

    void* ret = (char*)currPage + sizeof(TAllocPageHeader);
    currPage->UseCount = 1;
    currPage->MyAlloc = state;
    currPage->Link = nullptr;
    return ret;
}

void MKQLFreeSlow(TAllocPageHeader* header, TAllocState *state, const EMemorySubPool mPool) noexcept {
    Y_DEBUG_ABORT_UNLESS(state);
    Y_DEBUG_ABORT_UNLESS(header->MyAlloc == state, "%s", (TStringBuilder() << "wrong allocator was used; "
        "allocated with: " << header->MyAlloc->GetDebugInfo() << " freed with: " << TlsAllocState->GetDebugInfo()).data());
    state->ReturnBlock(header, header->Capacity);
    if (header == state->CurrentPages[(TMemorySubPoolIdx)mPool]) {
        state->CurrentPages[(TMemorySubPoolIdx)mPool] = &TAllocState::EmptyPageHeader;
    }
}

void* TPagedArena::AllocSlow(const size_t sz, const EMemorySubPool mPool) {
    auto& currentPage = CurrentPages_[(TMemorySubPoolIdx)mPool];
    auto prevLink = currentPage;
    auto roundedSize = AlignUp(sz + sizeof(TAllocPageHeader), MKQL_ALIGNMENT);
    auto capacity = Max(ui64(TAlignedPagePool::POOL_PAGE_SIZE), roundedSize);
    currentPage = (TAllocPageHeader*)PagePool_->GetBlock(capacity);
    currentPage->Capacity = capacity;
    void* ret = (char*)currentPage + sizeof(TAllocPageHeader);
    currentPage->Offset = roundedSize;
    currentPage->UseCount = 0;
    currentPage->MyAlloc = PagePool_;
    currentPage->Link = prevLink;
    return ret;
}

void TPagedArena::Clear() noexcept {
    for (auto&& i : CurrentPages_) {
        auto current = i;
        while (current != &TAllocState::EmptyPageHeader) {
            auto next = current->Link;
            PagePool_->ReturnBlock(current, current->Capacity);
            current = next;
        }

        i = &TAllocState::EmptyPageHeader;
    }
}

void* MKQLArrowAllocate(ui64 size) {
    TAllocState* state = TlsAllocState;
    Y_ENSURE(state);
    auto fullSize = size + sizeof(TMkqlArrowHeader);
    if (state->EnableArrowTracking) {
        state->OffloadAlloc(fullSize);
    }

#ifdef PROFILE_MEMORY_ALLOCATIONS
    auto ptr = malloc(fullSize);
    if (!ptr) {
        throw TMemoryLimitExceededException();
    }
#else
    auto ptr = GetAlignedPage(fullSize);
#endif
    auto header = (TMkqlArrowHeader*)ptr;
    if (state->EnableArrowTracking) {
        header->Entry.Link(&state->ArrowBlocksRoot);
        Y_ENSURE(state->ArrowBuffers.insert(header + 1).second);
    } else {
        header->Entry.Clear();
    }

    header->Size = size;
    return header + 1;
}

void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
    auto res = MKQLArrowAllocate(size);
    memcpy(res, mem, Min(prevSize, size));
    MKQLArrowFree(mem, prevSize);
    return res;
}

void MKQLArrowFree(const void* mem, ui64 size) {
    auto fullSize = size + sizeof(TMkqlArrowHeader);
    auto header = ((TMkqlArrowHeader*)mem) - 1;
    if (!header->Entry.IsUnlinked()) {
        TAllocState* state = TlsAllocState;
        Y_ENSURE(state);
        state->OffloadFree(fullSize);
        header->Entry.Unlink();
        auto it = state->ArrowBuffers.find(mem);
        Y_ENSURE(it != state->ArrowBuffers.end());
        state->ArrowBuffers.erase(it);
    }

    Y_ENSURE(size == header->Size);
#ifdef PROFILE_MEMORY_ALLOCATIONS
    free(header);
#else
    ReleaseAlignedPage(header, fullSize);
#endif
}

void MKQLArrowUntrack(const void* mem) {
    TAllocState* state = TlsAllocState;
    Y_ENSURE(state);
    if (!state->EnableArrowTracking) {
        return;
    }

    auto it = state->ArrowBuffers.find(mem);
    if (it == state->ArrowBuffers.end()) {
        return;
    }

    auto header = ((TMkqlArrowHeader*)mem) - 1;
    if (!header->Entry.IsUnlinked()) {
        header->Entry.Unlink();
        auto fullSize = header->Size + sizeof(TMkqlArrowHeader);
        state->OffloadFree(fullSize);
        state->ArrowBuffers.erase(it);
    }
}

} // NMiniKQL

} // NKikimr