// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "contrib/libs/apache/arrow_next/cpp/src/arrow/memory_pool_internal.h" #include // IWYU pragma: keep #include #include // IWYU pragma: keep #include // IWYU pragma: keep #include // IWYU pragma: keep #include #include #include #include #if defined(sun) || defined(__sun) # include #endif #include "contrib/libs/apache/arrow_next/cpp/src/arrow/buffer.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/io/util_internal.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/result.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/status.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/bit_util.h" #include "contrib/libs/apache/arrow_next/src/arrow/util/config.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/debug.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/int_util_overflow.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/io_util.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/logging.h" // IWYU pragma: keep #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/string.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/thread_pool.h" #include "contrib/libs/apache/arrow_next/cpp/src/arrow/util/ubsan.h" #ifdef __GLIBC__ # include #endif #ifdef ARROW_MIMALLOC # error #include #endif namespace arrow20 { namespace memory_pool { namespace internal { alignas(kDefaultBufferAlignment) int64_t zero_size_area[1] = {kDebugXorSuffix}; } // namespace internal } // namespace memory_pool namespace { constexpr char kDefaultBackendEnvVar[] = "ARROW_DEFAULT_MEMORY_POOL"; constexpr char kDebugMemoryEnvVar[] = "ARROW_DEBUG_MEMORY_POOL"; enum class MemoryPoolBackend : uint8_t { System, Jemalloc, Mimalloc }; struct SupportedBackend { const char* name; MemoryPoolBackend backend; }; // See ARROW-12248 for why we use static in-function singletons rather than // global constants below (in SupportedBackends() and UserSelectedBackend()). // In some contexts (especially R bindings) `default_memory_pool()` may be // called before all globals are initialized, and then the ARROW_DEFAULT_MEMORY_POOL // environment variable would be ignored. const std::vector& SupportedBackends() { static std::vector backends = { // mimalloc is our preferred allocator for several reasons: // 1) it has good performance // 2) it is well-supported on all our main platforms (Linux, macOS, Windows) // 3) it is easy to configure and has a consistent API. #ifdef ARROW_MIMALLOC {"mimalloc", MemoryPoolBackend::Mimalloc}, #endif #ifdef ARROW_JEMALLOC {"jemalloc", MemoryPoolBackend::Jemalloc}, #endif {"system", MemoryPoolBackend::System}}; return backends; } // Return the MemoryPoolBackend selected by the user through the // ARROW_DEFAULT_MEMORY_POOL environment variable, if any. std::optional UserSelectedBackend() { static auto user_selected_backend = []() -> std::optional { auto unsupported_backend = [](const std::string& name) { std::vector supported; for (const auto backend : SupportedBackends()) { supported.push_back(std::string("'") + backend.name + "'"); } ARROW_LOG(WARNING) << "Unsupported backend '" << name << "' specified in " << kDefaultBackendEnvVar << " (supported backends are " << internal::JoinStrings(supported, ", ") << ")"; }; auto maybe_name = internal::GetEnvVar(kDefaultBackendEnvVar); if (!maybe_name.ok()) { return {}; } const auto name = *std::move(maybe_name); if (name.empty()) { // An empty environment variable is considered missing return {}; } const auto found = std::find_if( SupportedBackends().begin(), SupportedBackends().end(), [&](const SupportedBackend& backend) { return name == backend.name; }); if (found != SupportedBackends().end()) { return found->backend; } unsupported_backend(name); return {}; }(); return user_selected_backend; } MemoryPoolBackend DefaultBackend() { auto backend = UserSelectedBackend(); if (backend.has_value()) { return backend.value(); } struct SupportedBackend default_backend = SupportedBackends().front(); return default_backend.backend; } using MemoryDebugHandler = std::function; struct DebugState { void Invoke(uint8_t* ptr, int64_t size, const Status& st) { std::lock_guard lock(mutex_); if (handler_) { handler_(ptr, size, st); } } void SetHandler(MemoryDebugHandler handler) { std::lock_guard lock(mutex_); handler_ = std::move(handler); } static DebugState* Instance() { // Instance is constructed on-demand. If it was a global static variable, // it could be constructed after being used. static DebugState instance; return &instance; } private: DebugState() = default; ARROW_DISALLOW_COPY_AND_ASSIGN(DebugState); std::mutex mutex_; MemoryDebugHandler handler_; }; void DebugAbort(uint8_t* ptr, int64_t size, const Status& st) { st.Abort(); } void DebugTrap(uint8_t* ptr, int64_t size, const Status& st) { ARROW_LOG(ERROR) << st.ToString(); arrow20::internal::DebugTrap(); } void DebugWarn(uint8_t* ptr, int64_t size, const Status& st) { ARROW_LOG(WARNING) << st.ToString(); } bool IsDebugEnabled() { static const bool is_enabled = []() { auto maybe_env_value = internal::GetEnvVar(kDebugMemoryEnvVar); if (!maybe_env_value.ok()) { return false; } auto env_value = *std::move(maybe_env_value); if (env_value.empty() || env_value == "none") { return false; } auto debug_state = DebugState::Instance(); if (env_value == "abort") { debug_state->SetHandler(DebugAbort); return true; } if (env_value == "trap") { debug_state->SetHandler(DebugTrap); return true; } if (env_value == "warn") { debug_state->SetHandler(DebugWarn); return true; } ARROW_LOG(WARNING) << "Invalid value for " << kDebugMemoryEnvVar << ": '" << env_value << "'. Valid values are 'abort', 'trap', 'warn', 'none'."; return false; }(); return is_enabled; } // An allocator wrapper that adds a suffix at the end of allocation to check // for writes beyond the allocated area. template class DebugAllocator { public: static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out) { if (size == 0) { *out = memory_pool::internal::kZeroSizeArea; } else { ARROW_ASSIGN_OR_RAISE(int64_t raw_size, RawSize(size)); DCHECK(raw_size > size) << "bug in raw size computation: " << raw_size << " for size " << size; RETURN_NOT_OK(WrappedAllocator::AllocateAligned(raw_size, alignment, out)); InitAllocatedArea(*out, size); } return Status::OK(); } static void ReleaseUnused() { WrappedAllocator::ReleaseUnused(); } static Status ReallocateAligned(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { CheckAllocatedArea(*ptr, old_size, "reallocation"); if (*ptr == memory_pool::internal::kZeroSizeArea) { return AllocateAligned(new_size, alignment, ptr); } if (new_size == 0) { // Note that an overflow check isn't needed as `old_size` is supposed to have // been successfully passed to AllocateAligned() before. WrappedAllocator::DeallocateAligned(*ptr, old_size + kOverhead, alignment); *ptr = memory_pool::internal::kZeroSizeArea; return Status::OK(); } ARROW_ASSIGN_OR_RAISE(int64_t raw_new_size, RawSize(new_size)); DCHECK(raw_new_size > new_size) << "bug in raw size computation: " << raw_new_size << " for size " << new_size; RETURN_NOT_OK(WrappedAllocator::ReallocateAligned(old_size + kOverhead, raw_new_size, alignment, ptr)); InitAllocatedArea(*ptr, new_size); return Status::OK(); } static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t alignment) { CheckAllocatedArea(ptr, size, "deallocation"); if (ptr != memory_pool::internal::kZeroSizeArea) { WrappedAllocator::DeallocateAligned(ptr, size + kOverhead, alignment); } } static void PrintStats() { WrappedAllocator::PrintStats(); } private: static Result RawSize(int64_t size) { if (ARROW_PREDICT_FALSE(internal::AddWithOverflow(size, kOverhead, &size))) { return Status::OutOfMemory("Memory allocation size too large"); } return size; } static void InitAllocatedArea(uint8_t* ptr, int64_t size) { DCHECK_NE(size, 0); util::SafeStore(ptr + size, size ^ memory_pool::internal::kDebugXorSuffix); } static void CheckAllocatedArea(uint8_t* ptr, int64_t size, const char* context) { // Check that memory wasn't clobbered at the end of the allocated area. int64_t stored_size = memory_pool::internal::kDebugXorSuffix ^ util::SafeLoadAs(ptr + size); if (ARROW_PREDICT_FALSE(stored_size != size)) { auto st = Status::Invalid("Wrong size on ", context, ": given size = ", size, ", actual size = ", stored_size); DebugState::Instance()->Invoke(ptr, size, st); } } static constexpr int64_t kOverhead = sizeof(int64_t); }; // Helper class directing allocations to the standard system allocator. class SystemAllocator { public: // Allocate memory according to the alignment requirements for Arrow // (as of May 2016 64 bytes) static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out) { if (size == 0) { *out = memory_pool::internal::kZeroSizeArea; return Status::OK(); } #ifdef _WIN32 // Special code path for Windows *out = reinterpret_cast( _aligned_malloc(static_cast(size), static_cast(alignment))); if (!*out) { return Status::OutOfMemory("malloc of size ", size, " failed"); } #elif defined(sun) || defined(__sun) *out = reinterpret_cast( memalign(static_cast(alignment), static_cast(size))); if (!*out) { return Status::OutOfMemory("malloc of size ", size, " failed"); } #else const int result = posix_memalign(reinterpret_cast(out), static_cast(alignment), static_cast(size)); if (result == ENOMEM) { return Status::OutOfMemory("malloc of size ", size, " failed"); } if (result == EINVAL) { return Status::Invalid("invalid alignment parameter: ", static_cast(alignment)); } #endif return Status::OK(); } static Status ReallocateAligned(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { uint8_t* previous_ptr = *ptr; if (previous_ptr == memory_pool::internal::kZeroSizeArea) { DCHECK_EQ(old_size, 0); return AllocateAligned(new_size, alignment, ptr); } if (new_size == 0) { DeallocateAligned(previous_ptr, old_size, alignment); *ptr = memory_pool::internal::kZeroSizeArea; return Status::OK(); } // Note: We cannot use realloc() here as it doesn't guarantee alignment. // Allocate new chunk uint8_t* out = nullptr; RETURN_NOT_OK(AllocateAligned(new_size, alignment, &out)); DCHECK(out); // Copy contents and release old memory chunk memcpy(out, *ptr, static_cast(std::min(new_size, old_size))); #ifdef _WIN32 _aligned_free(*ptr); #else free(*ptr); #endif // defined(_WIN32) *ptr = out; return Status::OK(); } static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t /*alignment*/) { if (ptr == memory_pool::internal::kZeroSizeArea) { DCHECK_EQ(size, 0); } else { #ifdef _WIN32 _aligned_free(ptr); #else free(ptr); #endif } } static void ReleaseUnused() { #ifdef __GLIBC__ // The return value of malloc_trim is not an error but to inform // you if memory was actually released or not, which we do not care about here ARROW_UNUSED(malloc_trim(0)); #endif } static void PrintStats() { #ifdef __GLIBC__ malloc_stats(); #endif } }; #ifdef ARROW_MIMALLOC // Helper class directing allocations to the mimalloc allocator. class MimallocAllocator { public: static Status AllocateAligned(int64_t size, int64_t alignment, uint8_t** out) { if (size == 0) { *out = memory_pool::internal::kZeroSizeArea; return Status::OK(); } *out = reinterpret_cast( mi_malloc_aligned(static_cast(size), static_cast(alignment))); if (*out == NULL) { return Status::OutOfMemory("malloc of size ", size, " failed"); } return Status::OK(); } static void ReleaseUnused() { mi_collect(true); } static Status ReallocateAligned(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { uint8_t* previous_ptr = *ptr; if (previous_ptr == memory_pool::internal::kZeroSizeArea) { DCHECK_EQ(old_size, 0); return AllocateAligned(new_size, alignment, ptr); } if (new_size == 0) { DeallocateAligned(previous_ptr, old_size, alignment); *ptr = memory_pool::internal::kZeroSizeArea; return Status::OK(); } *ptr = reinterpret_cast( mi_realloc_aligned(previous_ptr, static_cast(new_size), alignment)); if (*ptr == NULL) { *ptr = previous_ptr; return Status::OutOfMemory("realloc of size ", new_size, " failed"); } return Status::OK(); } static void DeallocateAligned(uint8_t* ptr, int64_t size, int64_t /*alignment*/) { if (ptr == memory_pool::internal::kZeroSizeArea) { DCHECK_EQ(size, 0); } else { mi_free(ptr); } } static void PrintStats() { mi_stats_print_out(nullptr, nullptr); } }; #endif // defined(ARROW_MIMALLOC) } // namespace int64_t MemoryPool::max_memory() const { return -1; } /////////////////////////////////////////////////////////////////////// // MemoryPool implementation that delegates its core duty // to an Allocator class. #ifndef NDEBUG static constexpr uint8_t kAllocPoison = 0xBC; static constexpr uint8_t kReallocPoison = 0xBD; static constexpr uint8_t kDeallocPoison = 0xBE; #endif template class BaseMemoryPoolImpl : public MemoryPool { public: ~BaseMemoryPoolImpl() override {} Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override { if (size < 0) { return Status::Invalid("negative malloc size"); } if (static_cast(size) >= std::numeric_limits::max()) { return Status::OutOfMemory("malloc size overflows size_t"); } RETURN_NOT_OK(Allocator::AllocateAligned(size, alignment, out)); #ifndef NDEBUG // Poison data if (size > 0) { DCHECK_NE(*out, nullptr); (*out)[0] = kAllocPoison; (*out)[size - 1] = kAllocPoison; } #endif stats_.DidAllocateBytes(size); return Status::OK(); } Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) override { if (new_size < 0) { return Status::Invalid("negative realloc size"); } if (static_cast(new_size) >= std::numeric_limits::max()) { return Status::OutOfMemory("realloc overflows size_t"); } RETURN_NOT_OK(Allocator::ReallocateAligned(old_size, new_size, alignment, ptr)); #ifndef NDEBUG // Poison data if (new_size > old_size) { DCHECK_NE(*ptr, nullptr); (*ptr)[old_size] = kReallocPoison; (*ptr)[new_size - 1] = kReallocPoison; } #endif stats_.DidReallocateBytes(old_size, new_size); return Status::OK(); } void Free(uint8_t* buffer, int64_t size, int64_t alignment) override { #ifndef NDEBUG // Poison data if (size > 0) { DCHECK_NE(buffer, nullptr); buffer[0] = kDeallocPoison; buffer[size - 1] = kDeallocPoison; } #endif Allocator::DeallocateAligned(buffer, size, alignment); stats_.DidFreeBytes(size); } void ReleaseUnused() override { Allocator::ReleaseUnused(); } void PrintStats() override { Allocator::PrintStats(); } int64_t bytes_allocated() const override { return stats_.bytes_allocated(); } int64_t max_memory() const override { return stats_.max_memory(); } int64_t total_bytes_allocated() const override { return stats_.total_bytes_allocated(); } int64_t num_allocations() const override { return stats_.num_allocations(); } protected: internal::MemoryPoolStats stats_; }; class SystemMemoryPool : public BaseMemoryPoolImpl { public: std::string backend_name() const override { return "system"; } }; class SystemDebugMemoryPool : public BaseMemoryPoolImpl> { public: std::string backend_name() const override { return "system"; } }; #ifdef ARROW_JEMALLOC class JemallocMemoryPool : public BaseMemoryPoolImpl { public: std::string backend_name() const override { return "jemalloc"; } }; class JemallocDebugMemoryPool : public BaseMemoryPoolImpl< DebugAllocator> { public: std::string backend_name() const override { return "jemalloc"; } }; #endif #ifdef ARROW_MIMALLOC class MimallocMemoryPool : public BaseMemoryPoolImpl { public: std::string backend_name() const override { return "mimalloc"; } }; class MimallocDebugMemoryPool : public BaseMemoryPoolImpl> { public: std::string backend_name() const override { return "mimalloc"; } }; #endif std::unique_ptr MemoryPool::CreateDefault() { auto backend = DefaultBackend(); switch (backend) { case MemoryPoolBackend::System: return IsDebugEnabled() ? std::unique_ptr(new SystemDebugMemoryPool) : std::unique_ptr(new SystemMemoryPool); #ifdef ARROW_JEMALLOC case MemoryPoolBackend::Jemalloc: return IsDebugEnabled() ? std::unique_ptr(new JemallocDebugMemoryPool) : std::unique_ptr(new JemallocMemoryPool); #endif #ifdef ARROW_MIMALLOC case MemoryPoolBackend::Mimalloc: return IsDebugEnabled() ? std::unique_ptr(new MimallocDebugMemoryPool) : std::unique_ptr(new MimallocMemoryPool); #endif default: ARROW_LOG(FATAL) << "Internal error: cannot create default memory pool"; return nullptr; } } static struct GlobalState { ~GlobalState() { finalizing_.store(true, std::memory_order_relaxed); } bool is_finalizing() const { return finalizing_.load(std::memory_order_relaxed); } MemoryPool* system_memory_pool() { if (IsDebugEnabled()) { return &system_debug_pool_; } else { return &system_pool_; } } #ifdef ARROW_JEMALLOC MemoryPool* jemalloc_memory_pool() { if (IsDebugEnabled()) { return &jemalloc_debug_pool_; } else { return &jemalloc_pool_; } } #endif #ifdef ARROW_MIMALLOC MemoryPool* mimalloc_memory_pool() { if (IsDebugEnabled()) { return &mimalloc_debug_pool_; } else { return &mimalloc_pool_; } } #endif private: std::atomic finalizing_{false}; // constructed first, destroyed last SystemMemoryPool system_pool_; SystemDebugMemoryPool system_debug_pool_; #ifdef ARROW_JEMALLOC JemallocMemoryPool jemalloc_pool_; JemallocDebugMemoryPool jemalloc_debug_pool_; #endif #ifdef ARROW_MIMALLOC MimallocMemoryPool mimalloc_pool_; MimallocDebugMemoryPool mimalloc_debug_pool_; #endif } global_state; MemoryPool* system_memory_pool() { return global_state.system_memory_pool(); } Status jemalloc_memory_pool(MemoryPool** out) { #ifdef ARROW_JEMALLOC *out = global_state.jemalloc_memory_pool(); return Status::OK(); #else return Status::NotImplemented("This Arrow build does not enable jemalloc"); #endif } Status mimalloc_memory_pool(MemoryPool** out) { #ifdef ARROW_MIMALLOC *out = global_state.mimalloc_memory_pool(); return Status::OK(); #else return Status::NotImplemented("This Arrow build does not enable mimalloc"); #endif } MemoryPool* default_memory_pool() { auto backend = DefaultBackend(); switch (backend) { case MemoryPoolBackend::System: return global_state.system_memory_pool(); #ifdef ARROW_JEMALLOC case MemoryPoolBackend::Jemalloc: return global_state.jemalloc_memory_pool(); #endif #ifdef ARROW_MIMALLOC case MemoryPoolBackend::Mimalloc: return global_state.mimalloc_memory_pool(); #endif default: ARROW_LOG(FATAL) << "Internal error: cannot create default memory pool"; return nullptr; } } #ifndef ARROW_JEMALLOC Status jemalloc_set_decay_ms(int ms) { return Status::NotImplemented("jemalloc support is not built"); } Result jemalloc_get_stat(const char* name) { return Status::NotImplemented("jemalloc support is not built"); } Status jemalloc_peak_reset() { return Status::NotImplemented("jemalloc support is not built"); } Status jemalloc_stats_print(const char* opts) { return Status::NotImplemented("jemalloc support is not built"); } Status jemalloc_stats_print(std::function write_cb, const char* opts) { return Status::NotImplemented("jemalloc support is not built"); } Result jemalloc_stats_string(const char* opts) { return Status::NotImplemented("jemalloc support is not built"); } #endif /////////////////////////////////////////////////////////////////////// // LoggingMemoryPool implementation LoggingMemoryPool::LoggingMemoryPool(MemoryPool* pool) : pool_(pool) {} Status LoggingMemoryPool::Allocate(int64_t size, int64_t alignment, uint8_t** out) { Status s = pool_->Allocate(size, alignment, out); std::cout << "Allocate: size = " << size << ", alignment = " << alignment << std::endl; return s; } Status LoggingMemoryPool::Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { Status s = pool_->Reallocate(old_size, new_size, ptr); std::cout << "Reallocate: old_size = " << old_size << ", new_size = " << new_size << ", alignment = " << alignment << std::endl; return s; } void LoggingMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) { pool_->Free(buffer, size, alignment); std::cout << "Free: size = " << size << ", alignment = " << alignment << std::endl; } void LoggingMemoryPool::ReleaseUnused() { pool_->ReleaseUnused(); } void LoggingMemoryPool::PrintStats() { pool_->PrintStats(); } int64_t LoggingMemoryPool::bytes_allocated() const { int64_t nb_bytes = pool_->bytes_allocated(); std::cout << "bytes_allocated: " << nb_bytes << std::endl; return nb_bytes; } int64_t LoggingMemoryPool::max_memory() const { int64_t mem = pool_->max_memory(); std::cout << "max_memory: " << mem << std::endl; return mem; } int64_t LoggingMemoryPool::total_bytes_allocated() const { int64_t mem = pool_->total_bytes_allocated(); std::cout << "total_bytes_allocated: " << mem << std::endl; return mem; } int64_t LoggingMemoryPool::num_allocations() const { int64_t mem = pool_->num_allocations(); std::cout << "num_allocations: " << mem << std::endl; return mem; } std::string LoggingMemoryPool::backend_name() const { return pool_->backend_name(); } /////////////////////////////////////////////////////////////////////// // ProxyMemoryPool implementation class ProxyMemoryPool::ProxyMemoryPoolImpl { public: explicit ProxyMemoryPoolImpl(MemoryPool* pool) : pool_(pool) {} Status Allocate(int64_t size, int64_t alignment, uint8_t** out) { RETURN_NOT_OK(pool_->Allocate(size, alignment, out)); stats_.DidAllocateBytes(size); return Status::OK(); } Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, alignment, ptr)); stats_.DidReallocateBytes(old_size, new_size); return Status::OK(); } void Free(uint8_t* buffer, int64_t size, int64_t alignment) { pool_->Free(buffer, size, alignment); stats_.DidFreeBytes(size); } void ReleaseUnused() { pool_->ReleaseUnused(); } void PrintStats() { // XXX these are the allocation stats for the underlying allocator, not // the subset allocated through the ProxyMemoryPool pool_->PrintStats(); } int64_t bytes_allocated() const { return stats_.bytes_allocated(); } int64_t max_memory() const { return stats_.max_memory(); } int64_t total_bytes_allocated() const { return stats_.total_bytes_allocated(); } int64_t num_allocations() const { return stats_.num_allocations(); } std::string backend_name() const { return pool_->backend_name(); } private: MemoryPool* pool_; internal::MemoryPoolStats stats_; }; ProxyMemoryPool::ProxyMemoryPool(MemoryPool* pool) { impl_.reset(new ProxyMemoryPoolImpl(pool)); } ProxyMemoryPool::~ProxyMemoryPool() {} Status ProxyMemoryPool::Allocate(int64_t size, int64_t alignment, uint8_t** out) { return impl_->Allocate(size, alignment, out); } Status ProxyMemoryPool::Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { return impl_->Reallocate(old_size, new_size, alignment, ptr); } void ProxyMemoryPool::Free(uint8_t* buffer, int64_t size, int64_t alignment) { return impl_->Free(buffer, size, alignment); } void ProxyMemoryPool::ReleaseUnused() { impl_->ReleaseUnused(); } void ProxyMemoryPool::PrintStats() { impl_->PrintStats(); } int64_t ProxyMemoryPool::bytes_allocated() const { return impl_->bytes_allocated(); } int64_t ProxyMemoryPool::max_memory() const { return impl_->max_memory(); } int64_t ProxyMemoryPool::total_bytes_allocated() const { return impl_->total_bytes_allocated(); } int64_t ProxyMemoryPool::num_allocations() const { return impl_->num_allocations(); } std::string ProxyMemoryPool::backend_name() const { return impl_->backend_name(); } std::vector SupportedMemoryBackendNames() { std::vector supported; for (const auto backend : SupportedBackends()) { supported.push_back(backend.name); } return supported; } // ----------------------------------------------------------------------- // Pool buffer and allocation /// A Buffer whose lifetime is tied to a particular MemoryPool class PoolBuffer final : public ResizableBuffer { public: explicit PoolBuffer(std::shared_ptr mm, MemoryPool* pool, int64_t alignment) : ResizableBuffer(nullptr, 0, std::move(mm)), pool_(pool), alignment_(alignment) {} ~PoolBuffer() override { // Avoid calling pool_->Free if the global pools are destroyed // (XXX this will not work with user-defined pools) // This can happen if a Future is destructing on one thread while or // after memory pools are destructed on the main thread (as there is // no guarantee of destructor order between thread/memory pools) uint8_t* ptr = mutable_data(); if (ptr && !global_state.is_finalizing()) { pool_->Free(ptr, capacity_, alignment_); } } Status Reserve(const int64_t capacity) override { if (capacity < 0) { return Status::Invalid("Negative buffer capacity: ", capacity); } uint8_t* ptr = mutable_data(); if (!ptr || capacity > capacity_) { ARROW_ASSIGN_OR_RAISE(int64_t new_capacity, RoundCapacity(capacity)); if (ptr) { RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, alignment_, &ptr)); } else { RETURN_NOT_OK(pool_->Allocate(new_capacity, alignment_, &ptr)); } data_ = ptr; capacity_ = new_capacity; } return Status::OK(); } Status Resize(const int64_t new_size, bool shrink_to_fit = true) override { if (ARROW_PREDICT_FALSE(new_size < 0)) { return Status::Invalid("Negative buffer resize: ", new_size); } uint8_t* ptr = mutable_data(); if (ptr && shrink_to_fit && new_size <= size_) { // Buffer is non-null and is not growing, so shrink to the requested size without // excess space. ARROW_ASSIGN_OR_RAISE(int64_t new_capacity, RoundCapacity(new_size)); if (capacity_ != new_capacity) { // Buffer hasn't got yet the requested size. RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, alignment_, &ptr)); data_ = ptr; capacity_ = new_capacity; } } else { RETURN_NOT_OK(Reserve(new_size)); } size_ = new_size; return Status::OK(); } static std::shared_ptr MakeShared(MemoryPool* pool, int64_t alignment) { std::shared_ptr mm; if (pool == nullptr) { pool = default_memory_pool(); mm = default_cpu_memory_manager(); } else { mm = CPUDevice::memory_manager(pool); } return std::make_shared(std::move(mm), pool, alignment); } static std::unique_ptr MakeUnique(MemoryPool* pool, int64_t alignment) { std::shared_ptr mm; if (pool == nullptr) { pool = default_memory_pool(); mm = default_cpu_memory_manager(); } else { mm = CPUDevice::memory_manager(pool); } return std::make_unique(std::move(mm), pool, alignment); } private: static Result RoundCapacity(int64_t capacity) { if (capacity > std::numeric_limits::max() - 63) { return Status::OutOfMemory("capacity too large"); } return bit_util::RoundUpToMultipleOf64(capacity); } MemoryPool* pool_; int64_t alignment_; }; namespace { // A utility that does most of the work of the `AllocateBuffer` and // `AllocateResizableBuffer` methods. The argument `buffer` should be a smart pointer to // a PoolBuffer. template inline Result ResizePoolBuffer(PoolBufferPtr&& buffer, const int64_t size) { RETURN_NOT_OK(buffer->Resize(size)); buffer->ZeroPadding(); return std::move(buffer); } } // namespace Result> AllocateBuffer(const int64_t size, MemoryPool* pool) { return AllocateBuffer(size, kDefaultBufferAlignment, pool); } Result> AllocateBuffer(const int64_t size, const int64_t alignment, MemoryPool* pool) { return ResizePoolBuffer>( PoolBuffer::MakeUnique(pool, alignment), size); } Result> AllocateResizableBuffer(const int64_t size, MemoryPool* pool) { return AllocateResizableBuffer(size, kDefaultBufferAlignment, pool); } Result> AllocateResizableBuffer(const int64_t size, const int64_t alignment, MemoryPool* pool) { return ResizePoolBuffer>( PoolBuffer::MakeUnique(pool, alignment), size); } } // namespace arrow20