aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Coordination/KeeperServer.h
blob: 23441f4716914514ac8592247bf374f1ae0534ae (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#pragma once

#include <Coordination/CoordinationSettings.h>
#include <Coordination/InMemoryLogStore.h>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperStateManager.h>
#include <Coordination/KeeperStorage.h>
#error #include <libnuraft/raft_params.hxx>
#error #include <libnuraft/raft_server.hxx>
#include <Poco/Util/AbstractConfiguration.h>
#include <Coordination/Keeper4LWInfo.h>
#include <Coordination/KeeperContext.h>
#include <Coordination/RaftServerConfig.h>

namespace DB
{

using RaftAppendResult = nuraft::ptr<nuraft::cmd_result<nuraft::ptr<nuraft::buffer>>>;

class KeeperServer
{
private:
    const int server_id;

    CoordinationSettingsPtr coordination_settings;

    nuraft::ptr<KeeperStateMachine> state_machine;

    nuraft::ptr<KeeperStateManager> state_manager;

    struct KeeperRaftServer;
    nuraft::ptr<KeeperRaftServer> raft_instance; // TSA_GUARDED_BY(server_write_mutex);
    nuraft::ptr<nuraft::asio_service> asio_service;
    std::vector<nuraft::ptr<nuraft::rpc_listener>> asio_listeners;

    // because some actions can be applied
    // when we are sure that there are no requests currently being
    // processed (e.g. recovery) we do all write actions
    // on raft_server under this mutex.
    mutable std::mutex server_write_mutex;

    std::mutex initialized_mutex;
    std::atomic<bool> initialized_flag = false;
    std::condition_variable initialized_cv;
    std::atomic<bool> initial_batch_committed = false;

    nuraft::ptr<nuraft::cluster_config> last_local_config;

    Poco::Logger * log;

    /// Callback func which is called by NuRaft on all internal events.
    /// Used to determine the moment when raft is ready to server new requests
    nuraft::cb_func::ReturnCode callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param);

    /// Almost copy-paste from nuraft::launcher, but with separated server init and start
    /// Allows to avoid race conditions.
    void launchRaftServer(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6);

    void shutdownRaftServer();

    void loadLatestConfig();

    void enterRecoveryMode(nuraft::raft_params & params);

    std::atomic_bool is_recovering = false;

    std::shared_ptr<KeeperContext> keeper_context;

    const bool create_snapshot_on_exit;
    const bool enable_reconfiguration;

public:
    KeeperServer(
        const KeeperConfigurationAndSettingsPtr & settings_,
        const Poco::Util::AbstractConfiguration & config_,
        ResponsesQueue & responses_queue_,
        SnapshotsQueue & snapshots_queue_,
        KeeperContextPtr keeper_context_,
        KeeperSnapshotManagerS3 & snapshot_manager_s3,
        KeeperStateMachine::CommitCallback commit_callback);

    /// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings.
    void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true);

    /// Put local read request and execute in state machine directly and response into
    /// responses queue
    void putLocalReadRequest(const KeeperStorage::RequestForSession & request);

    bool isRecovering() const { return is_recovering; }
    bool reconfigEnabled() const { return enable_reconfiguration; }

    /// Put batch of requests into Raft and get result of put. Responses will be set separately into
    /// responses_queue.
    RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests);

    /// Return set of the non-active sessions
    std::vector<int64_t> getDeadSessions();

    nuraft::ptr<KeeperStateMachine> getKeeperStateMachine() const { return state_machine; }

    void forceRecovery();

    bool isLeader() const;

    bool isFollower() const;

    bool isObserver() const;

    bool isLeaderAlive() const;

    Keeper4LWInfo getPartiallyFilled4LWInfo() const;

    /// @return follower count if node is not leader return 0
    uint64_t getFollowerCount() const;

    /// @return synced follower count if node is not leader return 0
    uint64_t getSyncedFollowerCount() const;

    /// Wait server initialization (see callbackFunc)
    void waitInit();

    /// Return true if KeeperServer initialized
    bool checkInit() const { return initialized_flag; }

    void shutdown();

    int getServerID() const { return server_id; }

    bool applyConfigUpdate(const ClusterUpdateAction& action);

    // TODO (myrrc) these functions should be removed once "reconfig" is stabilized
    void applyConfigUpdateWithReconfigDisabled(const ClusterUpdateAction& action);
    bool waitForConfigUpdateWithReconfigDisabled(const ClusterUpdateAction& action);
    ClusterUpdateActions getRaftConfigurationDiff(const Poco::Util::AbstractConfiguration & config);

    uint64_t createSnapshot();

    KeeperLogInfo getKeeperLogInfo();

    bool requestLeader();

    void recalculateStorageStats();
};

}