aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/LimitReadBuffer.cpp
blob: e14112f8d19529de946baf245458083f340d58d5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include <IO/LimitReadBuffer.h>

#include <Common/Exception.h>


namespace DB
{

namespace ErrorCodes
{
    extern const int LIMIT_EXCEEDED;
    extern const int CANNOT_READ_ALL_DATA;
}


bool LimitReadBuffer::nextImpl()
{
    assert(position() >= in->position());

    /// Let underlying buffer calculate read bytes in `next()` call.
    in->position() = position();

    if (bytes >= limit)
    {
        if (exact_limit && bytes == *exact_limit)
            return false;

        if (exact_limit && bytes != *exact_limit)
            throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected data, got {} bytes, expected {}", bytes, *exact_limit);

        if (throw_exception)
            throw Exception(ErrorCodes::LIMIT_EXCEEDED, "Limit for LimitReadBuffer exceeded: {}", exception_message);

        return false;
    }

    if (!in->next())
    {
        if (exact_limit && bytes != *exact_limit)
            throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Unexpected EOF, got {} of {} bytes", bytes, *exact_limit);
        /// Clearing the buffer with existing data.
        set(in->position(), 0);
        return false;
    }

    working_buffer = in->buffer();

    if (limit - bytes < working_buffer.size())
        working_buffer.resize(limit - bytes);

    return true;
}


LimitReadBuffer::LimitReadBuffer(ReadBuffer * in_, bool owns, UInt64 limit_, bool throw_exception_,
                                 std::optional<size_t> exact_limit_, std::string exception_message_)
    : ReadBuffer(in_ ? in_->position() : nullptr, 0)
    , in(in_)
    , owns_in(owns)
    , limit(limit_)
    , throw_exception(throw_exception_)
    , exact_limit(exact_limit_)
    , exception_message(std::move(exception_message_))
{
    assert(in);

    size_t remaining_bytes_in_buffer = in->buffer().end() - in->position();
    if (remaining_bytes_in_buffer > limit)
        remaining_bytes_in_buffer = limit;

    working_buffer = Buffer(in->position(), in->position() + remaining_bytes_in_buffer);
}


LimitReadBuffer::LimitReadBuffer(ReadBuffer & in_, UInt64 limit_, bool throw_exception_,
                                 std::optional<size_t> exact_limit_, std::string exception_message_)
    : LimitReadBuffer(&in_, false, limit_, throw_exception_, exact_limit_, exception_message_)
{
}


LimitReadBuffer::LimitReadBuffer(std::unique_ptr<ReadBuffer> in_, UInt64 limit_, bool throw_exception_,
                                 std::optional<size_t> exact_limit_, std::string exception_message_)
    : LimitReadBuffer(in_.release(), true, limit_, throw_exception_, exact_limit_, exception_message_)
{
}


LimitReadBuffer::~LimitReadBuffer()
{
    /// Update underlying buffer's position in case when limit wasn't reached.
    if (!working_buffer.empty())
        in->position() = position();

    if (owns_in)
        delete in;
}

}