diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-02-25 12:01:25 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-02-25 12:01:25 +0300 |
commit | 8b9cb025d0b297c75fd8e5cd249ca067e33f4d8c (patch) | |
tree | 1e561893284f46da332f4e31738f24cd19549286 | |
parent | e632b99859c092407fd47c40c7b552a6b47afe7f (diff) | |
download | ydb-8b9cb025d0b297c75fd8e5cd249ca067e33f4d8c.tar.gz |
construct predictors with prepared queue object
refactoring
45 files changed, 239 insertions, 134 deletions
diff --git a/ydb/core/kqp/CMakeLists.darwin.txt b/ydb/core/kqp/CMakeLists.darwin.txt index 106d4102b4f..10eb5407b30 100644 --- a/ydb/core/kqp/CMakeLists.darwin.txt +++ b/ydb/core/kqp/CMakeLists.darwin.txt @@ -19,6 +19,7 @@ add_subdirectory(opt) add_subdirectory(provider) add_subdirectory(proxy_service) add_subdirectory(query_compiler) +add_subdirectory(query_data) add_subdirectory(rm_service) add_subdirectory(run_script_actor) add_subdirectory(runtime) diff --git a/ydb/core/kqp/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/CMakeLists.linux-aarch64.txt index 071a51d8abf..2a2bdf0d97c 100644 --- a/ydb/core/kqp/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/CMakeLists.linux-aarch64.txt @@ -19,6 +19,7 @@ add_subdirectory(opt) add_subdirectory(provider) add_subdirectory(proxy_service) add_subdirectory(query_compiler) +add_subdirectory(query_data) add_subdirectory(rm_service) add_subdirectory(run_script_actor) add_subdirectory(runtime) diff --git a/ydb/core/kqp/CMakeLists.linux.txt b/ydb/core/kqp/CMakeLists.linux.txt index 071a51d8abf..2a2bdf0d97c 100644 --- a/ydb/core/kqp/CMakeLists.linux.txt +++ b/ydb/core/kqp/CMakeLists.linux.txt @@ -19,6 +19,7 @@ add_subdirectory(opt) add_subdirectory(provider) add_subdirectory(proxy_service) add_subdirectory(query_compiler) +add_subdirectory(query_data) add_subdirectory(rm_service) add_subdirectory(run_script_actor) add_subdirectory(runtime) diff --git a/ydb/core/kqp/common/CMakeLists.darwin.txt b/ydb/core/kqp/common/CMakeLists.darwin.txt index b7c74e48b60..e8d477e1826 100644 --- a/ydb/core/kqp/common/CMakeLists.darwin.txt +++ b/ydb/core/kqp/common/CMakeLists.darwin.txt @@ -19,6 +19,7 @@ target_link_libraries(core-kqp-common PUBLIC core-kqp-expr_nodes core-kqp-provider tx-long_tx_service-public + yql-dq-expr_nodes ydb-library-aclib yql-core-issue yql-dq-actors @@ -36,7 +37,6 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_prepared_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp ) generate_enum_serilization(core-kqp-common diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt index aa4bfc498cc..f1f616100c8 100644 --- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt @@ -20,6 +20,7 @@ target_link_libraries(core-kqp-common PUBLIC core-kqp-expr_nodes core-kqp-provider tx-long_tx_service-public + yql-dq-expr_nodes ydb-library-aclib yql-core-issue yql-dq-actors @@ -37,7 +38,6 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_prepared_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp ) generate_enum_serilization(core-kqp-common diff --git a/ydb/core/kqp/common/CMakeLists.linux.txt b/ydb/core/kqp/common/CMakeLists.linux.txt index aa4bfc498cc..f1f616100c8 100644 --- a/ydb/core/kqp/common/CMakeLists.linux.txt +++ b/ydb/core/kqp/common/CMakeLists.linux.txt @@ -20,6 +20,7 @@ target_link_libraries(core-kqp-common PUBLIC core-kqp-expr_nodes core-kqp-provider tx-long_tx_service-public + yql-dq-expr_nodes ydb-library-aclib yql-core-issue yql-dq-actors @@ -37,7 +38,6 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_prepared_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp ) generate_enum_serilization(core-kqp-common diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index cdd7119ae5f..a0d303ebbf3 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -1,10 +1,10 @@ #pragma once #include "kqp_event_ids.h" -#include "kqp_prepared_query.h" #include <library/cpp/lwtrace/shuttle.h> #include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/kqp/query_data/kqp_prepared_query.h> #include <ydb/public/api/protos/ydb_status_codes.pb.h> #include <ydb/public/api/protos/draft/ydb_query.pb.h> diff --git a/ydb/core/kqp/common/kqp_resolve.cpp b/ydb/core/kqp/common/kqp_resolve.cpp index 482a0e419c4..473b3574e10 100644 --- a/ydb/core/kqp/common/kqp_resolve.cpp +++ b/ydb/core/kqp/common/kqp_resolve.cpp @@ -1,13 +1,12 @@ #include "kqp_resolve.h" -#include <ydb/core/kqp/provider/yql_kikimr_gateway.h> - // #define DBG_TRACE #ifdef DBG_TRACE #include <ydb/core/base/appdata.h> #include <ydb/core/tx/datashard/range_ops.h> #endif +#include <ydb/core/kqp/provider/yql_kikimr_gateway.h> namespace NKikimr { namespace NKqp { diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp index a7b5196d997..2334424b4ab 100644 --- a/ydb/core/kqp/common/kqp_yql.cpp +++ b/ydb/core/kqp/common/kqp_yql.cpp @@ -1,11 +1,12 @@ #include "kqp_yql.h" #include <ydb/library/yql/core/yql_expr_type_annotation.h> +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> namespace NYql { using namespace NKikimr; -using namespace NKikimr::NKqp; +//using namespace NKikimr::NKqp; using namespace NNodes; static EPhysicalQueryType GetPhysicalQueryType(const TStringBuf& value) { diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index 0f5236461d5..0c29de4df31 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -1,7 +1,8 @@ #pragma once #include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h> -#include <ydb/core/kqp/gateway/kqp_gateway.h> +#include <ydb/library/yql/ast/yql_pos_handle.h> +#include <ydb/library/yql/ast/yql_expr.h> namespace NYql { diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 8907790a18c..f92e5aa72c0 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -852,10 +852,10 @@ private: auto& task = TasksGraph.GetTask(taskId); auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); - if (HasReads(stageInfo)) { + if (stageInfo.Meta.HasReads()) { affectedFlags |= NFlatTxCoordinator::TTransactionProposal::TAffectedEntry::AffectedRead; } - if (HasWrites(stageInfo)) { + if (stageInfo.Meta.HasWrites()) { affectedFlags |= NFlatTxCoordinator::TTransactionProposal::TAffectedEntry::AffectedWrite; } } @@ -1307,7 +1307,7 @@ private: return task; }; - auto& stage = GetStage(stageInfo); + auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId); const auto& keyTypes = table.KeyColumnTypes;; @@ -1439,7 +1439,7 @@ private: } void BuildComputeTasks(TStageInfo& stageInfo) { - auto& stage = GetStage(stageInfo); + auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); ui32 partitionsCount = 1; for (ui32 inputIndex = 0; inputIndex < stage.InputsSize(); ++inputIndex) { @@ -1696,25 +1696,9 @@ private: for (auto& task : TasksGraph.GetTasks()) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); - auto& stage = GetStage(stageInfo); - - NDqProto::TDqTask taskDesc; - taskDesc.SetId(task.Id); - taskDesc.SetStageId(stageInfo.Id.StageId); + NDqProto::TDqTask taskDesc = PrepareKqpTaskParameters(stageInfo, task, TypeEnv()); ActorIdToProto(SelfId(), taskDesc.MutableExecuter()->MutableActorId()); - for (auto& input : task.Inputs) { - FillInputDesc(*taskDesc.AddInputs(), input); - } - - for (auto& output : task.Outputs) { - FillOutputDesc(*taskDesc.AddOutputs(), output); - } - - taskDesc.MutableProgram()->CopyFrom(stage.GetProgram()); - - PrepareKqpTaskParameters(stage, stageInfo, task, taskDesc, TypeEnv(), HolderFactory()); - if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) { NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta protoTaskMeta; diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 275a0858d4d..62c087af63d 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -2,7 +2,7 @@ #include <library/cpp/lwtrace/shuttle.h> #include <ydb/core/kqp/common/kqp_event_ids.h> -#include <ydb/core/kqp/gateway/kqp_query_data.h> +#include <ydb/core/kqp/query_data/kqp_query_data.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/tx/long_tx_service/public/lock_handle.h> @@ -84,7 +84,7 @@ struct TEvKqpExecuter { }; IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 450b5eda1a3..8fd996bb449 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -61,23 +61,9 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnb serializer.Deserialize(buffer, txResult.MkqlItemType, txResult.Rows); } -void PrepareKqpTaskParameters(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, const TTask& task, - NDqProto::TDqTask& dqTask, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory&) -{ - auto g = typeEnv.BindAllocator(); - for (auto& paramName : stage.GetProgramParameters()) { - auto& dqParams = *dqTask.MutableParameters(); - if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) { - dqParams[paramName] = *taskParam; - } else { - dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName); - } - } -} - -std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const NKqpProto::TKqpPhyStage& stage, - const TStageInfo& stageInfo, const TTask& task) +std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task) { + const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); std::vector<std::shared_ptr<arrow::Field>> columns; std::vector<std::shared_ptr<arrow::Array>> data; auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index d417e0252e1..d163396401c 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -61,11 +61,7 @@ enum class EExecType { const ui64 MaxTaskSize = 48_MB; -void PrepareKqpTaskParameters(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, const TTask& task, - NYql::NDqProto::TDqTask& dqTask, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory); - -std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const NKqpProto::TKqpPhyStage& stage, - const TStageInfo& stageInfo, const TTask& task); +std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task); inline bool IsDebugLogEnabled() { return TlsActivationContext->LoggerSettings() && @@ -670,7 +666,7 @@ protected: void BuildSysViewScanTasks(TStageInfo& stageInfo) { Y_VERIFY_DEBUG(stageInfo.Meta.IsSysView()); - auto& stage = GetStage(stageInfo); + auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId); const auto& keyTypes = table.KeyColumnTypes; @@ -716,7 +712,7 @@ protected: THashMap<ui64, std::vector<ui64>> nodeTasks; THashMap<ui64, ui64> assignedShardsCount; - auto& stage = GetStage(stageInfo); + auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); YQL_ENSURE(stage.GetSources(0).HasReadRangesSource()); YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections"); @@ -918,6 +914,33 @@ protected: } } + NYql::NDqProto::TDqTask PrepareKqpTaskParameters(const TStageInfo& stageInfo, const TTask& task, const NMiniKQL::TTypeEnvironment& typeEnv) { + NYql::NDqProto::TDqTask result; + result.SetId(task.Id); + result.SetStageId(stageInfo.Id.StageId); + + for (auto& input : task.Inputs) { + FillInputDesc(*result.AddInputs(), input); + } + + for (auto& output : task.Outputs) { + FillOutputDesc(*result.AddOutputs(), output); + } + + const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); + result.MutableProgram()->CopyFrom(stage.GetProgram()); + auto g = typeEnv.BindAllocator(); + for (auto& paramName : stage.GetProgramParameters()) { + auto& dqParams = *result.MutableParameters(); + if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) { + dqParams[paramName] = *taskParam; + } else { + dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName); + } + } + return result; + } + void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta) { meta->SetTablePath(stageInfo.Meta.TablePath); meta->MutableTableId()->SetTableId(stageInfo.Meta.TableId.PathId.LocalPathId); @@ -1197,7 +1220,7 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig); diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index 00730303de9..b5d916c2bb5 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -175,7 +175,7 @@ public: void RunTask(TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); - auto& stage = GetStage(stageInfo); + auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); NDqProto::TDqTask protoTask; protoTask.SetId(task.Id); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index db98b839d12..85542fbcdac 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -12,7 +12,7 @@ #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/compute_actor/kqp_compute_actor.h> #include <ydb/core/kqp/common/kqp.h> -#include <ydb/core/kqp/query_compiler/kqp_predictor.h> +#include <ydb/core/kqp/query_data/kqp_predictor.h> #include <ydb/core/kqp/node_service/kqp_node_service.h> #include <ydb/core/kqp/runtime/kqp_transport.h> #include <ydb/core/kqp/opt/kqp_query_plan.h> @@ -212,21 +212,15 @@ private: ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/) const { ui32 result = 0; - const auto& stage = GetStage(stageInfo); if (isOlapScan) { if (AggregationSettings.HasCSScanMinimalThreads()) { result = AggregationSettings.GetCSScanMinimalThreads(); } else { - TStagePredictor predictor; - const ui32 threadsCount = TStagePredictor::GetUsableThreads(); - if (!predictor.DeserializeFromKqpSettings(stage.GetProgram().GetSettings())) { - ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "cannot parse program settings for data prediction"; - return threadsCount; - } else { - return std::max<ui32>(1, predictor.CalcTasksOptimalCount(threadsCount, {})); - } + const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId); + return std::max<ui32>(1, predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {})); } } else { + const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); result = AggregationSettings.GetDSScanMinimalThreads(); if (stage.GetProgram().GetSettings().GetHasSort()) { result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads()); @@ -242,14 +236,8 @@ private: if (AggregationSettings.HasAggregationComputeThreads()) { return std::max<ui32>(1, AggregationSettings.GetAggregationComputeThreads()); } else { - const auto& stage = GetStage(stageInfo); - TStagePredictor predictor; - if (!predictor.DeserializeFromKqpSettings(stage.GetProgram().GetSettings())) { - ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "cannot parse program settings for data prediction"; - return std::max<ui32>(1, previousTasksCount * 0.75); - } else { - return predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), previousTasksCount / nodesCount) * nodesCount; - } + const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId); + return predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), previousTasksCount / nodesCount) * nodesCount; } } @@ -285,7 +273,7 @@ private: void BuildScanTasks(TStageInfo& stageInfo) { THashMap<ui64, std::vector<ui64>> nodeTasks; THashMap<ui64, ui64> assignedShardsCount; - auto& stage = GetStage(stageInfo); + auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId); const auto& keyTypes = table.KeyColumnTypes; @@ -379,7 +367,7 @@ private: } void BuildComputeTasks(TStageInfo& stageInfo) { - auto& stage = GetStage(stageInfo); + auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); ui32 partitionsCount = 1; ui32 inputTasks = 0; @@ -528,25 +516,10 @@ private: for (auto& task : TasksGraph.GetTasks()) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); - auto& stage = GetStage(stageInfo); - NYql::NDqProto::TDqTask taskDesc; - taskDesc.SetId(task.Id); + NYql::NDqProto::TDqTask taskDesc = PrepareKqpTaskParameters(stageInfo, task, TypeEnv()); ActorIdToProto(SelfId(), taskDesc.MutableExecuter()->MutableActorId()); - for (auto& input : task.Inputs) { - FillInputDesc(*taskDesc.AddInputs(), input); - } - - for (auto& output : task.Outputs) { - FillOutputDesc(*taskDesc.AddOutputs(), output); - } - - taskDesc.MutableProgram()->CopyFrom(stage.GetProgram()); - taskDesc.SetStageId(task.StageId.StageId); - - PrepareKqpTaskParameters(stage, stageInfo, task, taskDesc, TypeEnv(), HolderFactory()); - if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) { NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta protoTaskMeta; @@ -617,7 +590,7 @@ private: auto* olapProgram = protoTaskMeta.MutableOlapProgram(); olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program); - auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stage, stageInfo, task); + auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task); olapProgram->SetParametersSchema(schema); olapProgram->SetParameters(parameters); } else { diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 2dd09e99135..e26e121428d 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -19,19 +19,7 @@ using namespace NYql::NNodes; // #define DBG_TRACE void LogStage(const NActors::TActorContext& ctx, const TStageInfo& stageInfo) { - // TODO: Print stage details, including input types and program. - LOG_DEBUG_S(ctx, NKikimrServices::KQP_EXECUTER, "StageInfo: StageId #" << stageInfo.Id - << ", InputsCount: " << stageInfo.InputsCount - << ", OutputsCount: " << stageInfo.OutputsCount); -} - -bool HasReads(const TStageInfo& stageInfo) { - return stageInfo.Meta.ShardOperations.contains(TKeyDesc::ERowOperation::Read); -} - -bool HasWrites(const TStageInfo& stageInfo) { - return stageInfo.Meta.ShardOperations.contains(TKeyDesc::ERowOperation::Update) || - stageInfo.Meta.ShardOperations.contains(TKeyDesc::ERowOperation::Erase); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_EXECUTER, stageInfo.DebugString()); } void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs) { @@ -93,7 +81,7 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew } YQL_ENSURE(tables.empty() || tables.size() == 1); - YQL_ENSURE(!HasReads(stageInfo) || !HasWrites(stageInfo)); + YQL_ENSURE(!stageInfo.Meta.HasReads() || !stageInfo.Meta.HasWrites()); } } } @@ -272,7 +260,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, ui64 txId, bool enableSpilling) { - auto& stage = GetStage(stageInfo); + auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); if (stage.GetIsEffectsStage()) { YQL_ENSURE(stageInfo.OutputsCount == 1); @@ -357,13 +345,6 @@ bool IsCrossShardChannel(TKqpTasksGraph& tasksGraph, const TChannel& channel) { return targetShard != tasksGraph.GetTask(channel.SrcTask).Meta.ShardId; } -const NKqpProto::TKqpPhyStage& GetStage(const TStageInfo& stageInfo) { - auto& txBody = stageInfo.Meta.Tx.Body; - YQL_ENSURE(stageInfo.Id.StageId < txBody->StagesSize()); - - return txBody->GetStages(stageInfo.Id.StageId); -} - void TShardKeyRanges::AddPoint(TSerializedCellVec&& point) { if (!IsFullRange()) { Ranges.emplace_back(std::move(point)); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index b6bcc4c7d32..ae9ae1b5e5e 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -40,6 +40,26 @@ struct TStageInfoMeta { THolder<TKeyDesc> ShardKey; NSchemeCache::TSchemeCacheRequest::EKind ShardKind = NSchemeCache::TSchemeCacheRequest::EKind::KindUnknown; + const NKqpProto::TKqpPhyStage& GetStage(const size_t idx) const { + auto& txBody = Tx.Body; + YQL_ENSURE(idx < txBody->StagesSize()); + return txBody->GetStages(idx); + } + + template <class TStageIdExt> + const NKqpProto::TKqpPhyStage& GetStage(const TStageIdExt& stageId) const { + return GetStage(stageId.StageId); + } + + bool HasReads() const { + return ShardOperations.contains(TKeyDesc::ERowOperation::Read); + } + + bool HasWrites() const { + return ShardOperations.contains(TKeyDesc::ERowOperation::Update) || + ShardOperations.contains(TKeyDesc::ERowOperation::Erase); + } + explicit TStageInfoMeta(const IKqpGateway::TPhysicalTxData& tx) : Tx(tx) , TableKind(ETableKind::Unknown) @@ -204,13 +224,8 @@ struct TKqpTaskOutputType { }; }; -const NKqpProto::TKqpPhyStage& GetStage(const TStageInfo& stageInfo); - void LogStage(const NActors::TActorContext& ctx, const TStageInfo& stageInfo); -bool HasReads(const TStageInfo& stageInfo); -bool HasWrites(const TStageInfo& stageInfo); - bool IsCrossShardChannel(TKqpTasksGraph& tasksGraph, const NYql::NDq::TChannel& channel); } // namespace NKqp diff --git a/ydb/core/kqp/gateway/CMakeLists.darwin.txt b/ydb/core/kqp/gateway/CMakeLists.darwin.txt index ca44b59ecb7..1f61f525b37 100644 --- a/ydb/core/kqp/gateway/CMakeLists.darwin.txt +++ b/ydb/core/kqp/gateway/CMakeLists.darwin.txt @@ -18,10 +18,11 @@ target_link_libraries(core-kqp-gateway PUBLIC ydb-core-actorlib_impl ydb-core-base core-kqp-common + core-kqp-provider + core-kqp-query_data providers-result-expr_nodes ) target_sources(core-kqp-gateway PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_ic_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_metadata_loader.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_query_data.cpp ) diff --git a/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt index a70cfb046b1..d863010351e 100644 --- a/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt @@ -19,10 +19,11 @@ target_link_libraries(core-kqp-gateway PUBLIC ydb-core-actorlib_impl ydb-core-base core-kqp-common + core-kqp-provider + core-kqp-query_data providers-result-expr_nodes ) target_sources(core-kqp-gateway PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_ic_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_metadata_loader.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_query_data.cpp ) diff --git a/ydb/core/kqp/gateway/CMakeLists.linux.txt b/ydb/core/kqp/gateway/CMakeLists.linux.txt index a70cfb046b1..d863010351e 100644 --- a/ydb/core/kqp/gateway/CMakeLists.linux.txt +++ b/ydb/core/kqp/gateway/CMakeLists.linux.txt @@ -19,10 +19,11 @@ target_link_libraries(core-kqp-gateway PUBLIC ydb-core-actorlib_impl ydb-core-base core-kqp-common + core-kqp-provider + core-kqp-query_data providers-result-expr_nodes ) target_sources(core-kqp-gateway PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_ic_gateway.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_metadata_loader.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_query_data.cpp ) diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index 53c243a84fa..dffc95ae034 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -1,7 +1,6 @@ #pragma once -#include "kqp_query_data.h" - +#include <ydb/core/kqp/query_data/kqp_query_data.h> #include <ydb/core/protos/kqp_physical.pb.h> #include <ydb/core/protos/tx_proxy.pb.h> #include <ydb/core/protos/tx_datashard.pb.h> diff --git a/ydb/core/kqp/opt/CMakeLists.darwin.txt b/ydb/core/kqp/opt/CMakeLists.darwin.txt index c7580faeb24..b74f9c9bb43 100644 --- a/ydb/core/kqp/opt/CMakeLists.darwin.txt +++ b/ydb/core/kqp/opt/CMakeLists.darwin.txt @@ -23,6 +23,7 @@ target_link_libraries(core-kqp-opt PUBLIC kqp-opt-physical yql-dq-common yql-dq-opt + core-kqp-provider tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-opt PRIVATE diff --git a/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt index db53e7fb6a7..f007468e437 100644 --- a/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt @@ -24,6 +24,7 @@ target_link_libraries(core-kqp-opt PUBLIC kqp-opt-physical yql-dq-common yql-dq-opt + core-kqp-provider tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-opt PRIVATE diff --git a/ydb/core/kqp/opt/CMakeLists.linux.txt b/ydb/core/kqp/opt/CMakeLists.linux.txt index db53e7fb6a7..f007468e437 100644 --- a/ydb/core/kqp/opt/CMakeLists.linux.txt +++ b/ydb/core/kqp/opt/CMakeLists.linux.txt @@ -24,6 +24,7 @@ target_link_libraries(core-kqp-opt PUBLIC kqp-opt-physical yql-dq-common yql-dq-opt + core-kqp-provider tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-opt PRIVATE diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 42d2c444c5e..4a3c500f3ad 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -9,6 +9,7 @@ #include <ydb/library/yql/core/services/yql_out_transformers.h> #include <ydb/library/yql/core/services/yql_transform_pipeline.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> +#include <ydb/core/kqp/gateway/kqp_gateway.h> namespace NKikimr::NKqp::NOpt { diff --git a/ydb/core/kqp/provider/CMakeLists.darwin.txt b/ydb/core/kqp/provider/CMakeLists.darwin.txt index 644aa54605e..162bf742425 100644 --- a/ydb/core/kqp/provider/CMakeLists.darwin.txt +++ b/ydb/core/kqp/provider/CMakeLists.darwin.txt @@ -18,6 +18,7 @@ target_link_libraries(core-kqp-provider PUBLIC yutil ydb-core-base ydb-core-protos + core-kqp-query_data ydb-library-aclib library-aclib-protos ydb-library-binary_json diff --git a/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt index aa5f414c690..18fd77e095f 100644 --- a/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt @@ -19,6 +19,7 @@ target_link_libraries(core-kqp-provider PUBLIC yutil ydb-core-base ydb-core-protos + core-kqp-query_data ydb-library-aclib library-aclib-protos ydb-library-binary_json diff --git a/ydb/core/kqp/provider/CMakeLists.linux.txt b/ydb/core/kqp/provider/CMakeLists.linux.txt index aa5f414c690..18fd77e095f 100644 --- a/ydb/core/kqp/provider/CMakeLists.linux.txt +++ b/ydb/core/kqp/provider/CMakeLists.linux.txt @@ -19,6 +19,7 @@ target_link_libraries(core-kqp-provider PUBLIC yutil ydb-core-base ydb-core-protos + core-kqp-query_data ydb-library-aclib library-aclib-protos ydb-library-binary_json diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 9079b5040b5..9b920d1499a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -3,12 +3,11 @@ #include "yql_kikimr_gateway.h" #include "yql_kikimr_settings.h" +#include <ydb/core/kqp/query_data/kqp_query_data.h> #include <ydb/library/yql/ast/yql_gc_nodes.h> #include <ydb/library/yql/core/yql_type_annotation.h> #include <ydb/library/yql/minikql/mkql_function_registry.h> -#include <ydb/core/kqp/gateway/kqp_query_data.h> - #include <library/cpp/actors/core/actor.h> #include <library/cpp/cache/cache.h> diff --git a/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt b/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt index 18165e775a3..c848e589bdd 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt @@ -23,5 +23,4 @@ target_sources(core-kqp-query_compiler PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp ) diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt index 731299f3f56..bd6045c8f04 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt @@ -24,5 +24,4 @@ target_sources(core-kqp-query_compiler PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp ) diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux.txt index 731299f3f56..bd6045c8f04 100644 --- a/ydb/core/kqp/query_compiler/CMakeLists.linux.txt +++ b/ydb/core/kqp/query_compiler/CMakeLists.linux.txt @@ -24,5 +24,4 @@ target_sources(core-kqp-query_compiler PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp ) diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index db64a5d2325..38fd6788bdb 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -1,7 +1,7 @@ #include "kqp_query_compiler.h" -#include "kqp_predictor.h" #include <ydb/core/kqp/common/kqp_yql.h> +#include <ydb/core/kqp/query_data/kqp_predictor.h> #include <ydb/core/kqp/query_compiler/kqp_mkql_compiler.h> #include <ydb/core/kqp/query_compiler/kqp_olap_compiler.h> #include <ydb/core/kqp/opt/kqp_opt.h> diff --git a/ydb/core/kqp/query_data/CMakeLists.darwin.txt b/ydb/core/kqp/query_data/CMakeLists.darwin.txt new file mode 100644 index 00000000000..bf9b5d02025 --- /dev/null +++ b/ydb/core/kqp/query_data/CMakeLists.darwin.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-query_data) +target_compile_options(core-kqp-query_data PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-query_data PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-actorlib_impl + ydb-core-base + yql-dq-expr_nodes + yql-dq-proto + core-kqp-expr_nodes +) +target_sources(core-kqp-query_data PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_query_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_prepared_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_predictor.cpp +) diff --git a/ydb/core/kqp/query_data/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/query_data/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..a37be072140 --- /dev/null +++ b/ydb/core/kqp/query_data/CMakeLists.linux-aarch64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-query_data) +target_compile_options(core-kqp-query_data PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-query_data PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-actorlib_impl + ydb-core-base + yql-dq-expr_nodes + yql-dq-proto + core-kqp-expr_nodes +) +target_sources(core-kqp-query_data PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_query_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_prepared_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_predictor.cpp +) diff --git a/ydb/core/kqp/query_data/CMakeLists.linux.txt b/ydb/core/kqp/query_data/CMakeLists.linux.txt new file mode 100644 index 00000000000..a37be072140 --- /dev/null +++ b/ydb/core/kqp/query_data/CMakeLists.linux.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-query_data) +target_compile_options(core-kqp-query_data PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-query_data PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-actorlib_impl + ydb-core-base + yql-dq-expr_nodes + yql-dq-proto + core-kqp-expr_nodes +) +target_sources(core-kqp-query_data PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_query_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_prepared_query.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_predictor.cpp +) diff --git a/ydb/core/kqp/query_data/CMakeLists.txt b/ydb/core/kqp/query_data/CMakeLists.txt new file mode 100644 index 00000000000..5bb4faffb40 --- /dev/null +++ b/ydb/core/kqp/query_data/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/kqp/query_compiler/kqp_predictor.cpp b/ydb/core/kqp/query_data/kqp_predictor.cpp index 293ad98e812..cd743b89eec 100644 --- a/ydb/core/kqp/query_compiler/kqp_predictor.cpp +++ b/ydb/core/kqp/query_data/kqp_predictor.cpp @@ -1,8 +1,10 @@ #include "kqp_predictor.h" -#include <ydb/core/kqp/opt/kqp_opt.h> #include <ydb/core/base/appdata.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <util/system/info.h> +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h> +#include <ydb/core/kqp/common/kqp_yql.h> namespace NKikimr::NKqp { diff --git a/ydb/core/kqp/query_compiler/kqp_predictor.h b/ydb/core/kqp/query_data/kqp_predictor.h index 8195583876f..8195583876f 100644 --- a/ydb/core/kqp/query_compiler/kqp_predictor.h +++ b/ydb/core/kqp/query_data/kqp_predictor.h diff --git a/ydb/core/kqp/common/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp index 594179454bb..3c615c0088d 100644 --- a/ydb/core/kqp/common/kqp_prepared_query.cpp +++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp @@ -6,6 +6,9 @@ #include <ydb/library/yql/minikql/mkql_node.h> #include <ydb/library/mkql_proto/mkql_proto.h> #include <ydb/core/protos/kqp_physical.pb.h> +#include <ydb/core/protos/services.pb.h> + +#include <library/cpp/actors/core/log.h> namespace NKikimr::NKqp { @@ -55,6 +58,16 @@ TKqpPhyTxHolder::TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPrepar , Alloc(alloc) { TxResultsMeta.resize(Proto->GetResults().size()); + for (auto&& i : Proto->GetStages()) { + TStagePredictor predictor; + if (!predictor.DeserializeFromKqpSettings(i.GetProgram().GetSettings())) { + ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "cannot parse program settings for data prediction"; + Predictors.emplace_back(); + } else { + Predictors.emplace_back(std::move(predictor)); + } + } + ui32 i = 0; for (const auto& txResult : Proto->GetResults()) { auto& result = TxResultsMeta[i++]; @@ -81,6 +94,11 @@ bool TKqpPhyTxHolder::IsPureTx() const { return PureTx; } +const NKikimr::NKqp::TStagePredictor& TKqpPhyTxHolder::GetCalculationPredictor(const size_t stageIdx) const { + YQL_ENSURE(stageIdx < Predictors.size(), "incorrect stage idx for predictor"); + return Predictors[stageIdx]; +} + TPreparedQueryHolder::TPreparedQueryHolder(NKikimrKqp::TPreparedQuery* proto, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) : Proto(proto) diff --git a/ydb/core/kqp/common/kqp_prepared_query.h b/ydb/core/kqp/query_data/kqp_prepared_query.h index 2d1475c58aa..4705cafa669 100644 --- a/ydb/core/kqp/common/kqp_prepared_query.h +++ b/ydb/core/kqp/query_data/kqp_prepared_query.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/core/kqp/query_data/kqp_predictor.h> #include <ydb/core/protos/kqp.pb.h> #include <util/generic/vector.h> @@ -36,10 +37,12 @@ class TKqpPhyTxHolder { bool PureTx = false; TVector<TPhyTxResultMetadata> TxResultsMeta; std::shared_ptr<TPreparedQueryAllocHolder> Alloc; - + std::vector<TStagePredictor> Predictors; public: using TConstPtr = std::shared_ptr<const TKqpPhyTxHolder>; + const TStagePredictor& GetCalculationPredictor(const size_t stageIdx) const; + const TVector<TPhyTxResultMetadata>& GetTxResultsMeta() const { return TxResultsMeta; } const NKqpProto::TKqpPhyStage& GetStages(size_t index) const { diff --git a/ydb/core/kqp/gateway/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp index d16c3eabc8d..d16c3eabc8d 100644 --- a/ydb/core/kqp/gateway/kqp_query_data.cpp +++ b/ydb/core/kqp/query_data/kqp_query_data.cpp diff --git a/ydb/core/kqp/gateway/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h index ec25b7e8155..de2ca09978a 100644 --- a/ydb/core/kqp/gateway/kqp_query_data.h +++ b/ydb/core/kqp/query_data/kqp_query_data.h @@ -1,10 +1,10 @@ #pragma once +#include <ydb/core/kqp/query_data/kqp_prepared_query.h> #include <ydb/library/yql/public/udf/udf_data_type.h> #include <ydb/library/yql/minikql/mkql_node.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/mkql_proto/protos/minikql.pb.h> -#include <ydb/core/kqp/common/kqp_prepared_query.h> #include <library/cpp/random_provider/random_provider.h> #include <library/cpp/time_provider/time_provider.h> diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h index 9907f4e0bf0..4f63059d8fa 100644 --- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h +++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h @@ -64,6 +64,16 @@ struct TStageInfo : private TMoveOnly { TVector<ui64> Tasks; TStageInfoMeta Meta; + + TString DebugString() const { + // TODO: Print stage details, including input types and program. + TStringBuilder result; + result << "StageInfo: StageId #" << Id + << ", InputsCount: " << InputsCount + << ", OutputsCount: " << OutputsCount; + return result; + } + }; struct TChannel { |