diff options
| author | vitalyisaev <[email protected]> | 2023-11-14 09:58:56 +0300 |
|---|---|---|
| committer | vitalyisaev <[email protected]> | 2023-11-14 10:20:20 +0300 |
| commit | c2b2dfd9827a400a8495e172a56343462e3ceb82 (patch) | |
| tree | cd4e4f597d01bede4c82dffeb2d780d0a9046bd0 /contrib/clickhouse/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp | |
| parent | d4ae8f119e67808cb0cf776ba6e0cf95296f2df7 (diff) | |
YQ Connector: move tests from yql to ydb (OSS)
Перенос папки с тестами на Коннектор из папки yql в папку ydb (синхронизируется с github).
Diffstat (limited to 'contrib/clickhouse/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp')
| -rw-r--r-- | contrib/clickhouse/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/contrib/clickhouse/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp b/contrib/clickhouse/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp new file mode 100644 index 00000000000..2ff50ca4fe3 --- /dev/null +++ b/contrib/clickhouse/src/Processors/QueryPlan/DistributedCreateLocalPlan.cpp @@ -0,0 +1,105 @@ +#include <Processors/QueryPlan/DistributedCreateLocalPlan.h> + +#include "config_version.h" +#include <Common/checkStackSize.h> +#include <Core/ProtocolDefines.h> +#include <Interpreters/ActionsDAG.h> +#include <Interpreters/InterpreterSelectQuery.h> +#include <Interpreters/InterpreterSelectQueryAnalyzer.h> +#include <Processors/QueryPlan/ExpressionStep.h> + +namespace DB +{ + +namespace +{ + +void addConvertingActions(QueryPlan & plan, const Block & header) +{ + if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header)) + return; + + auto get_converting_dag = [](const Block & block_, const Block & header_) + { + /// Convert header structure to expected. + /// Also we ignore constants from result and replace it with constants from header. + /// It is needed for functions like `now64()` or `randConstant()` because their values may be different. + return ActionsDAG::makeConvertingActions( + block_.getColumnsWithTypeAndName(), + header_.getColumnsWithTypeAndName(), + ActionsDAG::MatchColumnsMode::Name, + true); + }; + + auto convert_actions_dag = get_converting_dag(plan.getCurrentDataStream().header, header); + auto converting = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), convert_actions_dag); + plan.addStep(std::move(converting)); +} + +} + +std::unique_ptr<QueryPlan> createLocalPlan( + const ASTPtr & query_ast, + const Block & header, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t shard_num, + size_t shard_count, + size_t replica_num, + size_t replica_count, + std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator, + UUID group_uuid) +{ + checkStackSize(); + + auto query_plan = std::make_unique<QueryPlan>(); + auto new_context = Context::createCopy(context); + + /// Do not push down limit to local plan, as it will break `rows_before_limit_at_least` counter. + if (processed_stage == QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit) + processed_stage = QueryProcessingStage::WithMergeableStateAfterAggregation; + + /// Do not apply AST optimizations, because query + /// is already optimized and some optimizations + /// can be applied only for non-distributed tables + /// and we can produce query, inconsistent with remote plans. + auto select_query_options = SelectQueryOptions(processed_stage) + .setShardInfo(static_cast<UInt32>(shard_num), static_cast<UInt32>(shard_count)) + .ignoreASTOptimizations(); + + /// There are much things that are needed for coordination + /// during reading with parallel replicas + if (coordinator) + { + new_context->parallel_reading_coordinator = coordinator; + new_context->setClientInterface(ClientInfo::Interface::LOCAL); + new_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY); + new_context->setReplicaInfo(true, replica_count, replica_num); + new_context->setConnectionClientVersion(VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH, DBMS_TCP_PROTOCOL_VERSION); + new_context->setParallelReplicasGroupUUID(group_uuid); + new_context->setMergeTreeAllRangesCallback([coordinator](InitialAllRangesAnnouncement announcement) + { + coordinator->handleInitialAllRangesAnnouncement(announcement); + }); + new_context->setMergeTreeReadTaskCallback([coordinator](ParallelReadRequest request) -> std::optional<ParallelReadResponse> + { + return coordinator->handleRequest(request); + }); + } + + if (context->getSettingsRef().allow_experimental_analyzer) + { + auto interpreter = InterpreterSelectQueryAnalyzer(query_ast, new_context, select_query_options); + query_plan = std::make_unique<QueryPlan>(std::move(interpreter).extractQueryPlan()); + } + else + { + auto interpreter = InterpreterSelectQuery(query_ast, new_context, select_query_options); + interpreter.buildQueryPlan(*query_plan); + } + + addConvertingActions(*query_plan, header); + return query_plan; +} + +} |
