aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/Common/Throttler.cpp
blob: 4c1320db27af77ff52e53fc6514337a16370ea71 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#include <Common/Throttler.h>
#include <Common/ProfileEvents.h>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <IO/WriteHelpers.h>

namespace ProfileEvents
{
    extern const Event ThrottlerSleepMicroseconds;
}

namespace DB
{

namespace ErrorCodes
{
    extern const int LIMIT_EXCEEDED;
}

/// Just 10^9.
static constexpr auto NS = 1000000000UL;

Throttler::Throttler(size_t max_speed_, const std::shared_ptr<Throttler> & parent_)
    : max_speed(max_speed_)
    , max_burst(max_speed_ * default_burst_seconds)
    , limit_exceeded_exception_message("")
    , tokens(max_burst)
    , parent(parent_)
{}

Throttler::Throttler(size_t max_speed_, size_t limit_, const char * limit_exceeded_exception_message_,
            const std::shared_ptr<Throttler> & parent_)
    : max_speed(max_speed_)
    , max_burst(max_speed_ * default_burst_seconds)
    , limit(limit_)
    , limit_exceeded_exception_message(limit_exceeded_exception_message_)
    , tokens(max_burst)
    , parent(parent_)
{}

UInt64 Throttler::add(size_t amount)
{
    // Values obtained under lock to be checked after release
    size_t count_value;
    double tokens_value;
    {
        std::lock_guard lock(mutex);
        auto now = clock_gettime_ns_adjusted(prev_ns);
        if (max_speed)
        {
            double delta_seconds = prev_ns ? static_cast<double>(now - prev_ns) / NS : 0;
            tokens = std::min<double>(tokens + max_speed * delta_seconds - amount, max_burst);
        }
        count += amount;
        count_value = count;
        tokens_value = tokens;
        prev_ns = now;
    }

    if (limit && count_value > limit)
        throw Exception::createDeprecated(limit_exceeded_exception_message + std::string(" Maximum: ") + toString(limit), ErrorCodes::LIMIT_EXCEEDED);

    /// Wait unless there is positive amount of tokens - throttling
    Int64 sleep_time_ns = 0;
    if (max_speed && tokens_value < 0)
    {
        sleep_time_ns = static_cast<Int64>(-tokens_value / max_speed * NS);
        accumulated_sleep += sleep_time_ns;
        sleepForNanoseconds(sleep_time_ns);
        accumulated_sleep -= sleep_time_ns;
        ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_time_ns / 1000UL);
    }

    if (parent)
        sleep_time_ns += parent->add(amount);

    return static_cast<UInt64>(sleep_time_ns);
}

void Throttler::reset()
{
    std::lock_guard lock(mutex);

    count = 0;
    tokens = max_burst;
    prev_ns = 0;
    // NOTE: do not zero `accumulated_sleep` to avoid races
}

bool Throttler::isThrottling() const
{
    if (accumulated_sleep != 0)
        return true;

    if (parent)
        return parent->isThrottling();

    return false;
}

}