aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/yt/memory/chunked_output_stream.cpp
blob: 90ec36fe23f3919e08168bbe523e7df33bbf7849 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#include "chunked_output_stream.h"

#include <util/system/sanitizers.h>

namespace NYT {

////////////////////////////////////////////////////////////////////////////////

TChunkedOutputStream::TChunkedOutputStream(
    TRefCountedTypeCookie tagCookie,
    size_t initialReserveSize,
    size_t maxReserveSize)
    : MaxReserveSize_(RoundUpToPage(maxReserveSize))
    , CurrentReserveSize_(RoundUpToPage(initialReserveSize))
    , CurrentChunk_(tagCookie, /*size*/ 0)
{
    YT_VERIFY(MaxReserveSize_ > 0);

    if (CurrentReserveSize_ > MaxReserveSize_) {
        CurrentReserveSize_ = MaxReserveSize_;
    }
}

std::vector<TSharedRef> TChunkedOutputStream::Finish()
{
    FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));

    YT_ASSERT(CurrentChunk_.IsEmpty());
    FinishedSize_ = 0;

    for (const auto& chunk : FinishedChunks_) {
        NSan::CheckMemIsInitialized(chunk.Begin(), chunk.Size());
    }

    return std::move(FinishedChunks_);
}

size_t TChunkedOutputStream::GetSize() const
{
    return FinishedSize_ + CurrentChunk_.Size();
}

size_t TChunkedOutputStream::GetCapacity() const
{
    return FinishedSize_ + CurrentChunk_.Capacity();
}

void TChunkedOutputStream::ReserveNewChunk(size_t spaceRequired)
{
    YT_ASSERT(CurrentChunk_.Size() == CurrentChunk_.Capacity());
    FinishedSize_ += CurrentChunk_.Size();
    FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
    CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_);
    CurrentChunk_.Reserve(std::max(RoundUpToPage(spaceRequired), CurrentReserveSize_));
}

void TChunkedOutputStream::DoWrite(const void* buffer, size_t length)
{
    if (CurrentChunk_.Capacity() == 0) {
        CurrentChunk_.Reserve(CurrentReserveSize_);
    }

    auto spaceAvailable = std::min(length, CurrentChunk_.Capacity() - CurrentChunk_.Size());
    CurrentChunk_.Append(buffer, spaceAvailable);

    auto spaceRequired = length - spaceAvailable;
    if (spaceRequired > 0) {
        ReserveNewChunk(spaceRequired);
        CurrentChunk_.Append(static_cast<const char*>(buffer) + spaceAvailable, spaceRequired);
    }
}

size_t TChunkedOutputStream::DoNext(void** ptr)
{
    if (CurrentChunk_.Size() == CurrentChunk_.Capacity()) {
        if (CurrentChunk_.Capacity() == 0) {
            CurrentChunk_.Reserve(CurrentReserveSize_);
        } else {
            ReserveNewChunk(0);
        }
    }

    auto spaceAvailable = CurrentChunk_.Capacity() - CurrentChunk_.Size();
    YT_ASSERT(spaceAvailable > 0);
    *ptr = CurrentChunk_.End();
    CurrentChunk_.Resize(CurrentChunk_.Capacity(), /*initializeStorage*/ false);
    return spaceAvailable;
}

void TChunkedOutputStream::DoUndo(size_t len)
{
    YT_VERIFY(CurrentChunk_.Size() >= len);
    CurrentChunk_.Resize(CurrentChunk_.Size() - len);
}

char* TChunkedOutputStream::Preallocate(size_t size)
{
    size_t available = CurrentChunk_.Capacity() - CurrentChunk_.Size();
    if (available < size) {
        FinishedSize_ += CurrentChunk_.Size();
        FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));

        CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_);

        CurrentChunk_.Reserve(std::max(RoundUpToPage(size), CurrentReserveSize_));
    }
    return CurrentChunk_.End();
}

void TChunkedOutputStream::Advance(size_t size)
{
    YT_ASSERT(CurrentChunk_.Size() + size <= CurrentChunk_.Capacity());
    CurrentChunk_.Resize(CurrentChunk_.Size() + size, false);
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT