aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Processors/Formats/Impl/MySQLOutputFormat.cpp
blob: f2157f63c2523d530c759d9cd8a8bf289b227815 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>


namespace DB
{

using namespace MySQLProtocol;
using namespace MySQLProtocol::Generic;
using namespace MySQLProtocol::ProtocolText;


MySQLOutputFormat::MySQLOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
    : IOutputFormat(header_, out_)
    , client_capabilities(settings_.mysql_wire.client_capabilities)
{
    /// MySQlWire is a special format that is usually used as output format for MySQL protocol connections.
    /// In this case we have a correct `sequence_id` stored in `settings_.mysql_wire`.
    /// But it's also possible to specify MySQLWire as output format for clickhouse-client or clickhouse-local.
    /// There is no `sequence_id` stored in `settings_.mysql_wire` in this case, so we create a dummy one.
    sequence_id = settings_.mysql_wire.sequence_id ? settings_.mysql_wire.sequence_id : &dummy_sequence_id;

    const auto & header = getPort(PortKind::Main).getHeader();
    data_types = header.getDataTypes();

    serializations.reserve(data_types.size());
    for (const auto & type : data_types)
        serializations.emplace_back(type->getDefaultSerialization());

    packet_endpoint = std::make_shared<MySQLProtocol::PacketEndpoint>(out, *sequence_id);
}

void MySQLOutputFormat::setContext(ContextPtr context_)
{
    context = context_;
}

void MySQLOutputFormat::writePrefix()
{
    const auto & header = getPort(PortKind::Main).getHeader();

    if (header.columns())
    {
        packet_endpoint->sendPacket(LengthEncodedNumber(header.columns()));

        for (size_t i = 0; i < header.columns(); ++i)
        {
            const auto & column_name = header.getColumnsWithTypeAndName()[i].name;
            packet_endpoint->sendPacket(getColumnDefinition(column_name, data_types[i]->getTypeId()));
        }

        if (!(client_capabilities & Capability::CLIENT_DEPRECATE_EOF))
        {
            packet_endpoint->sendPacket(EOFPacket(0, 0));
        }
    }
}

void MySQLOutputFormat::consume(Chunk chunk)
{
    for (size_t i = 0; i < chunk.getNumRows(); ++i)
    {
        ProtocolText::ResultSetRow row_packet(serializations, chunk.getColumns(), static_cast<int>(i));
        packet_endpoint->sendPacket(row_packet);
    }
}

void MySQLOutputFormat::finalizeImpl()
{
    size_t affected_rows = 0;
    std::string human_readable_info;
    if (QueryStatusPtr process_list_elem = getContext()->getProcessListElement())
    {
        CurrentThread::finalizePerformanceCounters();
        QueryStatusInfo info = process_list_elem->getInfo();
        affected_rows = info.written_rows;
        double elapsed_seconds = static_cast<double>(info.elapsed_microseconds) / 1000000.0;
        human_readable_info = fmt::format(
            "Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
            info.read_rows,
            ReadableSize(info.read_bytes),
            elapsed_seconds,
            static_cast<size_t>(info.read_rows / elapsed_seconds),
            ReadableSize(info.read_bytes / elapsed_seconds));
    }

    const auto & header = getPort(PortKind::Main).getHeader();
    if (header.columns() == 0)
        packet_endpoint->sendPacket(OKPacket(0x0, client_capabilities, affected_rows, 0, 0, "", human_readable_info), true);
    else if (client_capabilities & CLIENT_DEPRECATE_EOF)
        packet_endpoint->sendPacket(OKPacket(0xfe, client_capabilities, affected_rows, 0, 0, "", human_readable_info), true);
    else
        packet_endpoint->sendPacket(EOFPacket(0, 0), true);
}

void MySQLOutputFormat::flush()
{
    packet_endpoint->out->next();
}

void registerOutputFormatMySQLWire(FormatFactory & factory)
{
    factory.registerOutputFormat(
        "MySQLWire",
        [](WriteBuffer & buf,
           const Block & sample,
           const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, settings); });
}

}