diff options
| author | babenko <[email protected]> | 2023-01-03 13:23:49 +0300 | 
|---|---|---|
| committer | babenko <[email protected]> | 2023-01-03 13:23:49 +0300 | 
| commit | 85dbae30a801094e01a9aa5e4ecb1be070420ed4 (patch) | |
| tree | 1381aac9994baae96382a5231e8b1596d876320f /library/cpp | |
| parent | 8ee4eaa91898ce3adcdf302ba33311fc9e627282 (diff) | |
More TChunkedMemoryPool, TChunkedMemoryAllocator, TChunkedMemoryPoolOutput to library
More TChunkedMemoryPool, TChunkedMemoryAllocator, TChunkedMemoryPoolOutput to library
wip
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/yt/memory/CMakeLists.darwin.txt | 4 | ||||
| -rw-r--r-- | library/cpp/yt/memory/CMakeLists.linux-aarch64.txt | 4 | ||||
| -rw-r--r-- | library/cpp/yt/memory/CMakeLists.linux.txt | 4 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_input_stream.cpp | 34 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_input_stream.h | 29 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_memory_allocator-inl.h | 44 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_memory_allocator.cpp | 57 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_memory_allocator.h | 71 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_memory_pool-inl.h | 142 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_memory_pool.cpp | 200 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_memory_pool.h | 159 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_memory_pool_output.cpp | 65 | ||||
| -rw-r--r-- | library/cpp/yt/memory/chunked_memory_pool_output.h | 43 | ||||
| -rw-r--r-- | library/cpp/yt/memory/public.h | 15 | ||||
| -rw-r--r-- | library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp | 39 | ||||
| -rw-r--r-- | library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp | 74 | 
16 files changed, 984 insertions, 0 deletions
| diff --git a/library/cpp/yt/memory/CMakeLists.darwin.txt b/library/cpp/yt/memory/CMakeLists.darwin.txt index 5bcfddec029..e3c3cae0f09 100644 --- a/library/cpp/yt/memory/CMakeLists.darwin.txt +++ b/library/cpp/yt/memory/CMakeLists.darwin.txt @@ -17,6 +17,10 @@ target_link_libraries(cpp-yt-memory PUBLIC  )  target_sources(cpp-yt-memory PRIVATE    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_input_stream.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_allocator.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool_output.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp diff --git a/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt b/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt index 0e7c58d94f1..2950017992b 100644 --- a/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt +++ b/library/cpp/yt/memory/CMakeLists.linux-aarch64.txt @@ -18,6 +18,10 @@ target_link_libraries(cpp-yt-memory PUBLIC  )  target_sources(cpp-yt-memory PRIVATE    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_input_stream.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_allocator.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool_output.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp diff --git a/library/cpp/yt/memory/CMakeLists.linux.txt b/library/cpp/yt/memory/CMakeLists.linux.txt index 0e7c58d94f1..2950017992b 100644 --- a/library/cpp/yt/memory/CMakeLists.linux.txt +++ b/library/cpp/yt/memory/CMakeLists.linux.txt @@ -18,6 +18,10 @@ target_link_libraries(cpp-yt-memory PUBLIC  )  target_sources(cpp-yt-memory PRIVATE    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/blob.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_input_stream.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_allocator.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool.cpp +  ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_memory_pool_output.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/chunked_output_stream.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref.cpp    ${CMAKE_SOURCE_DIR}/library/cpp/yt/memory/ref_tracked.cpp diff --git a/library/cpp/yt/memory/chunked_input_stream.cpp b/library/cpp/yt/memory/chunked_input_stream.cpp new file mode 100644 index 00000000000..ea579a46ab4 --- /dev/null +++ b/library/cpp/yt/memory/chunked_input_stream.cpp @@ -0,0 +1,34 @@ +#include "chunked_input_stream.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TChunkedInputStream::TChunkedInputStream(std::vector<TSharedRef> blocks) +    : Blocks_(std::move(blocks)) +{ } + +size_t TChunkedInputStream::DoNext(const void** ptr, size_t len) +{ +    SkipCompletedBlocks(); +    if (Index_ == Blocks_.size()) { +        *ptr = nullptr; +        return 0; +    } +    *ptr = Blocks_[Index_].Begin() + Position_; +    size_t toSkip = std::min(Blocks_[Index_].Size() - Position_, len); +    Position_ += toSkip; +    return toSkip; +} + +void TChunkedInputStream::SkipCompletedBlocks() +{ +    while (Index_ < Blocks_.size() && Position_ == Blocks_[Index_].Size()) { +        Index_ += 1; +        Position_ = 0; +    } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_input_stream.h b/library/cpp/yt/memory/chunked_input_stream.h new file mode 100644 index 00000000000..baef2a4c66f --- /dev/null +++ b/library/cpp/yt/memory/chunked_input_stream.h @@ -0,0 +1,29 @@ +#pragma once + +#include "ref.h" + +#include <util/stream/zerocopy.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TChunkedInputStream +    : public IZeroCopyInput +{ +public: +    explicit TChunkedInputStream(std::vector<TSharedRef> blocks); + +    size_t DoNext(const void** ptr, size_t len) override; + +private: +    const std::vector<TSharedRef> Blocks_; +    size_t Index_ = 0; +    size_t Position_ = 0; + +    void SkipCompletedBlocks(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_memory_allocator-inl.h b/library/cpp/yt/memory/chunked_memory_allocator-inl.h new file mode 100644 index 00000000000..fe66060b2d6 --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_allocator-inl.h @@ -0,0 +1,44 @@ +#ifndef CHUNKED_MEMORY_ALLOCATOR_INL_H_ +#error "Direct inclusion of this file is not allowed, include chunked_memory_allocator.h" +// For the sake of sane code completion. +#include "chunked_memory_allocator.h" +#endif + +#include "serialize.h" + +#include <util/system/align.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +inline TSharedMutableRef TChunkedMemoryAllocator::AllocateUnaligned(i64 size) +{ +    // Fast path. +    if (FreeZoneEnd_ >= FreeZoneBegin_ + size) { +        FreeZoneEnd_ -= size; +        return Chunk_.Slice(FreeZoneEnd_, FreeZoneEnd_ + size); +    } + +    // Slow path. +    return AllocateUnalignedSlow(size); +} + +inline TSharedMutableRef TChunkedMemoryAllocator::AllocateAligned(i64 size, int align) +{ +    // NB: This can lead to FreeZoneBegin_ >= FreeZoneEnd_ in which case the chunk is full. +    FreeZoneBegin_ = AlignUp(FreeZoneBegin_, align); + +    // Fast path. +    if (FreeZoneBegin_ + size <= FreeZoneEnd_) { +        FreeZoneBegin_ += size; +        return Chunk_.Slice(FreeZoneBegin_ - size, FreeZoneBegin_); +    } + +    // Slow path. +    return AllocateAlignedSlow(size, align); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_memory_allocator.cpp b/library/cpp/yt/memory/chunked_memory_allocator.cpp new file mode 100644 index 00000000000..3560f8096d9 --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_allocator.cpp @@ -0,0 +1,57 @@ +#include "chunked_memory_allocator.h" +#include "serialize.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +const i64 TChunkedMemoryAllocator::DefaultChunkSize = 4096; +const double TChunkedMemoryAllocator::DefaultMaxSmallBlockSizeRatio = 0.25; + +//////////////////////////////////////////////////////////////////////////////// + +TChunkedMemoryAllocator::TChunkedMemoryAllocator( +    i64 chunkSize, +    double maxSmallBlockSizeRatio, +    TRefCountedTypeCookie tagCookie) +    : ChunkSize_(chunkSize) +    , MaxSmallBlockSize_(static_cast<i64>(ChunkSize_ * maxSmallBlockSizeRatio)) +    , TagCookie_(tagCookie) +{  } + +TSharedMutableRef TChunkedMemoryAllocator::AllocateUnalignedSlow(i64 size) +{ +    auto large = AllocateSlowCore(size); +    if (large) { +        return large; +    } +    return AllocateUnaligned(size); +} + +TSharedMutableRef TChunkedMemoryAllocator::AllocateAlignedSlow(i64 size, int align) +{ +    // NB: Do not rely on any particular alignment of chunks. +    auto large = AllocateSlowCore(size + align); +    if (large) { +        auto* alignedBegin = AlignUp(large.Begin(), align); +        return large.Slice(alignedBegin, alignedBegin + size); +    } +    return AllocateAligned(size, align); +} + +TSharedMutableRef TChunkedMemoryAllocator::AllocateSlowCore(i64 size) +{ +    if (size > MaxSmallBlockSize_) { +        return TSharedMutableRef::Allocate(size, {.InitializeStorage = false}, TagCookie_); +    } + +    Chunk_ = TSharedMutableRef::Allocate(ChunkSize_, {.InitializeStorage = false}, TagCookie_); +    FreeZoneBegin_ = Chunk_.Begin(); +    FreeZoneEnd_ = Chunk_.End(); + +    return TSharedMutableRef(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_memory_allocator.h b/library/cpp/yt/memory/chunked_memory_allocator.h new file mode 100644 index 00000000000..33559d8aa7e --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_allocator.h @@ -0,0 +1,71 @@ +#pragma once + +#include "ref.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +struct TDefaultChunkedMemoryAllocatorTag +{ }; + +//! Mimics TChunkedMemoryPool but acts as an allocator returning shared refs. +class TChunkedMemoryAllocator +    : private TNonCopyable +{ +public: +    static const i64 DefaultChunkSize; +    static const double DefaultMaxSmallBlockSizeRatio; + +    explicit TChunkedMemoryAllocator( +        i64 chunkSize = DefaultChunkSize, +        double maxSmallBlockSizeRatio = DefaultMaxSmallBlockSizeRatio, +        TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultChunkedMemoryAllocatorTag>()); + +    template <class TTag> +    explicit TChunkedMemoryAllocator( +        TTag /*tag*/ = TTag(), +        i64 chunkSize = DefaultChunkSize, +        double maxSmallBlockSizeRatio = DefaultMaxSmallBlockSizeRatio) +        : TChunkedMemoryAllocator( +            chunkSize, +            maxSmallBlockSizeRatio, +            GetRefCountedTypeCookie<TTag>()) +    { } + +    //! Allocates #sizes bytes without any alignment. +    TSharedMutableRef AllocateUnaligned(i64 size); + +    //! Allocates #size bytes aligned with 8-byte granularity. +    TSharedMutableRef AllocateAligned(i64 size, int align = 8); + +private: +    const i64 ChunkSize_; +    const i64 MaxSmallBlockSize_; +    const TRefCountedTypeCookie TagCookie_; + +    char EmptyBuf_[0]; + +    // Chunk memory layout: +    //   |AAAA|....|UUUU| +    // Legend: +    //   A aligned allocations +    //   U unaligned allocations +    //   . free zone +    char* FreeZoneBegin_ = EmptyBuf_; +    char* FreeZoneEnd_ = EmptyBuf_; +    TSharedMutableRef Chunk_; + +    TSharedMutableRef AllocateUnalignedSlow(i64 size); +    TSharedMutableRef AllocateAlignedSlow(i64 size, int align); +    TSharedMutableRef AllocateSlowCore(i64 size); + +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define CHUNKED_MEMORY_ALLOCATOR_INL_H_ +#include "chunked_memory_allocator-inl.h" +#undef CHUNKED_MEMORY_ALLOCATOR_INL_H_ diff --git a/library/cpp/yt/memory/chunked_memory_pool-inl.h b/library/cpp/yt/memory/chunked_memory_pool-inl.h new file mode 100644 index 00000000000..267c3e92217 --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_pool-inl.h @@ -0,0 +1,142 @@ +#ifndef CHUNKED_MEMORY_POOL_INL_H_ +#error "Direct inclusion of this file is not allowed, include chunked_memory_pool.h" +// For the sake of sane code completion. +#include "chunked_memory_pool.h" +#endif + +#include "serialize.h" + +#include <library/cpp/yt/malloc/malloc.h> + +#include <util/system/align.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +inline void TAllocationHolder::operator delete(void* ptr) noexcept +{ +    ::free(ptr); +} + +inline TMutableRef TAllocationHolder::GetRef() const +{ +    return Ref_; +} + +template <class TDerived> +TDerived* TAllocationHolder::Allocate(size_t size, TRefCountedTypeCookie cookie) +{ +    auto requestedSize = sizeof(TDerived) + size; +    auto* ptr = ::malloc(requestedSize); +    auto allocatedSize = ::malloc_usable_size(ptr); +    if (allocatedSize) { +        size += allocatedSize - requestedSize; +    } + +    auto* instance = static_cast<TDerived*>(ptr); + +    try { +        new (instance) TDerived(TMutableRef(instance + 1, size), cookie); +    } catch (const std::exception& ex) { +        // Do not forget to free the memory. +        ::free(ptr); +        throw; +    } + +    return instance; +} + +//////////////////////////////////////////////////////////////////////////////// + +inline TChunkedMemoryPool::TChunkedMemoryPool() +    : TChunkedMemoryPool( +        GetRefCountedTypeCookie<TDefaultChunkedMemoryPoolTag>()) +{ } + +template <class TTag> +inline TChunkedMemoryPool::TChunkedMemoryPool( +    TTag, +    size_t startChunkSize) +    : TChunkedMemoryPool( +        GetRefCountedTypeCookie<TTag>(), +        startChunkSize) +{ } + +inline char* TChunkedMemoryPool::AllocateUnaligned(size_t size) +{ +    // Fast path. +    if (FreeZoneEnd_ >= FreeZoneBegin_ + size) { +        FreeZoneEnd_ -= size; +        Size_ += size; +        return FreeZoneEnd_; +    } + +    // Slow path. +    return AllocateUnalignedSlow(size); +} + +inline char* TChunkedMemoryPool::AllocateAligned(size_t size, int align) +{ +    // NB: This can lead to FreeZoneBegin_ >= FreeZoneEnd_ in which case the chunk is full. +    FreeZoneBegin_ = AlignUp(FreeZoneBegin_, align); + +    // Fast path. +    if (FreeZoneBegin_ + size <= FreeZoneEnd_) { +        char* result = FreeZoneBegin_; +        Size_ += size; +        FreeZoneBegin_ += size; +        return result; +    } + +    // Slow path. +    return AllocateAlignedSlow(size, align); +} + +template <class T> +inline T* TChunkedMemoryPool::AllocateUninitialized(int n, int align) +{ +    return reinterpret_cast<T*>(AllocateAligned(sizeof(T) * n, align)); +} + +template <class T> +inline TMutableRange<T> TChunkedMemoryPool::Capture(TRange<T> src, int align) +{ +    auto* dst = AllocateUninitialized<T>(src.Size(), align); +    ::memcpy(dst, src.Begin(), sizeof(T) * src.Size()); +    return TMutableRange<T>(dst, src.Size()); +} + +inline void TChunkedMemoryPool::Free(char* from, char* to) +{ +    if (FreeZoneBegin_ == to) { +        FreeZoneBegin_ = from; +    } +    if (FreeZoneEnd_ == from) { +        FreeZoneEnd_ = to; +    } +} + +inline void TChunkedMemoryPool::Clear() +{ +    Size_ = 0; + +    if (Chunks_.empty()) { +        FreeZoneBegin_ = nullptr; +        FreeZoneEnd_ = nullptr; +        NextChunkIndex_ = 0; +    } else { +        FreeZoneBegin_ = Chunks_.front()->GetRef().Begin(); +        FreeZoneEnd_ = Chunks_.front()->GetRef().End(); +        NextChunkIndex_ = 1; +    } + +    for (const auto& block : OtherBlocks_) { +        Capacity_ -= block->GetRef().Size(); +    } +    OtherBlocks_.clear(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_memory_pool.cpp b/library/cpp/yt/memory/chunked_memory_pool.cpp new file mode 100644 index 00000000000..a10a6fe724f --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_pool.cpp @@ -0,0 +1,200 @@ +#include "chunked_memory_pool.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TAllocationHolder::TAllocationHolder(TMutableRef ref, TRefCountedTypeCookie cookie) +    : Ref_(ref) +#ifdef YT_ENABLE_REF_COUNTED_TRACKING +    , Cookie_(cookie) +#endif +{ +#ifdef YT_ENABLE_REF_COUNTED_TRACKING +    if (Cookie_ != NullRefCountedTypeCookie) { +        TRefCountedTrackerFacade::AllocateTagInstance(Cookie_); +        TRefCountedTrackerFacade::AllocateSpace(Cookie_, Ref_.Size()); +    } +#endif +} + +TAllocationHolder::~TAllocationHolder() +{ +#ifdef YT_ENABLE_REF_COUNTED_TRACKING +    if (Cookie_ != NullRefCountedTypeCookie) { +        TRefCountedTrackerFacade::FreeTagInstance(Cookie_); +        TRefCountedTrackerFacade::FreeSpace(Cookie_, Ref_.Size()); +    } +#endif +} + +//////////////////////////////////////////////////////////////////////////////// + +class TDefaultMemoryChunkProvider +    : public IMemoryChunkProvider +{ +public: +    std::unique_ptr<TAllocationHolder> Allocate(size_t size, TRefCountedTypeCookie cookie) override +    { +        return std::unique_ptr<TAllocationHolder>(TAllocationHolder::Allocate<TAllocationHolder>(size, cookie)); +    } +}; + +const IMemoryChunkProviderPtr& GetDefaultMemoryChunkProvider() +{ +    static const IMemoryChunkProviderPtr Result = New<TDefaultMemoryChunkProvider>(); +    return Result; +} + +//////////////////////////////////////////////////////////////////////////////// + +TChunkedMemoryPool::TChunkedMemoryPool( +    TRefCountedTypeCookie tagCookie, +    IMemoryChunkProviderPtr chunkProvider, +    size_t startChunkSize) +    : TagCookie_(tagCookie) +    , ChunkProviderHolder_(std::move(chunkProvider)) +    , ChunkProvider_(ChunkProviderHolder_.Get()) +{ +    Initialize(startChunkSize); +} + +TChunkedMemoryPool::TChunkedMemoryPool( +    TRefCountedTypeCookie tagCookie, +    size_t startChunkSize) +    : TagCookie_(tagCookie) +    , ChunkProvider_(GetDefaultMemoryChunkProvider().Get()) +{ +    Initialize(startChunkSize); +} + +void TChunkedMemoryPool::Initialize(size_t startChunkSize) +{ +    NextSmallSize_ = startChunkSize; +    FreeZoneBegin_ = nullptr; +    FreeZoneEnd_ = nullptr; +} + +void TChunkedMemoryPool::Purge() +{ +    Chunks_.clear(); +    OtherBlocks_.clear(); +    Size_ = 0; +    Capacity_ = 0; +    NextChunkIndex_ = 0; +    FreeZoneBegin_ = nullptr; +    FreeZoneEnd_ = nullptr; +} + +char* TChunkedMemoryPool::AllocateUnalignedSlow(size_t size) +{ +    auto* large = AllocateSlowCore(size); +    if (large) { +        return large; +    } +    return AllocateUnaligned(size); +} + +char* TChunkedMemoryPool::AllocateAlignedSlow(size_t size, int align) +{ +    // NB: Do not rely on any particular alignment of chunks. +    auto* large = AllocateSlowCore(size + align); +    if (large) { +        return AlignUp(large, align); +    } +    return AllocateAligned(size, align); +} + +char* TChunkedMemoryPool::AllocateSlowCore(size_t size) +{ +    TMutableRef ref; +    if (size > RegularChunkSize) { +        auto block = ChunkProvider_->Allocate(size, TagCookie_); +        ref = block->GetRef(); +        Size_ += size; +        Capacity_ += ref.Size(); +        OtherBlocks_.push_back(std::move(block)); +        return ref.Begin(); +    } + +    YT_VERIFY(NextChunkIndex_ <= std::ssize(Chunks_)); + +    if (NextSmallSize_ < RegularChunkSize) { +        auto block = ChunkProvider_->Allocate(std::max(NextSmallSize_, size), TagCookie_); +        ref = block->GetRef(); +        Capacity_ += ref.Size(); +        OtherBlocks_.push_back(std::move(block)); +        NextSmallSize_ = 2 * ref.Size(); +    } else if (NextChunkIndex_ == std::ssize(Chunks_)) { +        auto chunk = ChunkProvider_->Allocate(RegularChunkSize, TagCookie_); +        ref = chunk->GetRef(); +        Capacity_ += ref.Size(); +        Chunks_.push_back(std::move(chunk)); +        ++NextChunkIndex_; +    } else { +        ref = Chunks_[NextChunkIndex_++]->GetRef(); +    } + +    FreeZoneBegin_ = ref.Begin(); +    FreeZoneEnd_ = ref.End(); + +    return nullptr; +} + +void TChunkedMemoryPool::Absorb(TChunkedMemoryPool&& other) +{ +    YT_VERIFY(ChunkProvider_ == other.ChunkProvider_); + +    OtherBlocks_.reserve(OtherBlocks_.size() + other.OtherBlocks_.size()); +    for (auto& block : other.OtherBlocks_) { +        OtherBlocks_.push_back(std::move(block)); +    } +    other.OtherBlocks_.clear(); + +    // Suppose that +    // - "A" is filled blocks of the current pool; +    // - "a" is free blocks of the current pool; +    // - "B" is filled blocks of the other pool; +    // - "b" is free blocks of the other pool. +    // Then, from the initial layouts "AA...Aaa...a" and "BB...Bbb...b" we obtain "BB..BAA..Aaa...abb...b". +    Chunks_.reserve(Chunks_.size() + other.Chunks_.size()); +    size_t oldSize = Chunks_.size(); +    for (auto& chunk : other.Chunks_) { +        Chunks_.push_back(std::move(chunk)); +    } +    // Transform "AA...Aaa...aBB...B" => "BB...BAA...Aaa...a" +    std::rotate(Chunks_.begin(), Chunks_.begin() + oldSize, Chunks_.begin() + oldSize + other.NextChunkIndex_); +    if (NextChunkIndex_ == 0) { +        FreeZoneBegin_ = other.FreeZoneBegin_; +        FreeZoneEnd_ = other.FreeZoneEnd_; +    } +    NextChunkIndex_ += other.NextChunkIndex_; +    other.Chunks_.clear(); +    other.FreeZoneBegin_ = nullptr; +    other.FreeZoneEnd_ = nullptr; +    other.NextChunkIndex_ = 0; + +    Size_ += other.Size_; +    Capacity_ += other.Capacity_; +    other.Size_ = 0; +    other.Capacity_ = 0; +} + +size_t TChunkedMemoryPool::GetSize() const +{ +    return Size_; +} + +size_t TChunkedMemoryPool::GetCapacity() const +{ +    return Capacity_; +} + +size_t TChunkedMemoryPool::GetCurrentChunkSpareSize() const +{ +    return FreeZoneEnd_ - FreeZoneBegin_; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/chunked_memory_pool.h b/library/cpp/yt/memory/chunked_memory_pool.h new file mode 100644 index 00000000000..a22e2ec27c4 --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_pool.h @@ -0,0 +1,159 @@ +#pragma once + +#include "public.h" +#include "ref.h" + +#include <util/generic/size_literals.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +struct TDefaultChunkedMemoryPoolTag { }; + +// TAllocationHolder is polymorphic. So we cannot use TWithExtraSpace mixin +// because it needs the most derived type as a template argument and +// it would require GetExtraSpacePtr/GetRef methods to be virtual. + +class TAllocationHolder +{ +public: +    TAllocationHolder(TMutableRef ref, TRefCountedTypeCookie cookie); +    TAllocationHolder(const TAllocationHolder&) = delete; +    TAllocationHolder(TAllocationHolder&&) = default; +    virtual ~TAllocationHolder(); + +    void operator delete(void* ptr) noexcept; + +    TMutableRef GetRef() const; + +    template <class TDerived> +    static TDerived* Allocate(size_t size, TRefCountedTypeCookie cookie); + +private: +    const TMutableRef Ref_; +#ifdef YT_ENABLE_REF_COUNTED_TRACKING +    const TRefCountedTypeCookie Cookie_; +#endif +}; + +//////////////////////////////////////////////////////////////////////////////// + +struct IMemoryChunkProvider +    : public TRefCounted +{ +    virtual std::unique_ptr<TAllocationHolder> Allocate(size_t size, TRefCountedTypeCookie cookie) = 0; +}; + +DEFINE_REFCOUNTED_TYPE(IMemoryChunkProvider) + +const IMemoryChunkProviderPtr& GetDefaultMemoryChunkProvider(); + +//////////////////////////////////////////////////////////////////////////////// + +class TChunkedMemoryPool +    : private TNonCopyable +{ +public: +    static constexpr size_t DefaultStartChunkSize = 4_KB; +    static constexpr size_t RegularChunkSize = 36_KB - 512; + +    TChunkedMemoryPool( +        TRefCountedTypeCookie tagCookie, +        IMemoryChunkProviderPtr chunkProvider, +        size_t startChunkSize = DefaultStartChunkSize); + +    explicit TChunkedMemoryPool( +        TRefCountedTypeCookie tagCookie, +        size_t startChunkSize = DefaultStartChunkSize); + +    TChunkedMemoryPool(); + +    template <class TTag> +    explicit TChunkedMemoryPool( +        TTag, +        size_t startChunkSize = DefaultStartChunkSize); + +    //! Allocates #sizes bytes without any alignment. +    char* AllocateUnaligned(size_t size); + +    //! Allocates #size bytes aligned with 8-byte granularity. +    char* AllocateAligned(size_t size, int align = 8); + +    //! Allocates #n uninitialized instances of #T. +    template <class T> +    T* AllocateUninitialized(int n, int align = alignof(T)); + +    //! Allocates space and copies #src inside it. +    template <class T> +    TMutableRange<T> Capture(TRange<T> src, int align = alignof(T)); + +    //! Frees memory range if possible: namely, if the free region is a suffix of last allocated region. +    void Free(char* from, char* to); + +    //! Marks all previously allocated small chunks as free for subsequent allocations but +    //! does not deallocate them. +    //! Purges all large blocks. +    void Clear(); + +    //! Purges all allocated memory, including small chunks. +    void Purge(); + +    //! Returns the number of allocated bytes. +    size_t GetSize() const; + +    //! Returns the number of reserved bytes. +    size_t GetCapacity() const; + +    //! Returns the number of bytes that can be acquired in the current chunk +    //! without additional allocations. +    size_t GetCurrentChunkSpareSize() const; + +    //! Moves all the allocated memory from other memory pool to the current one. +    //! The other pool becomes empty, like after Purge() call. +    void Absorb(TChunkedMemoryPool&& other); + +private: +    const TRefCountedTypeCookie TagCookie_; +    // A common usecase is to construct TChunkedMemoryPool with the default +    // memory chunk provider. The latter is ref-counted and is shared between +    // a multitude of TChunkedMemoryPool instances. This could potentially +    // lead to a contention over IMemoryChunkProvider's ref-counter. +    // To circumvent this, we keep both an owning (#ChunkProviderHolder_) and +    // a non-owning (#ChunkProvider_) reference to the underlying provider. +    // In case of the default chunk provider, the owning reference is not used. +    const IMemoryChunkProviderPtr ChunkProviderHolder_; +    IMemoryChunkProvider* const ChunkProvider_; + +    int NextChunkIndex_ = 0; +    size_t NextSmallSize_; + +    size_t Size_ = 0; +    size_t Capacity_ = 0; + +    // Chunk memory layout: +    //   |AAAA|....|UUUU| +    // Legend: +    //   A aligned allocations +    //   U unaligned allocations +    //   . free zone +    char* FreeZoneBegin_; +    char* FreeZoneEnd_; + +    std::vector<std::unique_ptr<TAllocationHolder>> Chunks_; +    std::vector<std::unique_ptr<TAllocationHolder>> OtherBlocks_; + +    void Initialize(size_t startChunkSize); + +    char* AllocateUnalignedSlow(size_t size); +    char* AllocateAlignedSlow(size_t size, int align); +    char* AllocateSlowCore(size_t size); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define CHUNKED_MEMORY_POOL_INL_H_ +#include "chunked_memory_pool-inl.h" +#undef CHUNKED_MEMORY_POOL_INL_H_ diff --git a/library/cpp/yt/memory/chunked_memory_pool_output.cpp b/library/cpp/yt/memory/chunked_memory_pool_output.cpp new file mode 100644 index 00000000000..332096445ab --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_pool_output.cpp @@ -0,0 +1,65 @@ +#include "chunked_memory_pool_output.h" + +#include "chunked_memory_pool.h" + +#include <library/cpp/yt/memory/ref.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TChunkedMemoryPoolOutput::TChunkedMemoryPoolOutput(TChunkedMemoryPool* pool, size_t chunkSize) +    : Pool_(pool) +    , ChunkSize_(chunkSize) +{ } + +size_t TChunkedMemoryPoolOutput::DoNext(void** ptr) +{ +    // Check if the current chunk is exhausted. +    if (Current_ == End_) { +        // Emplace the (whole) last chunk, if any. +        if (Begin_) { +            Refs_.emplace_back(Begin_, Current_); +        } +        // Allocate a new chunk. +        // Use |AllocateAligned| to get a chance to free some memory afterwards. +        // Tune the number of bytes requested from the pool to try avoid allocations. +        auto spareSize = Pool_->GetCurrentChunkSpareSize(); +        auto allocationSize = (spareSize == 0 ? ChunkSize_ : std::min(ChunkSize_, spareSize)); +        Begin_ = Pool_->AllocateAligned(allocationSize, /* align */ 1); +        Current_ = Begin_; +        End_ = Begin_ + allocationSize; +    } + +    // Return the unused part of the current chunk. +    // This could be the whole chunk allocated above. +    *ptr = Current_; +    auto size = End_ - Current_; +    Current_ = End_; +    return size; +} + +void TChunkedMemoryPoolOutput::DoUndo(size_t size) +{ +    // Just rewind the current pointer. +    Current_ -= size; +    YT_VERIFY(Current_ >= Begin_); +} + +std::vector<TMutableRef> TChunkedMemoryPoolOutput::Finish() +{ +    // Emplace the used part of the last chunk, if any. +    if (Begin_) { +        Refs_.emplace_back(Begin_, Current_); +    } +    // Try to free the unused part of the last chunk, if possible. +    if (Current_ < End_) { +        Pool_->Free(Current_, End_); +    } +    return std::move(Refs_); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + diff --git a/library/cpp/yt/memory/chunked_memory_pool_output.h b/library/cpp/yt/memory/chunked_memory_pool_output.h new file mode 100644 index 00000000000..774b21788e7 --- /dev/null +++ b/library/cpp/yt/memory/chunked_memory_pool_output.h @@ -0,0 +1,43 @@ +#pragma once + +#include "public.h" +#include "ref.h" + +#include <util/stream/zerocopy_output.h> + +#include <util/generic/size_literals.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TChunkedMemoryPoolOutput +    : public IZeroCopyOutput +{ +public: +    static constexpr size_t DefaultChunkSize = 4_KB; + +    explicit TChunkedMemoryPoolOutput( +        TChunkedMemoryPool* pool, +        size_t chunkSize = DefaultChunkSize); + +    std::vector<TMutableRef> Finish(); + +private: +    size_t DoNext(void** ptr) override; +    void DoUndo(size_t size) override; + +private: +    TChunkedMemoryPool* const Pool_; +    const size_t ChunkSize_; + +    char* Begin_ = nullptr; +    char* Current_ = nullptr; +    char* End_ = nullptr; +    std::vector<TMutableRef> Refs_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + diff --git a/library/cpp/yt/memory/public.h b/library/cpp/yt/memory/public.h new file mode 100644 index 00000000000..86965457f87 --- /dev/null +++ b/library/cpp/yt/memory/public.h @@ -0,0 +1,15 @@ +#pragma once + +#include "ref_counted.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TChunkedMemoryPool; + +DECLARE_REFCOUNTED_STRUCT(IMemoryChunkProvider) + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp b/library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp new file mode 100644 index 00000000000..839fe06ce6d --- /dev/null +++ b/library/cpp/yt/memory/unittests/chunked_memory_pool_output_ut.cpp @@ -0,0 +1,39 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <library/cpp/yt/memory/chunked_memory_pool.h> +#include <library/cpp/yt/memory/chunked_memory_pool_output.h> + +namespace NYT { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TChunkedMemoryPoolOutputTest, Basic) +{ +    constexpr size_t PoolChunkSize = 10; +    constexpr size_t PoolOutputChunkSize = 7; +    TChunkedMemoryPool pool(NullRefCountedTypeCookie, PoolChunkSize); +    TChunkedMemoryPoolOutput output(&pool, PoolOutputChunkSize); + +    TString s1("Short."); +    output.Write(s1); + +    TString s2("Quite a long string."); +    output.Write(s2); + +    char* buf; +    auto len = output.Next(&buf); +    output.Undo(len); + +    auto chunks = output.Finish(); +    TString s; +    for (auto chunk : chunks) { +        s += TString(chunk.Begin(), chunk.End()); +    } +    ASSERT_EQ(s1 + s2, s); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT diff --git a/library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp b/library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp new file mode 100644 index 00000000000..63562cfb967 --- /dev/null +++ b/library/cpp/yt/memory/unittests/chunked_memory_pool_ut.cpp @@ -0,0 +1,74 @@ +#include <yt/yt/core/test_framework/framework.h> + +#include <library/cpp/yt/memory/chunked_memory_pool.h> + +namespace NYT { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TChunkedMemoryPoolTest, Absorb) +{ +    TChunkedMemoryPool first; +    TChunkedMemoryPool second; +    TChunkedMemoryPool third; + +    std::vector<std::pair<TStringBuf, TString>> tests; +    size_t totalSize = 0; + +    auto fillPool = [&] (TChunkedMemoryPool& pool, TString prefix, int count) { +        for (int i = 0; i < count; i++) { +            TString expected = prefix + ToString(count); +            char* buf = pool.AllocateUnaligned(expected.Size()); +            ::memcpy(buf, expected.c_str(), expected.size()); +            TStringBuf ref(buf, buf + expected.size()); +            totalSize += expected.size(); +            tests.emplace_back(std::move(ref), std::move(expected)); +        } +    }; + +    auto checkAll = [&] { +        ASSERT_GE(first.GetCapacity(), first.GetSize()); +        ASSERT_GE(second.GetCapacity(), second.GetSize()); +        ASSERT_GE(third.GetCapacity(), third.GetSize()); +        ASSERT_EQ(totalSize, first.GetSize() + second.GetSize() + third.GetSize()); + +        for (const auto& [ref, expected] : tests) { +            ASSERT_EQ(ref, expected); +        } +    }; + +    auto warmup = [&] (TChunkedMemoryPool& pool, int blockSize, int count) { +        ASSERT_EQ(pool.GetSize(), 0U); +        for (int i = 0; i < count; i++) { +            pool.AllocateUnaligned(blockSize); +        } +        pool.Clear(); +        ASSERT_EQ(pool.GetSize(), 0U); +    }; + +    warmup(first, 20, 20'000); +    warmup(second, 20, 20'000); +    fillPool(first, "firstPool", 10'000); +    fillPool(second, "secondPool", 10'000); +    fillPool(third, "thirdPool", 10'000); +    checkAll(); +    second.Absorb(std::move(third)); +    checkAll(); +    first.Absorb(std::move(second)); +    checkAll(); +    fillPool(first, "poolFirst_2", 10'000); +    fillPool(second, "poolSecond_2", 10'000); +    fillPool(third, "poolThird_2", 10'000); +    checkAll(); +    third.Absorb(std::move(second)); +    checkAll(); +    fillPool(second, "someStr2", 10'000); +    fillPool(third, "someStr3", 10'000); +    checkAll(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT | 
