blob: 32a8091e65e02d6e27ca79137364003d40caf58a (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
  | 
#pragma once
#include "blob.h"
#include "memory_usage_tracker.h"
#include <library/cpp/yt/memory/ref.h>
#include <util/stream/zerocopy_output.h>
#include <util/generic/size_literals.h>
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
struct TDefaultChunkedOutputStreamTag
{ };
class TChunkedOutputStream
    : public IZeroCopyOutput
{
public:
    explicit TChunkedOutputStream(
        TRefCountedTypeCookie tagCookie = GetRefCountedTypeCookie<TDefaultChunkedOutputStreamTag>(),
        IMemoryUsageTrackerPtr memoryUsageTracker = GetNullMemoryUsageTracker(),
        size_t initialReserveSize = 4_KB,
        size_t maxReserveSize = 64_KB);
    TChunkedOutputStream(TChunkedOutputStream&&) = default;
    TChunkedOutputStream& operator=(TChunkedOutputStream&&) = default;
    //! Returns a sequence of written chunks.
    //! The stream is no longer usable after this call.
    std::vector<TSharedRef> Finish();
    //! Returns the number of bytes actually written.
    size_t GetSize() const;
    //! Returns the number of bytes actually written plus unused capacity in the
    //! last chunk.
    size_t GetCapacity() const;
    //! Returns a pointer to a contiguous memory block of a given #size.
    //! Do not forget to call #Advance after use.
    char* Preallocate(size_t size);
    //! Marks #size bytes (which were previously preallocated) as used.
    void Advance(size_t size);
private:
    const IMemoryUsageTrackerPtr MemoryUsageTracker_;
    TMemoryUsageTrackerGuard CurrentChunkMemoryUsageGuard_;
    size_t MaxReserveSize_;
    size_t CurrentReserveSize_;
    size_t FinishedSize_ = 0;
    TBlob CurrentChunk_;
    std::vector<TSharedRef> FinishedChunks_;
    void ReserveNewChunk(size_t spaceRequired);
    void UpdateCurrentChunkMemoryUsage();
    void DoWrite(const void* buf, size_t len) override;
    size_t DoNext(void** ptr) override;
    void DoUndo(size_t len) override;
};
////////////////////////////////////////////////////////////////////////////////
} // namespace NYT
  |