aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Coordination/KeeperStateMachine.h
blob: 9b821903864585403d326f09db5ea17f2fa25bed (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#pragma once

#include <Coordination/CoordinationSettings.h>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/KeeperSnapshotManagerS3.h>
#include <Coordination/KeeperContext.h>
#include <Coordination/KeeperStorage.h>

#error #include <libnuraft/nuraft.hxx>
#include <Common/ConcurrentBoundedQueue.h>


namespace DB
{
using ResponsesQueue = ConcurrentBoundedQueue<KeeperStorage::ResponseForSession>;
using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>;

/// ClickHouse Keeper state machine. Wrapper for KeeperStorage.
/// Responsible for entries commit, snapshots creation and so on.
class KeeperStateMachine : public nuraft::state_machine
{
public:
    using CommitCallback = std::function<void(const KeeperStorage::RequestForSession &)>;

    KeeperStateMachine(
        ResponsesQueue & responses_queue_,
        SnapshotsQueue & snapshots_queue_,
        const CoordinationSettingsPtr & coordination_settings_,
        const KeeperContextPtr & keeper_context_,
        KeeperSnapshotManagerS3 * snapshot_manager_s3_,
        CommitCallback commit_callback_ = {},
        const std::string & superdigest_ = "");

    /// Read state from the latest snapshot
    void init();

    enum ZooKeeperLogSerializationVersion
    {
        INITIAL = 0,
        WITH_TIME = 1,
        WITH_ZXID_DIGEST = 2,
    };

    /// lifetime of a parsed request is:
    /// [preprocess/PreAppendLog -> commit]
    /// [preprocess/PreAppendLog -> rollback]
    /// on events like commit and rollback we can remove the parsed request to keep the memory usage at minimum
    /// request cache is also cleaned on session close in case something strange happened
    ///
    /// final - whether it's the final time we will fetch the request so we can safely remove it from cache
    /// serialization_version - information about which fields were parsed from the buffer so we can modify the buffer accordingly
    std::shared_ptr<KeeperStorage::RequestForSession> parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version = nullptr);

    bool preprocess(const KeeperStorage::RequestForSession & request_for_session);

    nuraft::ptr<nuraft::buffer> pre_commit(uint64_t log_idx, nuraft::buffer & data) override;

    nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override; /// NOLINT

    /// Save new cluster config to our snapshot (copy of the config stored in StateManager)
    void commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf) override; /// NOLINT

    void rollback(uint64_t log_idx, nuraft::buffer & data) override;

    // allow_missing - whether the transaction we want to rollback can be missing from storage
    // (can happen in case of exception during preprocessing)
    void rollbackRequest(const KeeperStorage::RequestForSession & request_for_session, bool allow_missing);

    void rollbackRequestNoLock(
        const KeeperStorage::RequestForSession & request_for_session,
        bool allow_missing) TSA_NO_THREAD_SAFETY_ANALYSIS;

    uint64_t last_commit_index() override { return last_committed_idx; }

    /// Apply preliminarily saved (save_logical_snp_obj) snapshot to our state.
    bool apply_snapshot(nuraft::snapshot & s) override;

    nuraft::ptr<nuraft::snapshot> last_snapshot() override;

    /// Create new snapshot from current state.
    void create_snapshot(nuraft::snapshot & s, nuraft::async_result<bool>::handler_type & when_done) override;

    /// Save snapshot which was send by leader to us. After that we will apply it in apply_snapshot.
    void save_logical_snp_obj(nuraft::snapshot & s, uint64_t & obj_id, nuraft::buffer & data, bool is_first_obj, bool is_last_obj) override;

    /// Better name is `serialize snapshot` -- save existing snapshot (created by create_snapshot) into
    /// in-memory buffer data_out.
    int read_logical_snp_obj(
        nuraft::snapshot & s, void *& user_snp_ctx, uint64_t obj_id, nuraft::ptr<nuraft::buffer> & data_out, bool & is_last_obj) override;

    // This should be used only for tests or keeper-data-dumper because it violates
    // TSA -- we can't acquire the lock outside of this class or return a storage under lock
    // in a reasonable way.
    KeeperStorage & getStorageUnsafe() TSA_NO_THREAD_SAFETY_ANALYSIS
    {
        return *storage;
    }

    void shutdownStorage();

    ClusterConfigPtr getClusterConfig() const;

    /// Process local read request
    void processReadRequest(const KeeperStorage::RequestForSession & request_for_session);

    std::vector<int64_t> getDeadSessions();

    int64_t getNextZxid() const;

    KeeperStorage::Digest getNodesDigest() const;

    /// Introspection functions for 4lw commands
    uint64_t getLastProcessedZxid() const;

    uint64_t getNodesCount() const;
    uint64_t getTotalWatchesCount() const;
    uint64_t getWatchedPathsCount() const;
    uint64_t getSessionsWithWatchesCount() const;

    void dumpWatches(WriteBufferFromOwnString & buf) const;
    void dumpWatchesByPath(WriteBufferFromOwnString & buf) const;
    void dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const;

    uint64_t getSessionWithEphemeralNodesCount() const;
    uint64_t getTotalEphemeralNodesCount() const;
    uint64_t getApproximateDataSize() const;
    uint64_t getKeyArenaSize() const;
    uint64_t getLatestSnapshotBufSize() const;

    void recalculateStorageStats();

    void reconfigure(const KeeperStorage::RequestForSession& request_for_session);

private:
    CommitCallback commit_callback;
    /// In our state machine we always have a single snapshot which is stored
    /// in memory in compressed (serialized) format.
    SnapshotMetadataPtr latest_snapshot_meta = nullptr;
    SnapshotFileInfo latest_snapshot_info;
    nuraft::ptr<nuraft::buffer> latest_snapshot_buf = nullptr;

    CoordinationSettingsPtr coordination_settings;

    /// Main state machine logic
    KeeperStoragePtr storage TSA_PT_GUARDED_BY(storage_and_responses_lock);

    /// Save/Load and Serialize/Deserialize logic for snapshots.
    KeeperSnapshotManager snapshot_manager;

    /// Put processed responses into this queue
    ResponsesQueue & responses_queue;

    /// Snapshots to create by snapshot thread
    SnapshotsQueue & snapshots_queue;

    /// Mutex for snapshots
    mutable std::mutex snapshots_lock;

    /// Lock for storage and responses_queue. It's important to process requests
    /// and push them to the responses queue while holding this lock. Otherwise
    /// we can get strange cases when, for example client send read request with
    /// watch and after that receive watch response and only receive response
    /// for request.
    mutable std::mutex storage_and_responses_lock;

    std::unordered_map<int64_t, std::unordered_map<Coordination::XID, std::shared_ptr<KeeperStorage::RequestForSession>>> parsed_request_cache;
    uint64_t min_request_size_to_cache{0};
    /// we only need to protect the access to the map itself
    /// requests can be modified from anywhere without lock because a single request
    /// can be processed only in 1 thread at any point
    std::mutex request_cache_mutex;

    /// Last committed Raft log number.
    std::atomic<uint64_t> last_committed_idx;

    Poco::Logger * log;

    /// Cluster config for our quorum.
    /// It's a copy of config stored in StateManager, but here
    /// we also write it to disk during snapshot. Must be used with lock.
    mutable std::mutex cluster_config_lock;
    ClusterConfigPtr cluster_config;

    /// Special part of ACL system -- superdigest specified in server config.
    const std::string superdigest;

    KeeperContextPtr keeper_context;

    KeeperSnapshotManagerS3 * snapshot_manager_s3;

    KeeperStorage::ResponseForSession processReconfiguration(
        const KeeperStorage::RequestForSession& request_for_session)
        TSA_REQUIRES(storage_and_responses_lock);
};
}