aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Interpreters/ClusterProxy/SelectStreamFactory.h
blob: a821730657d6897ddf9d069059e083a09deb1585 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#pragma once

#include <Client/ConnectionPool.h>
#include <Core/QueryProcessingStage.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/StorageID.h>
#include <Parsers/IAST.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include <Storages/StorageSnapshot.h>

namespace DB
{


struct Settings;
class Cluster;
class Throttler;
struct SelectQueryInfo;

class Pipe;
using Pipes = std::vector<Pipe>;

class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>;

struct StorageID;

class PreparedSets;
using PreparedSetsPtr = std::shared_ptr<PreparedSets>;
namespace ClusterProxy
{

/// select query has database, table and table function names as AST pointers
/// Creates a copy of query, changes database, table and table function names.
ASTPtr rewriteSelectQuery(
    ContextPtr context,
    const ASTPtr & query,
    const std::string & remote_database,
    const std::string & remote_table,
    ASTPtr table_function_ptr = nullptr);

using ColumnsDescriptionByShardNum = std::unordered_map<UInt32, ColumnsDescription>;

class SelectStreamFactory
{
public:

    struct Shard
    {
        /// Query and header may be changed depending on shard.
        ASTPtr query;
        /// Used to check the table existence on remote node
        StorageID main_table;
        Block header;

        Cluster::ShardInfo shard_info;

        /// If we connect to replicas lazily.
        /// (When there is a local replica with big delay).
        bool lazy = false;
        time_t local_delay = 0;
    };

    using Shards = std::vector<Shard>;

    SelectStreamFactory(
        const Block & header_,
        const ColumnsDescriptionByShardNum & objects_by_shard_,
        const StorageSnapshotPtr & storage_snapshot_,
        QueryProcessingStage::Enum processed_stage_);

    void createForShard(
        const Cluster::ShardInfo & shard_info,
        const ASTPtr & query_ast,
        const StorageID & main_table,
        const ASTPtr & table_func_ptr,
        ContextPtr context,
        std::vector<QueryPlanPtr> & local_plans,
        Shards & remote_shards,
        UInt32 shard_count,
        bool parallel_replicas_enabled);

    struct ShardPlans
    {
        /// If a shard has local replicas this won't be nullptr
        std::unique_ptr<QueryPlan> local_plan;

        /// Contains several steps to read from all remote replicas
        std::unique_ptr<QueryPlan> remote_plan;
    };

    const Block header;
    const ColumnsDescriptionByShardNum objects_by_shard;
    const StorageSnapshotPtr storage_snapshot;
    QueryProcessingStage::Enum processed_stage;
};

}

}