1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
#pragma once
#include <vector>
#include <base/types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/UUID.h>
#include <Core/BackgroundSchedulePool.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/CancellationCode.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
class StorageReplicatedMergeTree;
/**
* Cross shard part movement workflow orchestration.
*
* TODO(nv):
* * Usage of `format_version` when acting on the behalf of the remote shard.
* There needs to be sort of an API to coordinate with remote replicas.
* * Only one movement at a time can be coordinated. This can easily be fixed
* by cycling through different tasks and checking their status with a
* priority queue and back-off for failing tasks
* `min(backoff * num_tries, max_backoff)`.
*/
class PartMovesBetweenShardsOrchestrator
{
public:
struct EntryState
{
// State transitions are linear. When a kill query is issued a rollback
// flag is set and transitions order is reversed.
//
// SOURCE_DROP is a critical state after which rollback is not possible
// and we must ensure that the task can always succeed after that.
//
// Similar for rollback. It should be always possible to rollback before
// SOURCE_DROP state and it should terminate.
//
// Note: This fragile. If you change the states please add entry to
// changelog about forward/backward compatibility. Better not to have
// any active move tasks while doing upgrade/downgrade operations.
enum Value
{
CANCELLED,
TODO,
SYNC_SOURCE,
SYNC_DESTINATION,
DESTINATION_FETCH,
DESTINATION_ATTACH,
SOURCE_DROP_PRE_DELAY,
SOURCE_DROP,
SOURCE_DROP_POST_DELAY,
REMOVE_UUID_PIN,
DONE,
};
EntryState(): value(TODO) {}
EntryState(Value value_): value(value_) {} /// NOLINT
Value value;
String toString() const
{
switch (value)
{
case TODO: return "TODO";
case SYNC_SOURCE: return "SYNC_SOURCE";
case SYNC_DESTINATION: return "SYNC_DESTINATION";
case DESTINATION_FETCH: return "DESTINATION_FETCH";
case DESTINATION_ATTACH: return "DESTINATION_ATTACH";
case SOURCE_DROP_PRE_DELAY: return "SOURCE_DROP_PRE_DELAY";
case SOURCE_DROP: return "SOURCE_DROP";
case SOURCE_DROP_POST_DELAY: return "SOURCE_DROP_POST_DELAY";
case REMOVE_UUID_PIN: return "REMOVE_UUID_PIN";
case DONE: return "DONE";
case CANCELLED: return "CANCELLED";
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown EntryState: {}", DB::toString<int>(value));
}
static EntryState::Value fromString(String in)
{
if (in == "TODO") return TODO;
else if (in == "SYNC_SOURCE") return SYNC_SOURCE;
else if (in == "SYNC_DESTINATION") return SYNC_DESTINATION;
else if (in == "DESTINATION_FETCH") return DESTINATION_FETCH;
else if (in == "DESTINATION_ATTACH") return DESTINATION_ATTACH;
else if (in == "SOURCE_DROP_PRE_DELAY") return SOURCE_DROP_PRE_DELAY;
else if (in == "SOURCE_DROP") return SOURCE_DROP;
else if (in == "SOURCE_DROP_POST_DELAY") return SOURCE_DROP_POST_DELAY;
else if (in == "REMOVE_UUID_PIN") return REMOVE_UUID_PIN;
else if (in == "DONE") return DONE;
else if (in == "CANCELLED") return CANCELLED;
else throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown state: {}", in);
}
};
struct Entry
{
friend class PartMovesBetweenShardsOrchestrator;
time_t create_time = 0;
time_t update_time = 0;
/// Globally unique identifier used for attaching parts on destination.
/// Using `part_uuid` results in part names being reused when moving parts back and forth.
UUID task_uuid;
String part_name;
UUID part_uuid;
String to_shard;
String dst_part_name;
EntryState state;
bool rollback = false;
/// Reset on successful transitions.
String last_exception_msg;
UInt64 num_tries = 0;
String znode_name;
private:
/// Transient value for CAS.
uint32_t version = 0;
String znode_path;
public:
String toString() const;
void fromString(const String & buf);
};
private:
static constexpr auto JSON_KEY_CREATE_TIME = "create_time";
static constexpr auto JSON_KEY_UPDATE_TIME = "update_time";
static constexpr auto JSON_KEY_TASK_UUID = "task_uuid";
static constexpr auto JSON_KEY_PART_NAME = "part_name";
static constexpr auto JSON_KEY_PART_UUID = "part_uuid";
static constexpr auto JSON_KEY_TO_SHARD = "to_shard";
static constexpr auto JSON_KEY_DST_PART_NAME = "dst_part_name";
static constexpr auto JSON_KEY_STATE = "state";
static constexpr auto JSON_KEY_ROLLBACK = "rollback";
static constexpr auto JSON_KEY_LAST_EX_MSG = "last_exception";
static constexpr auto JSON_KEY_NUM_TRIES = "num_tries";
public:
explicit PartMovesBetweenShardsOrchestrator(StorageReplicatedMergeTree & storage_);
void start() { task->activateAndSchedule(); }
void wakeup() { task->schedule(); }
void shutdown();
CancellationCode killPartMoveToShard(const UUID & task_uuid);
std::vector<Entry> getEntries();
private:
void run();
bool step();
Entry stepEntry(Entry entry, zkutil::ZooKeeperPtr zk);
Entry getEntryByUUID(const UUID & task_uuid);
void removePins(const Entry & entry, zkutil::ZooKeeperPtr zk);
void syncStateFromZK();
StorageReplicatedMergeTree & storage;
String zookeeper_path;
String logger_name;
Poco::Logger * log = nullptr;
std::atomic<bool> need_stop{false};
BackgroundSchedulePool::TaskHolder task;
mutable std::mutex state_mutex;
std::vector<Entry> entries;
public:
String entries_znode_path;
};
}
|