aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Common/OvercommitTracker.h
blob: f40a70fe7cdea94c08e678b8bff097de856a2606 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#pragma once

#include <base/types.h>
#include <Core/Types.h>
#include <boost/core/noncopyable.hpp>
#include <Poco/Logger.h>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <unordered_map>

// This struct is used for the comparison of query memory usage.
struct OvercommitRatio
{
    OvercommitRatio(Int64 committed_, Int64 soft_limit_)
        : committed(committed_)
        , soft_limit(soft_limit_)
    {}

    friend bool operator<(OvercommitRatio const & lhs, OvercommitRatio const & rhs) noexcept
    {
        Int128 lhs_committed = lhs.committed, lhs_soft_limit = lhs.soft_limit;
        Int128 rhs_committed = rhs.committed, rhs_soft_limit = rhs.soft_limit;
        // (a / b < c / d) <=> (a * d < c * b)
        return (lhs_committed * rhs_soft_limit) < (rhs_committed * lhs_soft_limit)
            || (lhs_soft_limit == 0 && rhs_soft_limit > 0)
            || (lhs_committed == 0 && rhs_committed == 0 && lhs_soft_limit > rhs_soft_limit);
    }

    // actual query memory usage
    Int64 committed;
    // guaranteed amount of memory query can use
    Int64 soft_limit;
};

class MemoryTracker;

namespace DB
{
    class ProcessList;
    struct ProcessListForUser;
}

enum class OvercommitResult
{
    NONE,
    DISABLED,
    MEMORY_FREED,
    SELECTED,
    TIMEOUTED,
    NOT_ENOUGH_FREED,
};

enum class QueryCancellationState
{
    NONE     = 0,  // Hard limit is not reached, there is no selected query to kill.
    SELECTED = 1,  // Hard limit is reached, query to stop was chosen but it still is not aware of cancellation.
    RUNNING  = 2,  // Hard limit is reached, selected query has started the process of cancellation.
};

// Usually it's hard to set some reasonable hard memory limit
// (especially, the default value). This class introduces new
// mechanism for the limiting of memory usage.
// Soft limit represents guaranteed amount of memory query/user
// may use. It's allowed to exceed this limit. But if hard limit
// is reached, query with the biggest overcommit ratio
// is killed to free memory.
struct OvercommitTracker : boost::noncopyable
{
    OvercommitResult needToStopQuery(MemoryTracker * tracker, Int64 amount);

    void tryContinueQueryExecutionAfterFree(Int64 amount);

    void onQueryStop(MemoryTracker * tracker);

    virtual ~OvercommitTracker() = default;

protected:
    explicit OvercommitTracker(DB::ProcessList * process_list_);

    virtual void pickQueryToExcludeImpl() = 0;

    // This mutex is used to disallow concurrent access
    // to picked_tracker and cancellation_state variables.
    std::mutex overcommit_m;
    std::condition_variable cv;

    // Specifies memory tracker of the chosen to stop query.
    // If soft limit is not set, all the queries which reach hard limit must stop.
    // This case is represented as picked tracker pointer is set to nullptr and
    // overcommit tracker is in SELECTED state.
    MemoryTracker * picked_tracker;

    // Global mutex stored in ProcessList is used to synchronize
    // insertion and deletion of queries.
    // OvercommitTracker::pickQueryToExcludeImpl() implementations
    // require this mutex to be locked, because they read list (or sublist)
    // of queries.
    DB::ProcessList * process_list;
private:

    void pickQueryToExclude()
    {
        if (cancellation_state == QueryCancellationState::NONE)
        {
            pickQueryToExcludeImpl();
            cancellation_state = QueryCancellationState::SELECTED;
        }
    }

    void reset() noexcept
    {
        picked_tracker = nullptr;
        cancellation_state = QueryCancellationState::NONE;
        freed_memory = 0;

        next_id = 0;
        id_to_release = 0;

        allow_release = true;
    }

    void releaseThreads();

    QueryCancellationState cancellation_state;

    Int64 freed_memory;
    Int64 required_memory;

    size_t next_id; // Id provided to the next thread to come in OvercommitTracker
    size_t id_to_release; // We can release all threads with id smaller than this

    bool allow_release;
};

struct UserOvercommitTracker : OvercommitTracker
{
    explicit UserOvercommitTracker(DB::ProcessList * process_list_, DB::ProcessListForUser * user_process_list_);

    ~UserOvercommitTracker() override = default;

protected:
    void pickQueryToExcludeImpl() override;

private:
    DB::ProcessListForUser * user_process_list;
};

struct GlobalOvercommitTracker : OvercommitTracker
{
    explicit GlobalOvercommitTracker(DB::ProcessList * process_list_);

    ~GlobalOvercommitTracker() override = default;

protected:
    void pickQueryToExcludeImpl() override;
};

// This class is used to disallow tracking during logging to avoid deadlocks.
struct OvercommitTrackerBlockerInThread
{
    OvercommitTrackerBlockerInThread() { ++counter; }
    ~OvercommitTrackerBlockerInThread() { --counter; }

    OvercommitTrackerBlockerInThread(OvercommitTrackerBlockerInThread const &) = delete;
    OvercommitTrackerBlockerInThread & operator=(OvercommitTrackerBlockerInThread const &) = delete;

    static bool isBlocked() { return counter > 0; }

private:
    static thread_local size_t counter;
};