aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/CascadeWriteBuffer.cpp
blob: 91a42e77fdb3aec992eebffe5183dc8e74dc67e3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include <IO/CascadeWriteBuffer.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <Common/Exception.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int CURRENT_WRITE_BUFFER_IS_EXHAUSTED;
    extern const int CANNOT_WRITE_AFTER_END_OF_BUFFER;
    extern const int CANNOT_CREATE_IO_BUFFER;
}

CascadeWriteBuffer::CascadeWriteBuffer(WriteBufferPtrs && prepared_sources_, WriteBufferConstructors && lazy_sources_)
    : WriteBuffer(nullptr, 0), prepared_sources(std::move(prepared_sources_)), lazy_sources(std::move(lazy_sources_))
{
    first_lazy_source_num = prepared_sources.size();
    num_sources = first_lazy_source_num + lazy_sources.size();

    /// fill lazy sources by nullptr
    prepared_sources.resize(num_sources);

    curr_buffer_num = 0;
    curr_buffer = setNextBuffer();
    set(curr_buffer->buffer().begin(), curr_buffer->buffer().size());
}


void CascadeWriteBuffer::nextImpl()
{
    if (!curr_buffer)
        return;
    try
    {
        curr_buffer->position() = position();
        curr_buffer->next();
    }
    catch (const MemoryWriteBuffer::CurrentBufferExhausted &)
    {
        if (curr_buffer_num < num_sources)
        {
            /// TODO: protocol should require set(position(), 0) before Exception

            /// good situation, fetch next WriteBuffer
            ++curr_buffer_num;
            curr_buffer = setNextBuffer();
        }
        else
            throw Exception(ErrorCodes::CURRENT_WRITE_BUFFER_IS_EXHAUSTED, "MemoryWriteBuffer limit is exhausted");
    }

    set(curr_buffer->position(), curr_buffer->buffer().end() - curr_buffer->position());
}


void CascadeWriteBuffer::getResultBuffers(WriteBufferPtrs & res)
{
    finalize();

    /// Sync position with underlying buffer before invalidating
    curr_buffer->position() = position();

    res = std::move(prepared_sources);

    curr_buffer = nullptr;
    curr_buffer_num = num_sources = 0;
    prepared_sources.clear();
    lazy_sources.clear();
}

void CascadeWriteBuffer::finalizeImpl()
{
    if (curr_buffer)
        curr_buffer->position() = position();

    for (auto & buf : prepared_sources)
    {
        if (buf)
        {
            buf->finalize();
        }
    }
}

WriteBuffer * CascadeWriteBuffer::setNextBuffer()
{
    if (first_lazy_source_num <= curr_buffer_num && curr_buffer_num < num_sources)
    {
        if (!prepared_sources[curr_buffer_num])
        {
            WriteBufferPtr prev_buf = (curr_buffer_num > 0) ? prepared_sources[curr_buffer_num - 1] : nullptr;
            prepared_sources[curr_buffer_num] = lazy_sources[curr_buffer_num - first_lazy_source_num](prev_buf);
        }
    }
    else if (curr_buffer_num >= num_sources)
        throw Exception(ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER, "There are no WriteBuffers to write result");

    WriteBuffer * res = prepared_sources[curr_buffer_num].get();
    if (!res)
        throw Exception(ErrorCodes::CANNOT_CREATE_IO_BUFFER, "Required WriteBuffer is not created");

    /// Check that returned buffer isn't empty
    if (!res->hasPendingData())
        res->next();

    return res;
}


CascadeWriteBuffer::~CascadeWriteBuffer()
{
    /// Sync position with underlying buffer before exit
    if (curr_buffer)
        curr_buffer->position() = position();
}


}