summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/QueryPipeline/BlockIO.cpp
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-11-14 09:58:56 +0300
committervitalyisaev <[email protected]>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/QueryPipeline/BlockIO.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/QueryPipeline/BlockIO.cpp')
-rw-r--r--contrib/clickhouse/src/QueryPipeline/BlockIO.cpp101
1 files changed, 101 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/QueryPipeline/BlockIO.cpp b/contrib/clickhouse/src/QueryPipeline/BlockIO.cpp
new file mode 100644
index 00000000000..76da01bee0e
--- /dev/null
+++ b/contrib/clickhouse/src/QueryPipeline/BlockIO.cpp
@@ -0,0 +1,101 @@
+#include <QueryPipeline/BlockIO.h>
+#include <Interpreters/ProcessList.h>
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int QUERY_WAS_CANCELLED;
+}
+
+void BlockIO::reset()
+{
+ /** process_list_entry should be destroyed after in, after out and after pipeline,
+ * since in, out and pipeline contain pointer to objects inside process_list_entry (query-level MemoryTracker for example),
+ * which could be used before destroying of in and out.
+ *
+ * However, QueryStatus inside process_list_entry holds shared pointers to streams for some reason.
+ * Streams must be destroyed before storage locks, storages and contexts inside pipeline,
+ * so releaseQueryStreams() is required.
+ */
+ /// TODO simplify it all
+
+ pipeline.reset();
+ process_list_entry.reset();
+
+ /// TODO Do we need also reset callbacks? In which order?
+}
+
+BlockIO & BlockIO::operator= (BlockIO && rhs) noexcept
+{
+ if (this == &rhs)
+ return *this;
+
+ /// Explicitly reset fields, so everything is destructed in right order
+ reset();
+
+ process_list_entry = std::move(rhs.process_list_entry);
+ pipeline = std::move(rhs.pipeline);
+
+ finish_callback = std::move(rhs.finish_callback);
+ exception_callback = std::move(rhs.exception_callback);
+
+ null_format = rhs.null_format;
+
+ return *this;
+}
+
+BlockIO::~BlockIO()
+{
+ reset();
+}
+
+void BlockIO::onFinish()
+{
+ if (finish_callback)
+ finish_callback(pipeline);
+
+ pipeline.reset();
+}
+
+void BlockIO::onException()
+{
+ if (exception_callback)
+ exception_callback(/* log_error */ true);
+
+ pipeline.reset();
+}
+
+void BlockIO::onCancelOrConnectionLoss()
+{
+ /// Query was not finished gracefully, so we should call exception_callback
+ /// But we don't have a real exception
+ try
+ {
+ throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Query was cancelled or a client has unexpectedly dropped the connection");
+ }
+ catch (...)
+ {
+ if (exception_callback)
+ {
+ exception_callback(/* log_error */ false);
+ }
+
+ /// destroy pipeline and write buffers with an exception context
+ pipeline.reset();
+ }
+
+}
+
+void BlockIO::setAllDataSent() const
+{
+ /// The following queries does not have process_list_entry:
+ /// - internal
+ /// - SHOW PROCESSLIST
+ if (process_list_entry)
+ process_list_entry->getQueryStatus()->setAllDataSent();
+}
+
+
+}