aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-11-18 15:15:49 +0300
committervvvv <vvvv@ydb.tech>2022-11-18 15:15:49 +0300
commit7506642977946e2bd9538cf66daa3bf0d07f00b0 (patch)
tree854580ef3d3bb4227741a91bac080297c299402f
parentdc9458ae717eaa3e15730870ee30cb999a7a6d2f (diff)
downloadydb-7506642977946e2bd9538cf66daa3bf0d07f00b0.tar.gz
thread safe arrow MemoryPool
-rw-r--r--ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp30
-rw-r--r--ydb/library/yql/minikql/mkql_alloc.h3
2 files changed, 26 insertions, 7 deletions
diff --git a/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp b/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp
index 7bc6ad02b03..f0f099faad2 100644
--- a/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp
+++ b/ydb/library/yql/minikql/arrow/mkql_memory_pool.cpp
@@ -6,28 +6,44 @@ namespace {
class TArrowMemoryPool : public arrow::MemoryPool {
public:
explicit TArrowMemoryPool(TAllocState& allocState)
- : AllocState(allocState) {
+ : ArrowMemoryUsage(allocState.ArrowMemoryUsage) {
}
arrow::Status Allocate(int64_t size, uint8_t** out) override {
- *out = static_cast<uint8_t*>(MKQLAllocWithSize(size));
+ *out = static_cast<uint8_t*>(malloc(size));
+ if (auto p = ArrowMemoryUsage.lock()) {
+ *p += size;
+ }
+
return arrow::Status();
}
arrow::Status Reallocate(int64_t oldSize, int64_t newSize, uint8_t** ptr) override {
- void* newPtr = MKQLAllocWithSize(newSize);
+ void* newPtr = malloc(newSize);
::memcpy(newPtr, *ptr, std::min(oldSize, newSize));
- MKQLFreeWithSize(*ptr, oldSize);
+ free(*ptr);
*ptr = static_cast<uint8_t*>(newPtr);
+ if (auto p = ArrowMemoryUsage.lock()) {
+ *p -= oldSize;
+ *p += newSize;
+ }
+
return arrow::Status();
}
void Free(uint8_t* buffer, int64_t size) override {
- MKQLFreeWithSize(buffer, size);
+ free(buffer);
+ if (auto p = ArrowMemoryUsage.lock()) {
+ *p -= size;
+ }
}
int64_t bytes_allocated() const override {
- return AllocState.GetUsed();
+ if (auto p = ArrowMemoryUsage.lock()) {
+ return *p;
+ }
+
+ return 0;
}
int64_t max_memory() const override {
@@ -39,7 +55,7 @@ public:
}
private:
- TAllocState& AllocState;
+ std::weak_ptr<std::atomic<size_t>> ArrowMemoryUsage;
};
}
diff --git a/ydb/library/yql/minikql/mkql_alloc.h b/ydb/library/yql/minikql/mkql_alloc.h
index 3c40f4c4c11..0e73c984a64 100644
--- a/ydb/library/yql/minikql/mkql_alloc.h
+++ b/ydb/library/yql/minikql/mkql_alloc.h
@@ -10,6 +10,8 @@
#include <util/system/tls.h>
#include <new>
#include <unordered_map>
+#include <atomic>
+#include <memory>
namespace NKikimr {
@@ -59,6 +61,7 @@ struct TAllocState : public TAlignedPagePool
TListEntry OffloadedBlocksRoot;
TListEntry GlobalPAllocList;
TListEntry* CurrentPAllocList;
+ std::shared_ptr<std::atomic<size_t>> ArrowMemoryUsage = std::make_shared<std::atomic<size_t>>();
void* MainContext = nullptr;
void* CurrentContext = nullptr;