aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-02-25 12:01:25 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-02-25 12:01:25 +0300
commit8b9cb025d0b297c75fd8e5cd249ca067e33f4d8c (patch)
tree1e561893284f46da332f4e31738f24cd19549286
parente632b99859c092407fd47c40c7b552a6b47afe7f (diff)
downloadydb-8b9cb025d0b297c75fd8e5cd249ca067e33f4d8c.tar.gz
construct predictors with prepared queue object
refactoring
-rw-r--r--ydb/core/kqp/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/common/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/kqp/common/CMakeLists.linux.txt2
-rw-r--r--ydb/core/kqp/common/kqp.h2
-rw-r--r--ydb/core/kqp/common/kqp_resolve.cpp3
-rw-r--r--ydb/core/kqp/common/kqp_yql.cpp3
-rw-r--r--ydb/core/kqp/common/kqp_yql.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp26
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp18
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h39
-rw-r--r--ydb/core/kqp/executer_actor/kqp_literal_executer.cpp2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp47
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp25
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h25
-rw-r--r--ydb/core/kqp/gateway/CMakeLists.darwin.txt3
-rw-r--r--ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/core/kqp/gateway/CMakeLists.linux.txt3
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h3
-rw-r--r--ydb/core/kqp/opt/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/opt/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp1
-rw-r--r--ydb/core/kqp/provider/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/provider/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h3
-rw-r--r--ydb/core/kqp/query_compiler/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/query_compiler/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp2
-rw-r--r--ydb/core/kqp/query_data/CMakeLists.darwin.txt28
-rw-r--r--ydb/core/kqp/query_data/CMakeLists.linux-aarch64.txt29
-rw-r--r--ydb/core/kqp/query_data/CMakeLists.linux.txt29
-rw-r--r--ydb/core/kqp/query_data/CMakeLists.txt15
-rw-r--r--ydb/core/kqp/query_data/kqp_predictor.cpp (renamed from ydb/core/kqp/query_compiler/kqp_predictor.cpp)4
-rw-r--r--ydb/core/kqp/query_data/kqp_predictor.h (renamed from ydb/core/kqp/query_compiler/kqp_predictor.h)0
-rw-r--r--ydb/core/kqp/query_data/kqp_prepared_query.cpp (renamed from ydb/core/kqp/common/kqp_prepared_query.cpp)18
-rw-r--r--ydb/core/kqp/query_data/kqp_prepared_query.h (renamed from ydb/core/kqp/common/kqp_prepared_query.h)5
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.cpp (renamed from ydb/core/kqp/gateway/kqp_query_data.cpp)0
-rw-r--r--ydb/core/kqp/query_data/kqp_query_data.h (renamed from ydb/core/kqp/gateway/kqp_query_data.h)2
-rw-r--r--ydb/library/yql/dq/tasks/dq_tasks_graph.h10
45 files changed, 239 insertions, 134 deletions
diff --git a/ydb/core/kqp/CMakeLists.darwin.txt b/ydb/core/kqp/CMakeLists.darwin.txt
index 106d4102b4f..10eb5407b30 100644
--- a/ydb/core/kqp/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/CMakeLists.darwin.txt
@@ -19,6 +19,7 @@ add_subdirectory(opt)
add_subdirectory(provider)
add_subdirectory(proxy_service)
add_subdirectory(query_compiler)
+add_subdirectory(query_data)
add_subdirectory(rm_service)
add_subdirectory(run_script_actor)
add_subdirectory(runtime)
diff --git a/ydb/core/kqp/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/CMakeLists.linux-aarch64.txt
index 071a51d8abf..2a2bdf0d97c 100644
--- a/ydb/core/kqp/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/CMakeLists.linux-aarch64.txt
@@ -19,6 +19,7 @@ add_subdirectory(opt)
add_subdirectory(provider)
add_subdirectory(proxy_service)
add_subdirectory(query_compiler)
+add_subdirectory(query_data)
add_subdirectory(rm_service)
add_subdirectory(run_script_actor)
add_subdirectory(runtime)
diff --git a/ydb/core/kqp/CMakeLists.linux.txt b/ydb/core/kqp/CMakeLists.linux.txt
index 071a51d8abf..2a2bdf0d97c 100644
--- a/ydb/core/kqp/CMakeLists.linux.txt
+++ b/ydb/core/kqp/CMakeLists.linux.txt
@@ -19,6 +19,7 @@ add_subdirectory(opt)
add_subdirectory(provider)
add_subdirectory(proxy_service)
add_subdirectory(query_compiler)
+add_subdirectory(query_data)
add_subdirectory(rm_service)
add_subdirectory(run_script_actor)
add_subdirectory(runtime)
diff --git a/ydb/core/kqp/common/CMakeLists.darwin.txt b/ydb/core/kqp/common/CMakeLists.darwin.txt
index b7c74e48b60..e8d477e1826 100644
--- a/ydb/core/kqp/common/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/common/CMakeLists.darwin.txt
@@ -19,6 +19,7 @@ target_link_libraries(core-kqp-common PUBLIC
core-kqp-expr_nodes
core-kqp-provider
tx-long_tx_service-public
+ yql-dq-expr_nodes
ydb-library-aclib
yql-core-issue
yql-dq-actors
@@ -36,7 +37,6 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_prepared_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp
)
generate_enum_serilization(core-kqp-common
diff --git a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
index aa4bfc498cc..f1f616100c8 100644
--- a/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux-aarch64.txt
@@ -20,6 +20,7 @@ target_link_libraries(core-kqp-common PUBLIC
core-kqp-expr_nodes
core-kqp-provider
tx-long_tx_service-public
+ yql-dq-expr_nodes
ydb-library-aclib
yql-core-issue
yql-dq-actors
@@ -37,7 +38,6 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_prepared_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp
)
generate_enum_serilization(core-kqp-common
diff --git a/ydb/core/kqp/common/CMakeLists.linux.txt b/ydb/core/kqp/common/CMakeLists.linux.txt
index aa4bfc498cc..f1f616100c8 100644
--- a/ydb/core/kqp/common/CMakeLists.linux.txt
+++ b/ydb/core/kqp/common/CMakeLists.linux.txt
@@ -20,6 +20,7 @@ target_link_libraries(core-kqp-common PUBLIC
core-kqp-expr_nodes
core-kqp-provider
tx-long_tx_service-public
+ yql-dq-expr_nodes
ydb-library-aclib
yql-core-issue
yql-dq-actors
@@ -37,7 +38,6 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_types.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_prepared_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp.cpp
)
generate_enum_serilization(core-kqp-common
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index cdd7119ae5f..a0d303ebbf3 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -1,10 +1,10 @@
#pragma once
#include "kqp_event_ids.h"
-#include "kqp_prepared_query.h"
#include <library/cpp/lwtrace/shuttle.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/kqp/query_data/kqp_prepared_query.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/api/protos/draft/ydb_query.pb.h>
diff --git a/ydb/core/kqp/common/kqp_resolve.cpp b/ydb/core/kqp/common/kqp_resolve.cpp
index 482a0e419c4..473b3574e10 100644
--- a/ydb/core/kqp/common/kqp_resolve.cpp
+++ b/ydb/core/kqp/common/kqp_resolve.cpp
@@ -1,13 +1,12 @@
#include "kqp_resolve.h"
-#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
-
// #define DBG_TRACE
#ifdef DBG_TRACE
#include <ydb/core/base/appdata.h>
#include <ydb/core/tx/datashard/range_ops.h>
#endif
+#include <ydb/core/kqp/provider/yql_kikimr_gateway.h>
namespace NKikimr {
namespace NKqp {
diff --git a/ydb/core/kqp/common/kqp_yql.cpp b/ydb/core/kqp/common/kqp_yql.cpp
index a7b5196d997..2334424b4ab 100644
--- a/ydb/core/kqp/common/kqp_yql.cpp
+++ b/ydb/core/kqp/common/kqp_yql.cpp
@@ -1,11 +1,12 @@
#include "kqp_yql.h"
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
+#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
namespace NYql {
using namespace NKikimr;
-using namespace NKikimr::NKqp;
+//using namespace NKikimr::NKqp;
using namespace NNodes;
static EPhysicalQueryType GetPhysicalQueryType(const TStringBuf& value) {
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h
index 0f5236461d5..0c29de4df31 100644
--- a/ydb/core/kqp/common/kqp_yql.h
+++ b/ydb/core/kqp/common/kqp_yql.h
@@ -1,7 +1,8 @@
#pragma once
#include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h>
-#include <ydb/core/kqp/gateway/kqp_gateway.h>
+#include <ydb/library/yql/ast/yql_pos_handle.h>
+#include <ydb/library/yql/ast/yql_expr.h>
namespace NYql {
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 8907790a18c..f92e5aa72c0 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -852,10 +852,10 @@ private:
auto& task = TasksGraph.GetTask(taskId);
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
- if (HasReads(stageInfo)) {
+ if (stageInfo.Meta.HasReads()) {
affectedFlags |= NFlatTxCoordinator::TTransactionProposal::TAffectedEntry::AffectedRead;
}
- if (HasWrites(stageInfo)) {
+ if (stageInfo.Meta.HasWrites()) {
affectedFlags |= NFlatTxCoordinator::TTransactionProposal::TAffectedEntry::AffectedWrite;
}
}
@@ -1307,7 +1307,7 @@ private:
return task;
};
- auto& stage = GetStage(stageInfo);
+ auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId);
const auto& keyTypes = table.KeyColumnTypes;;
@@ -1439,7 +1439,7 @@ private:
}
void BuildComputeTasks(TStageInfo& stageInfo) {
- auto& stage = GetStage(stageInfo);
+ auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
ui32 partitionsCount = 1;
for (ui32 inputIndex = 0; inputIndex < stage.InputsSize(); ++inputIndex) {
@@ -1696,25 +1696,9 @@ private:
for (auto& task : TasksGraph.GetTasks()) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
- auto& stage = GetStage(stageInfo);
-
- NDqProto::TDqTask taskDesc;
- taskDesc.SetId(task.Id);
- taskDesc.SetStageId(stageInfo.Id.StageId);
+ NDqProto::TDqTask taskDesc = PrepareKqpTaskParameters(stageInfo, task, TypeEnv());
ActorIdToProto(SelfId(), taskDesc.MutableExecuter()->MutableActorId());
- for (auto& input : task.Inputs) {
- FillInputDesc(*taskDesc.AddInputs(), input);
- }
-
- for (auto& output : task.Outputs) {
- FillOutputDesc(*taskDesc.AddOutputs(), output);
- }
-
- taskDesc.MutableProgram()->CopyFrom(stage.GetProgram());
-
- PrepareKqpTaskParameters(stage, stageInfo, task, taskDesc, TypeEnv(), HolderFactory());
-
if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) {
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta protoTaskMeta;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 275a0858d4d..62c087af63d 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -2,7 +2,7 @@
#include <library/cpp/lwtrace/shuttle.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>
-#include <ydb/core/kqp/gateway/kqp_query_data.h>
+#include <ydb/core/kqp/query_data/kqp_query_data.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
@@ -84,7 +84,7 @@ struct TEvKqpExecuter {
};
IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 450b5eda1a3..8fd996bb449 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -61,23 +61,9 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnb
serializer.Deserialize(buffer, txResult.MkqlItemType, txResult.Rows);
}
-void PrepareKqpTaskParameters(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, const TTask& task,
- NDqProto::TDqTask& dqTask, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory&)
-{
- auto g = typeEnv.BindAllocator();
- for (auto& paramName : stage.GetProgramParameters()) {
- auto& dqParams = *dqTask.MutableParameters();
- if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) {
- dqParams[paramName] = *taskParam;
- } else {
- dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName);
- }
- }
-}
-
-std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const NKqpProto::TKqpPhyStage& stage,
- const TStageInfo& stageInfo, const TTask& task)
+std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task)
{
+ const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
std::vector<std::shared_ptr<arrow::Field>> columns;
std::vector<std::shared_ptr<arrow::Array>> data;
auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index d417e0252e1..d163396401c 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -61,11 +61,7 @@ enum class EExecType {
const ui64 MaxTaskSize = 48_MB;
-void PrepareKqpTaskParameters(const NKqpProto::TKqpPhyStage& stage, const TStageInfo& stageInfo, const TTask& task,
- NYql::NDqProto::TDqTask& dqTask, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory);
-
-std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const NKqpProto::TKqpPhyStage& stage,
- const TStageInfo& stageInfo, const TTask& task);
+std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task);
inline bool IsDebugLogEnabled() {
return TlsActivationContext->LoggerSettings() &&
@@ -670,7 +666,7 @@ protected:
void BuildSysViewScanTasks(TStageInfo& stageInfo) {
Y_VERIFY_DEBUG(stageInfo.Meta.IsSysView());
- auto& stage = GetStage(stageInfo);
+ auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId);
const auto& keyTypes = table.KeyColumnTypes;
@@ -716,7 +712,7 @@ protected:
THashMap<ui64, std::vector<ui64>> nodeTasks;
THashMap<ui64, ui64> assignedShardsCount;
- auto& stage = GetStage(stageInfo);
+ auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
YQL_ENSURE(stage.GetSources(0).HasReadRangesSource());
YQL_ENSURE(stage.InputsSize() == 0 && stage.SourcesSize() == 1, "multiple sources or sources mixed with connections");
@@ -918,6 +914,33 @@ protected:
}
}
+ NYql::NDqProto::TDqTask PrepareKqpTaskParameters(const TStageInfo& stageInfo, const TTask& task, const NMiniKQL::TTypeEnvironment& typeEnv) {
+ NYql::NDqProto::TDqTask result;
+ result.SetId(task.Id);
+ result.SetStageId(stageInfo.Id.StageId);
+
+ for (auto& input : task.Inputs) {
+ FillInputDesc(*result.AddInputs(), input);
+ }
+
+ for (auto& output : task.Outputs) {
+ FillOutputDesc(*result.AddOutputs(), output);
+ }
+
+ const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
+ result.MutableProgram()->CopyFrom(stage.GetProgram());
+ auto g = typeEnv.BindAllocator();
+ for (auto& paramName : stage.GetProgramParameters()) {
+ auto& dqParams = *result.MutableParameters();
+ if (auto* taskParam = task.Meta.Params.FindPtr(paramName)) {
+ dqParams[paramName] = *taskParam;
+ } else {
+ dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName);
+ }
+ }
+ return result;
+ }
+
void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta) {
meta->SetTablePath(stageInfo.Meta.TablePath);
meta->MutableTableId()->SetTableId(stageInfo.Meta.TableId.PathId.LocalPathId);
@@ -1197,7 +1220,7 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig);
diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
index 00730303de9..b5d916c2bb5 100644
--- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
@@ -175,7 +175,7 @@ public:
void RunTask(TTask& task, const TDqTaskRunnerContext& context, const TDqTaskRunnerSettings& settings) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
- auto& stage = GetStage(stageInfo);
+ auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
NDqProto::TDqTask protoTask;
protoTask.SetId(task.Id);
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index db98b839d12..85542fbcdac 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -12,7 +12,7 @@
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
#include <ydb/core/kqp/common/kqp.h>
-#include <ydb/core/kqp/query_compiler/kqp_predictor.h>
+#include <ydb/core/kqp/query_data/kqp_predictor.h>
#include <ydb/core/kqp/node_service/kqp_node_service.h>
#include <ydb/core/kqp/runtime/kqp_transport.h>
#include <ydb/core/kqp/opt/kqp_query_plan.h>
@@ -212,21 +212,15 @@ private:
ui32 GetMaxTasksPerNodeEstimate(TStageInfo& stageInfo, const bool isOlapScan, const ui64 /*nodeId*/) const {
ui32 result = 0;
- const auto& stage = GetStage(stageInfo);
if (isOlapScan) {
if (AggregationSettings.HasCSScanMinimalThreads()) {
result = AggregationSettings.GetCSScanMinimalThreads();
} else {
- TStagePredictor predictor;
- const ui32 threadsCount = TStagePredictor::GetUsableThreads();
- if (!predictor.DeserializeFromKqpSettings(stage.GetProgram().GetSettings())) {
- ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "cannot parse program settings for data prediction";
- return threadsCount;
- } else {
- return std::max<ui32>(1, predictor.CalcTasksOptimalCount(threadsCount, {}));
- }
+ const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId);
+ return std::max<ui32>(1, predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), {}));
}
} else {
+ const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
result = AggregationSettings.GetDSScanMinimalThreads();
if (stage.GetProgram().GetSettings().GetHasSort()) {
result = std::max(result, AggregationSettings.GetDSBaseSortScanThreads());
@@ -242,14 +236,8 @@ private:
if (AggregationSettings.HasAggregationComputeThreads()) {
return std::max<ui32>(1, AggregationSettings.GetAggregationComputeThreads());
} else {
- const auto& stage = GetStage(stageInfo);
- TStagePredictor predictor;
- if (!predictor.DeserializeFromKqpSettings(stage.GetProgram().GetSettings())) {
- ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "cannot parse program settings for data prediction";
- return std::max<ui32>(1, previousTasksCount * 0.75);
- } else {
- return predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), previousTasksCount / nodesCount) * nodesCount;
- }
+ const TStagePredictor& predictor = stageInfo.Meta.Tx.Body->GetCalculationPredictor(stageInfo.Id.StageId);
+ return predictor.CalcTasksOptimalCount(TStagePredictor::GetUsableThreads(), previousTasksCount / nodesCount) * nodesCount;
}
}
@@ -285,7 +273,7 @@ private:
void BuildScanTasks(TStageInfo& stageInfo) {
THashMap<ui64, std::vector<ui64>> nodeTasks;
THashMap<ui64, ui64> assignedShardsCount;
- auto& stage = GetStage(stageInfo);
+ auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
const auto& table = TableKeys.GetTable(stageInfo.Meta.TableId);
const auto& keyTypes = table.KeyColumnTypes;
@@ -379,7 +367,7 @@ private:
}
void BuildComputeTasks(TStageInfo& stageInfo) {
- auto& stage = GetStage(stageInfo);
+ auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
ui32 partitionsCount = 1;
ui32 inputTasks = 0;
@@ -528,25 +516,10 @@ private:
for (auto& task : TasksGraph.GetTasks()) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
- auto& stage = GetStage(stageInfo);
- NYql::NDqProto::TDqTask taskDesc;
- taskDesc.SetId(task.Id);
+ NYql::NDqProto::TDqTask taskDesc = PrepareKqpTaskParameters(stageInfo, task, TypeEnv());
ActorIdToProto(SelfId(), taskDesc.MutableExecuter()->MutableActorId());
- for (auto& input : task.Inputs) {
- FillInputDesc(*taskDesc.AddInputs(), input);
- }
-
- for (auto& output : task.Outputs) {
- FillOutputDesc(*taskDesc.AddOutputs(), output);
- }
-
- taskDesc.MutableProgram()->CopyFrom(stage.GetProgram());
- taskDesc.SetStageId(task.StageId.StageId);
-
- PrepareKqpTaskParameters(stage, stageInfo, task, taskDesc, TypeEnv(), HolderFactory());
-
if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) {
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta protoTaskMeta;
@@ -617,7 +590,7 @@ private:
auto* olapProgram = protoTaskMeta.MutableOlapProgram();
olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program);
- auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stage, stageInfo, task);
+ auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task);
olapProgram->SetParametersSchema(schema);
olapProgram->SetParameters(parameters);
} else {
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 2dd09e99135..e26e121428d 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -19,19 +19,7 @@ using namespace NYql::NNodes;
// #define DBG_TRACE
void LogStage(const NActors::TActorContext& ctx, const TStageInfo& stageInfo) {
- // TODO: Print stage details, including input types and program.
- LOG_DEBUG_S(ctx, NKikimrServices::KQP_EXECUTER, "StageInfo: StageId #" << stageInfo.Id
- << ", InputsCount: " << stageInfo.InputsCount
- << ", OutputsCount: " << stageInfo.OutputsCount);
-}
-
-bool HasReads(const TStageInfo& stageInfo) {
- return stageInfo.Meta.ShardOperations.contains(TKeyDesc::ERowOperation::Read);
-}
-
-bool HasWrites(const TStageInfo& stageInfo) {
- return stageInfo.Meta.ShardOperations.contains(TKeyDesc::ERowOperation::Update) ||
- stageInfo.Meta.ShardOperations.contains(TKeyDesc::ERowOperation::Erase);
+ LOG_DEBUG_S(ctx, NKikimrServices::KQP_EXECUTER, stageInfo.DebugString());
}
void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs) {
@@ -93,7 +81,7 @@ void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGatew
}
YQL_ENSURE(tables.empty() || tables.size() == 1);
- YQL_ENSURE(!HasReads(stageInfo) || !HasWrites(stageInfo));
+ YQL_ENSURE(!stageInfo.Meta.HasReads() || !stageInfo.Meta.HasWrites());
}
}
}
@@ -272,7 +260,7 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo,
ui64 txId, bool enableSpilling)
{
- auto& stage = GetStage(stageInfo);
+ auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
if (stage.GetIsEffectsStage()) {
YQL_ENSURE(stageInfo.OutputsCount == 1);
@@ -357,13 +345,6 @@ bool IsCrossShardChannel(TKqpTasksGraph& tasksGraph, const TChannel& channel) {
return targetShard != tasksGraph.GetTask(channel.SrcTask).Meta.ShardId;
}
-const NKqpProto::TKqpPhyStage& GetStage(const TStageInfo& stageInfo) {
- auto& txBody = stageInfo.Meta.Tx.Body;
- YQL_ENSURE(stageInfo.Id.StageId < txBody->StagesSize());
-
- return txBody->GetStages(stageInfo.Id.StageId);
-}
-
void TShardKeyRanges::AddPoint(TSerializedCellVec&& point) {
if (!IsFullRange()) {
Ranges.emplace_back(std::move(point));
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index b6bcc4c7d32..ae9ae1b5e5e 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -40,6 +40,26 @@ struct TStageInfoMeta {
THolder<TKeyDesc> ShardKey;
NSchemeCache::TSchemeCacheRequest::EKind ShardKind = NSchemeCache::TSchemeCacheRequest::EKind::KindUnknown;
+ const NKqpProto::TKqpPhyStage& GetStage(const size_t idx) const {
+ auto& txBody = Tx.Body;
+ YQL_ENSURE(idx < txBody->StagesSize());
+ return txBody->GetStages(idx);
+ }
+
+ template <class TStageIdExt>
+ const NKqpProto::TKqpPhyStage& GetStage(const TStageIdExt& stageId) const {
+ return GetStage(stageId.StageId);
+ }
+
+ bool HasReads() const {
+ return ShardOperations.contains(TKeyDesc::ERowOperation::Read);
+ }
+
+ bool HasWrites() const {
+ return ShardOperations.contains(TKeyDesc::ERowOperation::Update) ||
+ ShardOperations.contains(TKeyDesc::ERowOperation::Erase);
+ }
+
explicit TStageInfoMeta(const IKqpGateway::TPhysicalTxData& tx)
: Tx(tx)
, TableKind(ETableKind::Unknown)
@@ -204,13 +224,8 @@ struct TKqpTaskOutputType {
};
};
-const NKqpProto::TKqpPhyStage& GetStage(const TStageInfo& stageInfo);
-
void LogStage(const NActors::TActorContext& ctx, const TStageInfo& stageInfo);
-bool HasReads(const TStageInfo& stageInfo);
-bool HasWrites(const TStageInfo& stageInfo);
-
bool IsCrossShardChannel(TKqpTasksGraph& tasksGraph, const NYql::NDq::TChannel& channel);
} // namespace NKqp
diff --git a/ydb/core/kqp/gateway/CMakeLists.darwin.txt b/ydb/core/kqp/gateway/CMakeLists.darwin.txt
index ca44b59ecb7..1f61f525b37 100644
--- a/ydb/core/kqp/gateway/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/gateway/CMakeLists.darwin.txt
@@ -18,10 +18,11 @@ target_link_libraries(core-kqp-gateway PUBLIC
ydb-core-actorlib_impl
ydb-core-base
core-kqp-common
+ core-kqp-provider
+ core-kqp-query_data
providers-result-expr_nodes
)
target_sources(core-kqp-gateway PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_query_data.cpp
)
diff --git a/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt
index a70cfb046b1..d863010351e 100644
--- a/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt
@@ -19,10 +19,11 @@ target_link_libraries(core-kqp-gateway PUBLIC
ydb-core-actorlib_impl
ydb-core-base
core-kqp-common
+ core-kqp-provider
+ core-kqp-query_data
providers-result-expr_nodes
)
target_sources(core-kqp-gateway PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_query_data.cpp
)
diff --git a/ydb/core/kqp/gateway/CMakeLists.linux.txt b/ydb/core/kqp/gateway/CMakeLists.linux.txt
index a70cfb046b1..d863010351e 100644
--- a/ydb/core/kqp/gateway/CMakeLists.linux.txt
+++ b/ydb/core/kqp/gateway/CMakeLists.linux.txt
@@ -19,10 +19,11 @@ target_link_libraries(core-kqp-gateway PUBLIC
ydb-core-actorlib_impl
ydb-core-base
core-kqp-common
+ core-kqp-provider
+ core-kqp-query_data
providers-result-expr_nodes
)
target_sources(core-kqp-gateway PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/gateway/kqp_query_data.cpp
)
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index 53c243a84fa..dffc95ae034 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -1,7 +1,6 @@
#pragma once
-#include "kqp_query_data.h"
-
+#include <ydb/core/kqp/query_data/kqp_query_data.h>
#include <ydb/core/protos/kqp_physical.pb.h>
#include <ydb/core/protos/tx_proxy.pb.h>
#include <ydb/core/protos/tx_datashard.pb.h>
diff --git a/ydb/core/kqp/opt/CMakeLists.darwin.txt b/ydb/core/kqp/opt/CMakeLists.darwin.txt
index c7580faeb24..b74f9c9bb43 100644
--- a/ydb/core/kqp/opt/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/opt/CMakeLists.darwin.txt
@@ -23,6 +23,7 @@ target_link_libraries(core-kqp-opt PUBLIC
kqp-opt-physical
yql-dq-common
yql-dq-opt
+ core-kqp-provider
tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-opt PRIVATE
diff --git a/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt
index db53e7fb6a7..f007468e437 100644
--- a/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/opt/CMakeLists.linux-aarch64.txt
@@ -24,6 +24,7 @@ target_link_libraries(core-kqp-opt PUBLIC
kqp-opt-physical
yql-dq-common
yql-dq-opt
+ core-kqp-provider
tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-opt PRIVATE
diff --git a/ydb/core/kqp/opt/CMakeLists.linux.txt b/ydb/core/kqp/opt/CMakeLists.linux.txt
index db53e7fb6a7..f007468e437 100644
--- a/ydb/core/kqp/opt/CMakeLists.linux.txt
+++ b/ydb/core/kqp/opt/CMakeLists.linux.txt
@@ -24,6 +24,7 @@ target_link_libraries(core-kqp-opt PUBLIC
kqp-opt-physical
yql-dq-common
yql-dq-opt
+ core-kqp-provider
tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-opt PRIVATE
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index 42d2c444c5e..4a3c500f3ad 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -9,6 +9,7 @@
#include <ydb/library/yql/core/services/yql_out_transformers.h>
#include <ydb/library/yql/core/services/yql_transform_pipeline.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
+#include <ydb/core/kqp/gateway/kqp_gateway.h>
namespace NKikimr::NKqp::NOpt {
diff --git a/ydb/core/kqp/provider/CMakeLists.darwin.txt b/ydb/core/kqp/provider/CMakeLists.darwin.txt
index 644aa54605e..162bf742425 100644
--- a/ydb/core/kqp/provider/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/provider/CMakeLists.darwin.txt
@@ -18,6 +18,7 @@ target_link_libraries(core-kqp-provider PUBLIC
yutil
ydb-core-base
ydb-core-protos
+ core-kqp-query_data
ydb-library-aclib
library-aclib-protos
ydb-library-binary_json
diff --git a/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt
index aa5f414c690..18fd77e095f 100644
--- a/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/provider/CMakeLists.linux-aarch64.txt
@@ -19,6 +19,7 @@ target_link_libraries(core-kqp-provider PUBLIC
yutil
ydb-core-base
ydb-core-protos
+ core-kqp-query_data
ydb-library-aclib
library-aclib-protos
ydb-library-binary_json
diff --git a/ydb/core/kqp/provider/CMakeLists.linux.txt b/ydb/core/kqp/provider/CMakeLists.linux.txt
index aa5f414c690..18fd77e095f 100644
--- a/ydb/core/kqp/provider/CMakeLists.linux.txt
+++ b/ydb/core/kqp/provider/CMakeLists.linux.txt
@@ -19,6 +19,7 @@ target_link_libraries(core-kqp-provider PUBLIC
yutil
ydb-core-base
ydb-core-protos
+ core-kqp-query_data
ydb-library-aclib
library-aclib-protos
ydb-library-binary_json
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index 9079b5040b5..9b920d1499a 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -3,12 +3,11 @@
#include "yql_kikimr_gateway.h"
#include "yql_kikimr_settings.h"
+#include <ydb/core/kqp/query_data/kqp_query_data.h>
#include <ydb/library/yql/ast/yql_gc_nodes.h>
#include <ydb/library/yql/core/yql_type_annotation.h>
#include <ydb/library/yql/minikql/mkql_function_registry.h>
-#include <ydb/core/kqp/gateway/kqp_query_data.h>
-
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/cache/cache.h>
diff --git a/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt b/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt
index 18165e775a3..c848e589bdd 100644
--- a/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/query_compiler/CMakeLists.darwin.txt
@@ -23,5 +23,4 @@ target_sources(core-kqp-query_compiler PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp
)
diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt
index 731299f3f56..bd6045c8f04 100644
--- a/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/query_compiler/CMakeLists.linux-aarch64.txt
@@ -24,5 +24,4 @@ target_sources(core-kqp-query_compiler PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp
)
diff --git a/ydb/core/kqp/query_compiler/CMakeLists.linux.txt b/ydb/core/kqp/query_compiler/CMakeLists.linux.txt
index 731299f3f56..bd6045c8f04 100644
--- a/ydb/core/kqp/query_compiler/CMakeLists.linux.txt
+++ b/ydb/core/kqp/query_compiler/CMakeLists.linux.txt
@@ -24,5 +24,4 @@ target_sources(core-kqp-query_compiler PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_olap_compiler.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_compiler/kqp_predictor.cpp
)
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index db64a5d2325..38fd6788bdb 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -1,7 +1,7 @@
#include "kqp_query_compiler.h"
-#include "kqp_predictor.h"
#include <ydb/core/kqp/common/kqp_yql.h>
+#include <ydb/core/kqp/query_data/kqp_predictor.h>
#include <ydb/core/kqp/query_compiler/kqp_mkql_compiler.h>
#include <ydb/core/kqp/query_compiler/kqp_olap_compiler.h>
#include <ydb/core/kqp/opt/kqp_opt.h>
diff --git a/ydb/core/kqp/query_data/CMakeLists.darwin.txt b/ydb/core/kqp/query_data/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..bf9b5d02025
--- /dev/null
+++ b/ydb/core/kqp/query_data/CMakeLists.darwin.txt
@@ -0,0 +1,28 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-query_data)
+target_compile_options(core-kqp-query_data PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-query_data PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-actorlib_impl
+ ydb-core-base
+ yql-dq-expr_nodes
+ yql-dq-proto
+ core-kqp-expr_nodes
+)
+target_sources(core-kqp-query_data PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_query_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_prepared_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_predictor.cpp
+)
diff --git a/ydb/core/kqp/query_data/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/query_data/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..a37be072140
--- /dev/null
+++ b/ydb/core/kqp/query_data/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-query_data)
+target_compile_options(core-kqp-query_data PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-query_data PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-actorlib_impl
+ ydb-core-base
+ yql-dq-expr_nodes
+ yql-dq-proto
+ core-kqp-expr_nodes
+)
+target_sources(core-kqp-query_data PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_query_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_prepared_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_predictor.cpp
+)
diff --git a/ydb/core/kqp/query_data/CMakeLists.linux.txt b/ydb/core/kqp/query_data/CMakeLists.linux.txt
new file mode 100644
index 00000000000..a37be072140
--- /dev/null
+++ b/ydb/core/kqp/query_data/CMakeLists.linux.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-query_data)
+target_compile_options(core-kqp-query_data PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-query_data PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-actorlib_impl
+ ydb-core-base
+ yql-dq-expr_nodes
+ yql-dq-proto
+ core-kqp-expr_nodes
+)
+target_sources(core-kqp-query_data PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_query_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_prepared_query.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/query_data/kqp_predictor.cpp
+)
diff --git a/ydb/core/kqp/query_data/CMakeLists.txt b/ydb/core/kqp/query_data/CMakeLists.txt
new file mode 100644
index 00000000000..5bb4faffb40
--- /dev/null
+++ b/ydb/core/kqp/query_data/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/core/kqp/query_compiler/kqp_predictor.cpp b/ydb/core/kqp/query_data/kqp_predictor.cpp
index 293ad98e812..cd743b89eec 100644
--- a/ydb/core/kqp/query_compiler/kqp_predictor.cpp
+++ b/ydb/core/kqp/query_data/kqp_predictor.cpp
@@ -1,8 +1,10 @@
#include "kqp_predictor.h"
-#include <ydb/core/kqp/opt/kqp_opt.h>
#include <ydb/core/base/appdata.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <util/system/info.h>
+#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h>
+#include <ydb/core/kqp/common/kqp_yql.h>
namespace NKikimr::NKqp {
diff --git a/ydb/core/kqp/query_compiler/kqp_predictor.h b/ydb/core/kqp/query_data/kqp_predictor.h
index 8195583876f..8195583876f 100644
--- a/ydb/core/kqp/query_compiler/kqp_predictor.h
+++ b/ydb/core/kqp/query_data/kqp_predictor.h
diff --git a/ydb/core/kqp/common/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp
index 594179454bb..3c615c0088d 100644
--- a/ydb/core/kqp/common/kqp_prepared_query.cpp
+++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp
@@ -6,6 +6,9 @@
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/mkql_proto/mkql_proto.h>
#include <ydb/core/protos/kqp_physical.pb.h>
+#include <ydb/core/protos/services.pb.h>
+
+#include <library/cpp/actors/core/log.h>
namespace NKikimr::NKqp {
@@ -55,6 +58,16 @@ TKqpPhyTxHolder::TKqpPhyTxHolder(const std::shared_ptr<const NKikimrKqp::TPrepar
, Alloc(alloc)
{
TxResultsMeta.resize(Proto->GetResults().size());
+ for (auto&& i : Proto->GetStages()) {
+ TStagePredictor predictor;
+ if (!predictor.DeserializeFromKqpSettings(i.GetProgram().GetSettings())) {
+ ALS_ERROR(NKikimrServices::KQP_EXECUTER) << "cannot parse program settings for data prediction";
+ Predictors.emplace_back();
+ } else {
+ Predictors.emplace_back(std::move(predictor));
+ }
+ }
+
ui32 i = 0;
for (const auto& txResult : Proto->GetResults()) {
auto& result = TxResultsMeta[i++];
@@ -81,6 +94,11 @@ bool TKqpPhyTxHolder::IsPureTx() const {
return PureTx;
}
+const NKikimr::NKqp::TStagePredictor& TKqpPhyTxHolder::GetCalculationPredictor(const size_t stageIdx) const {
+ YQL_ENSURE(stageIdx < Predictors.size(), "incorrect stage idx for predictor");
+ return Predictors[stageIdx];
+}
+
TPreparedQueryHolder::TPreparedQueryHolder(NKikimrKqp::TPreparedQuery* proto,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry)
: Proto(proto)
diff --git a/ydb/core/kqp/common/kqp_prepared_query.h b/ydb/core/kqp/query_data/kqp_prepared_query.h
index 2d1475c58aa..4705cafa669 100644
--- a/ydb/core/kqp/common/kqp_prepared_query.h
+++ b/ydb/core/kqp/query_data/kqp_prepared_query.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/core/kqp/query_data/kqp_predictor.h>
#include <ydb/core/protos/kqp.pb.h>
#include <util/generic/vector.h>
@@ -36,10 +37,12 @@ class TKqpPhyTxHolder {
bool PureTx = false;
TVector<TPhyTxResultMetadata> TxResultsMeta;
std::shared_ptr<TPreparedQueryAllocHolder> Alloc;
-
+ std::vector<TStagePredictor> Predictors;
public:
using TConstPtr = std::shared_ptr<const TKqpPhyTxHolder>;
+ const TStagePredictor& GetCalculationPredictor(const size_t stageIdx) const;
+
const TVector<TPhyTxResultMetadata>& GetTxResultsMeta() const { return TxResultsMeta; }
const NKqpProto::TKqpPhyStage& GetStages(size_t index) const {
diff --git a/ydb/core/kqp/gateway/kqp_query_data.cpp b/ydb/core/kqp/query_data/kqp_query_data.cpp
index d16c3eabc8d..d16c3eabc8d 100644
--- a/ydb/core/kqp/gateway/kqp_query_data.cpp
+++ b/ydb/core/kqp/query_data/kqp_query_data.cpp
diff --git a/ydb/core/kqp/gateway/kqp_query_data.h b/ydb/core/kqp/query_data/kqp_query_data.h
index ec25b7e8155..de2ca09978a 100644
--- a/ydb/core/kqp/gateway/kqp_query_data.h
+++ b/ydb/core/kqp/query_data/kqp_query_data.h
@@ -1,10 +1,10 @@
#pragma once
+#include <ydb/core/kqp/query_data/kqp_prepared_query.h>
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/mkql_proto/protos/minikql.pb.h>
-#include <ydb/core/kqp/common/kqp_prepared_query.h>
#include <library/cpp/random_provider/random_provider.h>
#include <library/cpp/time_provider/time_provider.h>
diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
index 9907f4e0bf0..4f63059d8fa 100644
--- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h
+++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
@@ -64,6 +64,16 @@ struct TStageInfo : private TMoveOnly {
TVector<ui64> Tasks;
TStageInfoMeta Meta;
+
+ TString DebugString() const {
+ // TODO: Print stage details, including input types and program.
+ TStringBuilder result;
+ result << "StageInfo: StageId #" << Id
+ << ", InputsCount: " << InputsCount
+ << ", OutputsCount: " << OutputsCount;
+ return result;
+ }
+
};
struct TChannel {