aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Interpreters/ClusterDiscovery.h
blob: 140e3691c038d9cd8ed53600aaef066a7428afb2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#pragma once

#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/Common.h>
#include <Interpreters/Cluster.h>

#include <Poco/Logger.h>

#include <base/defines.h>

#include <unordered_map>

namespace DB
{

/*
 * Discover cluster nodes.
 *
 * Each node adds ephemernal node into specified path in zookeeper (each cluster have own path).
 * Also node subscribed for updates for these paths, and at each child node chanhe cluster updated.
 * When node goes down ephemernal node are destroyed, cluster configuration is updated on other node and gone node is removed from cluster.
 */
class ClusterDiscovery
{

public:
    ClusterDiscovery(
        const Poco::Util::AbstractConfiguration & config,
        ContextPtr context_,
        const String & config_prefix = "remote_servers");

    void start();

    ClusterPtr getCluster(const String & cluster_name) const;
    std::unordered_map<String, ClusterPtr> getClusters() const;

    ~ClusterDiscovery();

private:
    struct NodeInfo
    {
        /// versioning for format of data stored in zk
        static constexpr size_t data_ver = 1;

        /// host:port
        String address;
        /// is secure tcp port user
        bool secure = false;
        /// shard number
        size_t shard_id = 0;

        NodeInfo() = default;
        explicit NodeInfo(const String & address_, bool secure_, size_t shard_id_)
            : address(address_)
            , secure(secure_)
            , shard_id(shard_id_)
        {}

        static bool parse(const String & data, NodeInfo & result);
        String serialize() const;
    };

    // node uuid -> address ("host:port")
    using NodesInfo = std::unordered_map<String, NodeInfo>;

    struct ClusterInfo
    {
        const String name;
        const String zk_root;
        NodesInfo nodes_info;

        /// Track last update time
        Stopwatch watch;

        NodeInfo current_node;
        /// Current node may not belong to cluster, to be just an observer.
        bool current_node_is_observer = false;

        /// For internal management need.
        /// Is it designed that when deploying multiple compute groups,
        /// they are mutually invisible to each other.
        bool current_cluster_is_invisible = false;

        explicit ClusterInfo(const String & name_,
                             const String & zk_root_,
                             const String & host_name,
                             UInt16 port,
                             bool secure,
                             size_t shard_id,
                             bool observer_mode,
                             bool invisible)
            : name(name_)
            , zk_root(zk_root_)
            , current_node(host_name + ":" + toString(port), secure, shard_id)
            , current_node_is_observer(observer_mode)
            , current_cluster_is_invisible(invisible)
        {
        }
    };

    void initialUpdate();

    void registerInZk(zkutil::ZooKeeperPtr & zk, ClusterInfo & info);

    Strings getNodeNames(zkutil::ZooKeeperPtr & zk,
                         const String & zk_root,
                         const String & cluster_name,
                         int * version = nullptr,
                         bool set_callback = true);

    NodesInfo getNodes(zkutil::ZooKeeperPtr & zk, const String & zk_root, const Strings & node_uuids);

    ClusterPtr makeCluster(const ClusterInfo & cluster_info);

    bool needUpdate(const Strings & node_uuids, const NodesInfo & nodes);
    bool updateCluster(ClusterInfo & cluster_info);

    bool runMainThread(std::function<void()> up_to_date_callback);
    void shutdown();

    /// cluster name -> cluster info (zk root, set of nodes)
    std::unordered_map<String, ClusterInfo> clusters_info;

    ContextMutablePtr context;

    String current_node_name;

    template <typename T> class ConcurrentFlags;
    using UpdateFlags = ConcurrentFlags<std::string>;

    /// Cluster names to update.
    /// The `shared_ptr` is used because it's passed to watch callback.
    /// It prevents accessing to invalid object after ClusterDiscovery is destroyed.
    std::shared_ptr<UpdateFlags> clusters_to_update;

    mutable std::mutex mutex;
    std::unordered_map<String, ClusterPtr> cluster_impls;

    ThreadFromGlobalPool main_thread;

    Poco::Logger * log;
};

}