aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/ConcatSeekableReadBuffer.cpp
blob: ec2793898fe9ddf7fdfcbf75dea1f76d806e8dd5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#include <IO/ConcatSeekableReadBuffer.h>


namespace DB
{

namespace ErrorCodes
{
    extern const int ARGUMENT_OUT_OF_BOUND;
}

ConcatSeekableReadBuffer::BufferInfo::BufferInfo(BufferInfo && src) noexcept
    : in(std::exchange(src.in, nullptr)), own_in(std::exchange(src.own_in, false)), size(std::exchange(src.size, 0))
{
}

ConcatSeekableReadBuffer::BufferInfo::~BufferInfo()
{
    if (own_in)
        delete in;
}

ConcatSeekableReadBuffer::ConcatSeekableReadBuffer(std::unique_ptr<SeekableReadBuffer> buf1, size_t size1, std::unique_ptr<SeekableReadBuffer> buf2, size_t size2) : ConcatSeekableReadBuffer()
{
    appendBuffer(std::move(buf1), size1);
    appendBuffer(std::move(buf2), size2);
}

ConcatSeekableReadBuffer::ConcatSeekableReadBuffer(SeekableReadBuffer & buf1, size_t size1, SeekableReadBuffer & buf2, size_t size2) : ConcatSeekableReadBuffer()
{
    appendBuffer(buf1, size1);
    appendBuffer(buf2, size2);
}

void ConcatSeekableReadBuffer::appendBuffer(std::unique_ptr<SeekableReadBuffer> buffer, size_t size)
{
    appendBuffer(buffer.release(), true, size);
}

void ConcatSeekableReadBuffer::appendBuffer(SeekableReadBuffer & buffer, size_t size)
{
    appendBuffer(&buffer, false, size);
}

void ConcatSeekableReadBuffer::appendBuffer(SeekableReadBuffer * buffer, bool own, size_t size)
{
    BufferInfo info;
    info.in = buffer;
    info.own_in = own;
    info.size = size;

    if (!size)
        return;

    buffers.emplace_back(std::move(info));
    total_size += size;

    if (current == buffers.size() - 1)
    {
        working_buffer = buffers[current].in->buffer();
        pos = buffers[current].in->position();
    }
}

bool ConcatSeekableReadBuffer::nextImpl()
{
    if (current < buffers.size())
    {
        buffers[current].in->position() = pos;
        while ((current < buffers.size()) && buffers[current].in->eof())
        {
            current_start_pos += buffers[current++].size;
            if (current < buffers.size())
                buffers[current].in->seek(0, SEEK_SET);
        }
    }

    if (current >= buffers.size())
    {
        current_start_pos = total_size;
        set(nullptr, 0);
        return false;
    }

    working_buffer = buffers[current].in->buffer();
    pos = buffers[current].in->position();
    return true;
}

off_t ConcatSeekableReadBuffer::getPosition()
{
    size_t current_pos = current_start_pos;
    if (current < buffers.size())
        current_pos += buffers[current].in->getPosition() + offset();
    return current_pos;
}

off_t ConcatSeekableReadBuffer::seek(off_t off, int whence)
{
    off_t new_position;
    off_t current_position = getPosition();
    if (whence == SEEK_SET)
        new_position = off;
    else if (whence == SEEK_CUR)
        new_position = current_position + off;
    else
        throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "ConcatSeekableReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence");

    if (new_position < 0)
        throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "SEEK_SET underflow: off = {}", off);
    if (static_cast<UInt64>(new_position) > total_size)
        throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "SEEK_CUR shift out of bounds");

    if (static_cast<UInt64>(new_position) == total_size)
    {
        current = buffers.size();
        current_start_pos = total_size;
        set(nullptr, 0);
        return new_position;
    }

    off_t change_position = new_position - current_position;
    if ((working_buffer.begin() <= pos + change_position) && (pos + change_position <= working_buffer.end()))
    {
        /// Position is still inside the same working buffer.
        pos += change_position;
        assert(pos >= working_buffer.begin());
        assert(pos <= working_buffer.end());
        return new_position;
    }

    while (new_position < static_cast<off_t>(current_start_pos))
        current_start_pos -= buffers[--current].size;

    while (new_position >= static_cast<off_t>(current_start_pos + buffers[current].size))
        current_start_pos += buffers[current++].size;

    buffers[current].in->seek(new_position - current_start_pos, SEEK_SET);
    working_buffer = buffers[current].in->buffer();
    pos = buffers[current].in->position();
    return new_position;
}

}