aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Common/ConcurrencyControl.cpp
blob: de46f0e90cab43a2a2147d2d334824b173f734b3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#include <Common/ConcurrencyControl.h>
#include <Common/Exception.h>


namespace DB
{

namespace ErrorCodes
{
    extern const int LOGICAL_ERROR;
}

ConcurrencyControl::Slot::~Slot()
{
    allocation->release();
}

ConcurrencyControl::Slot::Slot(AllocationPtr && allocation_)
    : allocation(std::move(allocation_))
{
}

ConcurrencyControl::Allocation::~Allocation()
{
    // We have to lock parent's mutex to avoid race with grant()
    // NOTE: shortcut can be added, but it requires Allocation::mutex lock even to check if shortcut is possible
    parent.free(this);
}

[[nodiscard]] ConcurrencyControl::SlotPtr ConcurrencyControl::Allocation::tryAcquire()
{
    SlotCount value = granted.load();
    while (value)
    {
        if (granted.compare_exchange_strong(value, value - 1))
        {
            std::unique_lock lock{mutex};
            return SlotPtr(new Slot(shared_from_this())); // can't use std::make_shared due to private ctor
        }
    }
    return {}; // avoid unnecessary locking
}

ConcurrencyControl::SlotCount ConcurrencyControl::Allocation::grantedCount() const
{
    return granted;
}

ConcurrencyControl::Allocation::Allocation(ConcurrencyControl & parent_, SlotCount limit_, SlotCount granted_, Waiters::iterator waiter_)
    : parent(parent_)
    , limit(limit_)
    , allocated(granted_)
    , granted(granted_)
    , waiter(waiter_)
{
    if (allocated < limit)
        *waiter = this;
}

// Grant single slot to allocation, returns true iff more slot(s) are required
bool ConcurrencyControl::Allocation::grant()
{
    std::unique_lock lock{mutex};
    granted++;
    allocated++;
    return allocated < limit;
}

// Release one slot and grant it to other allocation if required
void ConcurrencyControl::Allocation::release()
{
    parent.release(1);
    std::unique_lock lock{mutex};
    released++;
    if (released > allocated)
        abort();
}

ConcurrencyControl::ConcurrencyControl()
    : cur_waiter(waiters.end())
{
}

ConcurrencyControl::~ConcurrencyControl()
{
    if (!waiters.empty())
        abort();
}

[[nodiscard]] ConcurrencyControl::AllocationPtr ConcurrencyControl::allocate(SlotCount min, SlotCount max)
{
    if (min > max)
        throw Exception(ErrorCodes::LOGICAL_ERROR, "ConcurrencyControl: invalid allocation requirements");

    std::unique_lock lock{mutex};

    // Acquire as many slots as we can, but not lower than `min`
    SlotCount granted = std::max(min, std::min(max, available(lock)));
    cur_concurrency += granted;

    // Create allocation and start waiting if more slots are required
    if (granted < max)
        return AllocationPtr(new Allocation(*this, max, granted,
            waiters.insert(cur_waiter, nullptr /* pointer is set by Allocation ctor */)));
    else
        return AllocationPtr(new Allocation(*this, max, granted));
}

void ConcurrencyControl::setMaxConcurrency(ConcurrencyControl::SlotCount value)
{
    std::unique_lock lock{mutex};
    max_concurrency = std::max<SlotCount>(1, value); // never allow max_concurrency to be zero
    schedule(lock);
}

ConcurrencyControl & ConcurrencyControl::instance()
{
    static ConcurrencyControl result;
    return result;
}

void ConcurrencyControl::free(Allocation * allocation)
{
    // Allocation is allowed to be canceled even if there are:
    //  - `amount`: granted slots (acquired slots are not possible, because Slot holds AllocationPtr)
    //  - `waiter`: active waiting for more slots to be allocated
    // Thus Allocation destruction may require the following lock, to avoid race conditions
    std::unique_lock lock{mutex};
    auto [amount, waiter] = allocation->cancel();

    cur_concurrency -= amount;
    if (waiter)
    {
        if (cur_waiter == *waiter)
            cur_waiter = waiters.erase(*waiter);
        else
            waiters.erase(*waiter);
    }
    schedule(lock);
}

void ConcurrencyControl::release(SlotCount amount)
{
    std::unique_lock lock{mutex};
    cur_concurrency -= amount;
    schedule(lock);
}

// Round-robin scheduling of available slots among waiting allocations
void ConcurrencyControl::schedule(std::unique_lock<std::mutex> &)
{
    while (cur_concurrency < max_concurrency && !waiters.empty())
    {
        cur_concurrency++;
        if (cur_waiter == waiters.end())
            cur_waiter = waiters.begin();
        Allocation * allocation = *cur_waiter;
        if (allocation->grant())
            ++cur_waiter;
        else
            cur_waiter = waiters.erase(cur_waiter); // last required slot has just been granted -- stop waiting
    }
}

ConcurrencyControl::SlotCount ConcurrencyControl::available(std::unique_lock<std::mutex> &) const
{
    if (cur_concurrency < max_concurrency)
        return max_concurrency - cur_concurrency;
    else
        return 0;
}

}