diff options
| author | vitalyisaev <[email protected]> | 2023-11-14 09:58:56 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-11-14 10:20:20 +0300 |
| commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
| tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp | |
| parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp')
| -rw-r--r-- | contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp | 763 |
1 files changed, 763 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp b/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp new file mode 100644 index 00000000000..eebe9797051 --- /dev/null +++ b/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -0,0 +1,763 @@ +#include <Common/ConcurrentBoundedQueue.h> +#include <QueryPipeline/RemoteQueryExecutor.h> +#include <QueryPipeline/RemoteQueryExecutorReadContext.h> + +#include <Columns/ColumnConst.h> +#include <Common/CurrentThread.h> +#include "Core/Protocol.h" +#include <Processors/QueryPlan/BuildQueryPipelineSettings.h> +#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> +#include <Processors/Sources/SourceFromSingleChunk.h> +#include <Processors/Transforms/LimitsCheckingTransform.h> +#include <Processors/QueryPlan/QueryPlan.h> +#include <QueryPipeline/QueryPipelineBuilder.h> +#include <Storages/SelectQueryInfo.h> +#include <Interpreters/castColumn.h> +#include <Interpreters/Cluster.h> +#include <Interpreters/Context.h> +#include <Interpreters/InternalTextLogsQueue.h> +#include <IO/ConnectionTimeouts.h> +#include <Client/MultiplexedConnections.h> +#include <Client/HedgedConnections.h> +#include <Storages/MergeTree/MergeTreeDataPartUUID.h> +#include <Storages/StorageMemory.h> + + +namespace ProfileEvents +{ + extern const Event SuspendSendingQueryToShard; + extern const Event ReadTaskRequestsReceived; + extern const Event MergeTreeReadTaskRequestsReceived; +} + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int UNKNOWN_PACKET_FROM_SERVER; + extern const int DUPLICATED_PART_UUIDS; + extern const int SYSTEM_ERROR; +} + +RemoteQueryExecutor::RemoteQueryExecutor( + const String & query_, const Block & header_, ContextPtr context_, + const Scalars & scalars_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, std::optional<Extension> extension_) + : header(header_), query(query_), context(context_), scalars(scalars_) + , external_tables(external_tables_), stage(stage_) + , extension(extension_) +{} + +RemoteQueryExecutor::RemoteQueryExecutor( + Connection & connection, + const String & query_, const Block & header_, ContextPtr context_, + ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, std::optional<Extension> extension_) + : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_) +{ + create_connections = [this, &connection, throttler, extension_](AsyncCallback) + { + auto res = std::make_unique<MultiplexedConnections>(connection, context->getSettingsRef(), throttler); + if (extension_ && extension_->replica_info) + res->setReplicaInfo(*extension_->replica_info); + return res; + }; +} + +RemoteQueryExecutor::RemoteQueryExecutor( + std::shared_ptr<Connection> connection_ptr, + const String & query_, const Block & header_, ContextPtr context_, + ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, std::optional<Extension> extension_) + : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_) +{ + create_connections = [this, connection_ptr, throttler, extension_](AsyncCallback) + { + auto res = std::make_unique<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler); + if (extension_ && extension_->replica_info) + res->setReplicaInfo(*extension_->replica_info); + return res; + }; +} + +RemoteQueryExecutor::RemoteQueryExecutor( + std::vector<IConnectionPool::Entry> && connections_, + const String & query_, const Block & header_, ContextPtr context_, + const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, std::optional<Extension> extension_) + : header(header_), query(query_), context(context_) + , scalars(scalars_), external_tables(external_tables_), stage(stage_) + , extension(extension_) +{ + create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable { + auto res = std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler); + if (extension_ && extension_->replica_info) + res->setReplicaInfo(*extension_->replica_info); + return res; + }; +} + +RemoteQueryExecutor::RemoteQueryExecutor( + const ConnectionPoolWithFailoverPtr & pool, + const String & query_, const Block & header_, ContextPtr context_, + const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, + QueryProcessingStage::Enum stage_, std::optional<Extension> extension_) + : header(header_), query(query_), context(context_) + , scalars(scalars_), external_tables(external_tables_), stage(stage_) + , extension(extension_) +{ + create_connections = [this, pool, throttler](AsyncCallback async_callback)->std::unique_ptr<IConnections> + { + const Settings & current_settings = context->getSettingsRef(); + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); + +#if defined(OS_LINUX) + if (current_settings.use_hedged_requests) + { + std::shared_ptr<QualifiedTableName> table_to_check = nullptr; + if (main_table) + table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName()); + + auto res = std::make_unique<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback)); + if (extension && extension->replica_info) + res->setReplicaInfo(*extension->replica_info); + return res; + } +#endif + + std::vector<IConnectionPool::Entry> connection_entries; + std::optional<bool> skip_unavailable_endpoints; + if (extension && extension->parallel_reading_coordinator) + skip_unavailable_endpoints = true; + + if (main_table) + { + auto try_results = pool->getManyChecked(timeouts, ¤t_settings, pool_mode, main_table.getQualifiedName(), std::move(async_callback), skip_unavailable_endpoints); + connection_entries.reserve(try_results.size()); + for (auto & try_result : try_results) + connection_entries.emplace_back(std::move(try_result.entry)); + } + else + { + connection_entries = pool->getMany(timeouts, ¤t_settings, pool_mode, std::move(async_callback), skip_unavailable_endpoints); + } + + auto res = std::make_unique<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler); + if (extension && extension->replica_info) + res->setReplicaInfo(*extension->replica_info); + return res; + }; +} + +RemoteQueryExecutor::~RemoteQueryExecutor() +{ + /// We should finish establishing connections to disconnect it later, + /// so these connections won't be in the out-of-sync state. + if (read_context && !established) + { + /// Set was_cancelled, so the query won't be sent after creating connections. + was_cancelled = true; + read_context->cancel(); + } + + /** If interrupted in the middle of the loop of communication with replicas, then interrupt + * all connections, then read and skip the remaining packets to make sure + * these connections did not remain hanging in the out-of-sync state. + */ + if (established || (isQueryPending() && connections)) + connections->disconnect(); +} + +/** If we receive a block with slightly different column types, or with excessive columns, + * we will adapt it to expected structure. + */ +static Block adaptBlockStructure(const Block & block, const Block & header) +{ + /// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest. + if (!header) + return block; + + Block res; + res.info = block.info; + + for (const auto & elem : header) + { + ColumnPtr column; + + if (elem.column && isColumnConst(*elem.column)) + { + /// We expect constant column in block. + /// If block is not empty, then get value for constant from it, + /// because it may be different for remote server for functions like version(), uptime(), ... + if (block.rows() > 0 && block.has(elem.name)) + { + /// Const column is passed as materialized. Get first value from it. + /// + /// TODO: check that column contains the same value. + /// TODO: serialize const columns. + auto col = block.getByName(elem.name); + col.column = block.getByName(elem.name).column->cut(0, 1); + + column = castColumn(col, elem.type); + + if (!isColumnConst(*column)) + column = ColumnConst::create(column, block.rows()); + else + /// It is not possible now. Just in case we support const columns serialization. + column = column->cloneResized(block.rows()); + } + else + column = elem.column->cloneResized(block.rows()); + } + else + column = castColumn(block.getByName(elem.name), elem.type); + + res.insert({column, elem.type, elem.name}); + } + return res; +} + +void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind, AsyncCallback async_callback) +{ + /// Query cannot be canceled in the middle of the send query, + /// since there are multiple packets: + /// - Query + /// - Data (multiple times) + /// + /// And after the Cancel packet none Data packet can be sent, otherwise the remote side will throw: + /// + /// Unexpected packet Data received from client + /// + std::lock_guard guard(was_cancelled_mutex); + sendQueryUnlocked(query_kind, async_callback); +} + +void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, AsyncCallback async_callback) +{ + if (sent_query || was_cancelled) + return; + + connections = create_connections(async_callback); + AsyncCallbackSetter async_callback_setter(connections.get(), async_callback); + + const auto & settings = context->getSettingsRef(); + if (isReplicaUnavailable() || needToSkipUnavailableShard()) + { + /// To avoid sending the query again in the read(), we need to update the following flags: + was_cancelled = true; + finished = true; + sent_query = true; + + /// We need to tell the coordinator not to wait for this replica. + if (extension && extension->parallel_reading_coordinator) + { + chassert(extension->replica_info); + extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica); + } + + return; + } + + established = true; + + auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); + ClientInfo modified_client_info = context->getClientInfo(); + modified_client_info.query_kind = query_kind; + + if (!duplicated_part_uuids.empty()) + connections->sendIgnoredPartUUIDs(duplicated_part_uuids); + + connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true); + + established = false; + sent_query = true; + + if (settings.enable_scalar_subquery_optimization) + sendScalars(); + sendExternalTables(); +} + +int RemoteQueryExecutor::sendQueryAsync() +{ +#if defined(OS_LINUX) + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return -1; + + if (!read_context) + read_context = std::make_unique<ReadContext>(*this, /*suspend_when_query_sent*/ true); + + /// If query already sent, do nothing. Note that we cannot use sent_query flag here, + /// because we can still be in process of sending scalars or external tables. + if (read_context->isQuerySent()) + return -1; + + read_context->resume(); + + if (read_context->isQuerySent()) + return -1; + + ProfileEvents::increment(ProfileEvents::SuspendSendingQueryToShard); /// Mostly for testing purposes. + return read_context->getFileDescriptor(); +#else + sendQuery(); + return -1; +#endif +} + +Block RemoteQueryExecutor::readBlock() +{ + while (true) + { + auto res = read(); + + if (res.getType() == ReadResult::Type::Data) + return res.getBlock(); + } +} + + +RemoteQueryExecutor::ReadResult RemoteQueryExecutor::read() +{ + if (!sent_query) + { + sendQuery(); + + if (context->getSettingsRef().skip_unavailable_shards && (0 == connections->size())) + return ReadResult(Block()); + } + + while (true) + { + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return ReadResult(Block()); + + auto packet = connections->receivePacket(); + auto anything = processPacket(std::move(packet)); + + if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken) + return anything; + + if (got_duplicated_part_uuids) + break; + } + + return restartQueryWithoutDuplicatedUUIDs(); +} + +RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() +{ +#if defined(OS_LINUX) + if (!read_context || (resent_query && recreate_read_context)) + { + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return ReadResult(Block()); + + read_context = std::make_unique<ReadContext>(*this); + recreate_read_context = false; + } + + while (true) + { + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return ReadResult(Block()); + + read_context->resume(); + + if (isReplicaUnavailable() || needToSkipUnavailableShard()) + { + /// We need to tell the coordinator not to wait for this replica. + /// But at this point it may lead to an incomplete result set, because + /// this replica committed to read some part of there data and then died. + if (extension && extension->parallel_reading_coordinator) + { + chassert(extension->parallel_reading_coordinator); + extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica); + } + + return ReadResult(Block()); + } + + /// Check if packet is not ready yet. + if (read_context->isInProgress()) + return ReadResult(read_context->getFileDescriptor()); + + auto anything = processPacket(read_context->getPacket()); + + if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken) + return anything; + + if (got_duplicated_part_uuids) + break; + } + + return restartQueryWithoutDuplicatedUUIDs(); +#else + return read(); +#endif +} + + +RemoteQueryExecutor::ReadResult RemoteQueryExecutor::restartQueryWithoutDuplicatedUUIDs() +{ + { + std::lock_guard lock(was_cancelled_mutex); + if (was_cancelled) + return ReadResult(Block()); + + /// Cancel previous query and disconnect before retry. + cancelUnlocked(); + connections->disconnect(); + + /// Only resend once, otherwise throw an exception + if (resent_query) + throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate uuids while processing query"); + + if (log) + LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts"); + + resent_query = true; + recreate_read_context = true; + sent_query = false; + got_duplicated_part_uuids = false; + was_cancelled = false; + } + + /// Consecutive read will implicitly send query first. + if (!read_context) + return read(); + else + return readAsync(); +} + +RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet) +{ + switch (packet.type) + { + case Protocol::Server::MergeTreeReadTaskRequest: + chassert(packet.request.has_value()); + processMergeTreeReadTaskRequest(packet.request.value()); + return ReadResult(ReadResult::Type::ParallelReplicasToken); + + case Protocol::Server::MergeTreeAllRangesAnnounecement: + chassert(packet.announcement.has_value()); + processMergeTreeInitialReadAnnounecement(packet.announcement.value()); + return ReadResult(ReadResult::Type::ParallelReplicasToken); + + case Protocol::Server::ReadTaskRequest: + processReadTaskRequest(); + break; + case Protocol::Server::PartUUIDs: + if (!setPartUUIDs(packet.part_uuids)) + got_duplicated_part_uuids = true; + break; + case Protocol::Server::Data: + /// Note: `packet.block.rows() > 0` means it's a header block. + /// We can actually return it, and the first call to RemoteQueryExecutor::read + /// will return earlier. We should consider doing it. + if (packet.block && (packet.block.rows() > 0)) + return ReadResult(adaptBlockStructure(packet.block, header)); + break; /// If the block is empty - we will receive other packets before EndOfStream. + + case Protocol::Server::Exception: + got_exception_from_replica = true; + packet.exception->rethrow(); + break; + + case Protocol::Server::EndOfStream: + if (!connections->hasActiveConnections()) + { + finished = true; + /// TODO: Replace with Type::Finished + return ReadResult(Block{}); + } + break; + + case Protocol::Server::Progress: + /** We use the progress from a remote server. + * We also include in ProcessList, + * and we use it to check + * constraints (for example, the minimum speed of query execution) + * and quotas (for example, the number of lines to read). + */ + if (progress_callback) + progress_callback(packet.progress); + break; + + case Protocol::Server::ProfileInfo: + /// Use own (client-side) info about read bytes, it is more correct info than server-side one. + if (profile_info_callback) + profile_info_callback(packet.profile_info); + break; + + case Protocol::Server::Totals: + totals = packet.block; + if (totals) + totals = adaptBlockStructure(totals, header); + break; + + case Protocol::Server::Extremes: + extremes = packet.block; + if (extremes) + extremes = adaptBlockStructure(packet.block, header); + break; + + case Protocol::Server::Log: + /// Pass logs from remote server to client + if (auto log_queue = CurrentThread::getInternalTextLogsQueue()) + log_queue->pushBlock(std::move(packet.block)); + break; + + case Protocol::Server::ProfileEvents: + /// Pass profile events from remote server to client + if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue()) + if (!profile_queue->emplace(std::move(packet.block))) + throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue"); + break; + + case Protocol::Server::TimezoneUpdate: + break; + + default: + got_unknown_packet_from_replica = true; + throw Exception( + ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, + "Unknown packet {} from one of the following replicas: {}", + packet.type, + connections->dumpAddresses()); + } + + return ReadResult(ReadResult::Type::Nothing); +} + +bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids) +{ + auto query_context = context->getQueryContext(); + auto duplicates = query_context->getPartUUIDs()->add(uuids); + + if (!duplicates.empty()) + { + duplicated_part_uuids.insert(duplicated_part_uuids.begin(), duplicates.begin(), duplicates.end()); + return false; + } + return true; +} + +void RemoteQueryExecutor::processReadTaskRequest() +{ + if (!extension || !extension->task_iterator) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized"); + + ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived); + auto response = (*extension->task_iterator)(); + connections->sendReadTaskResponse(response); +} + +void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest request) +{ + if (!extension || !extension->parallel_reading_coordinator) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized"); + + ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived); + auto response = extension->parallel_reading_coordinator->handleRequest(std::move(request)); + connections->sendMergeTreeReadTaskResponse(response); +} + +void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement) +{ + if (!extension || !extension->parallel_reading_coordinator) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized"); + + extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement); +} + +void RemoteQueryExecutor::finish() +{ + std::lock_guard guard(was_cancelled_mutex); + + /** If one of: + * - nothing started to do; + * - received all packets before EndOfStream; + * - received exception from one replica; + * - received an unknown packet from one replica; + * then you do not need to read anything. + */ + if (!isQueryPending() || hasThrownException()) + return; + + /** If you have not read all the data yet, but they are no longer needed. + * This may be due to the fact that the data is sufficient (for example, when using LIMIT). + */ + + /// Send the request to abort the execution of the request, if not already sent. + tryCancel("Cancelling query because enough data has been read"); + + /// If connections weren't created yet, query wasn't sent or was already finished, nothing to do. + if (!connections || !sent_query || finished) + return; + + /// Get the remaining packets so that there is no out of sync in the connections to the replicas. + Packet packet = connections->drain(); + switch (packet.type) + { + case Protocol::Server::EndOfStream: + finished = true; + break; + + case Protocol::Server::Log: + /// Pass logs from remote server to client + if (auto log_queue = CurrentThread::getInternalTextLogsQueue()) + log_queue->pushBlock(std::move(packet.block)); + break; + + case Protocol::Server::Exception: + got_exception_from_replica = true; + packet.exception->rethrow(); + break; + + case Protocol::Server::ProfileEvents: + /// Pass profile events from remote server to client + if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue()) + if (!profile_queue->emplace(std::move(packet.block))) + throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue"); + break; + + case Protocol::Server::TimezoneUpdate: + break; + + default: + got_unknown_packet_from_replica = true; + throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}", + toString(packet.type), + connections->dumpAddresses()); + } +} + +void RemoteQueryExecutor::cancel() +{ + std::lock_guard guard(was_cancelled_mutex); + cancelUnlocked(); +} + +void RemoteQueryExecutor::cancelUnlocked() +{ + { + std::lock_guard lock(external_tables_mutex); + + /// Stop sending external data. + for (auto & vec : external_tables_data) + for (auto & elem : vec) + elem->is_cancelled = true; + } + + if (!isQueryPending() || hasThrownException()) + return; + + tryCancel("Cancelling query"); +} + +void RemoteQueryExecutor::sendScalars() +{ + connections->sendScalarsData(scalars); +} + +void RemoteQueryExecutor::sendExternalTables() +{ + size_t count = connections->size(); + + { + std::lock_guard lock(external_tables_mutex); + + external_tables_data.clear(); + external_tables_data.reserve(count); + + StreamLocalLimits limits; + const auto & settings = context->getSettingsRef(); + limits.mode = LimitsMode::LIMITS_TOTAL; + limits.speed_limits.max_execution_time = settings.max_execution_time; + limits.timeout_overflow_mode = settings.timeout_overflow_mode; + + for (size_t i = 0; i < count; ++i) + { + ExternalTablesData res; + for (const auto & table : external_tables) + { + StoragePtr cur = table.second; + /// Send only temporary tables with StorageMemory + if (!std::dynamic_pointer_cast<StorageMemory>(cur)) + continue; + + auto data = std::make_unique<ExternalTableData>(); + data->table_name = table.first; + data->creating_pipe_callback = [cur, limits, my_context = this->context]() + { + SelectQueryInfo query_info; + auto metadata_snapshot = cur->getInMemoryMetadataPtr(); + auto storage_snapshot = cur->getStorageSnapshot(metadata_snapshot, my_context); + QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage( + my_context, QueryProcessingStage::Complete, storage_snapshot, query_info); + + QueryPlan plan; + cur->read( + plan, + metadata_snapshot->getColumns().getNamesOfPhysical(), + storage_snapshot, query_info, my_context, + read_from_table_stage, DEFAULT_BLOCK_SIZE, 1); + + auto builder = plan.buildQueryPipeline( + QueryPlanOptimizationSettings::fromContext(my_context), + BuildQueryPipelineSettings::fromContext(my_context)); + + builder->resize(1); + builder->addTransform(std::make_shared<LimitsCheckingTransform>(builder->getHeader(), limits)); + + return builder; + }; + + data->pipe = data->creating_pipe_callback(); + res.emplace_back(std::move(data)); + } + external_tables_data.push_back(std::move(res)); + } + } + + connections->sendExternalTablesData(external_tables_data); +} + +void RemoteQueryExecutor::tryCancel(const char * reason) +{ + if (was_cancelled) + return; + + was_cancelled = true; + + if (read_context) + read_context->cancel(); + + /// Query could be cancelled during connection creation, query sending or data receiving. + /// We should send cancel request if connections were already created, query were sent + /// and remote query is not finished. + if (connections && sent_query && !finished) + { + connections->sendCancel(); + if (log) + LOG_TRACE(log, "({}) {}", connections->dumpAddresses(), reason); + } +} + +bool RemoteQueryExecutor::isQueryPending() const +{ + return (sent_query || read_context) && !finished; +} + +bool RemoteQueryExecutor::hasThrownException() const +{ + return got_exception_from_replica || got_unknown_packet_from_replica; +} + +} |
