1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
#pragma once
#include <mutex>
#include <map>
#include <atomic>
#include <thread>
#include <chrono>
#include <Poco/Timespan.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperArgs.h>
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Coordination/KeeperFeatureFlags.h>
namespace Coordination
{
struct TestKeeperRequest;
using TestKeeperRequestPtr = std::shared_ptr<TestKeeperRequest>;
/** Looks like ZooKeeper but stores all data in memory of server process.
* All data is not shared between different servers and is lost after server restart.
*
* The only purpose is to more simple testing for interaction with ZooKeeper within a single server.
* This still makes sense, because multiple replicas of a single table can be created on a single server,
* and it is used to test replication logic.
*
* Does not support ACLs. Does not support NULL node values.
*
* NOTE: You can add various failure modes for better testing.
*/
class TestKeeper final : public IKeeper
{
public:
explicit TestKeeper(const zkutil::ZooKeeperArgs & args_);
~TestKeeper() override;
bool isExpired() const override { return expired; }
bool hasReachedDeadline() const override { return false; }
int64_t getSessionID() const override { return 0; }
void create(
const String & path,
const String & data,
bool is_ephemeral,
bool is_sequential,
const ACLs & acls,
CreateCallback callback) override;
void remove(
const String & path,
int32_t version,
RemoveCallback callback) override;
void exists(
const String & path,
ExistsCallback callback,
WatchCallbackPtr watch) override;
void get(
const String & path,
GetCallback callback,
WatchCallbackPtr watch) override;
void set(
const String & path,
const String & data,
int32_t version,
SetCallback callback) override;
void list(
const String & path,
ListRequestType list_request_type,
ListCallback callback,
WatchCallbackPtr watch) override;
void check(
const String & path,
int32_t version,
CheckCallback callback) override;
void sync(
const String & path,
SyncCallback callback) override;
void reconfig(
std::string_view joining,
std::string_view leaving,
std::string_view new_members,
int32_t version,
ReconfigCallback callback) final;
void multi(
const Requests & requests,
MultiCallback callback) override;
void finalize(const String & reason) override;
bool isFeatureEnabled(DB::KeeperFeatureFlag) const override
{
return false;
}
struct Node
{
String data;
ACLs acls;
bool is_ephemeral = false;
bool is_sequental = false;
Stat stat{};
int32_t seq_num = 0;
};
using Container = std::map<std::string, Node>;
using WatchCallbacks = std::unordered_set<WatchCallbackPtr>;
using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
private:
using clock = std::chrono::steady_clock;
struct RequestInfo
{
TestKeeperRequestPtr request;
ResponseCallback callback;
WatchCallbackPtr watch;
clock::time_point time;
};
Container container;
zkutil::ZooKeeperArgs args;
std::mutex push_request_mutex;
std::atomic<bool> expired{false};
int64_t zxid = 0;
Watches watches;
Watches list_watches; /// Watches for 'list' request (watches on children).
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1};
void pushRequest(RequestInfo && request);
ThreadFromGlobalPool processing_thread;
void processingThread();
};
}
|