aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Common/OvercommitTracker.cpp
blob: 2f067b7c1933b45f129a22ac4982e9e5e6eb2e11 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
#include "OvercommitTracker.h"

#include <chrono>
#include <mutex>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Interpreters/ProcessList.h>

namespace CurrentMetrics
{
    extern const Metric ThreadsInOvercommitTracker;
}

namespace ProfileEvents
{
    extern const Event MemoryOvercommitWaitTimeMicroseconds;
}

using namespace std::chrono_literals;

constexpr std::chrono::microseconds ZERO_MICROSEC = 0us;

OvercommitTracker::OvercommitTracker(DB::ProcessList * process_list_)
    : picked_tracker(nullptr)
    , process_list(process_list_)
    , cancellation_state(QueryCancellationState::NONE)
    , freed_memory(0)
    , required_memory(0)
    , next_id(0)
    , id_to_release(0)
    , allow_release(true)
{}

OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount)
{
    DENY_ALLOCATIONS_IN_SCOPE;

    if (OvercommitTrackerBlockerInThread::isBlocked())
        return OvercommitResult::NONE;

    CurrentMetrics::Increment metric_increment(CurrentMetrics::ThreadsInOvercommitTracker);
    // NOTE: Do not change the order of locks
    //
    // global mutex must be acquired before overcommit_m, because
    // method OvercommitTracker::onQueryStop(MemoryTracker *) is
    // always called with already acquired global mutex in
    // ProcessListEntry::~ProcessListEntry().
    auto global_lock = process_list->unsafeLock();
    std::unique_lock<std::mutex> lk(overcommit_m);

    size_t id = next_id++;

    auto max_wait_time = tracker->getOvercommitWaitingTime();

    if (max_wait_time == ZERO_MICROSEC)
        return OvercommitResult::DISABLED;

    pickQueryToExclude();
    assert(cancellation_state != QueryCancellationState::NONE);
    global_lock.unlock();

    // If no query was chosen we need to stop current query.
    // This may happen if no soft limit is set.
    if (picked_tracker == nullptr)
    {
        // Here state can not be RUNNING, because it requires
        // picked_tracker to be not null pointer.
        assert(cancellation_state == QueryCancellationState::SELECTED);
        cancellation_state = QueryCancellationState::NONE;
        return OvercommitResult::DISABLED;
    }
    if (picked_tracker == tracker)
    {
        // Query of the provided as an argument memory tracker was chosen.
        // It may happen even when current state is RUNNING, because
        // ThreadStatus::~ThreadStatus may call MemoryTracker::alloc.
        cancellation_state = QueryCancellationState::RUNNING;
        return OvercommitResult::SELECTED;
    }

    allow_release = true;

    required_memory += amount;
    auto wait_start_time = std::chrono::system_clock::now();
    bool timeout = !cv.wait_for(lk, max_wait_time, [this, id]()
    {
        return id < id_to_release || cancellation_state == QueryCancellationState::NONE;
    });
    auto wait_end_time = std::chrono::system_clock::now();
    ProfileEvents::increment(ProfileEvents::MemoryOvercommitWaitTimeMicroseconds, (wait_end_time - wait_start_time) / 1us);

    required_memory -= amount;
    bool still_need = !(id < id_to_release); // True if thread wasn't released

    // If threads where not released since last call of this method,
    // we can release them now.
    if (allow_release && required_memory <= freed_memory && still_need)
        releaseThreads();

    // All required amount of memory is free now and selected query to stop doesn't know about it.
    // As we don't need to free memory, we can continue execution of the selected query.
    if (required_memory == 0 && cancellation_state == QueryCancellationState::SELECTED)
        reset();
    if (timeout)
        return OvercommitResult::TIMEOUTED;
    if (still_need)
        return OvercommitResult::NOT_ENOUGH_FREED;
    else
        return OvercommitResult::MEMORY_FREED;
}

void OvercommitTracker::tryContinueQueryExecutionAfterFree(Int64 amount)
{
    DENY_ALLOCATIONS_IN_SCOPE;

    if (OvercommitTrackerBlockerInThread::isBlocked())
        return;

    std::lock_guard guard(overcommit_m);
    if (cancellation_state != QueryCancellationState::NONE)
    {
        freed_memory += amount;
        if (freed_memory >= required_memory)
            releaseThreads();
    }
}

void OvercommitTracker::onQueryStop(MemoryTracker * tracker)
{
    DENY_ALLOCATIONS_IN_SCOPE;

    std::lock_guard lk(overcommit_m);
    if (picked_tracker == tracker)
    {
        reset();
        cv.notify_all();
    }
}

void OvercommitTracker::releaseThreads()
{
    id_to_release = next_id;
    freed_memory = 0;
    allow_release = false; // To avoid repeating call of this method in OvercommitTracker::needToStopQuery
    cv.notify_all();
}

UserOvercommitTracker::UserOvercommitTracker(DB::ProcessList * process_list_, DB::ProcessListForUser * user_process_list_)
    : OvercommitTracker(process_list_)
    , user_process_list(user_process_list_)
{}

void UserOvercommitTracker::pickQueryToExcludeImpl()
{
    MemoryTracker * query_tracker = nullptr;
    OvercommitRatio current_ratio{0, 0};
    // At this moment query list must be read only.
    // This is guaranteed by locking global_mutex in OvercommitTracker::needToStopQuery.
    auto & queries = user_process_list->queries;
    for (auto const & query : queries)
    {
        if (query.second->isKilled())
            continue;

        auto * memory_tracker = query.second->getMemoryTracker();
        if (!memory_tracker)
            continue;

        auto ratio = memory_tracker->getOvercommitRatio();
        if (ratio.soft_limit != 0 && current_ratio < ratio)
        {
            query_tracker = memory_tracker;
            current_ratio   = ratio;
        }
    }
    picked_tracker = query_tracker;
}

GlobalOvercommitTracker::GlobalOvercommitTracker(DB::ProcessList * process_list_)
    : OvercommitTracker(process_list_)
{
}

void GlobalOvercommitTracker::pickQueryToExcludeImpl()
{
    MemoryTracker * query_tracker = nullptr;
    OvercommitRatio current_ratio{0, 0};
    // At this moment query list must be read only.
    // This is guaranteed by locking global_mutex in OvercommitTracker::needToStopQuery.
    for (auto const & query : process_list->processes)
    {
        if (query->isKilled())
            continue;

        Int64 user_soft_limit = 0;
        if (auto const * user_process_list = query->getUserProcessList())
            user_soft_limit = user_process_list->user_memory_tracker.getSoftLimit();
        if (user_soft_limit == 0)
            continue;

        auto * memory_tracker = query->getMemoryTracker();
        if (!memory_tracker)
            continue;
        auto ratio = memory_tracker->getOvercommitRatio(user_soft_limit);
        if (current_ratio < ratio)
        {
            query_tracker = memory_tracker;
            current_ratio   = ratio;
        }
    }
    picked_tracker = query_tracker;
}

thread_local size_t OvercommitTrackerBlockerInThread::counter = 0;