diff options
| author | vitalyisaev <[email protected]> | 2023-11-14 09:58:56 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-11-14 10:20:20 +0300 |
| commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
| tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/QueryPipeline/BlockIO.cpp | |
| parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/QueryPipeline/BlockIO.cpp')
| -rw-r--r-- | contrib/clickhouse/src/QueryPipeline/BlockIO.cpp | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/QueryPipeline/BlockIO.cpp b/contrib/clickhouse/src/QueryPipeline/BlockIO.cpp new file mode 100644 index 00000000000..76da01bee0e --- /dev/null +++ b/contrib/clickhouse/src/QueryPipeline/BlockIO.cpp @@ -0,0 +1,101 @@ +#include <QueryPipeline/BlockIO.h> +#include <Interpreters/ProcessList.h> + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int QUERY_WAS_CANCELLED; +} + +void BlockIO::reset() +{ + /** process_list_entry should be destroyed after in, after out and after pipeline, + * since in, out and pipeline contain pointer to objects inside process_list_entry (query-level MemoryTracker for example), + * which could be used before destroying of in and out. + * + * However, QueryStatus inside process_list_entry holds shared pointers to streams for some reason. + * Streams must be destroyed before storage locks, storages and contexts inside pipeline, + * so releaseQueryStreams() is required. + */ + /// TODO simplify it all + + pipeline.reset(); + process_list_entry.reset(); + + /// TODO Do we need also reset callbacks? In which order? +} + +BlockIO & BlockIO::operator= (BlockIO && rhs) noexcept +{ + if (this == &rhs) + return *this; + + /// Explicitly reset fields, so everything is destructed in right order + reset(); + + process_list_entry = std::move(rhs.process_list_entry); + pipeline = std::move(rhs.pipeline); + + finish_callback = std::move(rhs.finish_callback); + exception_callback = std::move(rhs.exception_callback); + + null_format = rhs.null_format; + + return *this; +} + +BlockIO::~BlockIO() +{ + reset(); +} + +void BlockIO::onFinish() +{ + if (finish_callback) + finish_callback(pipeline); + + pipeline.reset(); +} + +void BlockIO::onException() +{ + if (exception_callback) + exception_callback(/* log_error */ true); + + pipeline.reset(); +} + +void BlockIO::onCancelOrConnectionLoss() +{ + /// Query was not finished gracefully, so we should call exception_callback + /// But we don't have a real exception + try + { + throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled or a client has unexpectedly dropped the connection"); + } + catch (...) + { + if (exception_callback) + { + exception_callback(/* log_error */ false); + } + + /// destroy pipeline and write buffers with an exception context + pipeline.reset(); + } + +} + +void BlockIO::setAllDataSent() const +{ + /// The following queries does not have process_list_entry: + /// - internal + /// - SHOW PROCESSLIST + if (process_list_entry) + process_list_entry->getQueryStatus()->setAllDataSent(); +} + + +} |
