1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
|
#pragma once
#include <unordered_map>
#include <vector>
#include <Coordination/ACLMap.h>
#include <Coordination/SessionExpiryQueue.h>
#include <Coordination/SnapshotableHashTable.h>
#include <IO/WriteBufferFromString.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Coordination/KeeperContext.h>
#include <absl/container/flat_hash_set.h>
namespace DB
{
struct KeeperStorageRequestProcessor;
using KeeperStorageRequestProcessorPtr = std::shared_ptr<KeeperStorageRequestProcessor>;
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
using ChildrenSet = absl::flat_hash_set<StringRef, StringRefHash>;
using SessionAndTimeout = std::unordered_map<int64_t, int64_t>;
struct KeeperStorageSnapshot;
/// Keeper state machine almost equal to the ZooKeeper's state machine.
/// Implements all logic of operations, data changes, sessions allocation.
/// In-memory and not thread safe.
class KeeperStorage
{
public:
struct Node
{
uint64_t acl_id = 0; /// 0 -- no ACL by default
bool is_sequental = false;
Coordination::Stat stat{};
int32_t seq_num = 0;
uint64_t size_bytes; // save size to avoid calculate every time
Node() : size_bytes(sizeof(Node)) { }
/// Object memory size
uint64_t sizeInBytes() const { return size_bytes; }
void setData(String new_data);
const auto & getData() const noexcept { return data; }
void addChild(StringRef child_path, bool update_size = true);
void removeChild(StringRef child_path);
const auto & getChildren() const noexcept { return children; }
// Invalidate the calculated digest so it's recalculated again on the next
// getDigest call
void invalidateDigestCache() const;
// get the calculated digest of the node
UInt64 getDigest(std::string_view path) const;
// copy only necessary information for preprocessing and digest calculation
// (e.g. we don't need to copy list of children)
void shallowCopy(const Node & other);
void recalculateSize();
private:
String data;
ChildrenSet children{};
mutable std::optional<UInt64> cached_digest;
};
enum DigestVersion : uint8_t
{
NO_DIGEST = 0,
V1 = 1,
V2 = 2 // added system nodes that modify the digest on startup so digest from V0 is invalid
};
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V2;
struct ResponseForSession
{
int64_t session_id;
Coordination::ZooKeeperResponsePtr response;
};
using ResponsesForSessions = std::vector<ResponseForSession>;
struct Digest
{
DigestVersion version{DigestVersion::NO_DIGEST};
uint64_t value{0};
};
static bool checkDigest(const Digest & first, const Digest & second)
{
if (first.version != second.version)
return true;
if (first.version == DigestVersion::NO_DIGEST)
return true;
return first.value == second.value;
}
static String generateDigest(const String & userdata);
struct RequestForSession
{
int64_t session_id;
int64_t time{0};
Coordination::ZooKeeperRequestPtr request;
int64_t zxid{0};
std::optional<Digest> digest;
};
struct AuthID
{
std::string scheme;
std::string id;
bool operator==(const AuthID & other) const { return scheme == other.scheme && id == other.id; }
};
using RequestsForSessions = std::vector<RequestForSession>;
using Container = SnapshotableHashTable<Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionIDs = std::unordered_set<int64_t>;
/// Just vector of SHA1 from user:password
using AuthIDs = std::vector<AuthID>;
using SessionAndAuth = std::unordered_map<int64_t, AuthIDs>;
using Watches = std::map<String /* path, relative of root_path */, SessionIDs>;
int64_t session_id_counter{1};
SessionAndAuth session_and_auth;
/// Main hashtable with nodes. Contain all information about data.
/// All other structures expect session_and_timeout can be restored from
/// container.
Container container;
// Applying ZooKeeper request to storage consists of two steps:
// - preprocessing which, instead of applying the changes directly to storage,
// generates deltas with those changes, denoted with the request ZXID
// - processing which applies deltas with the correct ZXID to the storage
//
// Delta objects allow us two things:
// - fetch the latest, uncommitted state of an object by getting the committed
// state of that same object from the storage and applying the deltas
// in the same order as they are defined
// - quickly commit the changes to the storage
struct CreateNodeDelta
{
Coordination::Stat stat;
bool is_sequental;
Coordination::ACLs acls;
String data;
};
struct RemoveNodeDelta
{
int32_t version{-1};
int64_t ephemeral_owner{0};
};
struct UpdateNodeDelta
{
std::function<void(Node &)> update_fn;
int32_t version{-1};
};
struct SetACLDelta
{
Coordination::ACLs acls;
int32_t version{-1};
};
struct ErrorDelta
{
Coordination::Error error;
};
struct FailedMultiDelta
{
std::vector<Coordination::Error> error_codes;
};
// Denotes end of a subrequest in multi request
struct SubDeltaEnd
{
};
struct AddAuthDelta
{
int64_t session_id;
AuthID auth_id;
};
using Operation = std::
variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, AddAuthDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta>;
struct Delta
{
Delta(String path_, int64_t zxid_, Operation operation_) : path(std::move(path_)), zxid(zxid_), operation(std::move(operation_)) { }
Delta(int64_t zxid_, Coordination::Error error) : Delta("", zxid_, ErrorDelta{error}) { }
Delta(int64_t zxid_, Operation subdelta) : Delta("", zxid_, subdelta) { }
String path;
int64_t zxid;
Operation operation;
};
struct UncommittedState
{
explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { }
void addDelta(Delta new_delta);
void addDeltas(std::vector<Delta> new_deltas);
void commit(int64_t commit_zxid);
void rollback(int64_t rollback_zxid);
std::shared_ptr<Node> getNode(StringRef path) const;
Coordination::ACLs getACLs(StringRef path) const;
void applyDelta(const Delta & delta);
bool hasACL(int64_t session_id, bool is_local, std::function<bool(const AuthID &)> predicate)
{
const auto check_auth = [&](const auto & auth_ids)
{
for (const auto & auth : auth_ids)
{
using TAuth = std::remove_reference_t<decltype(auth)>;
const AuthID * auth_ptr = nullptr;
if constexpr (std::is_pointer_v<TAuth>)
auth_ptr = auth;
else
auth_ptr = &auth;
if (predicate(*auth_ptr))
return true;
}
return false;
};
if (is_local)
return check_auth(storage.session_and_auth[session_id]);
if (check_auth(storage.session_and_auth[session_id]))
return true;
// check if there are uncommitted
const auto auth_it = session_and_auth.find(session_id);
if (auth_it == session_and_auth.end())
return false;
return check_auth(auth_it->second);
}
void forEachAuthInSession(int64_t session_id, std::function<void(const AuthID &)> func) const;
std::shared_ptr<Node> tryGetNodeFromStorage(StringRef path) const;
std::unordered_map<int64_t, std::list<const AuthID *>> session_and_auth;
struct UncommittedNode
{
std::shared_ptr<Node> node{nullptr};
Coordination::ACLs acls{};
int64_t zxid{0};
};
struct Hash
{
auto operator()(const std::string_view view) const
{
SipHash hash;
hash.update(view);
return hash.get64();
}
using is_transparent = void; // required to make find() work with different type than key_type
};
struct Equal
{
auto operator()(const std::string_view a,
const std::string_view b) const
{
return a == b;
}
using is_transparent = void; // required to make find() work with different type than key_type
};
mutable std::unordered_map<std::string, UncommittedNode, Hash, Equal> nodes;
std::unordered_map<std::string, std::list<const Delta *>, Hash, Equal> deltas_for_path;
std::list<Delta> deltas;
KeeperStorage & storage;
};
UncommittedState uncommitted_state{*this};
// Apply uncommitted state to another storage using only transactions
// with zxid > last_zxid
void applyUncommittedState(KeeperStorage & other, int64_t last_zxid);
Coordination::Error commit(int64_t zxid);
// Create node in the storage
// Returns false if it failed to create the node, true otherwise
// We don't care about the exact failure because we should've caught it during preprocessing
bool createNode(
const std::string & path,
String data,
const Coordination::Stat & stat,
bool is_sequental,
Coordination::ACLs node_acls);
// Remove node in the storage
// Returns false if it failed to remove the node, true otherwise
// We don't care about the exact failure because we should've caught it during preprocessing
bool removeNode(const std::string & path, int32_t version);
bool checkACL(StringRef path, int32_t permissions, int64_t session_id, bool is_local);
void unregisterEphemeralPath(int64_t session_id, const std::string & path);
/// Mapping session_id -> set of ephemeral nodes paths
Ephemerals ephemerals;
/// Mapping session_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers;
/// Expiration queue for session, allows to get dead sessions at some point of time
SessionExpiryQueue session_expiry_queue;
/// All active sessions with timeout
SessionAndTimeout session_and_timeout;
/// ACLMap for more compact ACLs storage inside nodes.
ACLMap acl_map;
/// Global id of all requests applied to storage
int64_t zxid{0};
// older Keeper node (pre V5 snapshots) can create snapshots and receive logs from newer Keeper nodes
// this can lead to some inconsistencies, e.g. from snapshot it will use log_idx as zxid
// while the log will have a smaller zxid because it's generated by the newer nodes
// we save the value loaded from snapshot to know when is it okay to have
// smaller zxid in newer requests
int64_t old_snapshot_zxid{0};
struct TransactionInfo
{
int64_t zxid;
Digest nodes_digest;
};
std::deque<TransactionInfo> uncommitted_transactions;
uint64_t nodes_digest{0};
bool finalized{false};
/// Currently active watches (node_path -> subscribed sessions)
Watches watches;
Watches list_watches; /// Watches for 'list' request (watches on children).
void clearDeadWatches(int64_t session_id);
/// Get current committed zxid
int64_t getZXID() const { return zxid; }
int64_t getNextZXID() const
{
if (uncommitted_transactions.empty())
return zxid + 1;
return uncommitted_transactions.back().zxid + 1;
}
Digest getNodesDigest(bool committed) const;
KeeperContextPtr keeper_context;
const String superdigest;
bool initialized{false};
KeeperStorage(int64_t tick_time_ms, const String & superdigest_, const KeeperContextPtr & keeper_context_, bool initialize_system_nodes = true);
void initializeSystemNodes();
/// Allocate new session id with the specified timeouts
int64_t getSessionID(int64_t session_timeout_ms)
{
auto result = session_id_counter++;
session_and_timeout.emplace(result, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(result, session_timeout_ms);
return result;
}
/// Add session id. Used when restoring KeeperStorage from snapshot.
void addSessionID(int64_t session_id, int64_t session_timeout_ms)
{
session_and_timeout.emplace(session_id, session_timeout_ms);
session_expiry_queue.addNewSessionOrUpdate(session_id, session_timeout_ms);
}
UInt64 calculateNodesDigest(UInt64 current_digest, const std::vector<Delta> & new_deltas) const;
/// Process user request and return response.
/// check_acl = false only when converting data from ZooKeeper.
ResponsesForSessions processRequest(
const Coordination::ZooKeeperRequestPtr & request,
int64_t session_id,
std::optional<int64_t> new_last_zxid,
bool check_acl = true,
bool is_local = false);
void preprocessRequest(
const Coordination::ZooKeeperRequestPtr & request,
int64_t session_id,
int64_t time,
int64_t new_last_zxid,
bool check_acl = true,
std::optional<Digest> digest = std::nullopt);
void rollbackRequest(int64_t rollback_zxid, bool allow_missing);
void finalize();
bool isFinalized() const;
/// Set of methods for creating snapshots
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
void enableSnapshotMode(size_t up_to_version) { container.enableSnapshotMode(up_to_version); }
/// Turn off snapshot mode.
void disableSnapshotMode() { container.disableSnapshotMode(); }
Container::const_iterator getSnapshotIteratorBegin() const { return container.begin(); }
/// Clear outdated data from internal container.
void clearGarbageAfterSnapshot() { container.clearOutdatedNodes(); }
/// Get all active sessions
const SessionAndTimeout & getActiveSessions() const { return session_and_timeout; }
/// Get all dead sessions
std::vector<int64_t> getDeadSessions() const { return session_expiry_queue.getExpiredSessions(); }
/// Introspection functions mostly used in 4-letter commands
uint64_t getNodesCount() const { return container.size(); }
uint64_t getApproximateDataSize() const { return container.getApproximateDataSize(); }
uint64_t getArenaDataSize() const { return container.keyArenaSize(); }
uint64_t getTotalWatchesCount() const;
uint64_t getWatchedPathsCount() const { return watches.size() + list_watches.size(); }
uint64_t getSessionsWithWatchesCount() const;
uint64_t getSessionWithEphemeralNodesCount() const { return ephemerals.size(); }
uint64_t getTotalEphemeralNodesCount() const;
void dumpWatches(WriteBufferFromOwnString & buf) const;
void dumpWatchesByPath(WriteBufferFromOwnString & buf) const;
void dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const;
void recalculateStats();
private:
void removeDigest(const Node & node, std::string_view path);
void addDigest(const Node & node, std::string_view path);
};
using KeeperStoragePtr = std::unique_ptr<KeeperStorage>;
}
|