1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Access/EnabledQuota.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_ROWS_OR_BYTES;
}
void ProcessorProfileInfo::update(const Chunk & block)
{
++blocks;
rows += block.getNumRows();
bytes += block.bytes();
}
LimitsCheckingTransform::LimitsCheckingTransform(const Block & header_, StreamLocalLimits limits_)
: ISimpleTransform(header_, header_, false)
, limits(std::move(limits_))
{
}
void LimitsCheckingTransform::transform(Chunk & chunk)
{
if (!info.started)
{
info.total_stopwatch.start();
info.started = true;
}
if (!limits.speed_limits.checkTimeLimit(info.total_stopwatch, limits.timeout_overflow_mode))
{
stopReading();
return;
}
if (chunk)
{
info.update(chunk);
if (limits.mode == LimitsMode::LIMITS_CURRENT &&
!limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
{
stopReading();
}
if (quota)
checkQuota(chunk);
}
}
void LimitsCheckingTransform::checkQuota(Chunk & chunk)
{
switch (limits.mode)
{
case LimitsMode::LIMITS_TOTAL:
/// Checked in ISource::progress method.
break;
case LimitsMode::LIMITS_CURRENT:
{
UInt64 total_elapsed = info.total_stopwatch.elapsedNanoseconds();
quota->used(
{QuotaType::RESULT_ROWS, chunk.getNumRows()},
{QuotaType::RESULT_BYTES, chunk.bytes()},
{QuotaType::EXECUTION_TIME, total_elapsed - prev_elapsed});
prev_elapsed = total_elapsed;
break;
}
}
}
}
|