aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/QueryPipeline/ExecutionSpeedLimits.cpp
blob: 111ba7c9a953cd76e5d0572cf5e8e8ddc7342d52 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#include <QueryPipeline/ExecutionSpeedLimits.h>

#include <Common/ProfileEvents.h>
#include <Common/CurrentThread.h>
#include <IO/WriteHelpers.h>
#include <Common/Stopwatch.h>
#include <base/sleep.h>

namespace ProfileEvents
{
    extern const Event ThrottlerSleepMicroseconds;
    extern const Event OverflowBreak;
    extern const Event OverflowThrow;
}


namespace DB
{

namespace ErrorCodes
{
    extern const int TOO_SLOW;
    extern const int LOGICAL_ERROR;
    extern const int TIMEOUT_EXCEEDED;
}

static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_in_seconds, UInt64 total_elapsed_microseconds)
{
    /// How much time to wait for the average speed to become `max_speed_in_seconds`.
    UInt64 desired_microseconds = total_progress_size * 1000000 / max_speed_in_seconds;

    if (desired_microseconds > total_elapsed_microseconds)
    {
        UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds;

        /// Never sleep more than one second (it should be enough to limit speed for a reasonable amount,
        /// and otherwise it's too easy to make query hang).
        sleep_microseconds = std::min(static_cast<UInt64>(1000000), sleep_microseconds);

        sleepForMicroseconds(sleep_microseconds);

        ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds);
    }
}

void ExecutionSpeedLimits::throttle(
    size_t read_rows, size_t read_bytes,
    size_t total_rows_to_read, UInt64 total_elapsed_microseconds) const
{
    if ((min_execution_rps != 0 || max_execution_rps != 0
         || min_execution_bps != 0 || max_execution_bps != 0
         || (total_rows_to_read != 0 && timeout_before_checking_execution_speed != 0))
        && (static_cast<Int64>(total_elapsed_microseconds) > timeout_before_checking_execution_speed.totalMicroseconds()))
    {
        /// Do not count sleeps in throttlers
        UInt64 throttler_sleep_microseconds = CurrentThread::getProfileEvents()[ProfileEvents::ThrottlerSleepMicroseconds];

        double elapsed_seconds = 0;
        if (total_elapsed_microseconds > throttler_sleep_microseconds)
            elapsed_seconds = static_cast<double>(total_elapsed_microseconds - throttler_sleep_microseconds) / 1000000.0;

        if (elapsed_seconds > 0)
        {
            auto rows_per_second = read_rows / elapsed_seconds;
            if (min_execution_rps && rows_per_second < min_execution_rps)
                throw Exception(
                    ErrorCodes::TOO_SLOW,
                    "Query is executing too slow: {} rows/sec., minimum: {}",
                    read_rows / elapsed_seconds,
                    min_execution_rps);

            auto bytes_per_second = read_bytes / elapsed_seconds;
            if (min_execution_bps && bytes_per_second < min_execution_bps)
                throw Exception(
                    ErrorCodes::TOO_SLOW,
                    "Query is executing too slow: {} bytes/sec., minimum: {}",
                    read_bytes / elapsed_seconds,
                    min_execution_bps);

            /// If the predicted execution time is longer than `max_execution_time`.
            if (max_execution_time != 0 && total_rows_to_read && read_rows)
            {
                double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows_to_read) / read_rows);

                if (estimated_execution_time_seconds > max_execution_time.totalSeconds())
                    throw Exception(
                        ErrorCodes::TOO_SLOW,
                        "Estimated query execution time ({} seconds) is too long. Maximum: {}. Estimated rows to process: {}",
                        estimated_execution_time_seconds,
                        max_execution_time.totalSeconds(),
                        total_rows_to_read);
            }

            if (max_execution_rps && rows_per_second >= max_execution_rps)
                limitProgressingSpeed(read_rows, max_execution_rps, total_elapsed_microseconds);

            if (max_execution_bps && bytes_per_second >= max_execution_bps)
                limitProgressingSpeed(read_bytes, max_execution_bps, total_elapsed_microseconds);
        }
    }
}

template <typename... Args>
static bool handleOverflowMode(OverflowMode mode, int code, FormatStringHelper<Args...> fmt, Args &&... args)
{
    switch (mode)
    {
        case OverflowMode::THROW:
            ProfileEvents::increment(ProfileEvents::OverflowThrow);
            throw Exception(code, std::move(fmt), std::forward<Args>(args)...);
        case OverflowMode::BREAK:
            ProfileEvents::increment(ProfileEvents::OverflowBreak);
            return false;
        default:
            throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: unknown overflow mode");
    }
}

bool ExecutionSpeedLimits::checkTimeLimit(const Stopwatch & stopwatch, OverflowMode overflow_mode) const
{
    if (max_execution_time != 0)
    {
        auto elapsed_ns = stopwatch.elapsed();

        if (elapsed_ns > static_cast<UInt64>(max_execution_time.totalMicroseconds()) * 1000)
            return handleOverflowMode(
                overflow_mode,
                ErrorCodes::TIMEOUT_EXCEEDED,
                "Timeout exceeded: elapsed {} seconds, maximum: {}",
                static_cast<double>(elapsed_ns) / 1000000000ULL,
                max_execution_time.totalMicroseconds() / 1000000.0);
    }

    return true;
}

}