1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
|
#include <Client/ConnectionPoolWithFailover.h>
#include <Client/ConnectionEstablisher.h>
#include <Poco/Net/NetException.h>
#include <Poco/Net/DNS.h>
#include <Common/BitHelpers.h>
#include <Common/quoteString.h>
#include <base/getFQDNOrHostName.h>
#include <Common/isLocalAddress.h>
#include <Common/ProfileEvents.h>
#include <Core/Settings.h>
#include <IO/ConnectionTimeouts.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ALL_CONNECTION_TRIES_FAILED;
}
ConnectionPoolWithFailover::ConnectionPoolWithFailover(
ConnectionPoolPtrs nested_pools_,
LoadBalancing load_balancing,
time_t decrease_error_period_,
size_t max_error_cap_)
: Base(std::move(nested_pools_), decrease_error_period_, max_error_cap_, &Poco::Logger::get("ConnectionPoolWithFailover"))
, get_priority_load_balancing(load_balancing)
{
const std::string & local_hostname = getFQDNOrHostName();
get_priority_load_balancing.hostname_differences.resize(nested_pools.size());
for (size_t i = 0; i < nested_pools.size(); ++i)
{
ConnectionPool & connection_pool = dynamic_cast<ConnectionPool &>(*nested_pools[i]);
get_priority_load_balancing.hostname_differences[i] = getHostNameDifference(local_hostname, connection_pool.getHost());
}
}
IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts & timeouts,
const Settings * settings,
bool /*force_connected*/)
{
if (nested_pools.empty())
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
"Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings, {});
};
size_t offset = 0;
LoadBalancing load_balancing = get_priority_load_balancing.load_balancing;
if (settings)
{
offset = settings->load_balancing_first_offset % nested_pools.size();
load_balancing = LoadBalancing(settings->load_balancing);
}
GetPriorityFunc get_priority = get_priority_load_balancing.getPriorityFunc(load_balancing, offset, nested_pools.size());
UInt64 max_ignored_errors = settings ? settings->distributed_replica_max_ignored_errors.value : 0;
bool fallback_to_stale_replicas = settings ? settings->fallback_to_stale_replicas_for_distributed_queries.value : true;
return Base::get(max_ignored_errors, fallback_to_stale_replicas, try_get_entry, get_priority);
}
Priority ConnectionPoolWithFailover::getPriority() const
{
return (*std::max_element(nested_pools.begin(), nested_pools.end(), [](const auto & a, const auto & b)
{
return a->getPriority() < b->getPriority();
}))->getPriority();
}
ConnectionPoolWithFailover::Status ConnectionPoolWithFailover::getStatus() const
{
const auto [states, pools, error_decrease_time] = getPoolExtendedStates();
// NOTE: to avoid data races do not touch any data of ConnectionPoolWithFailover or PoolWithFailoverBase in the code below.
assert(states.size() == pools.size());
ConnectionPoolWithFailover::Status result;
result.reserve(states.size());
const time_t since_last_error_decrease = time(nullptr) - error_decrease_time;
/// Update error_count and slowdown_count in states to return actual information.
auto updated_states = states;
auto updated_error_decrease_time = error_decrease_time;
Base::updateErrorCounts(updated_states, updated_error_decrease_time);
for (size_t i = 0; i < states.size(); ++i)
{
const auto rounds_to_zero_errors = states[i].error_count ? bitScanReverse(states[i].error_count) + 1 : 0;
const auto rounds_to_zero_slowdowns = states[i].slowdown_count ? bitScanReverse(states[i].slowdown_count) + 1 : 0;
const auto seconds_to_zero_errors = std::max(static_cast<time_t>(0), std::max(rounds_to_zero_errors, rounds_to_zero_slowdowns) * decrease_error_period - since_last_error_decrease);
result.emplace_back(NestedPoolStatus{
pools[i],
updated_states[i].error_count,
updated_states[i].slowdown_count,
std::chrono::seconds{seconds_to_zero_errors}
});
}
return result;
}
std::vector<IConnectionPool::Entry> ConnectionPoolWithFailover::getMany(const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode,
AsyncCallback async_callback,
std::optional<bool> skip_unavailable_endpoints)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings, nullptr, async_callback);
};
std::vector<TryResult> results = getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
std::vector<Entry> entries;
entries.reserve(results.size());
for (auto & result : results)
entries.emplace_back(std::move(result.entry));
return entries;
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyForTableFunction(
const ConnectionTimeouts & timeouts,
const Settings * settings,
PoolMode pool_mode)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings);
};
return getManyImpl(settings, pool_mode, try_get_entry);
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyChecked(
const ConnectionTimeouts & timeouts,
const Settings * settings, PoolMode pool_mode,
const QualifiedTableName & table_to_check,
AsyncCallback async_callback,
std::optional<bool> skip_unavailable_endpoints)
{
TryGetEntryFunc try_get_entry = [&](NestedPool & pool, std::string & fail_message)
{
return tryGetEntry(pool, timeouts, fail_message, settings, &table_to_check, async_callback);
};
return getManyImpl(settings, pool_mode, try_get_entry, skip_unavailable_endpoints);
}
ConnectionPoolWithFailover::Base::GetPriorityFunc ConnectionPoolWithFailover::makeGetPriorityFunc(const Settings * settings)
{
size_t offset = 0;
LoadBalancing load_balancing = get_priority_load_balancing.load_balancing;
if (settings)
{
offset = settings->load_balancing_first_offset % nested_pools.size();
load_balancing = LoadBalancing(settings->load_balancing);
}
return get_priority_load_balancing.getPriorityFunc(load_balancing, offset, nested_pools.size());
}
std::vector<ConnectionPoolWithFailover::TryResult> ConnectionPoolWithFailover::getManyImpl(
const Settings * settings,
PoolMode pool_mode,
const TryGetEntryFunc & try_get_entry,
std::optional<bool> skip_unavailable_endpoints)
{
if (nested_pools.empty())
throw DB::Exception(DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED,
"Cannot get connection from ConnectionPoolWithFailover cause nested pools are empty");
if (!skip_unavailable_endpoints.has_value())
skip_unavailable_endpoints = (settings && settings->skip_unavailable_shards);
size_t min_entries = skip_unavailable_endpoints.value() ? 0 : 1;
size_t max_tries = (settings ?
size_t{settings->connections_with_failover_max_tries} :
size_t{DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES});
size_t max_entries;
if (pool_mode == PoolMode::GET_ALL)
{
min_entries = nested_pools.size();
max_entries = nested_pools.size();
}
else if (pool_mode == PoolMode::GET_ONE)
max_entries = 1;
else if (pool_mode == PoolMode::GET_MANY)
max_entries = settings ? size_t(settings->max_parallel_replicas) : 1;
else
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown pool allocation mode");
GetPriorityFunc get_priority = makeGetPriorityFunc(settings);
UInt64 max_ignored_errors = settings ? settings->distributed_replica_max_ignored_errors.value : 0;
bool fallback_to_stale_replicas = settings ? settings->fallback_to_stale_replicas_for_distributed_queries.value : true;
return Base::getMany(min_entries, max_entries, max_tries,
max_ignored_errors, fallback_to_stale_replicas,
try_get_entry, get_priority);
}
ConnectionPoolWithFailover::TryResult
ConnectionPoolWithFailover::tryGetEntry(
IConnectionPool & pool,
const ConnectionTimeouts & timeouts,
std::string & fail_message,
const Settings * settings,
const QualifiedTableName * table_to_check,
[[maybe_unused]] AsyncCallback async_callback)
{
#if defined(OS_LINUX)
if (async_callback)
{
ConnectionEstablisherAsync connection_establisher_async(&pool, &timeouts, settings, log, table_to_check);
while (true)
{
connection_establisher_async.resume();
if (connection_establisher_async.isFinished())
break;
async_callback(
connection_establisher_async.getFileDescriptor(),
0,
AsyncEventTimeoutType::NONE,
"Connection establisher file descriptor",
AsyncTaskExecutor::Event::READ | AsyncTaskExecutor::Event::ERROR);
}
fail_message = connection_establisher_async.getFailMessage();
return connection_establisher_async.getResult();
}
#endif
ConnectionEstablisher connection_establisher(&pool, &timeouts, settings, log, table_to_check);
TryResult result;
connection_establisher.run(result, fail_message);
return result;
}
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> ConnectionPoolWithFailover::getShuffledPools(const Settings * settings)
{
GetPriorityFunc get_priority = makeGetPriorityFunc(settings);
UInt64 max_ignored_errors = settings ? settings->distributed_replica_max_ignored_errors.value : 0;
return Base::getShuffledPools(max_ignored_errors, get_priority);
}
}
|