diff options
| author | vitalyisaev <[email protected]> | 2023-11-14 09:58:56 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-11-14 10:20:20 +0300 |
| commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
| tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/QueryPipeline/ReadProgressCallback.cpp | |
| parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/QueryPipeline/ReadProgressCallback.cpp')
| -rw-r--r-- | contrib/clickhouse/src/QueryPipeline/ReadProgressCallback.cpp | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/QueryPipeline/ReadProgressCallback.cpp b/contrib/clickhouse/src/QueryPipeline/ReadProgressCallback.cpp new file mode 100644 index 00000000000..4d7c7aa0f2a --- /dev/null +++ b/contrib/clickhouse/src/QueryPipeline/ReadProgressCallback.cpp @@ -0,0 +1,148 @@ +#include <QueryPipeline/ReadProgressCallback.h> +#include <Interpreters/ProcessList.h> +#include <Access/EnabledQuota.h> + + +namespace ProfileEvents +{ + extern const Event SelectedRows; + extern const Event SelectedBytes; +} + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int TOO_MANY_ROWS; + extern const int TOO_MANY_BYTES; +} + +void ReadProgressCallback::setProcessListElement(QueryStatusPtr elem) +{ + process_list_elem = elem; + if (!elem) + return; + + /// Update total_rows_approx as soon as possible. + /// + /// It is important to do this, since you will not get correct + /// total_rows_approx until the query will start reading all parts (in case + /// of query needs to read from multiple parts), and this is especially a + /// problem in case of max_threads=1. + /// + /// NOTE: This can be done only if progress callback already set, since + /// otherwise total_rows_approx will lost. + size_t rows_approx = 0; + if (progress_callback && (rows_approx = total_rows_approx.exchange(0)) != 0) + { + Progress total_rows_progress = {0, 0, rows_approx}; + + progress_callback(total_rows_progress); + process_list_elem->updateProgressIn(total_rows_progress); + } +} + +bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes, const StorageLimitsList & storage_limits) +{ + for (const auto & limits : storage_limits) + { + if (!limits.local_limits.speed_limits.checkTimeLimit(total_stopwatch, limits.local_limits.timeout_overflow_mode)) + return false; + } + + size_t rows_approx = 0; + if ((rows_approx = total_rows_approx.exchange(0)) != 0) + { + Progress total_rows_progress = {0, 0, rows_approx}; + + if (progress_callback) + progress_callback(total_rows_progress); + + if (process_list_elem) + process_list_elem->updateProgressIn(total_rows_progress); + } + + size_t bytes = 0; + if ((bytes = total_bytes.exchange(0)) != 0) + { + Progress total_bytes_progress = {0, 0, 0, bytes}; + + if (progress_callback) + progress_callback(total_bytes_progress); + + if (process_list_elem) + process_list_elem->updateProgressIn(total_bytes_progress); + } + + Progress value {read_rows, read_bytes}; + + if (progress_callback) + progress_callback(value); + + if (process_list_elem) + { + if (!process_list_elem->updateProgressIn(value)) + return false; + + /// The total amount of data processed or intended for processing in all sources, possibly on remote servers. + + ProgressValues progress = process_list_elem->getProgressIn(); + + for (const auto & limits : storage_limits) + { + /// If the mode is "throw" and estimate of total rows is known, then throw early if an estimate is too high. + /// If the mode is "break", then allow to read before limit even if estimate is very high. + + size_t rows_to_check_limit = progress.read_rows; + if (limits.local_limits.size_limits.overflow_mode == OverflowMode::THROW && progress.total_rows_to_read > progress.read_rows) + rows_to_check_limit = progress.total_rows_to_read; + + /// Check the restrictions on the + /// * amount of data to read + /// * speed of the query + /// * quota on the amount of data to read + /// NOTE: Maybe it makes sense to have them checked directly in ProcessList? + + if (limits.local_limits.mode == LimitsMode::LIMITS_TOTAL) + { + if (!limits.local_limits.size_limits.check( + rows_to_check_limit, progress.read_bytes, "rows or bytes to read", + ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES)) + { + return false; + } + } + + if (!limits.leaf_limits.check( + rows_to_check_limit, progress.read_bytes, "rows or bytes to read on leaf node", + ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES)) + { + return false; + } + } + + size_t total_rows = progress.total_rows_to_read; + + CurrentThread::updatePerformanceCountersIfNeeded(); + + std::lock_guard lock(limits_and_quotas_mutex); + + /// TODO: Should be done in PipelineExecutor. + for (const auto & limits : storage_limits) + limits.local_limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_stopwatch.elapsedMicroseconds()); + + if (quota) + quota->used({QuotaType::READ_ROWS, value.read_rows}, {QuotaType::READ_BYTES, value.read_bytes}); + } + + if (update_profile_events) + { + ProfileEvents::increment(ProfileEvents::SelectedRows, value.read_rows); + ProfileEvents::increment(ProfileEvents::SelectedBytes, value.read_bytes); + } + + return true; +} + +} |
