1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
#include <Processors/Formats/Impl/MySQLOutputFormat.h>
#include <Core/MySQL/PacketsGeneric.h>
#include <Core/MySQL/PacketsProtocolText.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
namespace DB
{
using namespace MySQLProtocol;
using namespace MySQLProtocol::Generic;
using namespace MySQLProtocol::ProtocolText;
MySQLOutputFormat::MySQLOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & settings_)
: IOutputFormat(header_, out_)
, client_capabilities(settings_.mysql_wire.client_capabilities)
{
/// MySQlWire is a special format that is usually used as output format for MySQL protocol connections.
/// In this case we have a correct `sequence_id` stored in `settings_.mysql_wire`.
/// But it's also possible to specify MySQLWire as output format for clickhouse-client or clickhouse-local.
/// There is no `sequence_id` stored in `settings_.mysql_wire` in this case, so we create a dummy one.
sequence_id = settings_.mysql_wire.sequence_id ? settings_.mysql_wire.sequence_id : &dummy_sequence_id;
const auto & header = getPort(PortKind::Main).getHeader();
data_types = header.getDataTypes();
serializations.reserve(data_types.size());
for (const auto & type : data_types)
serializations.emplace_back(type->getDefaultSerialization());
packet_endpoint = std::make_shared<MySQLProtocol::PacketEndpoint>(out, *sequence_id);
}
void MySQLOutputFormat::setContext(ContextPtr context_)
{
context = context_;
}
void MySQLOutputFormat::writePrefix()
{
const auto & header = getPort(PortKind::Main).getHeader();
if (header.columns())
{
packet_endpoint->sendPacket(LengthEncodedNumber(header.columns()));
for (size_t i = 0; i < header.columns(); ++i)
{
const auto & column_name = header.getColumnsWithTypeAndName()[i].name;
packet_endpoint->sendPacket(getColumnDefinition(column_name, data_types[i]->getTypeId()));
}
if (!(client_capabilities & Capability::CLIENT_DEPRECATE_EOF))
{
packet_endpoint->sendPacket(EOFPacket(0, 0));
}
}
}
void MySQLOutputFormat::consume(Chunk chunk)
{
for (size_t i = 0; i < chunk.getNumRows(); ++i)
{
ProtocolText::ResultSetRow row_packet(serializations, chunk.getColumns(), static_cast<int>(i));
packet_endpoint->sendPacket(row_packet);
}
}
void MySQLOutputFormat::finalizeImpl()
{
size_t affected_rows = 0;
std::string human_readable_info;
if (QueryStatusPtr process_list_elem = getContext()->getProcessListElement())
{
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo();
affected_rows = info.written_rows;
double elapsed_seconds = static_cast<double>(info.elapsed_microseconds) / 1000000.0;
human_readable_info = fmt::format(
"Read {} rows, {} in {} sec., {} rows/sec., {}/sec.",
info.read_rows,
ReadableSize(info.read_bytes),
elapsed_seconds,
static_cast<size_t>(info.read_rows / elapsed_seconds),
ReadableSize(info.read_bytes / elapsed_seconds));
}
const auto & header = getPort(PortKind::Main).getHeader();
if (header.columns() == 0)
packet_endpoint->sendPacket(OKPacket(0x0, client_capabilities, affected_rows, 0, 0, "", human_readable_info), true);
else if (client_capabilities & CLIENT_DEPRECATE_EOF)
packet_endpoint->sendPacket(OKPacket(0xfe, client_capabilities, affected_rows, 0, 0, "", human_readable_info), true);
else
packet_endpoint->sendPacket(EOFPacket(0, 0), true);
}
void MySQLOutputFormat::flush()
{
packet_endpoint->out->next();
}
void registerOutputFormatMySQLWire(FormatFactory & factory)
{
factory.registerOutputFormat(
"MySQLWire",
[](WriteBuffer & buf,
const Block & sample,
const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, settings); });
}
}
|