1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
|
#pragma once
#include <Poco/Net/StreamSocket.h>
#include "clickhouse_config.h"
#include <Client/IServerConnection.h>
#include <Core/Defines.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <Interpreters/TablesStatus.h>
#include <Interpreters/Context_fwd.h>
#include <Compression/ICompressionCodec.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <atomic>
#include <optional>
namespace DB
{
struct Settings;
class Connection;
struct ConnectionParameters;
using ConnectionPtr = std::shared_ptr<Connection>;
using Connections = std::vector<ConnectionPtr>;
class NativeReader;
class NativeWriter;
/** Connection with database server, to use by client.
* How to use - see Core/Protocol.h
* (Implementation of server end - see Server/TCPHandler.h)
*
* As 'default_database' empty string could be passed
* - in that case, server will use it's own default database.
*/
class Connection : public IServerConnection
{
friend class MultiplexedConnections;
public:
Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const String & quota_key_,
const String & cluster_,
const String & cluster_secret_,
const String & client_name_,
Protocol::Compression compression_,
Protocol::Secure secure_);
~Connection() override;
IServerConnection::Type getConnectionType() const override { return IServerConnection::Type::SERVER; }
static ServerConnectionPtr createConnection(const ConnectionParameters & parameters, ContextPtr context);
/// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic.
void setThrottler(const ThrottlerPtr & throttler_) override
{
throttler = throttler_;
}
/// Change default database. Changes will take effect on next reconnect.
void setDefaultDatabase(const String & database) override;
void getServerVersion(const ConnectionTimeouts & timeouts,
String & name,
UInt64 & version_major,
UInt64 & version_minor,
UInt64 & version_patch,
UInt64 & revision) override;
UInt64 getServerRevision(const ConnectionTimeouts & timeouts) override;
const String & getServerTimezone(const ConnectionTimeouts & timeouts) override;
const String & getServerDisplayName(const ConnectionTimeouts & timeouts) override;
/// For log and exception messages.
const String & getDescription() const override;
const String & getHost() const;
UInt16 getPort() const;
const String & getDefaultDatabase() const;
Protocol::Compression getCompression() const { return compression; }
std::vector<std::pair<String, String>> getPasswordComplexityRules() const override { return password_complexity_rules; }
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const NameToNameMap& query_parameters,
const String & query_id_/* = "" */,
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings/* = nullptr */,
const ClientInfo * client_info/* = nullptr */,
bool with_pending_data/* = false */,
std::function<void(const Progress &)> process_progress_callback) override;
void sendCancel() override;
void sendData(const Block & block, const String & name/* = "" */, bool scalar/* = false */) override;
void sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) override;
void sendExternalTablesData(ExternalTablesData & data) override;
bool poll(size_t timeout_microseconds/* = 0 */) override;
bool hasReadPendingData() const override;
std::optional<UInt64> checkPacket(size_t timeout_microseconds/* = 0*/) override;
Packet receivePacket() override;
void forceConnected(const ConnectionTimeouts & timeouts) override;
bool isConnected() const override { return connected; }
bool checkConnected(const ConnectionTimeouts & timeouts) override { return connected && ping(timeouts); }
void disconnect() override;
/// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'.
/// You could pass size of serialized/compressed block.
void sendPreparedData(ReadBuffer & input, size_t size, const String & name = "");
void sendReadTaskResponse(const String &);
/// Send all scalars.
void sendScalarsData(Scalars & data);
/// Send parts' uuids to excluded them from query processing
void sendIgnoredPartUUIDs(const std::vector<UUID> & uuids);
TablesStatusResponse getTablesStatus(const ConnectionTimeouts & timeouts,
const TablesStatusRequest & request);
size_t outBytesCount() const { return out ? out->count() : 0; }
size_t inBytesCount() const { return in ? in->count() : 0; }
Poco::Net::Socket * getSocket() { return socket.get(); }
/// Each time read from socket blocks and async_callback is set, it will be called. You can poll socket inside it.
void setAsyncCallback(AsyncCallback async_callback_)
{
async_callback = std::move(async_callback_);
if (in)
in->setAsyncCallback(async_callback);
if (out)
out->setAsyncCallback(async_callback);
}
bool haveMoreAddressesToConnect() const { return have_more_addresses_to_connect; }
private:
String host;
UInt16 port;
String default_database;
String user;
String password;
String quota_key;
/// For inter-server authorization
String cluster;
String cluster_secret;
/// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET
String salt;
/// For DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2
std::optional<UInt64> nonce;
/// Address is resolved during the first connection (or the following reconnects)
/// Use it only for logging purposes
std::optional<Poco::Net::SocketAddress> current_resolved_address;
/// For messages in log and in exceptions.
String description;
void setDescription();
/// Returns resolved address if it was resolved.
std::optional<Poco::Net::SocketAddress> getResolvedAddress() const;
String client_name;
bool connected = false;
String server_name;
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_version_patch = 0;
UInt64 server_revision = 0;
String server_timezone;
String server_display_name;
std::unique_ptr<Poco::Net::StreamSocket> socket;
std::shared_ptr<ReadBufferFromPocoSocket> in;
std::shared_ptr<WriteBufferFromPocoSocket> out;
std::optional<UInt64> last_input_packet_type;
String query_id;
Protocol::Compression compression; /// Enable data compression for communication.
Protocol::Secure secure; /// Enable data encryption for communication.
/// What compression settings to use while sending data for INSERT queries and external tables.
CompressionCodecPtr compression_codec;
/** If not nullptr, used to limit network traffic.
* Only traffic for transferring blocks is accounted. Other packets don't.
*/
ThrottlerPtr throttler;
std::vector<std::pair<String, String>> password_complexity_rules;
/// From where to read query execution result.
std::shared_ptr<ReadBuffer> maybe_compressed_in;
std::unique_ptr<NativeReader> block_in;
std::unique_ptr<NativeReader> block_logs_in;
std::unique_ptr<NativeReader> block_profile_events_in;
/// Where to write data for INSERT.
std::shared_ptr<WriteBuffer> maybe_compressed_out;
std::unique_ptr<NativeWriter> block_out;
bool have_more_addresses_to_connect = false;
/// Logger is created lazily, for avoid to run DNS request in constructor.
class LoggerWrapper
{
public:
explicit LoggerWrapper(Connection & parent_)
: log(nullptr), parent(parent_)
{
}
Poco::Logger * get()
{
if (!log)
log = &Poco::Logger::get("Connection (" + parent.getDescription() + ")");
return log;
}
private:
std::atomic<Poco::Logger *> log;
Connection & parent;
};
LoggerWrapper log_wrapper;
AsyncCallback async_callback = {};
void connect(const ConnectionTimeouts & timeouts);
void sendHello();
void sendAddendum();
void receiveHello(const Poco::Timespan & handshake_timeout);
#if USE_SSL
void sendClusterNameAndSalt();
#endif
bool ping(const ConnectionTimeouts & timeouts);
Block receiveData();
Block receiveLogData();
Block receiveDataImpl(NativeReader & reader);
Block receiveProfileEvents();
std::vector<String> receiveMultistringMessage(UInt64 msg_type) const;
std::unique_ptr<Exception> receiveException() const;
Progress receiveProgress() const;
ParallelReadRequest receiveParallelReadRequest() const;
InitialAllRangesAnnouncement receiveInitialParallelReadAnnounecement() const;
ProfileInfo receiveProfileInfo() const;
void initInputBuffers();
void initBlockInput();
void initBlockLogsInput();
void initBlockProfileEventsInput();
[[noreturn]] void throwUnexpectedPacket(UInt64 packet_type, const char * expected) const;
};
template <typename Conn>
class AsyncCallbackSetter
{
public:
AsyncCallbackSetter(Conn * connection_, AsyncCallback async_callback) : connection(connection_)
{
connection->setAsyncCallback(std::move(async_callback));
}
~AsyncCallbackSetter()
{
connection->setAsyncCallback({});
}
private:
Conn * connection;
};
}
|