diff options
| author | AlexSm <[email protected]> | 2024-01-04 15:09:05 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-01-04 15:09:05 +0100 |
| commit | dab291146f6cd7d35684e3a1150e5bb1c412982c (patch) | |
| tree | 36ef35f6cacb6432845a4a33f940c95871036b32 /contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp | |
| parent | 63660ad5e7512029fd0218e7a636580695a24e1f (diff) | |
Library import 5, delete go dependencies (#832)
* Library import 5, delete go dependencies
* Fix yt client
Diffstat (limited to 'contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp')
| -rw-r--r-- | contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp | 763 |
1 files changed, 0 insertions, 763 deletions
diff --git a/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp b/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp deleted file mode 100644 index eebe9797051..00000000000 --- a/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp +++ /dev/null @@ -1,763 +0,0 @@ -#include <Common/ConcurrentBoundedQueue.h> -#include <QueryPipeline/RemoteQueryExecutor.h> -#include <QueryPipeline/RemoteQueryExecutorReadContext.h> - -#include <Columns/ColumnConst.h> -#include <Common/CurrentThread.h> -#include "Core/Protocol.h" -#include <Processors/QueryPlan/BuildQueryPipelineSettings.h> -#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h> -#include <Processors/Sources/SourceFromSingleChunk.h> -#include <Processors/Transforms/LimitsCheckingTransform.h> -#include <Processors/QueryPlan/QueryPlan.h> -#include <QueryPipeline/QueryPipelineBuilder.h> -#include <Storages/SelectQueryInfo.h> -#include <Interpreters/castColumn.h> -#include <Interpreters/Cluster.h> -#include <Interpreters/Context.h> -#include <Interpreters/InternalTextLogsQueue.h> -#include <IO/ConnectionTimeouts.h> -#include <Client/MultiplexedConnections.h> -#include <Client/HedgedConnections.h> -#include <Storages/MergeTree/MergeTreeDataPartUUID.h> -#include <Storages/StorageMemory.h> - - -namespace ProfileEvents -{ - extern const Event SuspendSendingQueryToShard; - extern const Event ReadTaskRequestsReceived; - extern const Event MergeTreeReadTaskRequestsReceived; -} - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; - extern const int UNKNOWN_PACKET_FROM_SERVER; - extern const int DUPLICATED_PART_UUIDS; - extern const int SYSTEM_ERROR; -} - -RemoteQueryExecutor::RemoteQueryExecutor( - const String & query_, const Block & header_, ContextPtr context_, - const Scalars & scalars_, const Tables & external_tables_, - QueryProcessingStage::Enum stage_, std::optional<Extension> extension_) - : header(header_), query(query_), context(context_), scalars(scalars_) - , external_tables(external_tables_), stage(stage_) - , extension(extension_) -{} - -RemoteQueryExecutor::RemoteQueryExecutor( - Connection & connection, - const String & query_, const Block & header_, ContextPtr context_, - ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_, - QueryProcessingStage::Enum stage_, std::optional<Extension> extension_) - : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_) -{ - create_connections = [this, &connection, throttler, extension_](AsyncCallback) - { - auto res = std::make_unique<MultiplexedConnections>(connection, context->getSettingsRef(), throttler); - if (extension_ && extension_->replica_info) - res->setReplicaInfo(*extension_->replica_info); - return res; - }; -} - -RemoteQueryExecutor::RemoteQueryExecutor( - std::shared_ptr<Connection> connection_ptr, - const String & query_, const Block & header_, ContextPtr context_, - ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_, - QueryProcessingStage::Enum stage_, std::optional<Extension> extension_) - : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_) -{ - create_connections = [this, connection_ptr, throttler, extension_](AsyncCallback) - { - auto res = std::make_unique<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler); - if (extension_ && extension_->replica_info) - res->setReplicaInfo(*extension_->replica_info); - return res; - }; -} - -RemoteQueryExecutor::RemoteQueryExecutor( - std::vector<IConnectionPool::Entry> && connections_, - const String & query_, const Block & header_, ContextPtr context_, - const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, - QueryProcessingStage::Enum stage_, std::optional<Extension> extension_) - : header(header_), query(query_), context(context_) - , scalars(scalars_), external_tables(external_tables_), stage(stage_) - , extension(extension_) -{ - create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable { - auto res = std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler); - if (extension_ && extension_->replica_info) - res->setReplicaInfo(*extension_->replica_info); - return res; - }; -} - -RemoteQueryExecutor::RemoteQueryExecutor( - const ConnectionPoolWithFailoverPtr & pool, - const String & query_, const Block & header_, ContextPtr context_, - const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, - QueryProcessingStage::Enum stage_, std::optional<Extension> extension_) - : header(header_), query(query_), context(context_) - , scalars(scalars_), external_tables(external_tables_), stage(stage_) - , extension(extension_) -{ - create_connections = [this, pool, throttler](AsyncCallback async_callback)->std::unique_ptr<IConnections> - { - const Settings & current_settings = context->getSettingsRef(); - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings); - -#if defined(OS_LINUX) - if (current_settings.use_hedged_requests) - { - std::shared_ptr<QualifiedTableName> table_to_check = nullptr; - if (main_table) - table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName()); - - auto res = std::make_unique<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback)); - if (extension && extension->replica_info) - res->setReplicaInfo(*extension->replica_info); - return res; - } -#endif - - std::vector<IConnectionPool::Entry> connection_entries; - std::optional<bool> skip_unavailable_endpoints; - if (extension && extension->parallel_reading_coordinator) - skip_unavailable_endpoints = true; - - if (main_table) - { - auto try_results = pool->getManyChecked(timeouts, ¤t_settings, pool_mode, main_table.getQualifiedName(), std::move(async_callback), skip_unavailable_endpoints); - connection_entries.reserve(try_results.size()); - for (auto & try_result : try_results) - connection_entries.emplace_back(std::move(try_result.entry)); - } - else - { - connection_entries = pool->getMany(timeouts, ¤t_settings, pool_mode, std::move(async_callback), skip_unavailable_endpoints); - } - - auto res = std::make_unique<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler); - if (extension && extension->replica_info) - res->setReplicaInfo(*extension->replica_info); - return res; - }; -} - -RemoteQueryExecutor::~RemoteQueryExecutor() -{ - /// We should finish establishing connections to disconnect it later, - /// so these connections won't be in the out-of-sync state. - if (read_context && !established) - { - /// Set was_cancelled, so the query won't be sent after creating connections. - was_cancelled = true; - read_context->cancel(); - } - - /** If interrupted in the middle of the loop of communication with replicas, then interrupt - * all connections, then read and skip the remaining packets to make sure - * these connections did not remain hanging in the out-of-sync state. - */ - if (established || (isQueryPending() && connections)) - connections->disconnect(); -} - -/** If we receive a block with slightly different column types, or with excessive columns, - * we will adapt it to expected structure. - */ -static Block adaptBlockStructure(const Block & block, const Block & header) -{ - /// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest. - if (!header) - return block; - - Block res; - res.info = block.info; - - for (const auto & elem : header) - { - ColumnPtr column; - - if (elem.column && isColumnConst(*elem.column)) - { - /// We expect constant column in block. - /// If block is not empty, then get value for constant from it, - /// because it may be different for remote server for functions like version(), uptime(), ... - if (block.rows() > 0 && block.has(elem.name)) - { - /// Const column is passed as materialized. Get first value from it. - /// - /// TODO: check that column contains the same value. - /// TODO: serialize const columns. - auto col = block.getByName(elem.name); - col.column = block.getByName(elem.name).column->cut(0, 1); - - column = castColumn(col, elem.type); - - if (!isColumnConst(*column)) - column = ColumnConst::create(column, block.rows()); - else - /// It is not possible now. Just in case we support const columns serialization. - column = column->cloneResized(block.rows()); - } - else - column = elem.column->cloneResized(block.rows()); - } - else - column = castColumn(block.getByName(elem.name), elem.type); - - res.insert({column, elem.type, elem.name}); - } - return res; -} - -void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind, AsyncCallback async_callback) -{ - /// Query cannot be canceled in the middle of the send query, - /// since there are multiple packets: - /// - Query - /// - Data (multiple times) - /// - /// And after the Cancel packet none Data packet can be sent, otherwise the remote side will throw: - /// - /// Unexpected packet Data received from client - /// - std::lock_guard guard(was_cancelled_mutex); - sendQueryUnlocked(query_kind, async_callback); -} - -void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, AsyncCallback async_callback) -{ - if (sent_query || was_cancelled) - return; - - connections = create_connections(async_callback); - AsyncCallbackSetter async_callback_setter(connections.get(), async_callback); - - const auto & settings = context->getSettingsRef(); - if (isReplicaUnavailable() || needToSkipUnavailableShard()) - { - /// To avoid sending the query again in the read(), we need to update the following flags: - was_cancelled = true; - finished = true; - sent_query = true; - - /// We need to tell the coordinator not to wait for this replica. - if (extension && extension->parallel_reading_coordinator) - { - chassert(extension->replica_info); - extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica); - } - - return; - } - - established = true; - - auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings); - ClientInfo modified_client_info = context->getClientInfo(); - modified_client_info.query_kind = query_kind; - - if (!duplicated_part_uuids.empty()) - connections->sendIgnoredPartUUIDs(duplicated_part_uuids); - - connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true); - - established = false; - sent_query = true; - - if (settings.enable_scalar_subquery_optimization) - sendScalars(); - sendExternalTables(); -} - -int RemoteQueryExecutor::sendQueryAsync() -{ -#if defined(OS_LINUX) - std::lock_guard lock(was_cancelled_mutex); - if (was_cancelled) - return -1; - - if (!read_context) - read_context = std::make_unique<ReadContext>(*this, /*suspend_when_query_sent*/ true); - - /// If query already sent, do nothing. Note that we cannot use sent_query flag here, - /// because we can still be in process of sending scalars or external tables. - if (read_context->isQuerySent()) - return -1; - - read_context->resume(); - - if (read_context->isQuerySent()) - return -1; - - ProfileEvents::increment(ProfileEvents::SuspendSendingQueryToShard); /// Mostly for testing purposes. - return read_context->getFileDescriptor(); -#else - sendQuery(); - return -1; -#endif -} - -Block RemoteQueryExecutor::readBlock() -{ - while (true) - { - auto res = read(); - - if (res.getType() == ReadResult::Type::Data) - return res.getBlock(); - } -} - - -RemoteQueryExecutor::ReadResult RemoteQueryExecutor::read() -{ - if (!sent_query) - { - sendQuery(); - - if (context->getSettingsRef().skip_unavailable_shards && (0 == connections->size())) - return ReadResult(Block()); - } - - while (true) - { - std::lock_guard lock(was_cancelled_mutex); - if (was_cancelled) - return ReadResult(Block()); - - auto packet = connections->receivePacket(); - auto anything = processPacket(std::move(packet)); - - if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken) - return anything; - - if (got_duplicated_part_uuids) - break; - } - - return restartQueryWithoutDuplicatedUUIDs(); -} - -RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync() -{ -#if defined(OS_LINUX) - if (!read_context || (resent_query && recreate_read_context)) - { - std::lock_guard lock(was_cancelled_mutex); - if (was_cancelled) - return ReadResult(Block()); - - read_context = std::make_unique<ReadContext>(*this); - recreate_read_context = false; - } - - while (true) - { - std::lock_guard lock(was_cancelled_mutex); - if (was_cancelled) - return ReadResult(Block()); - - read_context->resume(); - - if (isReplicaUnavailable() || needToSkipUnavailableShard()) - { - /// We need to tell the coordinator not to wait for this replica. - /// But at this point it may lead to an incomplete result set, because - /// this replica committed to read some part of there data and then died. - if (extension && extension->parallel_reading_coordinator) - { - chassert(extension->parallel_reading_coordinator); - extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica); - } - - return ReadResult(Block()); - } - - /// Check if packet is not ready yet. - if (read_context->isInProgress()) - return ReadResult(read_context->getFileDescriptor()); - - auto anything = processPacket(read_context->getPacket()); - - if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken) - return anything; - - if (got_duplicated_part_uuids) - break; - } - - return restartQueryWithoutDuplicatedUUIDs(); -#else - return read(); -#endif -} - - -RemoteQueryExecutor::ReadResult RemoteQueryExecutor::restartQueryWithoutDuplicatedUUIDs() -{ - { - std::lock_guard lock(was_cancelled_mutex); - if (was_cancelled) - return ReadResult(Block()); - - /// Cancel previous query and disconnect before retry. - cancelUnlocked(); - connections->disconnect(); - - /// Only resend once, otherwise throw an exception - if (resent_query) - throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate uuids while processing query"); - - if (log) - LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts"); - - resent_query = true; - recreate_read_context = true; - sent_query = false; - got_duplicated_part_uuids = false; - was_cancelled = false; - } - - /// Consecutive read will implicitly send query first. - if (!read_context) - return read(); - else - return readAsync(); -} - -RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet) -{ - switch (packet.type) - { - case Protocol::Server::MergeTreeReadTaskRequest: - chassert(packet.request.has_value()); - processMergeTreeReadTaskRequest(packet.request.value()); - return ReadResult(ReadResult::Type::ParallelReplicasToken); - - case Protocol::Server::MergeTreeAllRangesAnnounecement: - chassert(packet.announcement.has_value()); - processMergeTreeInitialReadAnnounecement(packet.announcement.value()); - return ReadResult(ReadResult::Type::ParallelReplicasToken); - - case Protocol::Server::ReadTaskRequest: - processReadTaskRequest(); - break; - case Protocol::Server::PartUUIDs: - if (!setPartUUIDs(packet.part_uuids)) - got_duplicated_part_uuids = true; - break; - case Protocol::Server::Data: - /// Note: `packet.block.rows() > 0` means it's a header block. - /// We can actually return it, and the first call to RemoteQueryExecutor::read - /// will return earlier. We should consider doing it. - if (packet.block && (packet.block.rows() > 0)) - return ReadResult(adaptBlockStructure(packet.block, header)); - break; /// If the block is empty - we will receive other packets before EndOfStream. - - case Protocol::Server::Exception: - got_exception_from_replica = true; - packet.exception->rethrow(); - break; - - case Protocol::Server::EndOfStream: - if (!connections->hasActiveConnections()) - { - finished = true; - /// TODO: Replace with Type::Finished - return ReadResult(Block{}); - } - break; - - case Protocol::Server::Progress: - /** We use the progress from a remote server. - * We also include in ProcessList, - * and we use it to check - * constraints (for example, the minimum speed of query execution) - * and quotas (for example, the number of lines to read). - */ - if (progress_callback) - progress_callback(packet.progress); - break; - - case Protocol::Server::ProfileInfo: - /// Use own (client-side) info about read bytes, it is more correct info than server-side one. - if (profile_info_callback) - profile_info_callback(packet.profile_info); - break; - - case Protocol::Server::Totals: - totals = packet.block; - if (totals) - totals = adaptBlockStructure(totals, header); - break; - - case Protocol::Server::Extremes: - extremes = packet.block; - if (extremes) - extremes = adaptBlockStructure(packet.block, header); - break; - - case Protocol::Server::Log: - /// Pass logs from remote server to client - if (auto log_queue = CurrentThread::getInternalTextLogsQueue()) - log_queue->pushBlock(std::move(packet.block)); - break; - - case Protocol::Server::ProfileEvents: - /// Pass profile events from remote server to client - if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue()) - if (!profile_queue->emplace(std::move(packet.block))) - throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue"); - break; - - case Protocol::Server::TimezoneUpdate: - break; - - default: - got_unknown_packet_from_replica = true; - throw Exception( - ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, - "Unknown packet {} from one of the following replicas: {}", - packet.type, - connections->dumpAddresses()); - } - - return ReadResult(ReadResult::Type::Nothing); -} - -bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids) -{ - auto query_context = context->getQueryContext(); - auto duplicates = query_context->getPartUUIDs()->add(uuids); - - if (!duplicates.empty()) - { - duplicated_part_uuids.insert(duplicated_part_uuids.begin(), duplicates.begin(), duplicates.end()); - return false; - } - return true; -} - -void RemoteQueryExecutor::processReadTaskRequest() -{ - if (!extension || !extension->task_iterator) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized"); - - ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived); - auto response = (*extension->task_iterator)(); - connections->sendReadTaskResponse(response); -} - -void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest request) -{ - if (!extension || !extension->parallel_reading_coordinator) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized"); - - ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived); - auto response = extension->parallel_reading_coordinator->handleRequest(std::move(request)); - connections->sendMergeTreeReadTaskResponse(response); -} - -void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement) -{ - if (!extension || !extension->parallel_reading_coordinator) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized"); - - extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement); -} - -void RemoteQueryExecutor::finish() -{ - std::lock_guard guard(was_cancelled_mutex); - - /** If one of: - * - nothing started to do; - * - received all packets before EndOfStream; - * - received exception from one replica; - * - received an unknown packet from one replica; - * then you do not need to read anything. - */ - if (!isQueryPending() || hasThrownException()) - return; - - /** If you have not read all the data yet, but they are no longer needed. - * This may be due to the fact that the data is sufficient (for example, when using LIMIT). - */ - - /// Send the request to abort the execution of the request, if not already sent. - tryCancel("Cancelling query because enough data has been read"); - - /// If connections weren't created yet, query wasn't sent or was already finished, nothing to do. - if (!connections || !sent_query || finished) - return; - - /// Get the remaining packets so that there is no out of sync in the connections to the replicas. - Packet packet = connections->drain(); - switch (packet.type) - { - case Protocol::Server::EndOfStream: - finished = true; - break; - - case Protocol::Server::Log: - /// Pass logs from remote server to client - if (auto log_queue = CurrentThread::getInternalTextLogsQueue()) - log_queue->pushBlock(std::move(packet.block)); - break; - - case Protocol::Server::Exception: - got_exception_from_replica = true; - packet.exception->rethrow(); - break; - - case Protocol::Server::ProfileEvents: - /// Pass profile events from remote server to client - if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue()) - if (!profile_queue->emplace(std::move(packet.block))) - throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue"); - break; - - case Protocol::Server::TimezoneUpdate: - break; - - default: - got_unknown_packet_from_replica = true; - throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}", - toString(packet.type), - connections->dumpAddresses()); - } -} - -void RemoteQueryExecutor::cancel() -{ - std::lock_guard guard(was_cancelled_mutex); - cancelUnlocked(); -} - -void RemoteQueryExecutor::cancelUnlocked() -{ - { - std::lock_guard lock(external_tables_mutex); - - /// Stop sending external data. - for (auto & vec : external_tables_data) - for (auto & elem : vec) - elem->is_cancelled = true; - } - - if (!isQueryPending() || hasThrownException()) - return; - - tryCancel("Cancelling query"); -} - -void RemoteQueryExecutor::sendScalars() -{ - connections->sendScalarsData(scalars); -} - -void RemoteQueryExecutor::sendExternalTables() -{ - size_t count = connections->size(); - - { - std::lock_guard lock(external_tables_mutex); - - external_tables_data.clear(); - external_tables_data.reserve(count); - - StreamLocalLimits limits; - const auto & settings = context->getSettingsRef(); - limits.mode = LimitsMode::LIMITS_TOTAL; - limits.speed_limits.max_execution_time = settings.max_execution_time; - limits.timeout_overflow_mode = settings.timeout_overflow_mode; - - for (size_t i = 0; i < count; ++i) - { - ExternalTablesData res; - for (const auto & table : external_tables) - { - StoragePtr cur = table.second; - /// Send only temporary tables with StorageMemory - if (!std::dynamic_pointer_cast<StorageMemory>(cur)) - continue; - - auto data = std::make_unique<ExternalTableData>(); - data->table_name = table.first; - data->creating_pipe_callback = [cur, limits, my_context = this->context]() - { - SelectQueryInfo query_info; - auto metadata_snapshot = cur->getInMemoryMetadataPtr(); - auto storage_snapshot = cur->getStorageSnapshot(metadata_snapshot, my_context); - QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage( - my_context, QueryProcessingStage::Complete, storage_snapshot, query_info); - - QueryPlan plan; - cur->read( - plan, - metadata_snapshot->getColumns().getNamesOfPhysical(), - storage_snapshot, query_info, my_context, - read_from_table_stage, DEFAULT_BLOCK_SIZE, 1); - - auto builder = plan.buildQueryPipeline( - QueryPlanOptimizationSettings::fromContext(my_context), - BuildQueryPipelineSettings::fromContext(my_context)); - - builder->resize(1); - builder->addTransform(std::make_shared<LimitsCheckingTransform>(builder->getHeader(), limits)); - - return builder; - }; - - data->pipe = data->creating_pipe_callback(); - res.emplace_back(std::move(data)); - } - external_tables_data.push_back(std::move(res)); - } - } - - connections->sendExternalTablesData(external_tables_data); -} - -void RemoteQueryExecutor::tryCancel(const char * reason) -{ - if (was_cancelled) - return; - - was_cancelled = true; - - if (read_context) - read_context->cancel(); - - /// Query could be cancelled during connection creation, query sending or data receiving. - /// We should send cancel request if connections were already created, query were sent - /// and remote query is not finished. - if (connections && sent_query && !finished) - { - connections->sendCancel(); - if (log) - LOG_TRACE(log, "({}) {}", connections->dumpAddresses(), reason); - } -} - -bool RemoteQueryExecutor::isQueryPending() const -{ - return (sent_query || read_context) && !finished; -} - -bool RemoteQueryExecutor::hasThrownException() const -{ - return got_exception_from_replica || got_unknown_packet_from_replica; -} - -} |
