aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/PeekableReadBuffer.h
blob: 78cb319327df4fcaed85bf88c248290dbb708646 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#pragma once
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <stack>

namespace DB
{

/// Also allows to set checkpoint at some position in stream and come back to this position later.
/// When next() is called, saves data between checkpoint and current position to own memory and loads next data to sub-buffer
/// Sub-buffer should not be accessed directly during the lifetime of peekable buffer (unless
/// you reset() the state of peekable buffer after each change of underlying buffer)
/// If position() of peekable buffer is explicitly set to some position before checkpoint
/// (e.g. by istr.position() = prev_pos), behavior is undefined.
class PeekableReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
    friend class PeekableReadBufferCheckpoint;
public:
    explicit PeekableReadBuffer(ReadBuffer & sub_buf_, size_t start_size_ = 0);

    ~PeekableReadBuffer() override;

    void prefetch(Priority priority) override { sub_buf->prefetch(priority); }

    /// Sets checkpoint at current position
    ALWAYS_INLINE inline void setCheckpoint()
    {
        if (checkpoint)
        {
            /// Recursive checkpoints. We just remember offset from the
            /// first checkpoint to the current position.
            recursive_checkpoints_offsets.push(offsetFromCheckpoint());
            return;
        }

        checkpoint_in_own_memory = currentlyReadFromOwnMemory();
        if (!checkpoint_in_own_memory)
        {
            /// Don't need to store unread data anymore
            peeked_size = 0;
        }
        checkpoint.emplace(pos);
    }

    /// Forget checkpoint and all data between checkpoint and position
    ALWAYS_INLINE inline void dropCheckpoint()
    {
        assert(checkpoint);

        if (!recursive_checkpoints_offsets.empty())
        {
            recursive_checkpoints_offsets.pop();
            return;
        }

        if (!currentlyReadFromOwnMemory())
        {
            /// Don't need to store unread data anymore
            peeked_size = 0;
        }
        checkpoint = std::nullopt;
        checkpoint_in_own_memory = false;
    }

    /// Sets position at checkpoint.
    /// All pointers (such as this->buffer().end()) may be invalidated
    void rollbackToCheckpoint(bool drop = false);

    /// If checkpoint and current position are in different buffers, appends data from sub-buffer to own memory,
    /// so data between checkpoint and position will be in continuous memory.
    void makeContinuousMemoryFromCheckpointToPos();

    /// Returns true if there unread data extracted from sub-buffer in own memory.
    /// This data will be lost after destruction of peekable buffer.
    bool hasUnreadData() const;

    // for streaming reading (like in Kafka) we need to restore initial state of the buffer
    // without recreating the buffer.
    void reset();

    void setSubBuffer(ReadBuffer & sub_buf_);

    const ReadBuffer & getSubBuffer() const { return *sub_buf; }

private:
    bool nextImpl() override;

    void resetImpl();

    bool peekNext();

    inline bool useSubbufferOnly() const { return !peeked_size; }
    inline bool currentlyReadFromOwnMemory() const { return working_buffer.begin() != sub_buf->buffer().begin(); }
    inline bool checkpointInOwnMemory() const { return checkpoint_in_own_memory; }

    void checkStateCorrect() const;

    /// Makes possible to append `bytes_to_append` bytes to data in own memory.
    /// Updates all invalidated pointers and sizes.
    void resizeOwnMemoryIfNecessary(size_t bytes_to_append);

    char * getMemoryData() { return use_stack_memory ? stack_memory : memory.data(); }
    const char * getMemoryData() const { return use_stack_memory ? stack_memory : memory.data(); }

    size_t offsetFromCheckpointInOwnMemory() const;
    size_t offsetFromCheckpoint() const;


    ReadBuffer * sub_buf;
    size_t peeked_size = 0;
    std::optional<Position> checkpoint = std::nullopt;
    bool checkpoint_in_own_memory = false;

    /// To prevent expensive and in some cases unnecessary memory allocations on PeekableReadBuffer
    /// creation (for example if PeekableReadBuffer is often created or if we need to remember small amount of
    /// data after checkpoint), at the beginning we will use small amount of memory on stack and allocate
    /// larger buffer only if reserved memory is not enough.
    char stack_memory[PADDING_FOR_SIMD];
    bool use_stack_memory = true;

    std::stack<size_t> recursive_checkpoints_offsets;
};


class PeekableReadBufferCheckpoint : boost::noncopyable
{
    PeekableReadBuffer & buf;
    bool auto_rollback;
public:
    explicit PeekableReadBufferCheckpoint(PeekableReadBuffer & buf_, bool auto_rollback_ = false)
                : buf(buf_), auto_rollback(auto_rollback_) { buf.setCheckpoint(); }
    ~PeekableReadBufferCheckpoint()
    {
        if (!buf.checkpoint)
            return;
        if (auto_rollback)
            buf.rollbackToCheckpoint();
        buf.dropCheckpoint();
    }

};

}