diff options
author | ilezhankin <ilezhankin@yandex-team.com> | 2025-03-03 18:46:45 +0300 |
---|---|---|
committer | ilezhankin <ilezhankin@yandex-team.com> | 2025-03-03 19:11:56 +0300 |
commit | 2bc1762eb4cc1110a3dfe6195ec4ee7423a9e30e (patch) | |
tree | 0a541f3fe92ea9be796025c0354d5efeef387bb4 | |
parent | 46a0810e4806b6d9769174f0d9ce3c342b273960 (diff) | |
download | ydb-2bc1762eb4cc1110a3dfe6195ec4ee7423a9e30e.tar.gz |
Implement and use a page arena for small Arrow allocations
commit_hash:2bcb57a12fbb750db7b33872e2cfbec66bdf6405
-rw-r--r-- | yql/essentials/minikql/aligned_page_pool.cpp | 47 | ||||
-rw-r--r-- | yql/essentials/minikql/aligned_page_pool.h | 6 | ||||
-rw-r--r-- | yql/essentials/minikql/computation/mkql_block_transport.cpp | 2 | ||||
-rw-r--r-- | yql/essentials/minikql/mkql_alloc.cpp | 97 | ||||
-rw-r--r-- | yql/essentials/minikql/mkql_alloc.h | 9 |
5 files changed, 156 insertions, 5 deletions
diff --git a/yql/essentials/minikql/aligned_page_pool.cpp b/yql/essentials/minikql/aligned_page_pool.cpp index 04aab1bd8a..bcbb3edd11 100644 --- a/yql/essentials/minikql/aligned_page_pool.cpp +++ b/yql/essentials/minikql/aligned_page_pool.cpp @@ -783,6 +783,40 @@ void* GetAlignedPage(ui64 size) { } template<typename TMmap> +void* GetAlignedPage() { + const auto size = TAlignedPagePool::POOL_PAGE_SIZE; + auto& globalPool = TGlobalPools<TMmap, false>::Instance(); + + if (auto* page = globalPool.Get(0).GetPage()) { + return page; + } + + auto allocSize = size * 2; + void* unalignedPtr = globalPool.DoMmap(allocSize); + if (Y_UNLIKELY(MAP_FAILED == unalignedPtr)) { + TStringStream mmaps; + const auto lastError = LastSystemError(); + if (lastError == ENOMEM) { + mmaps << GetMemoryMapsString(); + } + + ythrow yexception() << "Mmap failed to allocate " << allocSize << " bytes: " + << LastSystemErrorText(lastError) << mmaps.Str(); + } + + void* page = AlignUp(unalignedPtr, size); + + // Unmap unaligned prefix before offset and tail after aligned page + const size_t offset = (intptr_t)page - (intptr_t)unalignedPtr; + if (Y_UNLIKELY(offset)) { + globalPool.DoMunmap(unalignedPtr, offset); + globalPool.DoMunmap((ui8*)page + size, size - offset); + } + + return page; +} + +template<typename TMmap> void ReleaseAlignedPage(void* mem, ui64 size) { size = AlignUp(size, SYS_PAGE_SIZE); if (size < TAlignedPagePool::POOL_PAGE_SIZE) { @@ -801,6 +835,11 @@ void ReleaseAlignedPage(void* mem, ui64 size) { } template<typename TMmap> +void ReleaseAlignedPage(void* ptr) { + TGlobalPools<TMmap, false>::Instance().PushPage(0, ptr); +} + +template<typename TMmap> i64 GetTotalMmapedBytes() { return TGlobalPools<TMmap, true>::Instance().GetTotalMmappedBytes() + TGlobalPools<TMmap, false>::Instance().GetTotalMmappedBytes(); } @@ -822,10 +861,18 @@ template void* GetAlignedPage<>(ui64); template void* GetAlignedPage<TFakeAlignedMmap>(ui64); template void* GetAlignedPage<TFakeUnalignedMmap>(ui64); +template void* GetAlignedPage<>(); +template void* GetAlignedPage<TFakeAlignedMmap>(); +template void* GetAlignedPage<TFakeUnalignedMmap>(); + template void ReleaseAlignedPage<>(void*,ui64); template void ReleaseAlignedPage<TFakeAlignedMmap>(void*,ui64); template void ReleaseAlignedPage<TFakeUnalignedMmap>(void*,ui64); +template void ReleaseAlignedPage<>(void*); +template void ReleaseAlignedPage<TFakeAlignedMmap>(void*); +template void ReleaseAlignedPage<TFakeUnalignedMmap>(void*); + size_t GetMemoryMapsCount() { size_t lineCount = 0; TString line; diff --git a/yql/essentials/minikql/aligned_page_pool.h b/yql/essentials/minikql/aligned_page_pool.h index 4a5b1d2e55..511b99b4d7 100644 --- a/yql/essentials/minikql/aligned_page_pool.h +++ b/yql/essentials/minikql/aligned_page_pool.h @@ -309,9 +309,15 @@ template<typename TMmap = TSystemMmap> void* GetAlignedPage(ui64 size); template<typename TMmap = TSystemMmap> +void* GetAlignedPage(); + +template<typename TMmap = TSystemMmap> void ReleaseAlignedPage(void* mem, ui64 size); template<typename TMmap = TSystemMmap> +void ReleaseAlignedPage(void* mem); + +template<typename TMmap = TSystemMmap> i64 GetTotalMmapedBytes(); template<typename TMmap = TSystemMmap> i64 GetTotalFreeListBytes(); diff --git a/yql/essentials/minikql/computation/mkql_block_transport.cpp b/yql/essentials/minikql/computation/mkql_block_transport.cpp index 2a37245f9d..766df54889 100644 --- a/yql/essentials/minikql/computation/mkql_block_transport.cpp +++ b/yql/essentials/minikql/computation/mkql_block_transport.cpp @@ -13,7 +13,7 @@ namespace { using NYql::TChunkedBuffer; TChunkedBuffer MakeChunkedBufferAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) { - MKQLArrowUntrack(owner->data()); + MKQLArrowUntrack(owner->data(), owner->capacity()); return TChunkedBuffer(TStringBuf{data, size}, owner); } diff --git a/yql/essentials/minikql/mkql_alloc.cpp b/yql/essentials/minikql/mkql_alloc.cpp index 963f46a67e..8446522eda 100644 --- a/yql/essentials/minikql/mkql_alloc.cpp +++ b/yql/essentials/minikql/mkql_alloc.cpp @@ -7,6 +7,8 @@ namespace NKikimr { namespace NMiniKQL { +constexpr ui64 ArrowSizeForArena = (TAllocState::POOL_PAGE_SIZE >> 2); + Y_POD_THREAD(TAllocState*) TlsAllocState; TAllocPageHeader TAllocState::EmptyPageHeader = { 0, 0, 0, 0, nullptr, nullptr }; @@ -94,6 +96,10 @@ void TAllocState::KillAllBoxed() { OffloadedBlocksRoot.InitLinks(); } + if (CurrentArrowPages) { + MKQLArrowFree(CurrentArrowPages, 0); + CurrentArrowPages = nullptr; + } CleanupArrowList(&ArrowBlocksRoot); #ifndef NDEBUG @@ -253,7 +259,49 @@ void TPagedArena::Clear() noexcept { } } +void* MKQLArrowAllocateOnArena(ui64 size) { + TAllocState* state = TlsAllocState; + Y_ENSURE(state); + + auto alignedSize = AlignUp(size, ArrowAlignment); + auto& page = state->CurrentArrowPages; + + if (Y_UNLIKELY(!page || page->Offset + alignedSize > page->Size)) { + const auto pageSize = TAllocState::POOL_PAGE_SIZE; + + if (state->EnableArrowTracking) { + state->OffloadAlloc(pageSize); + } + + if (page) { + MKQLArrowFree(page, 0); + } + + page = (TMkqlArrowHeader*)GetAlignedPage(); + page->Offset = sizeof(TMkqlArrowHeader); + page->Size = pageSize; + page->UseCount = 1; + + if (state->EnableArrowTracking) { + page->Entry.Link(&state->ArrowBlocksRoot); + Y_ENSURE(state->ArrowBuffers.insert(page).second); + } else { + page->Entry.Clear(); + } + } + + void* ptr = (ui8*)page + page->Offset; + page->Offset += alignedSize; + ++page->UseCount; + + return ptr; +} + void* MKQLArrowAllocate(ui64 size) { + if (size <= ArrowSizeForArena) { + return MKQLArrowAllocateOnArena(size); + } + TAllocState* state = TlsAllocState; Y_ENSURE(state); auto fullSize = size + sizeof(TMkqlArrowHeader); @@ -276,6 +324,9 @@ void* MKQLArrowAllocate(ui64 size) { #endif auto* header = (TMkqlArrowHeader*)ptr; + header->Offset = 0; + header->UseCount = 0; + if (state->EnableArrowTracking) { header->Entry.Link(&state->ArrowBlocksRoot); Y_ENSURE(state->ArrowBuffers.insert(header + 1).second); @@ -294,7 +345,31 @@ void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) { return res; } +void MKQLArrowFreeOnArena(const void* ptr) { + auto* page = (TMkqlArrowHeader*)TAllocState::GetPageStart(ptr); + if (page->UseCount.fetch_sub(1) == 1) { + if (!page->Entry.IsUnlinked()) { + TAllocState* state = TlsAllocState; + Y_ENSURE(state); + state->OffloadFree(page->Size); + page->Entry.Unlink(); + + auto it = state->ArrowBuffers.find(page); + Y_ENSURE(it != state->ArrowBuffers.end()); + state->ArrowBuffers.erase(it); + } + + ReleaseAlignedPage(page); + } + + return; +} + void MKQLArrowFree(const void* mem, ui64 size) { + if (size <= ArrowSizeForArena) { + return MKQLArrowFreeOnArena(mem); + } + auto fullSize = size + sizeof(TMkqlArrowHeader); auto header = ((TMkqlArrowHeader*)mem) - 1; if (!header->Entry.IsUnlinked()) { @@ -318,19 +393,37 @@ void MKQLArrowFree(const void* mem, ui64 size) { ReleaseAlignedPage(header, fullSize); } -void MKQLArrowUntrack(const void* mem) { +void MKQLArrowUntrack(const void* mem, ui64 size) { TAllocState* state = TlsAllocState; Y_ENSURE(state); if (!state->EnableArrowTracking) { return; } + if (size <= ArrowSizeForArena) { + auto* page = (TMkqlArrowHeader*)TAllocState::GetPageStart(mem); + + auto it = state->ArrowBuffers.find(page); + if (it == state->ArrowBuffers.end()) { + return; + } + + if (!page->Entry.IsUnlinked()) { + page->Entry.Unlink(); // unlink page immediately so we don't accidentally free untracked memory within `TAllocState` + state->ArrowBuffers.erase(it); + state->OffloadFree(page->Size); + } + + return; + } + auto it = state->ArrowBuffers.find(mem); if (it == state->ArrowBuffers.end()) { return; } - auto header = ((TMkqlArrowHeader*)mem) - 1; + auto* header = ((TMkqlArrowHeader*)mem) - 1; + Y_ENSURE(header->UseCount == 0); if (!header->Entry.IsUnlinked()) { header->Entry.Unlink(); auto fullSize = header->Size + sizeof(TMkqlArrowHeader); diff --git a/yql/essentials/minikql/mkql_alloc.h b/yql/essentials/minikql/mkql_alloc.h index abcb6cc73d..24bbbb8e9e 100644 --- a/yql/essentials/minikql/mkql_alloc.h +++ b/yql/essentials/minikql/mkql_alloc.h @@ -41,6 +41,8 @@ constexpr ui32 MaxPageUserData = TAlignedPagePool::POOL_PAGE_SIZE - sizeof(TAllo static_assert(sizeof(TAllocPageHeader) % MKQL_ALIGNMENT == 0, "Incorrect size of header"); +struct TMkqlArrowHeader; + struct TAllocState : public TAlignedPagePool { struct TListEntry { @@ -90,6 +92,7 @@ struct TAllocState : public TAlignedPagePool TListEntry GlobalPAllocList; TListEntry* CurrentPAllocList; TListEntry ArrowBlocksRoot; + TMkqlArrowHeader* CurrentArrowPages = nullptr; // page arena for small arrow allocations std::unordered_set<const void*> ArrowBuffers; bool EnableArrowTracking = true; @@ -186,7 +189,9 @@ constexpr size_t ArrowAlignment = 64; struct TMkqlArrowHeader { TAllocState::TListEntry Entry; ui64 Size; - char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64)]; + ui64 Offset; + std::atomic<ui64> UseCount; + char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64) - sizeof(ui64) - sizeof(std::atomic<ui64>)]; }; static_assert(sizeof(TMkqlArrowHeader) == ArrowAlignment); @@ -441,7 +446,7 @@ inline void MKQLUnregisterObject(NUdf::TBoxedValue* value) noexcept { void* MKQLArrowAllocate(ui64 size); void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size); void MKQLArrowFree(const void* mem, ui64 size); -void MKQLArrowUntrack(const void* mem); +void MKQLArrowUntrack(const void* mem, ui64 size); template <const EMemorySubPool MemoryPoolExt = EMemorySubPool::Default> struct TWithMiniKQLAlloc { |