1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
#include "chunked_output_stream.h"
#include <util/system/sanitizers.h>
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
TChunkedOutputStream::TChunkedOutputStream(
TRefCountedTypeCookie tagCookie,
size_t initialReserveSize,
size_t maxReserveSize)
: MaxReserveSize_(RoundUpToPage(maxReserveSize))
, CurrentReserveSize_(RoundUpToPage(initialReserveSize))
, CurrentChunk_(tagCookie, /*size*/ 0)
{
YT_VERIFY(MaxReserveSize_ > 0);
if (CurrentReserveSize_ > MaxReserveSize_) {
CurrentReserveSize_ = MaxReserveSize_;
}
}
std::vector<TSharedRef> TChunkedOutputStream::Finish()
{
FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
YT_ASSERT(CurrentChunk_.IsEmpty());
FinishedSize_ = 0;
for (const auto& chunk : FinishedChunks_) {
NSan::CheckMemIsInitialized(chunk.Begin(), chunk.Size());
}
return std::move(FinishedChunks_);
}
size_t TChunkedOutputStream::GetSize() const
{
return FinishedSize_ + CurrentChunk_.Size();
}
size_t TChunkedOutputStream::GetCapacity() const
{
return FinishedSize_ + CurrentChunk_.Capacity();
}
void TChunkedOutputStream::ReserveNewChunk(size_t spaceRequired)
{
YT_ASSERT(CurrentChunk_.Size() == CurrentChunk_.Capacity());
FinishedSize_ += CurrentChunk_.Size();
FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_);
CurrentChunk_.Reserve(std::max(RoundUpToPage(spaceRequired), CurrentReserveSize_));
}
void TChunkedOutputStream::DoWrite(const void* buffer, size_t length)
{
if (CurrentChunk_.Capacity() == 0) {
CurrentChunk_.Reserve(CurrentReserveSize_);
}
auto spaceAvailable = std::min(length, CurrentChunk_.Capacity() - CurrentChunk_.Size());
CurrentChunk_.Append(buffer, spaceAvailable);
auto spaceRequired = length - spaceAvailable;
if (spaceRequired > 0) {
ReserveNewChunk(spaceRequired);
CurrentChunk_.Append(static_cast<const char*>(buffer) + spaceAvailable, spaceRequired);
}
}
size_t TChunkedOutputStream::DoNext(void** ptr)
{
if (CurrentChunk_.Size() == CurrentChunk_.Capacity()) {
if (CurrentChunk_.Capacity() == 0) {
CurrentChunk_.Reserve(CurrentReserveSize_);
} else {
ReserveNewChunk(0);
}
}
auto spaceAvailable = CurrentChunk_.Capacity() - CurrentChunk_.Size();
YT_ASSERT(spaceAvailable > 0);
*ptr = CurrentChunk_.End();
CurrentChunk_.Resize(CurrentChunk_.Capacity(), /*initializeStorage*/ false);
return spaceAvailable;
}
void TChunkedOutputStream::DoUndo(size_t len)
{
YT_VERIFY(CurrentChunk_.Size() >= len);
CurrentChunk_.Resize(CurrentChunk_.Size() - len);
}
char* TChunkedOutputStream::Preallocate(size_t size)
{
size_t available = CurrentChunk_.Capacity() - CurrentChunk_.Size();
if (available < size) {
FinishedSize_ += CurrentChunk_.Size();
FinishedChunks_.push_back(TSharedRef::FromBlob(std::move(CurrentChunk_)));
CurrentReserveSize_ = std::min(2 * CurrentReserveSize_, MaxReserveSize_);
CurrentChunk_.Reserve(std::max(RoundUpToPage(size), CurrentReserveSize_));
}
return CurrentChunk_.End();
}
void TChunkedOutputStream::Advance(size_t size)
{
YT_ASSERT(CurrentChunk_.Size() + size <= CurrentChunk_.Capacity());
CurrentChunk_.Resize(CurrentChunk_.Size() + size, false);
}
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
|