summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp
diff options
context:
space:
mode:
authorvitalyisaev <[email protected]>2023-11-14 09:58:56 +0300
committervitalyisaev <[email protected]>2023-11-14 10:20:20 +0300
commitc2b2dfd9827a400a8495e172a56343462e3ceb82 (patch)
treecd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp
parentd4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff)
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp')
-rw-r--r--contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp763
1 files changed, 763 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp b/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp
new file mode 100644
index 00000000000..eebe9797051
--- /dev/null
+++ b/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp
@@ -0,0 +1,763 @@
+#include <Common/ConcurrentBoundedQueue.h>
+#include <QueryPipeline/RemoteQueryExecutor.h>
+#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
+
+#include <Columns/ColumnConst.h>
+#include <Common/CurrentThread.h>
+#include "Core/Protocol.h"
+#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
+#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
+#include <Processors/Sources/SourceFromSingleChunk.h>
+#include <Processors/Transforms/LimitsCheckingTransform.h>
+#include <Processors/QueryPlan/QueryPlan.h>
+#include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Storages/SelectQueryInfo.h>
+#include <Interpreters/castColumn.h>
+#include <Interpreters/Cluster.h>
+#include <Interpreters/Context.h>
+#include <Interpreters/InternalTextLogsQueue.h>
+#include <IO/ConnectionTimeouts.h>
+#include <Client/MultiplexedConnections.h>
+#include <Client/HedgedConnections.h>
+#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
+#include <Storages/StorageMemory.h>
+
+
+namespace ProfileEvents
+{
+ extern const Event SuspendSendingQueryToShard;
+ extern const Event ReadTaskRequestsReceived;
+ extern const Event MergeTreeReadTaskRequestsReceived;
+}
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int LOGICAL_ERROR;
+ extern const int UNKNOWN_PACKET_FROM_SERVER;
+ extern const int DUPLICATED_PART_UUIDS;
+ extern const int SYSTEM_ERROR;
+}
+
+RemoteQueryExecutor::RemoteQueryExecutor(
+ const String & query_, const Block & header_, ContextPtr context_,
+ const Scalars & scalars_, const Tables & external_tables_,
+ QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
+ : header(header_), query(query_), context(context_), scalars(scalars_)
+ , external_tables(external_tables_), stage(stage_)
+ , extension(extension_)
+{}
+
+RemoteQueryExecutor::RemoteQueryExecutor(
+ Connection & connection,
+ const String & query_, const Block & header_, ContextPtr context_,
+ ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
+ QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
+ : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
+{
+ create_connections = [this, &connection, throttler, extension_](AsyncCallback)
+ {
+ auto res = std::make_unique<MultiplexedConnections>(connection, context->getSettingsRef(), throttler);
+ if (extension_ && extension_->replica_info)
+ res->setReplicaInfo(*extension_->replica_info);
+ return res;
+ };
+}
+
+RemoteQueryExecutor::RemoteQueryExecutor(
+ std::shared_ptr<Connection> connection_ptr,
+ const String & query_, const Block & header_, ContextPtr context_,
+ ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
+ QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
+ : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
+{
+ create_connections = [this, connection_ptr, throttler, extension_](AsyncCallback)
+ {
+ auto res = std::make_unique<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler);
+ if (extension_ && extension_->replica_info)
+ res->setReplicaInfo(*extension_->replica_info);
+ return res;
+ };
+}
+
+RemoteQueryExecutor::RemoteQueryExecutor(
+ std::vector<IConnectionPool::Entry> && connections_,
+ const String & query_, const Block & header_, ContextPtr context_,
+ const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
+ QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
+ : header(header_), query(query_), context(context_)
+ , scalars(scalars_), external_tables(external_tables_), stage(stage_)
+ , extension(extension_)
+{
+ create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable {
+ auto res = std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
+ if (extension_ && extension_->replica_info)
+ res->setReplicaInfo(*extension_->replica_info);
+ return res;
+ };
+}
+
+RemoteQueryExecutor::RemoteQueryExecutor(
+ const ConnectionPoolWithFailoverPtr & pool,
+ const String & query_, const Block & header_, ContextPtr context_,
+ const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
+ QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
+ : header(header_), query(query_), context(context_)
+ , scalars(scalars_), external_tables(external_tables_), stage(stage_)
+ , extension(extension_)
+{
+ create_connections = [this, pool, throttler](AsyncCallback async_callback)->std::unique_ptr<IConnections>
+ {
+ const Settings & current_settings = context->getSettingsRef();
+ auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
+
+#if defined(OS_LINUX)
+ if (current_settings.use_hedged_requests)
+ {
+ std::shared_ptr<QualifiedTableName> table_to_check = nullptr;
+ if (main_table)
+ table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName());
+
+ auto res = std::make_unique<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback));
+ if (extension && extension->replica_info)
+ res->setReplicaInfo(*extension->replica_info);
+ return res;
+ }
+#endif
+
+ std::vector<IConnectionPool::Entry> connection_entries;
+ std::optional<bool> skip_unavailable_endpoints;
+ if (extension && extension->parallel_reading_coordinator)
+ skip_unavailable_endpoints = true;
+
+ if (main_table)
+ {
+ auto try_results = pool->getManyChecked(timeouts, &current_settings, pool_mode, main_table.getQualifiedName(), std::move(async_callback), skip_unavailable_endpoints);
+ connection_entries.reserve(try_results.size());
+ for (auto & try_result : try_results)
+ connection_entries.emplace_back(std::move(try_result.entry));
+ }
+ else
+ {
+ connection_entries = pool->getMany(timeouts, &current_settings, pool_mode, std::move(async_callback), skip_unavailable_endpoints);
+ }
+
+ auto res = std::make_unique<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
+ if (extension && extension->replica_info)
+ res->setReplicaInfo(*extension->replica_info);
+ return res;
+ };
+}
+
+RemoteQueryExecutor::~RemoteQueryExecutor()
+{
+ /// We should finish establishing connections to disconnect it later,
+ /// so these connections won't be in the out-of-sync state.
+ if (read_context && !established)
+ {
+ /// Set was_cancelled, so the query won't be sent after creating connections.
+ was_cancelled = true;
+ read_context->cancel();
+ }
+
+ /** If interrupted in the middle of the loop of communication with replicas, then interrupt
+ * all connections, then read and skip the remaining packets to make sure
+ * these connections did not remain hanging in the out-of-sync state.
+ */
+ if (established || (isQueryPending() && connections))
+ connections->disconnect();
+}
+
+/** If we receive a block with slightly different column types, or with excessive columns,
+ * we will adapt it to expected structure.
+ */
+static Block adaptBlockStructure(const Block & block, const Block & header)
+{
+ /// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest.
+ if (!header)
+ return block;
+
+ Block res;
+ res.info = block.info;
+
+ for (const auto & elem : header)
+ {
+ ColumnPtr column;
+
+ if (elem.column && isColumnConst(*elem.column))
+ {
+ /// We expect constant column in block.
+ /// If block is not empty, then get value for constant from it,
+ /// because it may be different for remote server for functions like version(), uptime(), ...
+ if (block.rows() > 0 && block.has(elem.name))
+ {
+ /// Const column is passed as materialized. Get first value from it.
+ ///
+ /// TODO: check that column contains the same value.
+ /// TODO: serialize const columns.
+ auto col = block.getByName(elem.name);
+ col.column = block.getByName(elem.name).column->cut(0, 1);
+
+ column = castColumn(col, elem.type);
+
+ if (!isColumnConst(*column))
+ column = ColumnConst::create(column, block.rows());
+ else
+ /// It is not possible now. Just in case we support const columns serialization.
+ column = column->cloneResized(block.rows());
+ }
+ else
+ column = elem.column->cloneResized(block.rows());
+ }
+ else
+ column = castColumn(block.getByName(elem.name), elem.type);
+
+ res.insert({column, elem.type, elem.name});
+ }
+ return res;
+}
+
+void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind, AsyncCallback async_callback)
+{
+ /// Query cannot be canceled in the middle of the send query,
+ /// since there are multiple packets:
+ /// - Query
+ /// - Data (multiple times)
+ ///
+ /// And after the Cancel packet none Data packet can be sent, otherwise the remote side will throw:
+ ///
+ /// Unexpected packet Data received from client
+ ///
+ std::lock_guard guard(was_cancelled_mutex);
+ sendQueryUnlocked(query_kind, async_callback);
+}
+
+void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, AsyncCallback async_callback)
+{
+ if (sent_query || was_cancelled)
+ return;
+
+ connections = create_connections(async_callback);
+ AsyncCallbackSetter async_callback_setter(connections.get(), async_callback);
+
+ const auto & settings = context->getSettingsRef();
+ if (isReplicaUnavailable() || needToSkipUnavailableShard())
+ {
+ /// To avoid sending the query again in the read(), we need to update the following flags:
+ was_cancelled = true;
+ finished = true;
+ sent_query = true;
+
+ /// We need to tell the coordinator not to wait for this replica.
+ if (extension && extension->parallel_reading_coordinator)
+ {
+ chassert(extension->replica_info);
+ extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica);
+ }
+
+ return;
+ }
+
+ established = true;
+
+ auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
+ ClientInfo modified_client_info = context->getClientInfo();
+ modified_client_info.query_kind = query_kind;
+
+ if (!duplicated_part_uuids.empty())
+ connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
+
+ connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
+
+ established = false;
+ sent_query = true;
+
+ if (settings.enable_scalar_subquery_optimization)
+ sendScalars();
+ sendExternalTables();
+}
+
+int RemoteQueryExecutor::sendQueryAsync()
+{
+#if defined(OS_LINUX)
+ std::lock_guard lock(was_cancelled_mutex);
+ if (was_cancelled)
+ return -1;
+
+ if (!read_context)
+ read_context = std::make_unique<ReadContext>(*this, /*suspend_when_query_sent*/ true);
+
+ /// If query already sent, do nothing. Note that we cannot use sent_query flag here,
+ /// because we can still be in process of sending scalars or external tables.
+ if (read_context->isQuerySent())
+ return -1;
+
+ read_context->resume();
+
+ if (read_context->isQuerySent())
+ return -1;
+
+ ProfileEvents::increment(ProfileEvents::SuspendSendingQueryToShard); /// Mostly for testing purposes.
+ return read_context->getFileDescriptor();
+#else
+ sendQuery();
+ return -1;
+#endif
+}
+
+Block RemoteQueryExecutor::readBlock()
+{
+ while (true)
+ {
+ auto res = read();
+
+ if (res.getType() == ReadResult::Type::Data)
+ return res.getBlock();
+ }
+}
+
+
+RemoteQueryExecutor::ReadResult RemoteQueryExecutor::read()
+{
+ if (!sent_query)
+ {
+ sendQuery();
+
+ if (context->getSettingsRef().skip_unavailable_shards && (0 == connections->size()))
+ return ReadResult(Block());
+ }
+
+ while (true)
+ {
+ std::lock_guard lock(was_cancelled_mutex);
+ if (was_cancelled)
+ return ReadResult(Block());
+
+ auto packet = connections->receivePacket();
+ auto anything = processPacket(std::move(packet));
+
+ if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken)
+ return anything;
+
+ if (got_duplicated_part_uuids)
+ break;
+ }
+
+ return restartQueryWithoutDuplicatedUUIDs();
+}
+
+RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
+{
+#if defined(OS_LINUX)
+ if (!read_context || (resent_query && recreate_read_context))
+ {
+ std::lock_guard lock(was_cancelled_mutex);
+ if (was_cancelled)
+ return ReadResult(Block());
+
+ read_context = std::make_unique<ReadContext>(*this);
+ recreate_read_context = false;
+ }
+
+ while (true)
+ {
+ std::lock_guard lock(was_cancelled_mutex);
+ if (was_cancelled)
+ return ReadResult(Block());
+
+ read_context->resume();
+
+ if (isReplicaUnavailable() || needToSkipUnavailableShard())
+ {
+ /// We need to tell the coordinator not to wait for this replica.
+ /// But at this point it may lead to an incomplete result set, because
+ /// this replica committed to read some part of there data and then died.
+ if (extension && extension->parallel_reading_coordinator)
+ {
+ chassert(extension->parallel_reading_coordinator);
+ extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica);
+ }
+
+ return ReadResult(Block());
+ }
+
+ /// Check if packet is not ready yet.
+ if (read_context->isInProgress())
+ return ReadResult(read_context->getFileDescriptor());
+
+ auto anything = processPacket(read_context->getPacket());
+
+ if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken)
+ return anything;
+
+ if (got_duplicated_part_uuids)
+ break;
+ }
+
+ return restartQueryWithoutDuplicatedUUIDs();
+#else
+ return read();
+#endif
+}
+
+
+RemoteQueryExecutor::ReadResult RemoteQueryExecutor::restartQueryWithoutDuplicatedUUIDs()
+{
+ {
+ std::lock_guard lock(was_cancelled_mutex);
+ if (was_cancelled)
+ return ReadResult(Block());
+
+ /// Cancel previous query and disconnect before retry.
+ cancelUnlocked();
+ connections->disconnect();
+
+ /// Only resend once, otherwise throw an exception
+ if (resent_query)
+ throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate uuids while processing query");
+
+ if (log)
+ LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts");
+
+ resent_query = true;
+ recreate_read_context = true;
+ sent_query = false;
+ got_duplicated_part_uuids = false;
+ was_cancelled = false;
+ }
+
+ /// Consecutive read will implicitly send query first.
+ if (!read_context)
+ return read();
+ else
+ return readAsync();
+}
+
+RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet)
+{
+ switch (packet.type)
+ {
+ case Protocol::Server::MergeTreeReadTaskRequest:
+ chassert(packet.request.has_value());
+ processMergeTreeReadTaskRequest(packet.request.value());
+ return ReadResult(ReadResult::Type::ParallelReplicasToken);
+
+ case Protocol::Server::MergeTreeAllRangesAnnounecement:
+ chassert(packet.announcement.has_value());
+ processMergeTreeInitialReadAnnounecement(packet.announcement.value());
+ return ReadResult(ReadResult::Type::ParallelReplicasToken);
+
+ case Protocol::Server::ReadTaskRequest:
+ processReadTaskRequest();
+ break;
+ case Protocol::Server::PartUUIDs:
+ if (!setPartUUIDs(packet.part_uuids))
+ got_duplicated_part_uuids = true;
+ break;
+ case Protocol::Server::Data:
+ /// Note: `packet.block.rows() > 0` means it's a header block.
+ /// We can actually return it, and the first call to RemoteQueryExecutor::read
+ /// will return earlier. We should consider doing it.
+ if (packet.block && (packet.block.rows() > 0))
+ return ReadResult(adaptBlockStructure(packet.block, header));
+ break; /// If the block is empty - we will receive other packets before EndOfStream.
+
+ case Protocol::Server::Exception:
+ got_exception_from_replica = true;
+ packet.exception->rethrow();
+ break;
+
+ case Protocol::Server::EndOfStream:
+ if (!connections->hasActiveConnections())
+ {
+ finished = true;
+ /// TODO: Replace with Type::Finished
+ return ReadResult(Block{});
+ }
+ break;
+
+ case Protocol::Server::Progress:
+ /** We use the progress from a remote server.
+ * We also include in ProcessList,
+ * and we use it to check
+ * constraints (for example, the minimum speed of query execution)
+ * and quotas (for example, the number of lines to read).
+ */
+ if (progress_callback)
+ progress_callback(packet.progress);
+ break;
+
+ case Protocol::Server::ProfileInfo:
+ /// Use own (client-side) info about read bytes, it is more correct info than server-side one.
+ if (profile_info_callback)
+ profile_info_callback(packet.profile_info);
+ break;
+
+ case Protocol::Server::Totals:
+ totals = packet.block;
+ if (totals)
+ totals = adaptBlockStructure(totals, header);
+ break;
+
+ case Protocol::Server::Extremes:
+ extremes = packet.block;
+ if (extremes)
+ extremes = adaptBlockStructure(packet.block, header);
+ break;
+
+ case Protocol::Server::Log:
+ /// Pass logs from remote server to client
+ if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
+ log_queue->pushBlock(std::move(packet.block));
+ break;
+
+ case Protocol::Server::ProfileEvents:
+ /// Pass profile events from remote server to client
+ if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue())
+ if (!profile_queue->emplace(std::move(packet.block)))
+ throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue");
+ break;
+
+ case Protocol::Server::TimezoneUpdate:
+ break;
+
+ default:
+ got_unknown_packet_from_replica = true;
+ throw Exception(
+ ErrorCodes::UNKNOWN_PACKET_FROM_SERVER,
+ "Unknown packet {} from one of the following replicas: {}",
+ packet.type,
+ connections->dumpAddresses());
+ }
+
+ return ReadResult(ReadResult::Type::Nothing);
+}
+
+bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids)
+{
+ auto query_context = context->getQueryContext();
+ auto duplicates = query_context->getPartUUIDs()->add(uuids);
+
+ if (!duplicates.empty())
+ {
+ duplicated_part_uuids.insert(duplicated_part_uuids.begin(), duplicates.begin(), duplicates.end());
+ return false;
+ }
+ return true;
+}
+
+void RemoteQueryExecutor::processReadTaskRequest()
+{
+ if (!extension || !extension->task_iterator)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");
+
+ ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
+ auto response = (*extension->task_iterator)();
+ connections->sendReadTaskResponse(response);
+}
+
+void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest request)
+{
+ if (!extension || !extension->parallel_reading_coordinator)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
+
+ ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived);
+ auto response = extension->parallel_reading_coordinator->handleRequest(std::move(request));
+ connections->sendMergeTreeReadTaskResponse(response);
+}
+
+void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement)
+{
+ if (!extension || !extension->parallel_reading_coordinator)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
+
+ extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement);
+}
+
+void RemoteQueryExecutor::finish()
+{
+ std::lock_guard guard(was_cancelled_mutex);
+
+ /** If one of:
+ * - nothing started to do;
+ * - received all packets before EndOfStream;
+ * - received exception from one replica;
+ * - received an unknown packet from one replica;
+ * then you do not need to read anything.
+ */
+ if (!isQueryPending() || hasThrownException())
+ return;
+
+ /** If you have not read all the data yet, but they are no longer needed.
+ * This may be due to the fact that the data is sufficient (for example, when using LIMIT).
+ */
+
+ /// Send the request to abort the execution of the request, if not already sent.
+ tryCancel("Cancelling query because enough data has been read");
+
+ /// If connections weren't created yet, query wasn't sent or was already finished, nothing to do.
+ if (!connections || !sent_query || finished)
+ return;
+
+ /// Get the remaining packets so that there is no out of sync in the connections to the replicas.
+ Packet packet = connections->drain();
+ switch (packet.type)
+ {
+ case Protocol::Server::EndOfStream:
+ finished = true;
+ break;
+
+ case Protocol::Server::Log:
+ /// Pass logs from remote server to client
+ if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
+ log_queue->pushBlock(std::move(packet.block));
+ break;
+
+ case Protocol::Server::Exception:
+ got_exception_from_replica = true;
+ packet.exception->rethrow();
+ break;
+
+ case Protocol::Server::ProfileEvents:
+ /// Pass profile events from remote server to client
+ if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue())
+ if (!profile_queue->emplace(std::move(packet.block)))
+ throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue");
+ break;
+
+ case Protocol::Server::TimezoneUpdate:
+ break;
+
+ default:
+ got_unknown_packet_from_replica = true;
+ throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
+ toString(packet.type),
+ connections->dumpAddresses());
+ }
+}
+
+void RemoteQueryExecutor::cancel()
+{
+ std::lock_guard guard(was_cancelled_mutex);
+ cancelUnlocked();
+}
+
+void RemoteQueryExecutor::cancelUnlocked()
+{
+ {
+ std::lock_guard lock(external_tables_mutex);
+
+ /// Stop sending external data.
+ for (auto & vec : external_tables_data)
+ for (auto & elem : vec)
+ elem->is_cancelled = true;
+ }
+
+ if (!isQueryPending() || hasThrownException())
+ return;
+
+ tryCancel("Cancelling query");
+}
+
+void RemoteQueryExecutor::sendScalars()
+{
+ connections->sendScalarsData(scalars);
+}
+
+void RemoteQueryExecutor::sendExternalTables()
+{
+ size_t count = connections->size();
+
+ {
+ std::lock_guard lock(external_tables_mutex);
+
+ external_tables_data.clear();
+ external_tables_data.reserve(count);
+
+ StreamLocalLimits limits;
+ const auto & settings = context->getSettingsRef();
+ limits.mode = LimitsMode::LIMITS_TOTAL;
+ limits.speed_limits.max_execution_time = settings.max_execution_time;
+ limits.timeout_overflow_mode = settings.timeout_overflow_mode;
+
+ for (size_t i = 0; i < count; ++i)
+ {
+ ExternalTablesData res;
+ for (const auto & table : external_tables)
+ {
+ StoragePtr cur = table.second;
+ /// Send only temporary tables with StorageMemory
+ if (!std::dynamic_pointer_cast<StorageMemory>(cur))
+ continue;
+
+ auto data = std::make_unique<ExternalTableData>();
+ data->table_name = table.first;
+ data->creating_pipe_callback = [cur, limits, my_context = this->context]()
+ {
+ SelectQueryInfo query_info;
+ auto metadata_snapshot = cur->getInMemoryMetadataPtr();
+ auto storage_snapshot = cur->getStorageSnapshot(metadata_snapshot, my_context);
+ QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
+ my_context, QueryProcessingStage::Complete, storage_snapshot, query_info);
+
+ QueryPlan plan;
+ cur->read(
+ plan,
+ metadata_snapshot->getColumns().getNamesOfPhysical(),
+ storage_snapshot, query_info, my_context,
+ read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
+
+ auto builder = plan.buildQueryPipeline(
+ QueryPlanOptimizationSettings::fromContext(my_context),
+ BuildQueryPipelineSettings::fromContext(my_context));
+
+ builder->resize(1);
+ builder->addTransform(std::make_shared<LimitsCheckingTransform>(builder->getHeader(), limits));
+
+ return builder;
+ };
+
+ data->pipe = data->creating_pipe_callback();
+ res.emplace_back(std::move(data));
+ }
+ external_tables_data.push_back(std::move(res));
+ }
+ }
+
+ connections->sendExternalTablesData(external_tables_data);
+}
+
+void RemoteQueryExecutor::tryCancel(const char * reason)
+{
+ if (was_cancelled)
+ return;
+
+ was_cancelled = true;
+
+ if (read_context)
+ read_context->cancel();
+
+ /// Query could be cancelled during connection creation, query sending or data receiving.
+ /// We should send cancel request if connections were already created, query were sent
+ /// and remote query is not finished.
+ if (connections && sent_query && !finished)
+ {
+ connections->sendCancel();
+ if (log)
+ LOG_TRACE(log, "({}) {}", connections->dumpAddresses(), reason);
+ }
+}
+
+bool RemoteQueryExecutor::isQueryPending() const
+{
+ return (sent_query || read_context) && !finished;
+}
+
+bool RemoteQueryExecutor::hasThrownException() const
+{
+ return got_exception_from_replica || got_unknown_packet_from_replica;
+}
+
+}