1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
#pragma once
#include <Client/ConnectionPool.h>
#include <Core/QueryProcessingStage.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/StorageID.h>
#include <Parsers/IAST.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <Storages/StorageSnapshot.h>
namespace DB
{
struct Settings;
class Cluster;
class Throttler;
struct SelectQueryInfo;
class Pipe;
using Pipes = std::vector<Pipe>;
class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>;
struct StorageID;
class PreparedSets;
using PreparedSetsPtr = std::shared_ptr<PreparedSets>;
namespace ClusterProxy
{
/// select query has database, table and table function names as AST pointers
/// Creates a copy of query, changes database, table and table function names.
ASTPtr rewriteSelectQuery(
ContextPtr context,
const ASTPtr & query,
const std::string & remote_database,
const std::string & remote_table,
ASTPtr table_function_ptr = nullptr);
using ColumnsDescriptionByShardNum = std::unordered_map<UInt32, ColumnsDescription>;
class SelectStreamFactory
{
public:
struct Shard
{
/// Query and header may be changed depending on shard.
ASTPtr query;
/// Used to check the table existence on remote node
StorageID main_table;
Block header;
Cluster::ShardInfo shard_info;
/// If we connect to replicas lazily.
/// (When there is a local replica with big delay).
bool lazy = false;
time_t local_delay = 0;
};
using Shards = std::vector<Shard>;
SelectStreamFactory(
const Block & header_,
const ColumnsDescriptionByShardNum & objects_by_shard_,
const StorageSnapshotPtr & storage_snapshot_,
QueryProcessingStage::Enum processed_stage_);
void createForShard(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled);
struct ShardPlans
{
/// If a shard has local replicas this won't be nullptr
std::unique_ptr<QueryPlan> local_plan;
/// Contains several steps to read from all remote replicas
std::unique_ptr<QueryPlan> remote_plan;
};
const Block header;
const ColumnsDescriptionByShardNum objects_by_shard;
const StorageSnapshotPtr storage_snapshot;
QueryProcessingStage::Enum processed_stage;
};
}
}
|