1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
|
#pragma once
#include <variant>
#include <Client/ConnectionPool.h>
#include <Client/IConnections.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/Context.h>
#include <Interpreters/StorageID.h>
#include <Common/TimerDescriptor.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <sys/types.h>
namespace DB
{
class Context;
class Throttler;
using ThrottlerPtr = std::shared_ptr<Throttler>;
struct Progress;
using ProgressCallback = std::function<void(const Progress & progress)>;
struct ProfileInfo;
using ProfileInfoCallback = std::function<void(const ProfileInfo & info)>;
class RemoteQueryExecutorReadContext;
/// This is the same type as StorageS3Source::IteratorWrapper
using TaskIterator = std::function<String()>;
/// This class allows one to launch queries on remote replicas of one shard and get results
class RemoteQueryExecutor
{
public:
using ReadContext = RemoteQueryExecutorReadContext;
/// We can provide additional logic for RemoteQueryExecutor
/// For example for s3Cluster table function we provide an Iterator over tasks to do.
/// Nodes involved into the query send request for a new task and we answer them using this object.
/// In case of parallel reading from replicas we provide a Coordinator object
/// Every replica will tell us about parts and mark ranges it wants to read and coordinator will
/// decide whether to deny or to accept that request.
struct Extension
{
std::shared_ptr<TaskIterator> task_iterator = nullptr;
std::shared_ptr<ParallelReplicasReadingCoordinator> parallel_reading_coordinator = nullptr;
std::optional<IConnections::ReplicaInfo> replica_info = {};
};
/// Takes already set connection.
RemoteQueryExecutor(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
/// Takes already set connection.
RemoteQueryExecutor(
std::shared_ptr<Connection> connection,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
/// Accepts several connections already taken from pool.
RemoteQueryExecutor(
std::vector<IConnectionPool::Entry> && connections_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
/// Takes a pool and gets one or several connections from it.
RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
~RemoteQueryExecutor();
/// Create connection and send query, external tables and scalars.
///
/// @param query_kind - kind of query, usually it is SECONDARY_QUERY,
/// since this is the queries between servers
/// (for which this code was written in general).
/// But clickhouse-benchmark uses the same code,
/// and it should pass INITIAL_QUERY.
void sendQuery(ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::SECONDARY_QUERY, AsyncCallback async_callback = {});
void sendQueryUnlocked(ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::SECONDARY_QUERY, AsyncCallback async_callback = {});
int sendQueryAsync();
/// Query is resent to a replica, the query itself can be modified.
bool resent_query { false };
bool recreate_read_context { false };
struct ReadResult
{
enum class Type : uint8_t
{
Data,
ParallelReplicasToken,
FileDescriptor,
Finished,
Nothing
};
explicit ReadResult(Block block_)
: type(Type::Data)
, block(std::move(block_))
{}
explicit ReadResult(int fd_)
: type(Type::FileDescriptor)
, fd(fd_)
{}
explicit ReadResult(Type type_)
: type(type_)
{
assert(type != Type::Data && type != Type::FileDescriptor);
}
Type getType() const { return type; }
Block getBlock()
{
chassert(type == Type::Data);
return std::move(block);
}
int getFileDescriptor() const
{
chassert(type == Type::FileDescriptor);
return fd;
}
Type type;
Block block;
int fd{-1};
};
/// Read next block of data. Returns empty block if query is finished.
Block readBlock();
ReadResult read();
/// Async variant of read. Returns ready block or file descriptor which may be used for polling.
ReadResult readAsync();
/// Receive all remain packets and finish query.
/// It should be cancelled after read returned empty block.
void finish();
/// Cancel query execution. Sends Cancel packet and ignore others.
/// This method may be called from separate thread.
void cancel();
/// Get totals and extremes if any.
Block getTotals() { return std::move(totals); }
Block getExtremes() { return std::move(extremes); }
/// Set callback for progress. It will be called on Progress packet.
void setProgressCallback(ProgressCallback callback) { progress_callback = std::move(callback); }
/// Set callback for profile info. It will be called on ProfileInfo packet.
void setProfileInfoCallback(ProfileInfoCallback callback) { profile_info_callback = std::move(callback); }
/// Set the query_id. For now, used by performance test to later find the query
/// in the server query_log. Must be called before sending the query to the server.
void setQueryId(const std::string& query_id_) { assert(!sent_query); query_id = query_id_; }
/// Specify how we allocate connections on a shard.
void setPoolMode(PoolMode pool_mode_) { pool_mode = pool_mode_; }
void setMainTable(StorageID main_table_) { main_table = std::move(main_table_); }
void setLogger(Poco::Logger * logger) { log = logger; }
const Block & getHeader() const { return header; }
IConnections & getConnections() { return *connections; }
bool needToSkipUnavailableShard() const { return context->getSettingsRef().skip_unavailable_shards && (0 == connections->size()); }
bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; }
private:
RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_);
Block header;
Block totals;
Block extremes;
std::function<std::unique_ptr<IConnections>(AsyncCallback)> create_connections;
std::unique_ptr<IConnections> connections;
std::unique_ptr<ReadContext> read_context;
const String query;
String query_id;
ContextPtr context;
ProgressCallback progress_callback;
ProfileInfoCallback profile_info_callback;
/// Scalars needed to be sent to remote servers
Scalars scalars;
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;
std::optional<Extension> extension;
/// Initiator identifier for distributed task processing
std::shared_ptr<TaskIterator> task_iterator;
/// This is needed only for parallel reading from replicas, because
/// we create a RemoteQueryExecutor per replica and have to store additional info
/// about the number of the current replica or the count of replicas at all.
IConnections::ReplicaInfo replica_info;
/// Streams for reading from temporary tables and following sending of data
/// to remote servers for GLOBAL-subqueries
std::vector<ExternalTablesData> external_tables_data;
std::mutex external_tables_mutex;
/// Connections to replicas are established, but no queries are sent yet
bool established = false;
/// Query is sent (used before getting first block)
bool sent_query { false };
/** All data from all replicas are received, before EndOfStream packet.
* To prevent desynchronization, if not all data is read before object
* destruction, it's required to send cancel query request to replicas and
* read all packets before EndOfStream
*/
bool finished = false;
/** Cancel query request was sent to all replicas because data is not needed anymore
* This behaviour may occur when:
* - data size is already satisfactory (when using LIMIT, for example)
* - an exception was thrown from client side
*/
bool was_cancelled = false;
std::mutex was_cancelled_mutex;
/** An exception from replica was received. No need in receiving more packets or
* requesting to cancel query execution
*/
bool got_exception_from_replica = false;
/** Unknown packet was received from replica. No need in receiving more packets or
* requesting to cancel query execution
*/
bool got_unknown_packet_from_replica = false;
/** Got duplicated uuids from replica
*/
bool got_duplicated_part_uuids = false;
/// Parts uuids, collected from remote replicas
std::vector<UUID> duplicated_part_uuids;
PoolMode pool_mode = PoolMode::GET_MANY;
StorageID main_table = StorageID::createEmpty();
Poco::Logger * log = nullptr;
/// Send all scalars to remote servers
void sendScalars();
/// Send all temporary tables to remote servers
void sendExternalTables();
/// Set part uuids to a query context, collected from remote replicas.
/// Return true if duplicates found.
bool setPartUUIDs(const std::vector<UUID> & uuids);
void processReadTaskRequest();
void processMergeTreeReadTaskRequest(ParallelReadRequest request);
void processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement);
/// Cancel query and restart it with info about duplicate UUIDs
/// only for `allow_experimental_query_deduplication`.
ReadResult restartQueryWithoutDuplicatedUUIDs();
/// If wasn't sent yet, send request to cancel all connections to replicas
void cancelUnlocked();
void tryCancel(const char * reason);
/// Returns true if query was sent
bool isQueryPending() const;
/// Returns true if exception was thrown
bool hasThrownException() const;
/// Process packet for read and return data block if possible.
ReadResult processPacket(Packet packet);
/// Reads packet by packet
Block readPackets();
};
}
|