aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-04-05 00:15:48 +0300
committergvit <gvit@ydb.tech>2023-04-05 00:15:48 +0300
commita6800d9e4bb42789f36e52866d29ddf0b97f8a71 (patch)
tree33382d2fe5fb30c3b618ed688b1c9144061a26cd
parentd4d3f75e15c428924032f43a71439f301b319edb (diff)
downloadydb-a6800d9e4bb42789f36e52866d29ddf0b97f8a71.tar.gz
move task serialization code to separate methods
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp98
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp48
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h10
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp131
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp234
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.h6
6 files changed, 251 insertions, 276 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index ca22a1bd79c..1daefa0ff7d 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1742,107 +1742,11 @@ private:
for (auto& task : TasksGraph.GetTasks()) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
- NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, ResultChannelProxies, task, TypeEnv());
- ActorIdToProto(SelfId(), taskDesc.MutableExecuter()->MutableActorId());
+ NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task, TableKeys, ResultChannelProxies, TypeEnv());
if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) {
- NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta protoTaskMeta;
-
- FillTableMeta(stageInfo, protoTaskMeta.MutableTable());
-
- if (task.Meta.Reads) {
- for (auto& read : *task.Meta.Reads) {
- auto* protoReadMeta = protoTaskMeta.AddReads();
- read.Ranges.SerializeTo(protoReadMeta->MutableRange());
- for (auto& column : read.Columns) {
- auto* protoColumn = protoReadMeta->AddColumns();
- protoColumn->SetId(column.Id);
- auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.Type, column.TypeMod);
- protoColumn->SetType(columnType.TypeId);
- if (columnType.TypeInfo) {
- *protoColumn->MutableTypeInfo() = *columnType.TypeInfo;
- }
- protoColumn->SetName(column.Name);
- }
- protoReadMeta->SetItemsLimit(task.Meta.ReadInfo.ItemsLimit);
- protoReadMeta->SetReverse(task.Meta.ReadInfo.Reverse);
- }
- }
- if (task.Meta.Writes) {
- auto* protoWrites = protoTaskMeta.MutableWrites();
- task.Meta.Writes->Ranges.SerializeTo(protoWrites->MutableRange());
- if (task.Meta.Writes->IsPureEraseOp()) {
- protoWrites->SetIsPureEraseOp(true);
- }
-
- for (const auto& [_, columnWrite] : task.Meta.Writes->ColumnWrites) {
- auto& protoColumnWrite = *protoWrites->AddColumns();
-
- auto& protoColumn = *protoColumnWrite.MutableColumn();
- protoColumn.SetId(columnWrite.Column.Id);
- auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(columnWrite.Column.Type, columnWrite.Column.TypeMod);
- protoColumn.SetType(columnType.TypeId);
- if (columnType.TypeInfo) {
- *protoColumn.MutableTypeInfo() = *columnType.TypeInfo;
- }
- protoColumn.SetName(columnWrite.Column.Name);
-
- protoColumnWrite.SetMaxValueSizeBytes(columnWrite.MaxValueSizeBytes);
- }
- }
-
- taskDesc.MutableMeta()->PackFrom(protoTaskMeta);
- LOG_D("Task: " << task.Id << ", shard: " << task.Meta.ShardId << ", meta: " << protoTaskMeta.ShortDebugString());
-
datashardTasks[task.Meta.ShardId].emplace_back(std::move(taskDesc));
} else if (stageInfo.Meta.IsSysView()) {
- NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta protoTaskMeta;
-
- FillTableMeta(stageInfo, protoTaskMeta.MutableTable());
-
- const auto& tableInfo = TableKeys.GetTable(stageInfo.Meta.TableId);
- for (const auto& keyColumnName : tableInfo.KeyColumns) {
- const auto& keyColumn = tableInfo.Columns.at(keyColumnName);
- auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(keyColumn.Type, keyColumn.TypeMod);
- protoTaskMeta.AddKeyColumnTypes(columnType.TypeId);
- if (columnType.TypeInfo) {
- *protoTaskMeta.AddKeyColumnTypeInfos() = *columnType.TypeInfo;
- }
- }
-
- for (bool skipNullKey : stageInfo.Meta.SkipNullKeys) {
- protoTaskMeta.AddSkipNullKeys(skipNullKey);
- }
-
- YQL_ENSURE(task.Meta.Reads);
- YQL_ENSURE(!task.Meta.Writes);
-
- for (auto& column : task.Meta.Reads->front().Columns) {
- auto* protoColumn = protoTaskMeta.AddColumns();
- protoColumn->SetId(column.Id);
- auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.Type, column.TypeMod);
- protoColumn->SetType(columnType.TypeId);
- if (columnType.TypeInfo) {
- *protoColumn->MutableTypeInfo() = *columnType.TypeInfo;
- }
- protoColumn->SetName(column.Name);
- }
-
- for (auto& read : *task.Meta.Reads) {
- auto* protoReadMeta = protoTaskMeta.AddReads();
- protoReadMeta->SetShardId(read.ShardId);
- read.Ranges.SerializeTo(protoReadMeta);
-
- YQL_ENSURE((int) read.Columns.size() == protoTaskMeta.GetColumns().size());
- for (ui64 i = 0; i < read.Columns.size(); ++i) {
- YQL_ENSURE(read.Columns[i].Id == protoTaskMeta.GetColumns()[i].GetId());
- YQL_ENSURE(read.Columns[i].Type.GetTypeId() == protoTaskMeta.GetColumns()[i].GetType());
- }
- }
-
- LOG_D("task: " << task.Id << ", node: " << task.Meta.NodeId << ", meta: " << protoTaskMeta.ShortDebugString());
-
- taskDesc.MutableMeta()->PackFrom(protoTaskMeta);
computeTasks.emplace_back(std::move(taskDesc));
} else {
if (task.Meta.ShardId) {
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 1d597aa7584..47e93cbf112 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -61,54 +61,6 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnb
serializer.Deserialize(buffer, txResult.MkqlItemType, txResult.Rows);
}
-std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task)
-{
- const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
- std::vector<std::shared_ptr<arrow::Field>> columns;
- std::vector<std::shared_ptr<arrow::Array>> data;
- auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames;
-
- columns.reserve(parameterNames.size());
- data.reserve(parameterNames.size());
-
- for (auto& name : stage.GetProgramParameters()) {
- if (!parameterNames.contains(name)) {
- continue;
- }
-
- if (auto* taskParam = task.Meta.Params.FindPtr(name)) {
- // This parameter is the list, holding type from task.Meta.ParamTypes
- // Those parameters can't be used in Olap programs now
- YQL_ENSURE(false, "OLAP program contains task parameter, not supported yet.");
- continue;
- }
-
- auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name);
- YQL_ENSURE(NYql::NArrow::IsArrowCompatible(type), "Incompatible parameter type. Can't convert to arrow");
-
- std::unique_ptr<arrow::ArrayBuilder> builder = NYql::NArrow::MakeArrowBuilder(type);
- NYql::NArrow::AppendElement(value, builder.get(), type);
-
- std::shared_ptr<arrow::Array> array;
- auto status = builder->Finish(&array);
-
- YQL_ENSURE(status.ok(), "Failed to build arrow array of variables.");
-
- auto field = std::make_shared<arrow::Field>(name, array->type());
-
- columns.emplace_back(std::move(field));
- data.emplace_back(std::move(array));
- }
-
- auto schema = std::make_shared<arrow::Schema>(std::move(columns));
- auto recordBatch = arrow::RecordBatch::Make(schema, 1, data);
-
- return std::make_pair<TString, TString>(
- NArrow::SerializeSchema(*schema),
- NArrow::SerializeBatchNoCompression(recordBatch)
- );
-}
-
TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,
const NKikimrKqp::TRlPath& path)
{
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index e99e322462f..c31ea277956 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -802,16 +802,6 @@ protected:
}
protected:
- void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta) {
- meta->SetTablePath(stageInfo.Meta.TablePath);
- meta->MutableTableId()->SetTableId(stageInfo.Meta.TableId.PathId.LocalPathId);
- meta->MutableTableId()->SetOwnerId(stageInfo.Meta.TableId.PathId.OwnerId);
- meta->SetSchemaVersion(stageInfo.Meta.TableId.SchemaVersion);
- meta->SetSysViewInfo(stageInfo.Meta.TableId.SysViewInfo);
- meta->SetTableKind((ui32)stageInfo.Meta.TableKind);
- }
-
-protected:
void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
for (const auto& task : this->TasksGraph.GetTasks()) {
if (task.ComputeActorId) {
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 76c7dd7efe4..189c1efca66 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -37,17 +37,6 @@ using namespace NYql::NDq;
namespace {
-NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::EReadType ReadTypeToProto(const TTaskMeta::TReadInfo::EReadType& type) {
- switch (type) {
- case TTaskMeta::TReadInfo::EReadType::Rows:
- return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::ROWS;
- case TTaskMeta::TReadInfo::EReadType::Blocks:
- return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::BLOCKS;
- }
-
- YQL_ENSURE(false, "Invalid read type in task meta.");
-}
-
TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpReadOlapRanges::EReadType& type) {
switch (type) {
case NKqpProto::TKqpPhyOpReadOlapRanges::ROWS:
@@ -253,6 +242,7 @@ private:
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.ExecuterId = SelfId();
task.Meta.NodeId = nodeId;
+ task.Meta.ScanTask = true;
return task;
}
@@ -262,6 +252,7 @@ private:
if (cnt < maxScansPerNode) {
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.NodeId = nodeId;
+ task.Meta.ScanTask = true;
tasks.push_back(task.Id);
++cnt;
return task;
@@ -525,50 +516,9 @@ private:
for (auto& task : TasksGraph.GetTasks()) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
- NYql::NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, ResultChannelProxies, task, TypeEnv());
- ActorIdToProto(SelfId(), taskDesc.MutableExecuter()->MutableActorId());
+ NYql::NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task, TableKeys, ResultChannelProxies, TypeEnv());
if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) {
- NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta protoTaskMeta;
-
- FillTableMeta(stageInfo, protoTaskMeta.MutableTable());
-
- const auto& tableInfo = TableKeys.GetTable(stageInfo.Meta.TableId);
- for (const auto& keyColumnName : tableInfo.KeyColumns) {
- const auto& keyColumn = tableInfo.Columns.at(keyColumnName);
- auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(keyColumn.Type, keyColumn.TypeMod);
- protoTaskMeta.AddKeyColumnTypes(columnType.TypeId);
- if (columnType.TypeInfo) {
- *protoTaskMeta.AddKeyColumnTypeInfos() = *columnType.TypeInfo;
- } else {
- *protoTaskMeta.AddKeyColumnTypeInfos() = NKikimrProto::TTypeInfo();
- }
- }
-
- switch (tableInfo.TableKind) {
- case ETableKind::Unknown:
- case ETableKind::SysView: {
- protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
- break;
- }
- case ETableKind::Datashard: {
- if (AppData()->FeatureFlags.GetEnableArrowFormatAtDatashard()) {
- protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW);
- } else {
- protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
- }
- break;
- }
- case ETableKind::Olap: {
- protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW);
- break;
- }
- }
-
- for (bool skipNullKey : stageInfo.Meta.SkipNullKeys) {
- protoTaskMeta.AddSkipNullKeys(skipNullKey);
- }
-
// Task with source
if (!task.Meta.Reads) {
scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc));
@@ -576,83 +526,26 @@ private:
continue;
}
- YQL_ENSURE(!task.Meta.Writes);
-
- if (!task.Meta.Reads->empty()) {
- protoTaskMeta.SetReverse(task.Meta.ReadInfo.Reverse);
- protoTaskMeta.SetItemsLimit(task.Meta.ReadInfo.ItemsLimit);
- protoTaskMeta.SetSorted(task.Meta.ReadInfo.Sorted);
- protoTaskMeta.SetReadType(ReadTypeToProto(task.Meta.ReadInfo.ReadType));
-
- for (auto columnType : task.Meta.ReadInfo.ResultColumnsTypes) {
- auto* protoResultColumn = protoTaskMeta.AddResultColumns();
- protoResultColumn->SetId(0);
- auto protoColumnType = NScheme::ProtoColumnTypeFromTypeInfoMod(columnType, "");
- protoResultColumn->SetType(protoColumnType.TypeId);
- if (protoColumnType.TypeInfo) {
- *protoResultColumn->MutableTypeInfo() = *protoColumnType.TypeInfo;
- }
- }
-
- if (tableInfo.TableKind == ETableKind::Olap) {
- auto* olapProgram = protoTaskMeta.MutableOlapProgram();
- olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program);
-
- auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task);
- olapProgram->SetParametersSchema(schema);
- olapProgram->SetParameters(parameters);
- } else {
- YQL_ENSURE(task.Meta.ReadInfo.OlapProgram.Program.empty());
- }
-
- for (auto& column : task.Meta.Reads->front().Columns) {
- auto* protoColumn = protoTaskMeta.AddColumns();
- protoColumn->SetId(column.Id);
- auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.Type, "");
- protoColumn->SetType(columnType.TypeId);
- if (columnType.TypeInfo) {
- *protoColumn->MutableTypeInfo() = *columnType.TypeInfo;
- }
- protoColumn->SetName(column.Name);
- }
- }
-
- for (auto& read : *task.Meta.Reads) {
- auto* protoReadMeta = protoTaskMeta.AddReads();
- protoReadMeta->SetShardId(read.ShardId);
- read.Ranges.SerializeTo(protoReadMeta);
-
- YQL_ENSURE((int) read.Columns.size() == protoTaskMeta.GetColumns().size());
- for (ui64 i = 0; i < read.Columns.size(); ++i) {
- YQL_ENSURE(read.Columns[i].Id == protoTaskMeta.GetColumns()[i].GetId());
- YQL_ENSURE(read.Columns[i].Type.GetTypeId() == protoTaskMeta.GetColumns()[i].GetType());
- }
-
- nShardScans++;
- if (Stats) {
- Stats->AffectedShards.insert(read.ShardId);
- }
- }
-
- LOG_D(
- "task: " << task.Id <<
- ", node: " << task.Meta.NodeId <<
- ", meta: " << protoTaskMeta.ShortDebugString()
- );
-
- taskDesc.MutableMeta()->PackFrom(protoTaskMeta);
-
if (stageInfo.Meta.IsSysView()) {
computeTasks.emplace_back(std::move(taskDesc));
} else {
scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc));
nScanTasks++;
}
+
+ nShardScans += task.Meta.Reads->size();
+ if (Stats) {
+ for(const auto& read: *task.Meta.Reads) {
+ Stats->AffectedShards.insert(read.ShardId);
+ }
+ }
+
} else {
computeTasks.emplace_back(std::move(taskDesc));
}
}
+
if (computeTasks.size() + nScanTasks > Request.MaxComputeActors) {
LOG_N("Too many compute actors: computeTasks=" << computeTasks.size() << ", scanTasks=" << nScanTasks);
TBase::ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 43f0a29060a..5b3cee3a0af 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/tx/datashard/range_ops.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
+#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h>
#include <library/cpp/actors/core/log.h>
@@ -21,6 +22,77 @@ void LogStage(const NActors::TActorContext& ctx, const TStageInfo& stageInfo) {
LOG_DEBUG_S(ctx, NKikimrServices::KQP_EXECUTER, stageInfo.DebugString());
}
+NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::EReadType ReadTypeToProto(const TTaskMeta::TReadInfo::EReadType& type) {
+ switch (type) {
+ case TTaskMeta::TReadInfo::EReadType::Rows:
+ return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::ROWS;
+ case TTaskMeta::TReadInfo::EReadType::Blocks:
+ return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::BLOCKS;
+ }
+
+ YQL_ENSURE(false, "Invalid read type in task meta.");
+}
+
+TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpReadOlapRanges::EReadType& type) {
+ switch (type) {
+ case NKqpProto::TKqpPhyOpReadOlapRanges::ROWS:
+ return TTaskMeta::TReadInfo::EReadType::Rows;
+ case NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS:
+ return TTaskMeta::TReadInfo::EReadType::Blocks;
+ default:
+ YQL_ENSURE(false, "Invalid read type from TKqpPhyOpReadOlapRanges protobuf.");
+ }
+}
+
+
+std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task)
+{
+ const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
+ std::vector<std::shared_ptr<arrow::Field>> columns;
+ std::vector<std::shared_ptr<arrow::Array>> data;
+ auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames;
+
+ columns.reserve(parameterNames.size());
+ data.reserve(parameterNames.size());
+
+ for (auto& name : stage.GetProgramParameters()) {
+ if (!parameterNames.contains(name)) {
+ continue;
+ }
+
+ if (auto* taskParam = task.Meta.Params.FindPtr(name)) {
+ // This parameter is the list, holding type from task.Meta.ParamTypes
+ // Those parameters can't be used in Olap programs now
+ YQL_ENSURE(false, "OLAP program contains task parameter, not supported yet.");
+ continue;
+ }
+
+ auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name);
+ YQL_ENSURE(NYql::NArrow::IsArrowCompatible(type), "Incompatible parameter type. Can't convert to arrow");
+
+ std::unique_ptr<arrow::ArrayBuilder> builder = NYql::NArrow::MakeArrowBuilder(type);
+ NYql::NArrow::AppendElement(value, builder.get(), type);
+
+ std::shared_ptr<arrow::Array> array;
+ auto status = builder->Finish(&array);
+
+ YQL_ENSURE(status.ok(), "Failed to build arrow array of variables.");
+
+ auto field = std::make_shared<arrow::Field>(name, array->type());
+
+ columns.emplace_back(std::move(field));
+ data.emplace_back(std::move(array));
+ }
+
+ auto schema = std::make_shared<arrow::Schema>(std::move(columns));
+ auto recordBatch = arrow::RecordBatch::Make(schema, 1, data);
+
+ return std::make_pair<TString, TString>(
+ NArrow::SerializeSchema(*schema),
+ NArrow::SerializeBatchNoCompression(recordBatch)
+ );
+}
+
void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs) {
for (size_t txIdx = 0; txIdx < txs.size(); ++txIdx) {
auto& tx = txs[txIdx];
@@ -605,14 +677,171 @@ void FillChannelDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<
channelDesc.SetInMemory(channel.InMemory);
}
+void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta) {
+ meta->SetTablePath(stageInfo.Meta.TablePath);
+ meta->MutableTableId()->SetTableId(stageInfo.Meta.TableId.PathId.LocalPathId);
+ meta->MutableTableId()->SetOwnerId(stageInfo.Meta.TableId.PathId.OwnerId);
+ meta->SetSchemaVersion(stageInfo.Meta.TableId.SchemaVersion);
+ meta->SetSysViewInfo(stageInfo.Meta.TableId.SysViewInfo);
+ meta->SetTableKind((ui32)stageInfo.Meta.TableKind);
+}
+
+void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTableKeys& tableKeys, NYql::NDqProto::TDqTask& taskDesc) {
+ if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) {
+ NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta protoTaskMeta;
+
+ FillTableMeta(stageInfo, protoTaskMeta.MutableTable());
+
+ if (task.Meta.Reads) {
+ for (auto& read : *task.Meta.Reads) {
+ auto* protoReadMeta = protoTaskMeta.AddReads();
+ read.Ranges.SerializeTo(protoReadMeta->MutableRange());
+ for (auto& column : read.Columns) {
+ auto* protoColumn = protoReadMeta->AddColumns();
+ protoColumn->SetId(column.Id);
+ auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.Type, column.TypeMod);
+ protoColumn->SetType(columnType.TypeId);
+ if (columnType.TypeInfo) {
+ *protoColumn->MutableTypeInfo() = *columnType.TypeInfo;
+ }
+ protoColumn->SetName(column.Name);
+ }
+ protoReadMeta->SetItemsLimit(task.Meta.ReadInfo.ItemsLimit);
+ protoReadMeta->SetReverse(task.Meta.ReadInfo.Reverse);
+ }
+ }
+ if (task.Meta.Writes) {
+ auto* protoWrites = protoTaskMeta.MutableWrites();
+ task.Meta.Writes->Ranges.SerializeTo(protoWrites->MutableRange());
+ if (task.Meta.Writes->IsPureEraseOp()) {
+ protoWrites->SetIsPureEraseOp(true);
+ }
+
+ for (const auto& [_, columnWrite] : task.Meta.Writes->ColumnWrites) {
+ auto& protoColumnWrite = *protoWrites->AddColumns();
+
+ auto& protoColumn = *protoColumnWrite.MutableColumn();
+ protoColumn.SetId(columnWrite.Column.Id);
+ auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(columnWrite.Column.Type, columnWrite.Column.TypeMod);
+ protoColumn.SetType(columnType.TypeId);
+ if (columnType.TypeInfo) {
+ *protoColumn.MutableTypeInfo() = *columnType.TypeInfo;
+ }
+ protoColumn.SetName(columnWrite.Column.Name);
+
+ protoColumnWrite.SetMaxValueSizeBytes(columnWrite.MaxValueSizeBytes);
+ }
+ }
+
+ taskDesc.MutableMeta()->PackFrom(protoTaskMeta);
+ } else if (task.Meta.ScanTask || stageInfo.Meta.IsSysView()) {
+ NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta protoTaskMeta;
+
+ FillTableMeta(stageInfo, protoTaskMeta.MutableTable());
+
+ const auto& tableInfo = tableKeys.GetTable(stageInfo.Meta.TableId);
+ for (const auto& keyColumnName : tableInfo.KeyColumns) {
+ const auto& keyColumn = tableInfo.Columns.at(keyColumnName);
+ auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(keyColumn.Type, keyColumn.TypeMod);
+ protoTaskMeta.AddKeyColumnTypes(columnType.TypeId);
+ if (columnType.TypeInfo) {
+ *protoTaskMeta.AddKeyColumnTypeInfos() = *columnType.TypeInfo;
+ }
+ }
+
+ for (bool skipNullKey : stageInfo.Meta.SkipNullKeys) {
+ protoTaskMeta.AddSkipNullKeys(skipNullKey);
+ }
+
+ switch (tableInfo.TableKind) {
+ case ETableKind::Unknown:
+ case ETableKind::SysView: {
+ protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
+ break;
+ }
+ case ETableKind::Datashard: {
+ if (AppData()->FeatureFlags.GetEnableArrowFormatAtDatashard()) {
+ protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW);
+ } else {
+ protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
+ }
+ break;
+ }
+ case ETableKind::Olap: {
+ protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW);
+ break;
+ }
+ }
+
+ YQL_ENSURE(!task.Meta.Writes);
+
+ if (!task.Meta.Reads->empty()) {
+ protoTaskMeta.SetReverse(task.Meta.ReadInfo.Reverse);
+ protoTaskMeta.SetItemsLimit(task.Meta.ReadInfo.ItemsLimit);
+ protoTaskMeta.SetSorted(task.Meta.ReadInfo.Sorted);
+ protoTaskMeta.SetReadType(ReadTypeToProto(task.Meta.ReadInfo.ReadType));
+
+ for (auto columnType : task.Meta.ReadInfo.ResultColumnsTypes) {
+ auto* protoResultColumn = protoTaskMeta.AddResultColumns();
+ protoResultColumn->SetId(0);
+ auto protoColumnType = NScheme::ProtoColumnTypeFromTypeInfoMod(columnType, "");
+ protoResultColumn->SetType(protoColumnType.TypeId);
+ if (protoColumnType.TypeInfo) {
+ *protoResultColumn->MutableTypeInfo() = *protoColumnType.TypeInfo;
+ }
+ }
+
+ if (tableInfo.TableKind == ETableKind::Olap) {
+ auto* olapProgram = protoTaskMeta.MutableOlapProgram();
+ olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program);
+
+ auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task);
+ olapProgram->SetParametersSchema(schema);
+ olapProgram->SetParameters(parameters);
+ } else {
+ YQL_ENSURE(task.Meta.ReadInfo.OlapProgram.Program.empty());
+ }
+
+ for (auto& column : task.Meta.Reads->front().Columns) {
+ auto* protoColumn = protoTaskMeta.AddColumns();
+ protoColumn->SetId(column.Id);
+ auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.Type, "");
+ protoColumn->SetType(columnType.TypeId);
+ if (columnType.TypeInfo) {
+ *protoColumn->MutableTypeInfo() = *columnType.TypeInfo;
+ }
+ protoColumn->SetName(column.Name);
+ }
+ }
+
+ for (auto& read : *task.Meta.Reads) {
+ auto* protoReadMeta = protoTaskMeta.AddReads();
+ protoReadMeta->SetShardId(read.ShardId);
+ read.Ranges.SerializeTo(protoReadMeta);
+
+ YQL_ENSURE((int) read.Columns.size() == protoTaskMeta.GetColumns().size());
+ for (ui64 i = 0; i < read.Columns.size(); ++i) {
+ YQL_ENSURE(read.Columns[i].Id == protoTaskMeta.GetColumns()[i].GetId());
+ YQL_ENSURE(read.Columns[i].Type.GetTypeId() == protoTaskMeta.GetColumns()[i].GetType());
+ }
+ }
+
+
+ taskDesc.MutableMeta()->PackFrom(protoTaskMeta);
+ }
+}
+
NYql::NDqProto::TDqTask SerializeTaskToProto(
const TKqpTasksGraph& tasksGraph,
+ const TTask& task,
+ const TKqpTableKeys& tableKeys,
const std::unordered_map<ui64, IActor*>& resultChannelProxies,
- const TTask& task, const NMiniKQL::TTypeEnvironment& typeEnv)
+ const NMiniKQL::TTypeEnvironment& typeEnv)
{
auto& stageInfo = tasksGraph.GetStageInfo(task.StageId);
NYql::NDqProto::TDqTask result;
+ ActorIdToProto(task.Meta.ExecuterId, result.MutableExecuter()->MutableActorId());
result.SetId(task.Id);
result.SetStageId(stageInfo.Id.StageId);
@@ -643,6 +872,9 @@ NYql::NDqProto::TDqTask SerializeTaskToProto(
dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName);
}
}
+
+ FillTaskMeta(stageInfo, task, tableKeys, result);
+
return result;
}
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
index 6cfe34f4bf1..f81c0f85104 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h
@@ -121,6 +121,7 @@ struct TShardKeyRanges {
struct TTaskMeta {
ui64 ShardId = 0; // only in case of non-scans (data-query & legacy scans)
ui64 NodeId = 0; // only in case of scans over persistent snapshots
+ bool ScanTask = false;
TActorId ExecuterId;
TMap<TString, NYql::NDqProto::TData> Params;
@@ -202,7 +203,10 @@ void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const TKqpPhyTx
void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo,
ui64 txId, bool enableSpilling);
-NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, const TTask& task, const NMiniKQL::TTypeEnvironment& typeEnv);
+NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task,
+ const TKqpTableKeys& tableKeys,
+ const std::unordered_map<ui64, IActor*>& resultChannelProxies, const NMiniKQL::TTypeEnvironment& typeEnv);
+void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta);
void FillChannelDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel);
void FillInputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskInput& inputDesc, const TTaskInput& input);
void FillOutputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output);