1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
#pragma once
#if defined(OS_LINUX)
#include <Common/TimerDescriptor.h>
#include <Common/Epoll.h>
#include <Common/FiberStack.h>
#include <Common/Fiber.h>
#include <Client/ConnectionEstablisher.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <Core/Settings.h>
#include <unordered_map>
#include <memory>
namespace DB
{
/** Class for establishing hedged connections with replicas.
* The process of establishing connection is divided on stages, on each stage if
* replica doesn't respond for a long time, we start establishing connection with
* the next replica, without cancelling working with previous one.
* It works with multiple replicas simultaneously without blocking by using epoll.
*/
class HedgedConnectionsFactory
{
public:
using ShuffledPool = ConnectionPoolWithFailover::Base::ShuffledPool;
using TryResult = PoolWithFailoverBase<IConnectionPool>::TryResult;
enum class State
{
READY,
NOT_READY,
CANNOT_CHOOSE,
};
struct ReplicaStatus
{
explicit ReplicaStatus(std::unique_ptr<ConnectionEstablisherAsync> connection_stablisher_) : connection_establisher(std::move(connection_stablisher_))
{
}
std::unique_ptr<ConnectionEstablisherAsync> connection_establisher;
TimerDescriptor change_replica_timeout;
bool is_ready = false;
};
HedgedConnectionsFactory(const ConnectionPoolWithFailoverPtr & pool_,
const Settings * settings_,
const ConnectionTimeouts & timeouts_,
std::shared_ptr<QualifiedTableName> table_to_check_ = nullptr);
/// Create and return active connections according to pool_mode.
std::vector<Connection *> getManyConnections(PoolMode pool_mode, AsyncCallback async_callback = {});
/// Try to get connection to the new replica without blocking. Process all current events in epoll (connections, timeouts),
/// Returned state might be READY (connection established successfully),
/// NOT_READY (there are no ready events now) and CANNOT_CHOOSE (cannot produce new connection anymore).
/// If state is READY, replica connection will be written in connection_out.
State waitForReadyConnections(Connection *& connection_out);
State startNewConnection(Connection *& connection_out);
/// Stop working with all replicas that are not READY.
void stopChoosingReplicas();
bool hasEventsInProcess() const { return !epoll.empty(); }
int getFileDescriptor() const { return epoll.getFileDescriptor(); }
const ConnectionTimeouts & getConnectionTimeouts() const { return timeouts; }
size_t numberOfProcessingReplicas() const;
/// Tell Factory to not return connections with two level aggregation incompatibility.
void skipReplicasWithTwoLevelAggregationIncompatibility() { skip_replicas_with_two_level_aggregation_incompatibility = true; }
~HedgedConnectionsFactory();
private:
State waitForReadyConnectionsImpl(bool blocking, Connection *& connection_out, AsyncCallback & async_callback);
/// Try to start establishing connection to the new replica. Return
/// the index of the new replica or -1 if cannot start new connection.
State startNewConnectionImpl(Connection *& connection_out);
/// Find an index of the next free replica to start connection.
/// Return -1 if there is no free replica.
int getNextIndex();
int getReadyFileDescriptor(bool blocking, AsyncCallback & async_callback);
void processFailedConnection(int index, const std::string & fail_message);
State resumeConnectionEstablisher(int index, Connection *& connection_out);
State processFinishedConnection(int index, TryResult result, Connection *& connection_out);
void removeReplicaFromEpoll(int index, int fd);
void addNewReplicaToEpoll(int index, int fd);
/// Return NOT_READY state if there is no ready events, READY if replica is ready
/// and CANNOT_CHOOSE if there is no more events in epoll.
State processEpollEvents(bool blocking, Connection *& connection_out, AsyncCallback & async_callback);
State setBestUsableReplica(Connection *& connection_out);
bool isTwoLevelAggregationIncompatible(Connection * connection);
const ConnectionPoolWithFailoverPtr pool;
const Settings * settings;
const ConnectionTimeouts timeouts;
std::vector<ShuffledPool> shuffled_pools;
std::vector<ReplicaStatus> replicas;
/// Map socket file descriptor to replica index.
std::unordered_map<int, int> fd_to_replica_index;
/// Map timeout for changing replica to replica index.
std::unordered_map<int, int> timeout_fd_to_replica_index;
/// If this flag is true, don't return connections with
/// two level aggregation incompatibility
bool skip_replicas_with_two_level_aggregation_incompatibility = false;
std::shared_ptr<QualifiedTableName> table_to_check;
int last_used_index = -1;
bool fallback_to_stale_replicas;
Epoll epoll;
Poco::Logger * log;
std::string fail_messages;
/// The maximum number of attempts to connect to replicas.
size_t max_tries;
/// Total number of established connections.
size_t entries_count = 0;
/// The number of established connections that are usable.
size_t usable_count = 0;
/// The number of established connections that are up to date.
size_t up_to_date_count = 0;
/// The number of failed connections (replica is considered failed after max_tries attempts to connect).
size_t failed_pools_count= 0;
/// The number of replicas that are in process of connection.
size_t replicas_in_process_count = 0;
/// The number of ready replicas (replica is considered ready when it's
/// connection returns outside).
size_t ready_replicas_count = 0;
/// The number of requested in startNewConnection replicas (it's needed for
/// checking the number of requested replicas that are still in process).
size_t requested_connections_count = 0;
};
}
#endif
|