1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
#include <Common/Throttler.h>
#include <Common/ProfileEvents.h>
#include <Common/Exception.h>
#include <Common/Stopwatch.h>
#include <IO/WriteHelpers.h>
namespace ProfileEvents
{
extern const Event ThrottlerSleepMicroseconds;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LIMIT_EXCEEDED;
}
/// Just 10^9.
static constexpr auto NS = 1000000000UL;
Throttler::Throttler(size_t max_speed_, const std::shared_ptr<Throttler> & parent_)
: max_speed(max_speed_)
, max_burst(max_speed_ * default_burst_seconds)
, limit_exceeded_exception_message("")
, tokens(max_burst)
, parent(parent_)
{}
Throttler::Throttler(size_t max_speed_, size_t limit_, const char * limit_exceeded_exception_message_,
const std::shared_ptr<Throttler> & parent_)
: max_speed(max_speed_)
, max_burst(max_speed_ * default_burst_seconds)
, limit(limit_)
, limit_exceeded_exception_message(limit_exceeded_exception_message_)
, tokens(max_burst)
, parent(parent_)
{}
UInt64 Throttler::add(size_t amount)
{
// Values obtained under lock to be checked after release
size_t count_value;
double tokens_value;
{
std::lock_guard lock(mutex);
auto now = clock_gettime_ns_adjusted(prev_ns);
if (max_speed)
{
double delta_seconds = prev_ns ? static_cast<double>(now - prev_ns) / NS : 0;
tokens = std::min<double>(tokens + max_speed * delta_seconds - amount, max_burst);
}
count += amount;
count_value = count;
tokens_value = tokens;
prev_ns = now;
}
if (limit && count_value > limit)
throw Exception::createDeprecated(limit_exceeded_exception_message + std::string(" Maximum: ") + toString(limit), ErrorCodes::LIMIT_EXCEEDED);
/// Wait unless there is positive amount of tokens - throttling
Int64 sleep_time_ns = 0;
if (max_speed && tokens_value < 0)
{
sleep_time_ns = static_cast<Int64>(-tokens_value / max_speed * NS);
accumulated_sleep += sleep_time_ns;
sleepForNanoseconds(sleep_time_ns);
accumulated_sleep -= sleep_time_ns;
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_time_ns / 1000UL);
}
if (parent)
sleep_time_ns += parent->add(amount);
return static_cast<UInt64>(sleep_time_ns);
}
void Throttler::reset()
{
std::lock_guard lock(mutex);
count = 0;
tokens = max_burst;
prev_ns = 0;
// NOTE: do not zero `accumulated_sleep` to avoid races
}
bool Throttler::isThrottling() const
{
if (accumulated_sleep != 0)
return true;
if (parent)
return parent->isThrottling();
return false;
}
}
|