aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilezhankin <ilezhankin@yandex-team.com>2025-03-03 18:46:45 +0300
committerilezhankin <ilezhankin@yandex-team.com>2025-03-03 19:11:56 +0300
commit2bc1762eb4cc1110a3dfe6195ec4ee7423a9e30e (patch)
tree0a541f3fe92ea9be796025c0354d5efeef387bb4
parent46a0810e4806b6d9769174f0d9ce3c342b273960 (diff)
downloadydb-2bc1762eb4cc1110a3dfe6195ec4ee7423a9e30e.tar.gz
Implement and use a page arena for small Arrow allocations
commit_hash:2bcb57a12fbb750db7b33872e2cfbec66bdf6405
-rw-r--r--yql/essentials/minikql/aligned_page_pool.cpp47
-rw-r--r--yql/essentials/minikql/aligned_page_pool.h6
-rw-r--r--yql/essentials/minikql/computation/mkql_block_transport.cpp2
-rw-r--r--yql/essentials/minikql/mkql_alloc.cpp97
-rw-r--r--yql/essentials/minikql/mkql_alloc.h9
5 files changed, 156 insertions, 5 deletions
diff --git a/yql/essentials/minikql/aligned_page_pool.cpp b/yql/essentials/minikql/aligned_page_pool.cpp
index 04aab1bd8a..bcbb3edd11 100644
--- a/yql/essentials/minikql/aligned_page_pool.cpp
+++ b/yql/essentials/minikql/aligned_page_pool.cpp
@@ -783,6 +783,40 @@ void* GetAlignedPage(ui64 size) {
}
template<typename TMmap>
+void* GetAlignedPage() {
+ const auto size = TAlignedPagePool::POOL_PAGE_SIZE;
+ auto& globalPool = TGlobalPools<TMmap, false>::Instance();
+
+ if (auto* page = globalPool.Get(0).GetPage()) {
+ return page;
+ }
+
+ auto allocSize = size * 2;
+ void* unalignedPtr = globalPool.DoMmap(allocSize);
+ if (Y_UNLIKELY(MAP_FAILED == unalignedPtr)) {
+ TStringStream mmaps;
+ const auto lastError = LastSystemError();
+ if (lastError == ENOMEM) {
+ mmaps << GetMemoryMapsString();
+ }
+
+ ythrow yexception() << "Mmap failed to allocate " << allocSize << " bytes: "
+ << LastSystemErrorText(lastError) << mmaps.Str();
+ }
+
+ void* page = AlignUp(unalignedPtr, size);
+
+ // Unmap unaligned prefix before offset and tail after aligned page
+ const size_t offset = (intptr_t)page - (intptr_t)unalignedPtr;
+ if (Y_UNLIKELY(offset)) {
+ globalPool.DoMunmap(unalignedPtr, offset);
+ globalPool.DoMunmap((ui8*)page + size, size - offset);
+ }
+
+ return page;
+}
+
+template<typename TMmap>
void ReleaseAlignedPage(void* mem, ui64 size) {
size = AlignUp(size, SYS_PAGE_SIZE);
if (size < TAlignedPagePool::POOL_PAGE_SIZE) {
@@ -801,6 +835,11 @@ void ReleaseAlignedPage(void* mem, ui64 size) {
}
template<typename TMmap>
+void ReleaseAlignedPage(void* ptr) {
+ TGlobalPools<TMmap, false>::Instance().PushPage(0, ptr);
+}
+
+template<typename TMmap>
i64 GetTotalMmapedBytes() {
return TGlobalPools<TMmap, true>::Instance().GetTotalMmappedBytes() + TGlobalPools<TMmap, false>::Instance().GetTotalMmappedBytes();
}
@@ -822,10 +861,18 @@ template void* GetAlignedPage<>(ui64);
template void* GetAlignedPage<TFakeAlignedMmap>(ui64);
template void* GetAlignedPage<TFakeUnalignedMmap>(ui64);
+template void* GetAlignedPage<>();
+template void* GetAlignedPage<TFakeAlignedMmap>();
+template void* GetAlignedPage<TFakeUnalignedMmap>();
+
template void ReleaseAlignedPage<>(void*,ui64);
template void ReleaseAlignedPage<TFakeAlignedMmap>(void*,ui64);
template void ReleaseAlignedPage<TFakeUnalignedMmap>(void*,ui64);
+template void ReleaseAlignedPage<>(void*);
+template void ReleaseAlignedPage<TFakeAlignedMmap>(void*);
+template void ReleaseAlignedPage<TFakeUnalignedMmap>(void*);
+
size_t GetMemoryMapsCount() {
size_t lineCount = 0;
TString line;
diff --git a/yql/essentials/minikql/aligned_page_pool.h b/yql/essentials/minikql/aligned_page_pool.h
index 4a5b1d2e55..511b99b4d7 100644
--- a/yql/essentials/minikql/aligned_page_pool.h
+++ b/yql/essentials/minikql/aligned_page_pool.h
@@ -309,9 +309,15 @@ template<typename TMmap = TSystemMmap>
void* GetAlignedPage(ui64 size);
template<typename TMmap = TSystemMmap>
+void* GetAlignedPage();
+
+template<typename TMmap = TSystemMmap>
void ReleaseAlignedPage(void* mem, ui64 size);
template<typename TMmap = TSystemMmap>
+void ReleaseAlignedPage(void* mem);
+
+template<typename TMmap = TSystemMmap>
i64 GetTotalMmapedBytes();
template<typename TMmap = TSystemMmap>
i64 GetTotalFreeListBytes();
diff --git a/yql/essentials/minikql/computation/mkql_block_transport.cpp b/yql/essentials/minikql/computation/mkql_block_transport.cpp
index 2a37245f9d..766df54889 100644
--- a/yql/essentials/minikql/computation/mkql_block_transport.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_transport.cpp
@@ -13,7 +13,7 @@ namespace {
using NYql::TChunkedBuffer;
TChunkedBuffer MakeChunkedBufferAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) {
- MKQLArrowUntrack(owner->data());
+ MKQLArrowUntrack(owner->data(), owner->capacity());
return TChunkedBuffer(TStringBuf{data, size}, owner);
}
diff --git a/yql/essentials/minikql/mkql_alloc.cpp b/yql/essentials/minikql/mkql_alloc.cpp
index 963f46a67e..8446522eda 100644
--- a/yql/essentials/minikql/mkql_alloc.cpp
+++ b/yql/essentials/minikql/mkql_alloc.cpp
@@ -7,6 +7,8 @@ namespace NKikimr {
namespace NMiniKQL {
+constexpr ui64 ArrowSizeForArena = (TAllocState::POOL_PAGE_SIZE >> 2);
+
Y_POD_THREAD(TAllocState*) TlsAllocState;
TAllocPageHeader TAllocState::EmptyPageHeader = { 0, 0, 0, 0, nullptr, nullptr };
@@ -94,6 +96,10 @@ void TAllocState::KillAllBoxed() {
OffloadedBlocksRoot.InitLinks();
}
+ if (CurrentArrowPages) {
+ MKQLArrowFree(CurrentArrowPages, 0);
+ CurrentArrowPages = nullptr;
+ }
CleanupArrowList(&ArrowBlocksRoot);
#ifndef NDEBUG
@@ -253,7 +259,49 @@ void TPagedArena::Clear() noexcept {
}
}
+void* MKQLArrowAllocateOnArena(ui64 size) {
+ TAllocState* state = TlsAllocState;
+ Y_ENSURE(state);
+
+ auto alignedSize = AlignUp(size, ArrowAlignment);
+ auto& page = state->CurrentArrowPages;
+
+ if (Y_UNLIKELY(!page || page->Offset + alignedSize > page->Size)) {
+ const auto pageSize = TAllocState::POOL_PAGE_SIZE;
+
+ if (state->EnableArrowTracking) {
+ state->OffloadAlloc(pageSize);
+ }
+
+ if (page) {
+ MKQLArrowFree(page, 0);
+ }
+
+ page = (TMkqlArrowHeader*)GetAlignedPage();
+ page->Offset = sizeof(TMkqlArrowHeader);
+ page->Size = pageSize;
+ page->UseCount = 1;
+
+ if (state->EnableArrowTracking) {
+ page->Entry.Link(&state->ArrowBlocksRoot);
+ Y_ENSURE(state->ArrowBuffers.insert(page).second);
+ } else {
+ page->Entry.Clear();
+ }
+ }
+
+ void* ptr = (ui8*)page + page->Offset;
+ page->Offset += alignedSize;
+ ++page->UseCount;
+
+ return ptr;
+}
+
void* MKQLArrowAllocate(ui64 size) {
+ if (size <= ArrowSizeForArena) {
+ return MKQLArrowAllocateOnArena(size);
+ }
+
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
auto fullSize = size + sizeof(TMkqlArrowHeader);
@@ -276,6 +324,9 @@ void* MKQLArrowAllocate(ui64 size) {
#endif
auto* header = (TMkqlArrowHeader*)ptr;
+ header->Offset = 0;
+ header->UseCount = 0;
+
if (state->EnableArrowTracking) {
header->Entry.Link(&state->ArrowBlocksRoot);
Y_ENSURE(state->ArrowBuffers.insert(header + 1).second);
@@ -294,7 +345,31 @@ void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
return res;
}
+void MKQLArrowFreeOnArena(const void* ptr) {
+ auto* page = (TMkqlArrowHeader*)TAllocState::GetPageStart(ptr);
+ if (page->UseCount.fetch_sub(1) == 1) {
+ if (!page->Entry.IsUnlinked()) {
+ TAllocState* state = TlsAllocState;
+ Y_ENSURE(state);
+ state->OffloadFree(page->Size);
+ page->Entry.Unlink();
+
+ auto it = state->ArrowBuffers.find(page);
+ Y_ENSURE(it != state->ArrowBuffers.end());
+ state->ArrowBuffers.erase(it);
+ }
+
+ ReleaseAlignedPage(page);
+ }
+
+ return;
+}
+
void MKQLArrowFree(const void* mem, ui64 size) {
+ if (size <= ArrowSizeForArena) {
+ return MKQLArrowFreeOnArena(mem);
+ }
+
auto fullSize = size + sizeof(TMkqlArrowHeader);
auto header = ((TMkqlArrowHeader*)mem) - 1;
if (!header->Entry.IsUnlinked()) {
@@ -318,19 +393,37 @@ void MKQLArrowFree(const void* mem, ui64 size) {
ReleaseAlignedPage(header, fullSize);
}
-void MKQLArrowUntrack(const void* mem) {
+void MKQLArrowUntrack(const void* mem, ui64 size) {
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
if (!state->EnableArrowTracking) {
return;
}
+ if (size <= ArrowSizeForArena) {
+ auto* page = (TMkqlArrowHeader*)TAllocState::GetPageStart(mem);
+
+ auto it = state->ArrowBuffers.find(page);
+ if (it == state->ArrowBuffers.end()) {
+ return;
+ }
+
+ if (!page->Entry.IsUnlinked()) {
+ page->Entry.Unlink(); // unlink page immediately so we don't accidentally free untracked memory within `TAllocState`
+ state->ArrowBuffers.erase(it);
+ state->OffloadFree(page->Size);
+ }
+
+ return;
+ }
+
auto it = state->ArrowBuffers.find(mem);
if (it == state->ArrowBuffers.end()) {
return;
}
- auto header = ((TMkqlArrowHeader*)mem) - 1;
+ auto* header = ((TMkqlArrowHeader*)mem) - 1;
+ Y_ENSURE(header->UseCount == 0);
if (!header->Entry.IsUnlinked()) {
header->Entry.Unlink();
auto fullSize = header->Size + sizeof(TMkqlArrowHeader);
diff --git a/yql/essentials/minikql/mkql_alloc.h b/yql/essentials/minikql/mkql_alloc.h
index abcb6cc73d..24bbbb8e9e 100644
--- a/yql/essentials/minikql/mkql_alloc.h
+++ b/yql/essentials/minikql/mkql_alloc.h
@@ -41,6 +41,8 @@ constexpr ui32 MaxPageUserData = TAlignedPagePool::POOL_PAGE_SIZE - sizeof(TAllo
static_assert(sizeof(TAllocPageHeader) % MKQL_ALIGNMENT == 0, "Incorrect size of header");
+struct TMkqlArrowHeader;
+
struct TAllocState : public TAlignedPagePool
{
struct TListEntry {
@@ -90,6 +92,7 @@ struct TAllocState : public TAlignedPagePool
TListEntry GlobalPAllocList;
TListEntry* CurrentPAllocList;
TListEntry ArrowBlocksRoot;
+ TMkqlArrowHeader* CurrentArrowPages = nullptr; // page arena for small arrow allocations
std::unordered_set<const void*> ArrowBuffers;
bool EnableArrowTracking = true;
@@ -186,7 +189,9 @@ constexpr size_t ArrowAlignment = 64;
struct TMkqlArrowHeader {
TAllocState::TListEntry Entry;
ui64 Size;
- char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64)];
+ ui64 Offset;
+ std::atomic<ui64> UseCount;
+ char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64) - sizeof(ui64) - sizeof(std::atomic<ui64>)];
};
static_assert(sizeof(TMkqlArrowHeader) == ArrowAlignment);
@@ -441,7 +446,7 @@ inline void MKQLUnregisterObject(NUdf::TBoxedValue* value) noexcept {
void* MKQLArrowAllocate(ui64 size);
void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size);
void MKQLArrowFree(const void* mem, ui64 size);
-void MKQLArrowUntrack(const void* mem);
+void MKQLArrowUntrack(const void* mem, ui64 size);
template <const EMemorySubPool MemoryPoolExt = EMemorySubPool::Default>
struct TWithMiniKQLAlloc {