summaryrefslogtreecommitdiffstats
path: root/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp
diff options
context:
space:
mode:
authorAlexSm <[email protected]>2024-01-04 15:09:05 +0100
committerGitHub <[email protected]>2024-01-04 15:09:05 +0100
commitdab291146f6cd7d35684e3a1150e5bb1c412982c (patch)
tree36ef35f6cacb6432845a4a33f940c95871036b32 /contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp
parent63660ad5e7512029fd0218e7a636580695a24e1f (diff)
Library import 5, delete go dependencies (#832)
* Library import 5, delete go dependencies * Fix yt client
Diffstat (limited to 'contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp')
-rw-r--r--contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp763
1 files changed, 0 insertions, 763 deletions
diff --git a/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp b/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp
deleted file mode 100644
index eebe9797051..00000000000
--- a/contrib/clickhouse/src/QueryPipeline/RemoteQueryExecutor.cpp
+++ /dev/null
@@ -1,763 +0,0 @@
-#include <Common/ConcurrentBoundedQueue.h>
-#include <QueryPipeline/RemoteQueryExecutor.h>
-#include <QueryPipeline/RemoteQueryExecutorReadContext.h>
-
-#include <Columns/ColumnConst.h>
-#include <Common/CurrentThread.h>
-#include "Core/Protocol.h"
-#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
-#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
-#include <Processors/Sources/SourceFromSingleChunk.h>
-#include <Processors/Transforms/LimitsCheckingTransform.h>
-#include <Processors/QueryPlan/QueryPlan.h>
-#include <QueryPipeline/QueryPipelineBuilder.h>
-#include <Storages/SelectQueryInfo.h>
-#include <Interpreters/castColumn.h>
-#include <Interpreters/Cluster.h>
-#include <Interpreters/Context.h>
-#include <Interpreters/InternalTextLogsQueue.h>
-#include <IO/ConnectionTimeouts.h>
-#include <Client/MultiplexedConnections.h>
-#include <Client/HedgedConnections.h>
-#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
-#include <Storages/StorageMemory.h>
-
-
-namespace ProfileEvents
-{
- extern const Event SuspendSendingQueryToShard;
- extern const Event ReadTaskRequestsReceived;
- extern const Event MergeTreeReadTaskRequestsReceived;
-}
-
-namespace DB
-{
-
-namespace ErrorCodes
-{
- extern const int LOGICAL_ERROR;
- extern const int UNKNOWN_PACKET_FROM_SERVER;
- extern const int DUPLICATED_PART_UUIDS;
- extern const int SYSTEM_ERROR;
-}
-
-RemoteQueryExecutor::RemoteQueryExecutor(
- const String & query_, const Block & header_, ContextPtr context_,
- const Scalars & scalars_, const Tables & external_tables_,
- QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
- : header(header_), query(query_), context(context_), scalars(scalars_)
- , external_tables(external_tables_), stage(stage_)
- , extension(extension_)
-{}
-
-RemoteQueryExecutor::RemoteQueryExecutor(
- Connection & connection,
- const String & query_, const Block & header_, ContextPtr context_,
- ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
- QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
- : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
-{
- create_connections = [this, &connection, throttler, extension_](AsyncCallback)
- {
- auto res = std::make_unique<MultiplexedConnections>(connection, context->getSettingsRef(), throttler);
- if (extension_ && extension_->replica_info)
- res->setReplicaInfo(*extension_->replica_info);
- return res;
- };
-}
-
-RemoteQueryExecutor::RemoteQueryExecutor(
- std::shared_ptr<Connection> connection_ptr,
- const String & query_, const Block & header_, ContextPtr context_,
- ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
- QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
- : RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
-{
- create_connections = [this, connection_ptr, throttler, extension_](AsyncCallback)
- {
- auto res = std::make_unique<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler);
- if (extension_ && extension_->replica_info)
- res->setReplicaInfo(*extension_->replica_info);
- return res;
- };
-}
-
-RemoteQueryExecutor::RemoteQueryExecutor(
- std::vector<IConnectionPool::Entry> && connections_,
- const String & query_, const Block & header_, ContextPtr context_,
- const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
- QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
- : header(header_), query(query_), context(context_)
- , scalars(scalars_), external_tables(external_tables_), stage(stage_)
- , extension(extension_)
-{
- create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable {
- auto res = std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
- if (extension_ && extension_->replica_info)
- res->setReplicaInfo(*extension_->replica_info);
- return res;
- };
-}
-
-RemoteQueryExecutor::RemoteQueryExecutor(
- const ConnectionPoolWithFailoverPtr & pool,
- const String & query_, const Block & header_, ContextPtr context_,
- const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
- QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
- : header(header_), query(query_), context(context_)
- , scalars(scalars_), external_tables(external_tables_), stage(stage_)
- , extension(extension_)
-{
- create_connections = [this, pool, throttler](AsyncCallback async_callback)->std::unique_ptr<IConnections>
- {
- const Settings & current_settings = context->getSettingsRef();
- auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
-
-#if defined(OS_LINUX)
- if (current_settings.use_hedged_requests)
- {
- std::shared_ptr<QualifiedTableName> table_to_check = nullptr;
- if (main_table)
- table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName());
-
- auto res = std::make_unique<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check, std::move(async_callback));
- if (extension && extension->replica_info)
- res->setReplicaInfo(*extension->replica_info);
- return res;
- }
-#endif
-
- std::vector<IConnectionPool::Entry> connection_entries;
- std::optional<bool> skip_unavailable_endpoints;
- if (extension && extension->parallel_reading_coordinator)
- skip_unavailable_endpoints = true;
-
- if (main_table)
- {
- auto try_results = pool->getManyChecked(timeouts, &current_settings, pool_mode, main_table.getQualifiedName(), std::move(async_callback), skip_unavailable_endpoints);
- connection_entries.reserve(try_results.size());
- for (auto & try_result : try_results)
- connection_entries.emplace_back(std::move(try_result.entry));
- }
- else
- {
- connection_entries = pool->getMany(timeouts, &current_settings, pool_mode, std::move(async_callback), skip_unavailable_endpoints);
- }
-
- auto res = std::make_unique<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
- if (extension && extension->replica_info)
- res->setReplicaInfo(*extension->replica_info);
- return res;
- };
-}
-
-RemoteQueryExecutor::~RemoteQueryExecutor()
-{
- /// We should finish establishing connections to disconnect it later,
- /// so these connections won't be in the out-of-sync state.
- if (read_context && !established)
- {
- /// Set was_cancelled, so the query won't be sent after creating connections.
- was_cancelled = true;
- read_context->cancel();
- }
-
- /** If interrupted in the middle of the loop of communication with replicas, then interrupt
- * all connections, then read and skip the remaining packets to make sure
- * these connections did not remain hanging in the out-of-sync state.
- */
- if (established || (isQueryPending() && connections))
- connections->disconnect();
-}
-
-/** If we receive a block with slightly different column types, or with excessive columns,
- * we will adapt it to expected structure.
- */
-static Block adaptBlockStructure(const Block & block, const Block & header)
-{
- /// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest.
- if (!header)
- return block;
-
- Block res;
- res.info = block.info;
-
- for (const auto & elem : header)
- {
- ColumnPtr column;
-
- if (elem.column && isColumnConst(*elem.column))
- {
- /// We expect constant column in block.
- /// If block is not empty, then get value for constant from it,
- /// because it may be different for remote server for functions like version(), uptime(), ...
- if (block.rows() > 0 && block.has(elem.name))
- {
- /// Const column is passed as materialized. Get first value from it.
- ///
- /// TODO: check that column contains the same value.
- /// TODO: serialize const columns.
- auto col = block.getByName(elem.name);
- col.column = block.getByName(elem.name).column->cut(0, 1);
-
- column = castColumn(col, elem.type);
-
- if (!isColumnConst(*column))
- column = ColumnConst::create(column, block.rows());
- else
- /// It is not possible now. Just in case we support const columns serialization.
- column = column->cloneResized(block.rows());
- }
- else
- column = elem.column->cloneResized(block.rows());
- }
- else
- column = castColumn(block.getByName(elem.name), elem.type);
-
- res.insert({column, elem.type, elem.name});
- }
- return res;
-}
-
-void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind, AsyncCallback async_callback)
-{
- /// Query cannot be canceled in the middle of the send query,
- /// since there are multiple packets:
- /// - Query
- /// - Data (multiple times)
- ///
- /// And after the Cancel packet none Data packet can be sent, otherwise the remote side will throw:
- ///
- /// Unexpected packet Data received from client
- ///
- std::lock_guard guard(was_cancelled_mutex);
- sendQueryUnlocked(query_kind, async_callback);
-}
-
-void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, AsyncCallback async_callback)
-{
- if (sent_query || was_cancelled)
- return;
-
- connections = create_connections(async_callback);
- AsyncCallbackSetter async_callback_setter(connections.get(), async_callback);
-
- const auto & settings = context->getSettingsRef();
- if (isReplicaUnavailable() || needToSkipUnavailableShard())
- {
- /// To avoid sending the query again in the read(), we need to update the following flags:
- was_cancelled = true;
- finished = true;
- sent_query = true;
-
- /// We need to tell the coordinator not to wait for this replica.
- if (extension && extension->parallel_reading_coordinator)
- {
- chassert(extension->replica_info);
- extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica);
- }
-
- return;
- }
-
- established = true;
-
- auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
- ClientInfo modified_client_info = context->getClientInfo();
- modified_client_info.query_kind = query_kind;
-
- if (!duplicated_part_uuids.empty())
- connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
-
- connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
-
- established = false;
- sent_query = true;
-
- if (settings.enable_scalar_subquery_optimization)
- sendScalars();
- sendExternalTables();
-}
-
-int RemoteQueryExecutor::sendQueryAsync()
-{
-#if defined(OS_LINUX)
- std::lock_guard lock(was_cancelled_mutex);
- if (was_cancelled)
- return -1;
-
- if (!read_context)
- read_context = std::make_unique<ReadContext>(*this, /*suspend_when_query_sent*/ true);
-
- /// If query already sent, do nothing. Note that we cannot use sent_query flag here,
- /// because we can still be in process of sending scalars or external tables.
- if (read_context->isQuerySent())
- return -1;
-
- read_context->resume();
-
- if (read_context->isQuerySent())
- return -1;
-
- ProfileEvents::increment(ProfileEvents::SuspendSendingQueryToShard); /// Mostly for testing purposes.
- return read_context->getFileDescriptor();
-#else
- sendQuery();
- return -1;
-#endif
-}
-
-Block RemoteQueryExecutor::readBlock()
-{
- while (true)
- {
- auto res = read();
-
- if (res.getType() == ReadResult::Type::Data)
- return res.getBlock();
- }
-}
-
-
-RemoteQueryExecutor::ReadResult RemoteQueryExecutor::read()
-{
- if (!sent_query)
- {
- sendQuery();
-
- if (context->getSettingsRef().skip_unavailable_shards && (0 == connections->size()))
- return ReadResult(Block());
- }
-
- while (true)
- {
- std::lock_guard lock(was_cancelled_mutex);
- if (was_cancelled)
- return ReadResult(Block());
-
- auto packet = connections->receivePacket();
- auto anything = processPacket(std::move(packet));
-
- if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken)
- return anything;
-
- if (got_duplicated_part_uuids)
- break;
- }
-
- return restartQueryWithoutDuplicatedUUIDs();
-}
-
-RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
-{
-#if defined(OS_LINUX)
- if (!read_context || (resent_query && recreate_read_context))
- {
- std::lock_guard lock(was_cancelled_mutex);
- if (was_cancelled)
- return ReadResult(Block());
-
- read_context = std::make_unique<ReadContext>(*this);
- recreate_read_context = false;
- }
-
- while (true)
- {
- std::lock_guard lock(was_cancelled_mutex);
- if (was_cancelled)
- return ReadResult(Block());
-
- read_context->resume();
-
- if (isReplicaUnavailable() || needToSkipUnavailableShard())
- {
- /// We need to tell the coordinator not to wait for this replica.
- /// But at this point it may lead to an incomplete result set, because
- /// this replica committed to read some part of there data and then died.
- if (extension && extension->parallel_reading_coordinator)
- {
- chassert(extension->parallel_reading_coordinator);
- extension->parallel_reading_coordinator->markReplicaAsUnavailable(extension->replica_info->number_of_current_replica);
- }
-
- return ReadResult(Block());
- }
-
- /// Check if packet is not ready yet.
- if (read_context->isInProgress())
- return ReadResult(read_context->getFileDescriptor());
-
- auto anything = processPacket(read_context->getPacket());
-
- if (anything.getType() == ReadResult::Type::Data || anything.getType() == ReadResult::Type::ParallelReplicasToken)
- return anything;
-
- if (got_duplicated_part_uuids)
- break;
- }
-
- return restartQueryWithoutDuplicatedUUIDs();
-#else
- return read();
-#endif
-}
-
-
-RemoteQueryExecutor::ReadResult RemoteQueryExecutor::restartQueryWithoutDuplicatedUUIDs()
-{
- {
- std::lock_guard lock(was_cancelled_mutex);
- if (was_cancelled)
- return ReadResult(Block());
-
- /// Cancel previous query and disconnect before retry.
- cancelUnlocked();
- connections->disconnect();
-
- /// Only resend once, otherwise throw an exception
- if (resent_query)
- throw Exception(ErrorCodes::DUPLICATED_PART_UUIDS, "Found duplicate uuids while processing query");
-
- if (log)
- LOG_DEBUG(log, "Found duplicate UUIDs, will retry query without those parts");
-
- resent_query = true;
- recreate_read_context = true;
- sent_query = false;
- got_duplicated_part_uuids = false;
- was_cancelled = false;
- }
-
- /// Consecutive read will implicitly send query first.
- if (!read_context)
- return read();
- else
- return readAsync();
-}
-
-RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet)
-{
- switch (packet.type)
- {
- case Protocol::Server::MergeTreeReadTaskRequest:
- chassert(packet.request.has_value());
- processMergeTreeReadTaskRequest(packet.request.value());
- return ReadResult(ReadResult::Type::ParallelReplicasToken);
-
- case Protocol::Server::MergeTreeAllRangesAnnounecement:
- chassert(packet.announcement.has_value());
- processMergeTreeInitialReadAnnounecement(packet.announcement.value());
- return ReadResult(ReadResult::Type::ParallelReplicasToken);
-
- case Protocol::Server::ReadTaskRequest:
- processReadTaskRequest();
- break;
- case Protocol::Server::PartUUIDs:
- if (!setPartUUIDs(packet.part_uuids))
- got_duplicated_part_uuids = true;
- break;
- case Protocol::Server::Data:
- /// Note: `packet.block.rows() > 0` means it's a header block.
- /// We can actually return it, and the first call to RemoteQueryExecutor::read
- /// will return earlier. We should consider doing it.
- if (packet.block && (packet.block.rows() > 0))
- return ReadResult(adaptBlockStructure(packet.block, header));
- break; /// If the block is empty - we will receive other packets before EndOfStream.
-
- case Protocol::Server::Exception:
- got_exception_from_replica = true;
- packet.exception->rethrow();
- break;
-
- case Protocol::Server::EndOfStream:
- if (!connections->hasActiveConnections())
- {
- finished = true;
- /// TODO: Replace with Type::Finished
- return ReadResult(Block{});
- }
- break;
-
- case Protocol::Server::Progress:
- /** We use the progress from a remote server.
- * We also include in ProcessList,
- * and we use it to check
- * constraints (for example, the minimum speed of query execution)
- * and quotas (for example, the number of lines to read).
- */
- if (progress_callback)
- progress_callback(packet.progress);
- break;
-
- case Protocol::Server::ProfileInfo:
- /// Use own (client-side) info about read bytes, it is more correct info than server-side one.
- if (profile_info_callback)
- profile_info_callback(packet.profile_info);
- break;
-
- case Protocol::Server::Totals:
- totals = packet.block;
- if (totals)
- totals = adaptBlockStructure(totals, header);
- break;
-
- case Protocol::Server::Extremes:
- extremes = packet.block;
- if (extremes)
- extremes = adaptBlockStructure(packet.block, header);
- break;
-
- case Protocol::Server::Log:
- /// Pass logs from remote server to client
- if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
- log_queue->pushBlock(std::move(packet.block));
- break;
-
- case Protocol::Server::ProfileEvents:
- /// Pass profile events from remote server to client
- if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue())
- if (!profile_queue->emplace(std::move(packet.block)))
- throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue");
- break;
-
- case Protocol::Server::TimezoneUpdate:
- break;
-
- default:
- got_unknown_packet_from_replica = true;
- throw Exception(
- ErrorCodes::UNKNOWN_PACKET_FROM_SERVER,
- "Unknown packet {} from one of the following replicas: {}",
- packet.type,
- connections->dumpAddresses());
- }
-
- return ReadResult(ReadResult::Type::Nothing);
-}
-
-bool RemoteQueryExecutor::setPartUUIDs(const std::vector<UUID> & uuids)
-{
- auto query_context = context->getQueryContext();
- auto duplicates = query_context->getPartUUIDs()->add(uuids);
-
- if (!duplicates.empty())
- {
- duplicated_part_uuids.insert(duplicated_part_uuids.begin(), duplicates.begin(), duplicates.end());
- return false;
- }
- return true;
-}
-
-void RemoteQueryExecutor::processReadTaskRequest()
-{
- if (!extension || !extension->task_iterator)
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");
-
- ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
- auto response = (*extension->task_iterator)();
- connections->sendReadTaskResponse(response);
-}
-
-void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest request)
-{
- if (!extension || !extension->parallel_reading_coordinator)
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
-
- ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived);
- auto response = extension->parallel_reading_coordinator->handleRequest(std::move(request));
- connections->sendMergeTreeReadTaskResponse(response);
-}
-
-void RemoteQueryExecutor::processMergeTreeInitialReadAnnounecement(InitialAllRangesAnnouncement announcement)
-{
- if (!extension || !extension->parallel_reading_coordinator)
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
-
- extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(announcement);
-}
-
-void RemoteQueryExecutor::finish()
-{
- std::lock_guard guard(was_cancelled_mutex);
-
- /** If one of:
- * - nothing started to do;
- * - received all packets before EndOfStream;
- * - received exception from one replica;
- * - received an unknown packet from one replica;
- * then you do not need to read anything.
- */
- if (!isQueryPending() || hasThrownException())
- return;
-
- /** If you have not read all the data yet, but they are no longer needed.
- * This may be due to the fact that the data is sufficient (for example, when using LIMIT).
- */
-
- /// Send the request to abort the execution of the request, if not already sent.
- tryCancel("Cancelling query because enough data has been read");
-
- /// If connections weren't created yet, query wasn't sent or was already finished, nothing to do.
- if (!connections || !sent_query || finished)
- return;
-
- /// Get the remaining packets so that there is no out of sync in the connections to the replicas.
- Packet packet = connections->drain();
- switch (packet.type)
- {
- case Protocol::Server::EndOfStream:
- finished = true;
- break;
-
- case Protocol::Server::Log:
- /// Pass logs from remote server to client
- if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
- log_queue->pushBlock(std::move(packet.block));
- break;
-
- case Protocol::Server::Exception:
- got_exception_from_replica = true;
- packet.exception->rethrow();
- break;
-
- case Protocol::Server::ProfileEvents:
- /// Pass profile events from remote server to client
- if (auto profile_queue = CurrentThread::getInternalProfileEventsQueue())
- if (!profile_queue->emplace(std::move(packet.block)))
- throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push into profile queue");
- break;
-
- case Protocol::Server::TimezoneUpdate:
- break;
-
- default:
- got_unknown_packet_from_replica = true;
- throw Exception(ErrorCodes::UNKNOWN_PACKET_FROM_SERVER, "Unknown packet {} from one of the following replicas: {}",
- toString(packet.type),
- connections->dumpAddresses());
- }
-}
-
-void RemoteQueryExecutor::cancel()
-{
- std::lock_guard guard(was_cancelled_mutex);
- cancelUnlocked();
-}
-
-void RemoteQueryExecutor::cancelUnlocked()
-{
- {
- std::lock_guard lock(external_tables_mutex);
-
- /// Stop sending external data.
- for (auto & vec : external_tables_data)
- for (auto & elem : vec)
- elem->is_cancelled = true;
- }
-
- if (!isQueryPending() || hasThrownException())
- return;
-
- tryCancel("Cancelling query");
-}
-
-void RemoteQueryExecutor::sendScalars()
-{
- connections->sendScalarsData(scalars);
-}
-
-void RemoteQueryExecutor::sendExternalTables()
-{
- size_t count = connections->size();
-
- {
- std::lock_guard lock(external_tables_mutex);
-
- external_tables_data.clear();
- external_tables_data.reserve(count);
-
- StreamLocalLimits limits;
- const auto & settings = context->getSettingsRef();
- limits.mode = LimitsMode::LIMITS_TOTAL;
- limits.speed_limits.max_execution_time = settings.max_execution_time;
- limits.timeout_overflow_mode = settings.timeout_overflow_mode;
-
- for (size_t i = 0; i < count; ++i)
- {
- ExternalTablesData res;
- for (const auto & table : external_tables)
- {
- StoragePtr cur = table.second;
- /// Send only temporary tables with StorageMemory
- if (!std::dynamic_pointer_cast<StorageMemory>(cur))
- continue;
-
- auto data = std::make_unique<ExternalTableData>();
- data->table_name = table.first;
- data->creating_pipe_callback = [cur, limits, my_context = this->context]()
- {
- SelectQueryInfo query_info;
- auto metadata_snapshot = cur->getInMemoryMetadataPtr();
- auto storage_snapshot = cur->getStorageSnapshot(metadata_snapshot, my_context);
- QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage(
- my_context, QueryProcessingStage::Complete, storage_snapshot, query_info);
-
- QueryPlan plan;
- cur->read(
- plan,
- metadata_snapshot->getColumns().getNamesOfPhysical(),
- storage_snapshot, query_info, my_context,
- read_from_table_stage, DEFAULT_BLOCK_SIZE, 1);
-
- auto builder = plan.buildQueryPipeline(
- QueryPlanOptimizationSettings::fromContext(my_context),
- BuildQueryPipelineSettings::fromContext(my_context));
-
- builder->resize(1);
- builder->addTransform(std::make_shared<LimitsCheckingTransform>(builder->getHeader(), limits));
-
- return builder;
- };
-
- data->pipe = data->creating_pipe_callback();
- res.emplace_back(std::move(data));
- }
- external_tables_data.push_back(std::move(res));
- }
- }
-
- connections->sendExternalTablesData(external_tables_data);
-}
-
-void RemoteQueryExecutor::tryCancel(const char * reason)
-{
- if (was_cancelled)
- return;
-
- was_cancelled = true;
-
- if (read_context)
- read_context->cancel();
-
- /// Query could be cancelled during connection creation, query sending or data receiving.
- /// We should send cancel request if connections were already created, query were sent
- /// and remote query is not finished.
- if (connections && sent_query && !finished)
- {
- connections->sendCancel();
- if (log)
- LOG_TRACE(log, "({}) {}", connections->dumpAddresses(), reason);
- }
-}
-
-bool RemoteQueryExecutor::isQueryPending() const
-{
- return (sent_query || read_context) && !finished;
-}
-
-bool RemoteQueryExecutor::hasThrownException() const
-{
- return got_exception_from_replica || got_unknown_packet_from_replica;
-}
-
-}