aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Common/ConcurrencyControl.h
blob: 2f41c23d10e444e5dfa2262852843e909e6bdd87 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#pragma once

#include <base/types.h>
#include <boost/core/noncopyable.hpp>
#include <mutex>
#include <memory>
#include <list>
#include <optional>

namespace DB
{

/*
 * Controls how many threads can be allocated for a query (or another activity).
 * There is a limited amount of slots for threads. It can be set with `setMaxConcurrency(limit)`.
 *
 * Lifecycle of a slot: free -> granted -> acquired -> free.
 * free: slot is available to be allocated by any query.
 * granted: slot is allocated by specific query, but not yet acquired by any thread.
 * acquired: slot is allocated by specific query and acquired by a thread.
 *
 * USAGE:
 *   1. Create an allocation for a query:
 *      `auto slots = ConcurrencyControl::instance().allocate(min, max);`
 *      It will allocate at least `min` and at most `max` slots.
 *      Note that `min` slots are granted immediately, but other `max - min` may be granted later.
 *   2. For every thread a slot has to be acquired from that allocation:
 *      `while (auto slot = slots->tryAcquire()) createYourThread([slot = std::move(slot)] { ... });`
 *      This snippet can be used at query startup and for upscaling later.
 * (both functions are non-blocking)
 *
 * Released slots are distributed between waiting allocations in a round-robin manner to provide fairness.
 * Oversubscription is possible: total amount of allocated slots can exceed `setMaxConcurrency(limit)`
 * because `min` amount of slots is allocated for each query unconditionally.
 */
class ConcurrencyControl : boost::noncopyable
{
public:
    struct Allocation;
    using AllocationPtr = std::shared_ptr<Allocation>;
    using SlotCount = UInt64;
    using Waiters = std::list<Allocation *>;

    static constexpr SlotCount Unlimited = std::numeric_limits<SlotCount>::max();

    // Scoped guard for acquired slot, see Allocation::tryAcquire()
    struct Slot : boost::noncopyable
    {
        ~Slot();

    private:
        friend struct Allocation; // for ctor

        explicit Slot(AllocationPtr && allocation_);

        AllocationPtr allocation;
    };

    // FIXME: have to be unique_ptr, but ThreadFromGlobalPool does not support move semantics yet
    using SlotPtr = std::shared_ptr<Slot>;

    // Manages group of slots for a single query, see ConcurrencyControl::allocate(min, max)
    struct Allocation : std::enable_shared_from_this<Allocation>, boost::noncopyable
    {
        ~Allocation();

        // Take one already granted slot if available. Lock-free iff there is no granted slot.
        [[nodiscard]] SlotPtr tryAcquire();

        SlotCount grantedCount() const;

    private:
        friend struct Slot; // for release()
        friend class ConcurrencyControl; // for grant(), free() and ctor

        Allocation(ConcurrencyControl & parent_, SlotCount limit_, SlotCount granted_, Waiters::iterator waiter_ = {});

        auto cancel()
        {
            std::unique_lock lock{mutex};
            return std::pair{allocated - released,
                allocated < limit ?
                    std::optional<Waiters::iterator>(waiter) :
                    std::optional<Waiters::iterator>()};
        }

        // Grant single slot to allocation, returns true iff more slot(s) are required
        bool grant();

        // Release one slot and grant it to other allocation if required
        void release();

        ConcurrencyControl & parent;
        const SlotCount limit;

        std::mutex mutex; // the following values must be accessed under this mutex
        SlotCount allocated; // allocated total (including already `released`)
        SlotCount released = 0;

        std::atomic<SlotCount> granted; // allocated, but not yet acquired

        const Waiters::iterator waiter; // iterator to itself in Waiters list; valid iff allocated < limit
    };

public:
    ConcurrencyControl();

    // WARNING: all Allocation objects MUST be destructed before ConcurrencyControl
    // NOTE: Recommended way to achieve this is to use `instance()` and do graceful shutdown of queries
    ~ConcurrencyControl();

    // Allocate at least `min` and at most `max` slots.
    // If not all `max` slots were successfully allocated, a subscription for later allocation is created
    // Use `Allocation::tryAcquire()` to acquire allocated slot, before running a thread.
    [[nodiscard]] AllocationPtr allocate(SlotCount min, SlotCount max);

    void setMaxConcurrency(SlotCount value);

    static ConcurrencyControl & instance();

private:
    friend struct Allocation; // for free() and release()

    void free(Allocation * allocation);

    void release(SlotCount amount);

    // Round-robin scheduling of available slots among waiting allocations
    void schedule(std::unique_lock<std::mutex> &);

    SlotCount available(std::unique_lock<std::mutex> &) const;

    std::mutex mutex;
    Waiters waiters;
    Waiters::iterator cur_waiter; // round-robin pointer
    SlotCount max_concurrency = Unlimited;
    SlotCount cur_concurrency = 0;
};

}