1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
|
#pragma once
#include <Storages/IStorage.h>
#include <Storages/IStorageCluster.h>
#include <Storages/Distributed/DistributedAsyncInsertDirectoryQueue.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/getStructureOfRemoteTable.h>
#include <Common/SimpleIncrement.h>
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <Common/ActionBlocker.h>
#include <Interpreters/Cluster.h>
#include <pcg_random.hpp>
namespace DB
{
struct Settings;
class Context;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** A distributed table that resides on multiple servers.
* Uses data from the specified database and tables on each server.
*
* You can pass one address, not several.
* In this case, the table can be considered remote, rather than distributed.
*/
class StorageDistributed final : public IStorage, WithContext
{
friend class DistributedSink;
friend class DistributedAsyncInsertBatch;
friend class DistributedAsyncInsertDirectoryQueue;
friend class StorageSystemDistributionQueue;
public:
StorageDistributed(
const StorageID & id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
const String & remote_database_,
const String & remote_table_,
const String & cluster_name_,
ContextPtr context_,
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach_,
ClusterPtr owned_cluster_ = {},
ASTPtr remote_table_function_ptr_ = {});
StorageDistributed(
const StorageID & id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
ContextPtr context_,
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
bool attach,
ClusterPtr owned_cluster_ = {});
~StorageDistributed() override;
std::string getName() const override { return "Distributed"; }
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
bool supportsPrewhere() const override { return true; }
bool supportsSubcolumns() const override { return true; }
bool supportsDynamicSubcolumns() const override { return true; }
StoragePolicyPtr getStoragePolicy() const override;
/// Do not apply moving to PREWHERE optimization for distributed tables,
/// because we can't be sure that underlying table supports PREWHERE.
bool canMoveConditionsToPrewhere() const override { return false; }
bool isRemote() const override { return true; }
/// Snapshot for StorageDistributed contains descriptions
/// of columns of type Object for each shard at the moment
/// of the start of query.
struct SnapshotData : public StorageSnapshot::Data
{
ColumnsDescriptionByShardNum objects_by_shard;
};
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
StorageSnapshotPtr getStorageSnapshotForQuery(
const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query, ContextPtr query_context) const override;
QueryProcessingStage::Enum
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t /*max_block_size*/,
size_t /*num_streams*/) override;
bool supportsParallelInsert() const override { return true; }
std::optional<UInt64> totalBytes(const Settings &) const override;
SinkToStoragePtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool /*async_insert*/) override;
std::optional<QueryPipeline> distributedWrite(const ASTInsertQuery & query, ContextPtr context) override;
/// Removes temporary data in local filesystem.
void truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &) override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;
/// in the sub-tables, you need to manually add and delete columns
/// the structure of the sub-table is not checked
void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override;
void initializeFromDisk();
void shutdown() override;
void flushAndPrepareForShutdown() override;
void drop() override;
bool storesDataOnDisk() const override { return data_volume != nullptr; }
Strings getDataPaths() const override;
ActionLock getActionLock(StorageActionBlockType type) override;
NamesAndTypesList getVirtuals() const override;
/// Used by InterpreterInsertQuery
std::string getRemoteDatabaseName() const { return remote_database; }
std::string getRemoteTableName() const { return remote_table; }
ClusterPtr getCluster() const;
/// Used by InterpreterSystemQuery
void flushClusterNodesAllData(ContextPtr context);
/// Used by ClusterCopier
size_t getShardCount() const;
private:
void renameOnDisk(const String & new_path_to_table_data);
const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
const String & getRelativeDataPath() const { return relative_data_path; }
/// create directory monitors for each existing subdirectory
void initializeDirectoryQueuesForDisk(const DiskPtr & disk);
/// Get directory queue thread and connection pool created by disk and subdirectory name
///
/// Used for the INSERT into Distributed in case of insert_distributed_sync==1, from DistributedSink.
DistributedAsyncInsertDirectoryQueue & getDirectoryQueue(const DiskPtr & disk, const std::string & name);
/// Return list of metrics for all created monitors
/// (note that monitors are created lazily, i.e. until at least one INSERT executed)
///
/// Used by StorageSystemDistributionQueue
std::vector<DistributedAsyncInsertDirectoryQueue::Status> getDirectoryQueueStatuses() const;
static IColumn::Selector createSelector(ClusterPtr cluster, const ColumnWithTypeAndName & result);
/// Apply the following settings:
/// - optimize_skip_unused_shards
/// - force_optimize_skip_unused_shards
ClusterPtr getOptimizedCluster(ContextPtr, const StorageSnapshotPtr & storage_snapshot, const SelectQueryInfo & query_info) const;
ClusterPtr skipUnusedShards(
ClusterPtr cluster, const SelectQueryInfo & query_info, const StorageSnapshotPtr & storage_snapshot, ContextPtr context) const;
/// This method returns optimal query processing stage.
///
/// Here is the list of stages (from the less optimal to more optimal):
/// - WithMergeableState
/// - WithMergeableStateAfterAggregation
/// - WithMergeableStateAfterAggregationAndLimit
/// - Complete
///
/// Some simple queries without GROUP BY/DISTINCT can use more optimal stage.
///
/// Also in case of optimize_distributed_group_by_sharding_key=1 the queries
/// with GROUP BY/DISTINCT sharding_key can also use more optimal stage.
/// (see also optimize_skip_unused_shards/allow_nondeterministic_optimize_skip_unused_shards)
///
/// @return QueryProcessingStage or empty std::optoinal
/// (in this case regular WithMergeableState should be used)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const;
size_t getRandomShardIndex(const Cluster::ShardsInfo & shards);
std::string getClusterName() const { return cluster_name.empty() ? "<remote>" : cluster_name; }
const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; }
void delayInsertOrThrowIfNeeded() const;
std::optional<QueryPipeline> distributedWriteFromClusterStorage(const IStorageCluster & src_storage_cluster, const ASTInsertQuery & query, ContextPtr context) const;
std::optional<QueryPipeline> distributedWriteBetweenDistributedTables(const StorageDistributed & src_distributed, const ASTInsertQuery & query, ContextPtr context) const;
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
Poco::Logger * log;
/// Used to implement TableFunctionRemote.
std::shared_ptr<Cluster> owned_cluster;
/// Is empty if this storage implements TableFunctionRemote.
const String cluster_name;
bool has_sharding_key;
bool sharding_key_is_deterministic = false;
ExpressionActionsPtr sharding_key_expr;
String sharding_key_column_name;
/// Used for global monotonic ordering of files to send.
SimpleIncrement file_names_increment;
ActionBlocker monitors_blocker;
String relative_data_path;
/// Can be empty if relative_data_path is empty. In this case, a directory for the data to be sent is not created.
StoragePolicyPtr storage_policy;
/// The main volume to store data.
/// Storage policy may have several configured volumes, but second and other volumes are used for parts movement in MergeTree engine.
/// For Distributed engine such configuration doesn't make sense and only the first (main) volume will be used to store data.
/// Other volumes will be ignored. It's needed to allow using the same multi-volume policy both for Distributed and other engines.
VolumePtr data_volume;
DistributedSettings distributed_settings;
struct ClusterNodeData
{
std::shared_ptr<DistributedAsyncInsertDirectoryQueue> directory_monitor;
ConnectionPoolPtr connection_pool;
};
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
mutable std::mutex cluster_nodes_mutex;
// For random shard index generation
mutable std::mutex rng_mutex;
pcg64 rng;
};
}
|