aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/QueryPipeline/ReadProgressCallback.cpp
blob: 4d7c7aa0f2a70ff5acfb8fd676173b9c87dd5c66 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#include <QueryPipeline/ReadProgressCallback.h>
#include <Interpreters/ProcessList.h>
#include <Access/EnabledQuota.h>


namespace ProfileEvents
{
    extern const Event SelectedRows;
    extern const Event SelectedBytes;
}

namespace DB
{

namespace ErrorCodes
{
    extern const int TOO_MANY_ROWS;
    extern const int TOO_MANY_BYTES;
}

void ReadProgressCallback::setProcessListElement(QueryStatusPtr elem)
{
    process_list_elem = elem;
    if (!elem)
        return;

    /// Update total_rows_approx as soon as possible.
    ///
    /// It is important to do this, since you will not get correct
    /// total_rows_approx until the query will start reading all parts (in case
    /// of query needs to read from multiple parts), and this is especially a
    /// problem in case of max_threads=1.
    ///
    /// NOTE: This can be done only if progress callback already set, since
    /// otherwise total_rows_approx will lost.
    size_t rows_approx = 0;
    if (progress_callback && (rows_approx = total_rows_approx.exchange(0)) != 0)
    {
        Progress total_rows_progress = {0, 0, rows_approx};

        progress_callback(total_rows_progress);
        process_list_elem->updateProgressIn(total_rows_progress);
    }
}

bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes, const StorageLimitsList & storage_limits)
{
    for (const auto & limits : storage_limits)
    {
        if (!limits.local_limits.speed_limits.checkTimeLimit(total_stopwatch, limits.local_limits.timeout_overflow_mode))
            return false;
    }

    size_t rows_approx = 0;
    if ((rows_approx = total_rows_approx.exchange(0)) != 0)
    {
        Progress total_rows_progress = {0, 0, rows_approx};

        if (progress_callback)
            progress_callback(total_rows_progress);

        if (process_list_elem)
            process_list_elem->updateProgressIn(total_rows_progress);
    }

    size_t bytes = 0;
    if ((bytes = total_bytes.exchange(0)) != 0)
    {
        Progress total_bytes_progress = {0, 0, 0, bytes};

        if (progress_callback)
            progress_callback(total_bytes_progress);

        if (process_list_elem)
            process_list_elem->updateProgressIn(total_bytes_progress);
    }

    Progress value {read_rows, read_bytes};

    if (progress_callback)
        progress_callback(value);

    if (process_list_elem)
    {
        if (!process_list_elem->updateProgressIn(value))
            return false;

        /// The total amount of data processed or intended for processing in all sources, possibly on remote servers.

        ProgressValues progress = process_list_elem->getProgressIn();

        for (const auto & limits : storage_limits)
        {
            /// If the mode is "throw" and estimate of total rows is known, then throw early if an estimate is too high.
            /// If the mode is "break", then allow to read before limit even if estimate is very high.

            size_t rows_to_check_limit = progress.read_rows;
            if (limits.local_limits.size_limits.overflow_mode == OverflowMode::THROW && progress.total_rows_to_read > progress.read_rows)
                rows_to_check_limit = progress.total_rows_to_read;

            /// Check the restrictions on the
            ///  * amount of data to read
            ///  * speed of the query
            ///  * quota on the amount of data to read
            /// NOTE: Maybe it makes sense to have them checked directly in ProcessList?

            if (limits.local_limits.mode == LimitsMode::LIMITS_TOTAL)
            {
                if (!limits.local_limits.size_limits.check(
                        rows_to_check_limit, progress.read_bytes, "rows or bytes to read",
                        ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES))
                {
                    return false;
                }
            }

            if (!limits.leaf_limits.check(
                    rows_to_check_limit, progress.read_bytes, "rows or bytes to read on leaf node",
                    ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES))
            {
                return false;
            }
        }

        size_t total_rows = progress.total_rows_to_read;

        CurrentThread::updatePerformanceCountersIfNeeded();

        std::lock_guard lock(limits_and_quotas_mutex);

        /// TODO: Should be done in PipelineExecutor.
        for (const auto & limits : storage_limits)
            limits.local_limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_stopwatch.elapsedMicroseconds());

        if (quota)
            quota->used({QuotaType::READ_ROWS, value.read_rows}, {QuotaType::READ_BYTES, value.read_bytes});
    }

    if (update_profile_events)
    {
        ProfileEvents::increment(ProfileEvents::SelectedRows, value.read_rows);
        ProfileEvents::increment(ProfileEvents::SelectedBytes, value.read_bytes);
    }

    return true;
}

}