1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
|
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
#include <base/types.h>
#include <base/sort.h>
#include <optional>
#include <mutex>
#include <city.h>
#include <algorithm>
#include <atomic>
namespace DB
{
/// minimum interval (seconds) between checks if chosen replica finished the merge.
static const auto RECHECK_MERGE_READYNESS_INTERVAL_SECONDS = 1;
/// don't refresh state too often (to limit number of zookeeper ops)
static const auto REFRESH_STATE_MINIMUM_INTERVAL_SECONDS = 3;
/// refresh the state automatically it it was not refreshed for a longer time
static const auto REFRESH_STATE_MAXIMUM_INTERVAL_SECONDS = 30;
ReplicatedMergeTreeMergeStrategyPicker::ReplicatedMergeTreeMergeStrategyPicker(StorageReplicatedMergeTree & storage_)
: storage(storage_)
{}
bool ReplicatedMergeTreeMergeStrategyPicker::isMergeFinishedByReplica(const String & replica, const ReplicatedMergeTreeLogEntryData & entry)
{
/// those have only seconds resolution, so recheck period is quite rough
auto reference_timestamp = entry.last_postpone_time;
if (reference_timestamp == 0)
reference_timestamp = entry.create_time;
/// we don't want to check zookeeper too frequent
if (time(nullptr) - reference_timestamp >= RECHECK_MERGE_READYNESS_INTERVAL_SECONDS)
{
return storage.checkReplicaHavePart(replica, entry.new_part_name);
}
return false;
}
bool ReplicatedMergeTreeMergeStrategyPicker::shouldMergeOnSingleReplica(const ReplicatedMergeTreeLogEntryData & entry) const
{
time_t threshold = execute_merges_on_single_replica_time_threshold;
return (
threshold > 0 /// feature turned on
&& entry.type == ReplicatedMergeTreeLogEntry::MERGE_PARTS /// it is a merge log entry
&& entry.create_time + threshold > time(nullptr) /// not too much time waited
);
}
/// that will return the same replica name for ReplicatedMergeTreeLogEntry on all the replicas (if the replica set is the same).
/// that way each replica knows who is responsible for doing a certain merge.
/// in some corner cases (added / removed / deactivated replica)
/// nodes can pick different replicas to execute merge and wait for it (or to execute the same merge together)
/// but that doesn't have a significant impact (in one case it will wait for the execute_merges_on_single_replica_time_threshold,
/// in another just 2 replicas will do the merge)
std::optional<String> ReplicatedMergeTreeMergeStrategyPicker::pickReplicaToExecuteMerge(const ReplicatedMergeTreeLogEntryData & entry)
{
/// last state refresh was too long ago, need to sync up the replicas list
if (time(nullptr) - last_refresh_time > REFRESH_STATE_MAXIMUM_INTERVAL_SECONDS)
refreshState();
auto hash = getEntryHash(entry);
std::lock_guard lock(mutex);
auto num_replicas = active_replicas.size();
if (num_replicas == 0)
return std::nullopt;
auto replica_index = static_cast<int>(hash % num_replicas);
if (replica_index == current_replica_index)
return std::nullopt;
return active_replicas.at(replica_index);
}
void ReplicatedMergeTreeMergeStrategyPicker::refreshState()
{
const auto settings = storage.getSettings();
time_t threshold = settings->execute_merges_on_single_replica_time_threshold.totalSeconds();
time_t threshold_init = 0;
if (settings->allow_remote_fs_zero_copy_replication)
threshold_init = settings->remote_fs_execute_merges_on_single_replica_time_threshold.totalSeconds();
if (threshold == 0)
/// we can reset the settings without lock (it's atomic)
execute_merges_on_single_replica_time_threshold = threshold;
if (threshold_init == 0)
remote_fs_execute_merges_on_single_replica_time_threshold = threshold_init;
if (threshold == 0 && threshold_init == 0)
return;
auto now = time(nullptr);
/// the setting was already enabled, and last state refresh was done recently
if (((threshold != 0 && execute_merges_on_single_replica_time_threshold != 0)
|| (threshold_init != 0 && remote_fs_execute_merges_on_single_replica_time_threshold != 0))
&& now - last_refresh_time < REFRESH_STATE_MINIMUM_INTERVAL_SECONDS)
return;
LOG_DEBUG(storage.log, "Updating strategy picker state");
auto zookeeper = storage.getZooKeeper();
auto all_replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas");
::sort(all_replicas.begin(), all_replicas.end());
std::vector<String> active_replicas_tmp;
int current_replica_index_tmp = -1;
for (const String & replica : all_replicas)
{
if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active"))
{
active_replicas_tmp.push_back(replica);
if (replica == storage.replica_name)
{
current_replica_index_tmp = static_cast<int>(active_replicas_tmp.size() - 1);
}
}
}
if (current_replica_index_tmp < 0 || active_replicas_tmp.size() < 2)
{
if (execute_merges_on_single_replica_time_threshold > 0)
{
LOG_WARNING(storage.log, "Can't find current replica in the active replicas list, or too few active replicas to use 'execute_merges_on_single_replica_time_threshold'");
/// we can reset the settings without lock (it's atomic)
execute_merges_on_single_replica_time_threshold = 0;
}
/// default value of remote_fs_execute_merges_on_single_replica_time_threshold is not 0
/// so we write no warning in log here
remote_fs_execute_merges_on_single_replica_time_threshold = 0;
return;
}
std::lock_guard lock(mutex);
if (threshold != 0) /// Zeros already reset
execute_merges_on_single_replica_time_threshold = threshold;
if (threshold_init != 0)
remote_fs_execute_merges_on_single_replica_time_threshold = threshold_init;
last_refresh_time = now;
current_replica_index = current_replica_index_tmp;
active_replicas = active_replicas_tmp;
LOG_DEBUG(storage.log, "Strategy picker state updated, current replica: {}, active replicas: [{}]", current_replica_index, fmt::join(active_replicas, ", "));
}
uint64_t ReplicatedMergeTreeMergeStrategyPicker::getEntryHash(const ReplicatedMergeTreeLogEntryData & entry) const
{
auto hash_data = storage.zookeeper_path + entry.new_part_name;
return CityHash_v1_0_2::CityHash64(hash_data.c_str(), hash_data.length());
}
}
|