aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/LimitSeekableReadBuffer.cpp
blob: 587138cb2cfbbaacd0a87cf2d2c6675cc39e9fac (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#include <IO/LimitSeekableReadBuffer.h>


namespace DB
{

namespace ErrorCodes
{
    extern const int ARGUMENT_OUT_OF_BOUND;
}

LimitSeekableReadBuffer::LimitSeekableReadBuffer(SeekableReadBuffer & in_, UInt64 start_offset_, UInt64 limit_size_)
    : LimitSeekableReadBuffer(wrapSeekableReadBufferReference(in_), start_offset_, limit_size_)
{
}

LimitSeekableReadBuffer::LimitSeekableReadBuffer(std::unique_ptr<SeekableReadBuffer> in_, UInt64 start_offset_, UInt64 limit_size_)
    : SeekableReadBuffer(in_->position(), 0)
    , in(std::move(in_))
    , min_offset(start_offset_)
    , max_offset(start_offset_ + limit_size_)
    , need_seek(min_offset) /// We always start reading from `min_offset`.
{
}

bool LimitSeekableReadBuffer::nextImpl()
{
    /// First let the nested buffer know the current position in the buffer (otherwise `in->eof()` or `in->seek()` below can work incorrectly).
    in->position() = position();

    if (need_seek)
    {
        /// Do actual seek.
        if (in->seek(*need_seek, SEEK_SET) != static_cast<off_t>(*need_seek))
        {
            /// Failed to seek, maybe because the new seek position is located after EOF.
            set(in->position(), 0);
            return false;
        }
        need_seek.reset();
    }

    off_t seek_pos = in->getPosition();
    off_t offset_after_min = seek_pos - min_offset;
    off_t available_before_max = max_offset - seek_pos;

    if (offset_after_min < 0 || available_before_max <= 0)
    {
        /// Limit reached.
        set(in->position(), 0);
        return false;
    }

    if (in->eof()) /// `in->eof()` can call `in->next()`
    {
        /// EOF reached.
        set(in->position(), 0);
        return false;
    }

    /// in->eof() shouldn't change the seek position.
    chassert(seek_pos == in->getPosition());

    /// Adjust the beginning and the end of the working buffer.
    /// Because we don't want to read before `min_offset` or after `max_offset`.
    auto * ptr = in->position();
    auto * begin = in->buffer().begin();
    auto * end = in->buffer().end();

    if (ptr - begin > offset_after_min)
        begin = ptr - offset_after_min;
    if (end - ptr > available_before_max)
        end = ptr + available_before_max;

    BufferBase::set(begin, end - begin, ptr - begin);
    chassert(position() == ptr && available());

    return true;
}

off_t LimitSeekableReadBuffer::seek(off_t off, int whence)
{
    off_t new_position;
    off_t current_position = getPosition();
    if (whence == SEEK_SET)
        new_position = off;
    else if (whence == SEEK_CUR)
        new_position = current_position + off;
    else
        throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Seek expects SEEK_SET or SEEK_CUR as whence");

    if (new_position < 0 || new_position + min_offset > max_offset)
        throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Seek shift out of bounds");

    off_t position_change = new_position - current_position;
    if ((buffer().begin() <= pos + position_change) && (pos + position_change <= buffer().end()))
    {
        /// Position is still inside the buffer.
        pos += position_change;
        chassert(pos >= working_buffer.begin());
        chassert(pos <= working_buffer.end());
        return new_position;
    }

    /// Actual seek in the nested buffer will be performed in nextImpl().
    need_seek = new_position + min_offset;

    /// Set the size of the working buffer to zero so next call next() would call nextImpl().
    set(in->position(), 0);

    return new_position;
}

off_t LimitSeekableReadBuffer::getPosition()
{
    if (need_seek)
        return *need_seek - min_offset;

    /// We have to do that because `in->getPosition()` below most likely needs to know the current position in the buffer.
    in->position() = position();

    return in->getPosition() - min_offset;
}

}