aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/MySQLBinlogEventReadBuffer.cpp
blob: 9f05c5b5e0904c8c1266b3ed581dc322f697e724 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <IO/MySQLBinlogEventReadBuffer.h>


namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

MySQLBinlogEventReadBuffer::MySQLBinlogEventReadBuffer(ReadBuffer & in_, size_t checksum_signature_length_)
    : ReadBuffer(nullptr, 0, 0), in(in_), checksum_signature_length(checksum_signature_length_)
{
    if (checksum_signature_length > MAX_CHECKSUM_SIGNATURE_LENGTH)
        throw Exception(ErrorCodes::LOGICAL_ERROR,
                        "LOGICAL ERROR: checksum_signature_length must be less than MAX_CHECKSUM_SIGNATURE_LENGTH. "
                        "It is a bug.");

    nextIfAtEnd();
}

bool MySQLBinlogEventReadBuffer::nextImpl()
{
    if (hasPendingData())
        return true;

    if (in.eof())
        return false;

    if (checksum_buff_size == checksum_buff_limit)
    {
        if (likely(in.available() > checksum_signature_length))
        {
            working_buffer = ReadBuffer::Buffer(in.position(), in.buffer().end() - checksum_signature_length);
            in.ignore(working_buffer.size());
            return true;
        }

        in.readStrict(checksum_buf, checksum_signature_length);
        checksum_buff_size = checksum_buff_limit = checksum_signature_length;
    }
    else
    {
        for (size_t index = 0; index < checksum_buff_size - checksum_buff_limit; ++index)
            checksum_buf[index] = checksum_buf[checksum_buff_limit + index];

        checksum_buff_size -= checksum_buff_limit;
        size_t read_bytes = checksum_signature_length - checksum_buff_size;
        in.readStrict(checksum_buf + checksum_buff_size, read_bytes);   /// Minimum checksum_signature_length bytes
        checksum_buff_size = checksum_buff_limit = checksum_signature_length;
    }

    if (in.eof())
        return false;

    if (in.available() < checksum_signature_length)
    {
        size_t left_move_size = checksum_signature_length - in.available();
        checksum_buff_limit = checksum_buff_size - left_move_size;
    }

    working_buffer = ReadBuffer::Buffer(checksum_buf, checksum_buf + checksum_buff_limit);
    return true;
}

MySQLBinlogEventReadBuffer::~MySQLBinlogEventReadBuffer()
{
    try
    {
        /// ignore last checksum_signature_length bytes
        nextIfAtEnd();
    }
    catch (...)
    {
        tryLogCurrentException(__PRETTY_FUNCTION__);
    }
}

}