aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/IO/MySQLPacketPayloadReadBuffer.cpp
blob: 2c5167ed038f712ee04c5f247fe9c45450aa2adb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <IO/MySQLPacketPayloadReadBuffer.h>

namespace DB
{

namespace ErrorCodes
{
    extern const int UNKNOWN_PACKET_FROM_CLIENT;
}

const size_t MAX_PACKET_LENGTH = (1 << 24) - 1; // 16 mb

MySQLPacketPayloadReadBuffer::MySQLPacketPayloadReadBuffer(ReadBuffer & in_, uint8_t & sequence_id_)
    : ReadBuffer(in_.position(), 0), in(in_), sequence_id(sequence_id_) // not in.buffer().begin(), because working buffer may include previous packet
{
}

bool MySQLPacketPayloadReadBuffer::nextImpl()
{
    if (!has_read_header || (payload_length == MAX_PACKET_LENGTH && offset == payload_length))
    {
        has_read_header = true;
        working_buffer.resize(0);
        offset = 0;
        payload_length = 0;
        in.readStrict(reinterpret_cast<char *>(&payload_length), 3);

        if (payload_length > MAX_PACKET_LENGTH)
            throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT,
                "Received packet with payload larger than max_packet_size: {}", payload_length);

        size_t packet_sequence_id = 0;
        in.readStrict(reinterpret_cast<char &>(packet_sequence_id));
        if (packet_sequence_id != sequence_id)
            throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT,
                "Received packet with wrong sequence-id: {}. Expected: {}.", packet_sequence_id, static_cast<unsigned int>(sequence_id));
        sequence_id++;

        if (payload_length == 0)
            return false;
    }
    else if (offset == payload_length)
    {
        return false;
    }

    in.nextIfAtEnd();
    /// Don't return a buffer when no bytes available
    if (!in.hasPendingData())
        return false;
    working_buffer = ReadBuffer::Buffer(in.position(), in.buffer().end());
    size_t count = std::min(in.available(), payload_length - offset);
    working_buffer.resize(count);
    in.ignore(count);

    offset += count;

    return true;
}

}