diff options
author | vvvv <vvvv@ydb.tech> | 2022-11-18 15:15:49 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-11-18 15:15:49 +0300 |
commit | 7506642977946e2bd9538cf66daa3bf0d07f00b0 (patch) | |
tree | 854580ef3d3bb4227741a91bac080297c299402f | |
parent | dc9458ae717eaa3e15730870ee30cb999a7a6d2f (diff) | |
download | ydb-7506642977946e2bd9538cf66daa3bf0d07f00b0.tar.gz |
thread safe arrow MemoryPool
-rw-r--r-- | ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp | 30 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_alloc.h | 3 |
2 files changed, 26 insertions, 7 deletions
diff --git a/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp b/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp index 7bc6ad02b03..f0f099faad2 100644 --- a/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp +++ b/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp @@ -6,28 +6,44 @@ namespace { class TArrowMemoryPool : public arrow::MemoryPool { public: explicit TArrowMemoryPool(TAllocState& allocState) - : AllocState(allocState) { + : ArrowMemoryUsage(allocState.ArrowMemoryUsage) { } arrow::Status Allocate(int64_t size, uint8_t** out) override { - *out = static_cast<uint8_t*>(MKQLAllocWithSize(size)); + *out = static_cast<uint8_t*>(malloc(size)); + if (auto p = ArrowMemoryUsage.lock()) { + *p += size; + } + return arrow::Status(); } arrow::Status Reallocate(int64_t oldSize, int64_t newSize, uint8_t** ptr) override { - void* newPtr = MKQLAllocWithSize(newSize); + void* newPtr = malloc(newSize); ::memcpy(newPtr, *ptr, std::min(oldSize, newSize)); - MKQLFreeWithSize(*ptr, oldSize); + free(*ptr); *ptr = static_cast<uint8_t*>(newPtr); + if (auto p = ArrowMemoryUsage.lock()) { + *p -= oldSize; + *p += newSize; + } + return arrow::Status(); } void Free(uint8_t* buffer, int64_t size) override { - MKQLFreeWithSize(buffer, size); + free(buffer); + if (auto p = ArrowMemoryUsage.lock()) { + *p -= size; + } } int64_t bytes_allocated() const override { - return AllocState.GetUsed(); + if (auto p = ArrowMemoryUsage.lock()) { + return *p; + } + + return 0; } int64_t max_memory() const override { @@ -39,7 +55,7 @@ public: } private: - TAllocState& AllocState; + std::weak_ptr<std::atomic<size_t>> ArrowMemoryUsage; }; } diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h index 3c40f4c4c11..0e73c984a64 100644 --- a/ydb/library/yql/minikql/mkql_alloc.h +++ b/ydb/library/yql/minikql/mkql_alloc.h @@ -10,6 +10,8 @@ #include <util/system/tls.h> #include <new> #include <unordered_map> +#include <atomic> +#include <memory> namespace NKikimr { @@ -59,6 +61,7 @@ struct TAllocState : public TAlignedPagePool TListEntry OffloadedBlocksRoot; TListEntry GlobalPAllocList; TListEntry* CurrentPAllocList; + std::shared_ptr<std::atomic<size_t>> ArrowMemoryUsage = std::make_shared<std::atomic<size_t>>(); void* MainContext = nullptr; void* CurrentContext = nullptr; |