aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Client/ConnectionPoolWithFailover.h
blob: 72a441fe3d6bd1af7c55e09fa30c31661a74f8e3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#pragma once

#include <Common/PoolWithFailoverBase.h>
#include <Common/GetPriorityForLoadBalancing.h>
#include <Client/ConnectionPool.h>

#include <chrono>
#include <vector>


namespace DB
{

/** Connection pool with fault tolerance.
  * Initialized by several other IConnectionPools.
  * When a connection is received, it tries to create or select a live connection from a pool,
  *  fetch them in some order, using no more than the specified number of attempts.
  * Pools with fewer errors are preferred;
  *  pools with the same number of errors are tried in random order.
  *
  * Note: if one of the nested pools is blocked due to overflow, then this pool will also be blocked.
  */

/// Specifies how many connections to return from ConnectionPoolWithFailover::getMany() method.
enum class PoolMode
{
    /// Return exactly one connection.
    GET_ONE = 0,
    /// Return a number of connections, this number being determined by max_parallel_replicas setting.
    GET_MANY,
    /// Return a connection from each nested pool.
    GET_ALL
};

class ConnectionPoolWithFailover : public IConnectionPool, private PoolWithFailoverBase<IConnectionPool>
{
public:
    ConnectionPoolWithFailover(
            ConnectionPoolPtrs nested_pools_,
            LoadBalancing load_balancing,
            time_t decrease_error_period_ = DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_DECREASE_ERROR_PERIOD,
            size_t max_error_cap = DBMS_CONNECTION_POOL_WITH_FAILOVER_MAX_ERROR_COUNT);

    using Entry = IConnectionPool::Entry;

    /** Allocates connection to work. */
    Entry get(const ConnectionTimeouts & timeouts,
              const Settings * settings,
              bool force_connected) override; /// From IConnectionPool

    Priority getPriority() const override; /// From IConnectionPool

    /** Allocates up to the specified number of connections to work.
      * Connections provide access to different replicas of one shard.
      */
    std::vector<Entry> getMany(const ConnectionTimeouts & timeouts,
                               const Settings * settings, PoolMode pool_mode,
                               AsyncCallback async_callback = {},
                               std::optional<bool> skip_unavailable_endpoints = std::nullopt);

    /// The same as getMany(), but return std::vector<TryResult>.
    std::vector<TryResult> getManyForTableFunction(const ConnectionTimeouts & timeouts,
                                                   const Settings * settings, PoolMode pool_mode);

    using Base = PoolWithFailoverBase<IConnectionPool>;
    using TryResult = Base::TryResult;

    /// The same as getMany(), but check that replication delay for table_to_check is acceptable.
    /// Delay threshold is taken from settings.
    std::vector<TryResult> getManyChecked(
            const ConnectionTimeouts & timeouts,
            const Settings * settings,
            PoolMode pool_mode,
            const QualifiedTableName & table_to_check,
            AsyncCallback async_callback = {},
            std::optional<bool> skip_unavailable_endpoints = std::nullopt);

    struct NestedPoolStatus
    {
        const Base::NestedPoolPtr pool;
        size_t error_count;
        size_t slowdown_count;
        std::chrono::seconds estimated_recovery_time;
    };

    using Status = std::vector<NestedPoolStatus>;
    Status getStatus() const;

    std::vector<Base::ShuffledPool> getShuffledPools(const Settings * settings);

    size_t getMaxErrorCup() const { return Base::max_error_cap; }

    void updateSharedError(std::vector<ShuffledPool> & shuffled_pools)
    {
        Base::updateSharedErrorCounts(shuffled_pools);
    }

private:
    /// Get the values of relevant settings and call Base::getMany()
    std::vector<TryResult> getManyImpl(
            const Settings * settings,
            PoolMode pool_mode,
            const TryGetEntryFunc & try_get_entry,
            std::optional<bool> skip_unavailable_endpoints = std::nullopt);

    /// Try to get a connection from the pool and check that it is good.
    /// If table_to_check is not null and the check is enabled in settings, check that replication delay
    /// for this table is not too large.
    TryResult tryGetEntry(
            IConnectionPool & pool,
            const ConnectionTimeouts & timeouts,
            std::string & fail_message,
            const Settings * settings,
            const QualifiedTableName * table_to_check = nullptr,
            AsyncCallback async_callback = {});

    GetPriorityFunc makeGetPriorityFunc(const Settings * settings);

    GetPriorityForLoadBalancing get_priority_load_balancing;
};

using ConnectionPoolWithFailoverPtr = std::shared_ptr<ConnectionPoolWithFailover>;
using ConnectionPoolWithFailoverPtrs = std::vector<ConnectionPoolWithFailoverPtr>;

}