diff options
author | gvit <gvit@ydb.tech> | 2023-04-05 00:15:48 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-04-05 00:15:48 +0300 |
commit | a6800d9e4bb42789f36e52866d29ddf0b97f8a71 (patch) | |
tree | 33382d2fe5fb30c3b618ed688b1c9144061a26cd | |
parent | d4d3f75e15c428924032f43a71439f301b319edb (diff) | |
download | ydb-a6800d9e4bb42789f36e52866d29ddf0b97f8a71.tar.gz |
move task serialization code to separate methods
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 98 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 48 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 10 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 131 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp | 234 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 6 |
6 files changed, 251 insertions, 276 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index ca22a1bd79c..1daefa0ff7d 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1742,107 +1742,11 @@ private: for (auto& task : TasksGraph.GetTasks()) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); - NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, ResultChannelProxies, task, TypeEnv()); - ActorIdToProto(SelfId(), taskDesc.MutableExecuter()->MutableActorId()); + NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task, TableKeys, ResultChannelProxies, TypeEnv()); if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) { - NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta protoTaskMeta; - - FillTableMeta(stageInfo, protoTaskMeta.MutableTable()); - - if (task.Meta.Reads) { - for (auto& read : *task.Meta.Reads) { - auto* protoReadMeta = protoTaskMeta.AddReads(); - read.Ranges.SerializeTo(protoReadMeta->MutableRange()); - for (auto& column : read.Columns) { - auto* protoColumn = protoReadMeta->AddColumns(); - protoColumn->SetId(column.Id); - auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.Type, column.TypeMod); - protoColumn->SetType(columnType.TypeId); - if (columnType.TypeInfo) { - *protoColumn->MutableTypeInfo() = *columnType.TypeInfo; - } - protoColumn->SetName(column.Name); - } - protoReadMeta->SetItemsLimit(task.Meta.ReadInfo.ItemsLimit); - protoReadMeta->SetReverse(task.Meta.ReadInfo.Reverse); - } - } - if (task.Meta.Writes) { - auto* protoWrites = protoTaskMeta.MutableWrites(); - task.Meta.Writes->Ranges.SerializeTo(protoWrites->MutableRange()); - if (task.Meta.Writes->IsPureEraseOp()) { - protoWrites->SetIsPureEraseOp(true); - } - - for (const auto& [_, columnWrite] : task.Meta.Writes->ColumnWrites) { - auto& protoColumnWrite = *protoWrites->AddColumns(); - - auto& protoColumn = *protoColumnWrite.MutableColumn(); - protoColumn.SetId(columnWrite.Column.Id); - auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(columnWrite.Column.Type, columnWrite.Column.TypeMod); - protoColumn.SetType(columnType.TypeId); - if (columnType.TypeInfo) { - *protoColumn.MutableTypeInfo() = *columnType.TypeInfo; - } - protoColumn.SetName(columnWrite.Column.Name); - - protoColumnWrite.SetMaxValueSizeBytes(columnWrite.MaxValueSizeBytes); - } - } - - taskDesc.MutableMeta()->PackFrom(protoTaskMeta); - LOG_D("Task: " << task.Id << ", shard: " << task.Meta.ShardId << ", meta: " << protoTaskMeta.ShortDebugString()); - datashardTasks[task.Meta.ShardId].emplace_back(std::move(taskDesc)); } else if (stageInfo.Meta.IsSysView()) { - NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta protoTaskMeta; - - FillTableMeta(stageInfo, protoTaskMeta.MutableTable()); - - const auto& tableInfo = TableKeys.GetTable(stageInfo.Meta.TableId); - for (const auto& keyColumnName : tableInfo.KeyColumns) { - const auto& keyColumn = tableInfo.Columns.at(keyColumnName); - auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(keyColumn.Type, keyColumn.TypeMod); - protoTaskMeta.AddKeyColumnTypes(columnType.TypeId); - if (columnType.TypeInfo) { - *protoTaskMeta.AddKeyColumnTypeInfos() = *columnType.TypeInfo; - } - } - - for (bool skipNullKey : stageInfo.Meta.SkipNullKeys) { - protoTaskMeta.AddSkipNullKeys(skipNullKey); - } - - YQL_ENSURE(task.Meta.Reads); - YQL_ENSURE(!task.Meta.Writes); - - for (auto& column : task.Meta.Reads->front().Columns) { - auto* protoColumn = protoTaskMeta.AddColumns(); - protoColumn->SetId(column.Id); - auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.Type, column.TypeMod); - protoColumn->SetType(columnType.TypeId); - if (columnType.TypeInfo) { - *protoColumn->MutableTypeInfo() = *columnType.TypeInfo; - } - protoColumn->SetName(column.Name); - } - - for (auto& read : *task.Meta.Reads) { - auto* protoReadMeta = protoTaskMeta.AddReads(); - protoReadMeta->SetShardId(read.ShardId); - read.Ranges.SerializeTo(protoReadMeta); - - YQL_ENSURE((int) read.Columns.size() == protoTaskMeta.GetColumns().size()); - for (ui64 i = 0; i < read.Columns.size(); ++i) { - YQL_ENSURE(read.Columns[i].Id == protoTaskMeta.GetColumns()[i].GetId()); - YQL_ENSURE(read.Columns[i].Type.GetTypeId() == protoTaskMeta.GetColumns()[i].GetType()); - } - } - - LOG_D("task: " << task.Id << ", node: " << task.Meta.NodeId << ", meta: " << protoTaskMeta.ShortDebugString()); - - taskDesc.MutableMeta()->PackFrom(protoTaskMeta); computeTasks.emplace_back(std::move(taskDesc)); } else { if (task.Meta.ShardId) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 1d597aa7584..47e93cbf112 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -61,54 +61,6 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnb serializer.Deserialize(buffer, txResult.MkqlItemType, txResult.Rows); } -std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task) -{ - const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); - std::vector<std::shared_ptr<arrow::Field>> columns; - std::vector<std::shared_ptr<arrow::Array>> data; - auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames; - - columns.reserve(parameterNames.size()); - data.reserve(parameterNames.size()); - - for (auto& name : stage.GetProgramParameters()) { - if (!parameterNames.contains(name)) { - continue; - } - - if (auto* taskParam = task.Meta.Params.FindPtr(name)) { - // This parameter is the list, holding type from task.Meta.ParamTypes - // Those parameters can't be used in Olap programs now - YQL_ENSURE(false, "OLAP program contains task parameter, not supported yet."); - continue; - } - - auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name); - YQL_ENSURE(NYql::NArrow::IsArrowCompatible(type), "Incompatible parameter type. Can't convert to arrow"); - - std::unique_ptr<arrow::ArrayBuilder> builder = NYql::NArrow::MakeArrowBuilder(type); - NYql::NArrow::AppendElement(value, builder.get(), type); - - std::shared_ptr<arrow::Array> array; - auto status = builder->Finish(&array); - - YQL_ENSURE(status.ok(), "Failed to build arrow array of variables."); - - auto field = std::make_shared<arrow::Field>(name, array->type()); - - columns.emplace_back(std::move(field)); - data.emplace_back(std::move(array)); - } - - auto schema = std::make_shared<arrow::Schema>(std::move(columns)); - auto recordBatch = arrow::RecordBatch::Make(schema, 1, data); - - return std::make_pair<TString, TString>( - NArrow::SerializeSchema(*schema), - NArrow::SerializeBatchNoCompression(recordBatch) - ); -} - TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken, const NKikimrKqp::TRlPath& path) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index e99e322462f..c31ea277956 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -802,16 +802,6 @@ protected: } protected: - void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta) { - meta->SetTablePath(stageInfo.Meta.TablePath); - meta->MutableTableId()->SetTableId(stageInfo.Meta.TableId.PathId.LocalPathId); - meta->MutableTableId()->SetOwnerId(stageInfo.Meta.TableId.PathId.OwnerId); - meta->SetSchemaVersion(stageInfo.Meta.TableId.SchemaVersion); - meta->SetSysViewInfo(stageInfo.Meta.TableId.SysViewInfo); - meta->SetTableKind((ui32)stageInfo.Meta.TableKind); - } - -protected: void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) { for (const auto& task : this->TasksGraph.GetTasks()) { if (task.ComputeActorId) { diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 76c7dd7efe4..189c1efca66 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -37,17 +37,6 @@ using namespace NYql::NDq; namespace { -NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::EReadType ReadTypeToProto(const TTaskMeta::TReadInfo::EReadType& type) { - switch (type) { - case TTaskMeta::TReadInfo::EReadType::Rows: - return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::ROWS; - case TTaskMeta::TReadInfo::EReadType::Blocks: - return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::BLOCKS; - } - - YQL_ENSURE(false, "Invalid read type in task meta."); -} - TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpReadOlapRanges::EReadType& type) { switch (type) { case NKqpProto::TKqpPhyOpReadOlapRanges::ROWS: @@ -253,6 +242,7 @@ private: auto& task = TasksGraph.AddTask(stageInfo); task.Meta.ExecuterId = SelfId(); task.Meta.NodeId = nodeId; + task.Meta.ScanTask = true; return task; } @@ -262,6 +252,7 @@ private: if (cnt < maxScansPerNode) { auto& task = TasksGraph.AddTask(stageInfo); task.Meta.NodeId = nodeId; + task.Meta.ScanTask = true; tasks.push_back(task.Id); ++cnt; return task; @@ -525,50 +516,9 @@ private: for (auto& task : TasksGraph.GetTasks()) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); - NYql::NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, ResultChannelProxies, task, TypeEnv()); - ActorIdToProto(SelfId(), taskDesc.MutableExecuter()->MutableActorId()); + NYql::NDqProto::TDqTask taskDesc = SerializeTaskToProto(TasksGraph, task, TableKeys, ResultChannelProxies, TypeEnv()); if (task.Meta.NodeId || stageInfo.Meta.IsSysView()) { - NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta protoTaskMeta; - - FillTableMeta(stageInfo, protoTaskMeta.MutableTable()); - - const auto& tableInfo = TableKeys.GetTable(stageInfo.Meta.TableId); - for (const auto& keyColumnName : tableInfo.KeyColumns) { - const auto& keyColumn = tableInfo.Columns.at(keyColumnName); - auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(keyColumn.Type, keyColumn.TypeMod); - protoTaskMeta.AddKeyColumnTypes(columnType.TypeId); - if (columnType.TypeInfo) { - *protoTaskMeta.AddKeyColumnTypeInfos() = *columnType.TypeInfo; - } else { - *protoTaskMeta.AddKeyColumnTypeInfos() = NKikimrProto::TTypeInfo(); - } - } - - switch (tableInfo.TableKind) { - case ETableKind::Unknown: - case ETableKind::SysView: { - protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); - break; - } - case ETableKind::Datashard: { - if (AppData()->FeatureFlags.GetEnableArrowFormatAtDatashard()) { - protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW); - } else { - protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); - } - break; - } - case ETableKind::Olap: { - protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW); - break; - } - } - - for (bool skipNullKey : stageInfo.Meta.SkipNullKeys) { - protoTaskMeta.AddSkipNullKeys(skipNullKey); - } - // Task with source if (!task.Meta.Reads) { scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc)); @@ -576,83 +526,26 @@ private: continue; } - YQL_ENSURE(!task.Meta.Writes); - - if (!task.Meta.Reads->empty()) { - protoTaskMeta.SetReverse(task.Meta.ReadInfo.Reverse); - protoTaskMeta.SetItemsLimit(task.Meta.ReadInfo.ItemsLimit); - protoTaskMeta.SetSorted(task.Meta.ReadInfo.Sorted); - protoTaskMeta.SetReadType(ReadTypeToProto(task.Meta.ReadInfo.ReadType)); - - for (auto columnType : task.Meta.ReadInfo.ResultColumnsTypes) { - auto* protoResultColumn = protoTaskMeta.AddResultColumns(); - protoResultColumn->SetId(0); - auto protoColumnType = NScheme::ProtoColumnTypeFromTypeInfoMod(columnType, ""); - protoResultColumn->SetType(protoColumnType.TypeId); - if (protoColumnType.TypeInfo) { - *protoResultColumn->MutableTypeInfo() = *protoColumnType.TypeInfo; - } - } - - if (tableInfo.TableKind == ETableKind::Olap) { - auto* olapProgram = protoTaskMeta.MutableOlapProgram(); - olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program); - - auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task); - olapProgram->SetParametersSchema(schema); - olapProgram->SetParameters(parameters); - } else { - YQL_ENSURE(task.Meta.ReadInfo.OlapProgram.Program.empty()); - } - - for (auto& column : task.Meta.Reads->front().Columns) { - auto* protoColumn = protoTaskMeta.AddColumns(); - protoColumn->SetId(column.Id); - auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.Type, ""); - protoColumn->SetType(columnType.TypeId); - if (columnType.TypeInfo) { - *protoColumn->MutableTypeInfo() = *columnType.TypeInfo; - } - protoColumn->SetName(column.Name); - } - } - - for (auto& read : *task.Meta.Reads) { - auto* protoReadMeta = protoTaskMeta.AddReads(); - protoReadMeta->SetShardId(read.ShardId); - read.Ranges.SerializeTo(protoReadMeta); - - YQL_ENSURE((int) read.Columns.size() == protoTaskMeta.GetColumns().size()); - for (ui64 i = 0; i < read.Columns.size(); ++i) { - YQL_ENSURE(read.Columns[i].Id == protoTaskMeta.GetColumns()[i].GetId()); - YQL_ENSURE(read.Columns[i].Type.GetTypeId() == protoTaskMeta.GetColumns()[i].GetType()); - } - - nShardScans++; - if (Stats) { - Stats->AffectedShards.insert(read.ShardId); - } - } - - LOG_D( - "task: " << task.Id << - ", node: " << task.Meta.NodeId << - ", meta: " << protoTaskMeta.ShortDebugString() - ); - - taskDesc.MutableMeta()->PackFrom(protoTaskMeta); - if (stageInfo.Meta.IsSysView()) { computeTasks.emplace_back(std::move(taskDesc)); } else { scanTasks[task.Meta.NodeId].emplace_back(std::move(taskDesc)); nScanTasks++; } + + nShardScans += task.Meta.Reads->size(); + if (Stats) { + for(const auto& read: *task.Meta.Reads) { + Stats->AffectedShards.insert(read.ShardId); + } + } + } else { computeTasks.emplace_back(std::move(taskDesc)); } } + if (computeTasks.size() + nScanTasks > Request.MaxComputeActors) { LOG_N("Too many compute actors: computeTasks=" << computeTasks.size() << ", scanTasks=" << nScanTasks); TBase::ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 43f0a29060a..5b3cee3a0af 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -6,6 +6,7 @@ #include <ydb/core/tx/datashard/range_ops.h> #include <ydb/library/yql/core/yql_expr_optimize.h> +#include <ydb/library/yql/dq/runtime/dq_arrow_helpers.h> #include <library/cpp/actors/core/log.h> @@ -21,6 +22,77 @@ void LogStage(const NActors::TActorContext& ctx, const TStageInfo& stageInfo) { LOG_DEBUG_S(ctx, NKikimrServices::KQP_EXECUTER, stageInfo.DebugString()); } +NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::EReadType ReadTypeToProto(const TTaskMeta::TReadInfo::EReadType& type) { + switch (type) { + case TTaskMeta::TReadInfo::EReadType::Rows: + return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::ROWS; + case TTaskMeta::TReadInfo::EReadType::Blocks: + return NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::BLOCKS; + } + + YQL_ENSURE(false, "Invalid read type in task meta."); +} + +TTaskMeta::TReadInfo::EReadType ReadTypeFromProto(const NKqpProto::TKqpPhyOpReadOlapRanges::EReadType& type) { + switch (type) { + case NKqpProto::TKqpPhyOpReadOlapRanges::ROWS: + return TTaskMeta::TReadInfo::EReadType::Rows; + case NKqpProto::TKqpPhyOpReadOlapRanges::BLOCKS: + return TTaskMeta::TReadInfo::EReadType::Blocks; + default: + YQL_ENSURE(false, "Invalid read type from TKqpPhyOpReadOlapRanges protobuf."); + } +} + + +std::pair<TString, TString> SerializeKqpTasksParametersForOlap(const TStageInfo& stageInfo, const TTask& task) +{ + const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id); + std::vector<std::shared_ptr<arrow::Field>> columns; + std::vector<std::shared_ptr<arrow::Array>> data; + auto& parameterNames = task.Meta.ReadInfo.OlapProgram.ParameterNames; + + columns.reserve(parameterNames.size()); + data.reserve(parameterNames.size()); + + for (auto& name : stage.GetProgramParameters()) { + if (!parameterNames.contains(name)) { + continue; + } + + if (auto* taskParam = task.Meta.Params.FindPtr(name)) { + // This parameter is the list, holding type from task.Meta.ParamTypes + // Those parameters can't be used in Olap programs now + YQL_ENSURE(false, "OLAP program contains task parameter, not supported yet."); + continue; + } + + auto [type, value] = stageInfo.Meta.Tx.Params->GetParameterUnboxedValue(name); + YQL_ENSURE(NYql::NArrow::IsArrowCompatible(type), "Incompatible parameter type. Can't convert to arrow"); + + std::unique_ptr<arrow::ArrayBuilder> builder = NYql::NArrow::MakeArrowBuilder(type); + NYql::NArrow::AppendElement(value, builder.get(), type); + + std::shared_ptr<arrow::Array> array; + auto status = builder->Finish(&array); + + YQL_ENSURE(status.ok(), "Failed to build arrow array of variables."); + + auto field = std::make_shared<arrow::Field>(name, array->type()); + + columns.emplace_back(std::move(field)); + data.emplace_back(std::move(array)); + } + + auto schema = std::make_shared<arrow::Schema>(std::move(columns)); + auto recordBatch = arrow::RecordBatch::Make(schema, 1, data); + + return std::make_pair<TString, TString>( + NArrow::SerializeSchema(*schema), + NArrow::SerializeBatchNoCompression(recordBatch) + ); +} + void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs) { for (size_t txIdx = 0; txIdx < txs.size(); ++txIdx) { auto& tx = txs[txIdx]; @@ -605,14 +677,171 @@ void FillChannelDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map< channelDesc.SetInMemory(channel.InMemory); } +void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta) { + meta->SetTablePath(stageInfo.Meta.TablePath); + meta->MutableTableId()->SetTableId(stageInfo.Meta.TableId.PathId.LocalPathId); + meta->MutableTableId()->SetOwnerId(stageInfo.Meta.TableId.PathId.OwnerId); + meta->SetSchemaVersion(stageInfo.Meta.TableId.SchemaVersion); + meta->SetSysViewInfo(stageInfo.Meta.TableId.SysViewInfo); + meta->SetTableKind((ui32)stageInfo.Meta.TableKind); +} + +void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, const TKqpTableKeys& tableKeys, NYql::NDqProto::TDqTask& taskDesc) { + if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) { + NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta protoTaskMeta; + + FillTableMeta(stageInfo, protoTaskMeta.MutableTable()); + + if (task.Meta.Reads) { + for (auto& read : *task.Meta.Reads) { + auto* protoReadMeta = protoTaskMeta.AddReads(); + read.Ranges.SerializeTo(protoReadMeta->MutableRange()); + for (auto& column : read.Columns) { + auto* protoColumn = protoReadMeta->AddColumns(); + protoColumn->SetId(column.Id); + auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.Type, column.TypeMod); + protoColumn->SetType(columnType.TypeId); + if (columnType.TypeInfo) { + *protoColumn->MutableTypeInfo() = *columnType.TypeInfo; + } + protoColumn->SetName(column.Name); + } + protoReadMeta->SetItemsLimit(task.Meta.ReadInfo.ItemsLimit); + protoReadMeta->SetReverse(task.Meta.ReadInfo.Reverse); + } + } + if (task.Meta.Writes) { + auto* protoWrites = protoTaskMeta.MutableWrites(); + task.Meta.Writes->Ranges.SerializeTo(protoWrites->MutableRange()); + if (task.Meta.Writes->IsPureEraseOp()) { + protoWrites->SetIsPureEraseOp(true); + } + + for (const auto& [_, columnWrite] : task.Meta.Writes->ColumnWrites) { + auto& protoColumnWrite = *protoWrites->AddColumns(); + + auto& protoColumn = *protoColumnWrite.MutableColumn(); + protoColumn.SetId(columnWrite.Column.Id); + auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(columnWrite.Column.Type, columnWrite.Column.TypeMod); + protoColumn.SetType(columnType.TypeId); + if (columnType.TypeInfo) { + *protoColumn.MutableTypeInfo() = *columnType.TypeInfo; + } + protoColumn.SetName(columnWrite.Column.Name); + + protoColumnWrite.SetMaxValueSizeBytes(columnWrite.MaxValueSizeBytes); + } + } + + taskDesc.MutableMeta()->PackFrom(protoTaskMeta); + } else if (task.Meta.ScanTask || stageInfo.Meta.IsSysView()) { + NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta protoTaskMeta; + + FillTableMeta(stageInfo, protoTaskMeta.MutableTable()); + + const auto& tableInfo = tableKeys.GetTable(stageInfo.Meta.TableId); + for (const auto& keyColumnName : tableInfo.KeyColumns) { + const auto& keyColumn = tableInfo.Columns.at(keyColumnName); + auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(keyColumn.Type, keyColumn.TypeMod); + protoTaskMeta.AddKeyColumnTypes(columnType.TypeId); + if (columnType.TypeInfo) { + *protoTaskMeta.AddKeyColumnTypeInfos() = *columnType.TypeInfo; + } + } + + for (bool skipNullKey : stageInfo.Meta.SkipNullKeys) { + protoTaskMeta.AddSkipNullKeys(skipNullKey); + } + + switch (tableInfo.TableKind) { + case ETableKind::Unknown: + case ETableKind::SysView: { + protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); + break; + } + case ETableKind::Datashard: { + if (AppData()->FeatureFlags.GetEnableArrowFormatAtDatashard()) { + protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW); + } else { + protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); + } + break; + } + case ETableKind::Olap: { + protoTaskMeta.SetDataFormat(NKikimrTxDataShard::EScanDataFormat::ARROW); + break; + } + } + + YQL_ENSURE(!task.Meta.Writes); + + if (!task.Meta.Reads->empty()) { + protoTaskMeta.SetReverse(task.Meta.ReadInfo.Reverse); + protoTaskMeta.SetItemsLimit(task.Meta.ReadInfo.ItemsLimit); + protoTaskMeta.SetSorted(task.Meta.ReadInfo.Sorted); + protoTaskMeta.SetReadType(ReadTypeToProto(task.Meta.ReadInfo.ReadType)); + + for (auto columnType : task.Meta.ReadInfo.ResultColumnsTypes) { + auto* protoResultColumn = protoTaskMeta.AddResultColumns(); + protoResultColumn->SetId(0); + auto protoColumnType = NScheme::ProtoColumnTypeFromTypeInfoMod(columnType, ""); + protoResultColumn->SetType(protoColumnType.TypeId); + if (protoColumnType.TypeInfo) { + *protoResultColumn->MutableTypeInfo() = *protoColumnType.TypeInfo; + } + } + + if (tableInfo.TableKind == ETableKind::Olap) { + auto* olapProgram = protoTaskMeta.MutableOlapProgram(); + olapProgram->SetProgram(task.Meta.ReadInfo.OlapProgram.Program); + + auto [schema, parameters] = SerializeKqpTasksParametersForOlap(stageInfo, task); + olapProgram->SetParametersSchema(schema); + olapProgram->SetParameters(parameters); + } else { + YQL_ENSURE(task.Meta.ReadInfo.OlapProgram.Program.empty()); + } + + for (auto& column : task.Meta.Reads->front().Columns) { + auto* protoColumn = protoTaskMeta.AddColumns(); + protoColumn->SetId(column.Id); + auto columnType = NScheme::ProtoColumnTypeFromTypeInfoMod(column.Type, ""); + protoColumn->SetType(columnType.TypeId); + if (columnType.TypeInfo) { + *protoColumn->MutableTypeInfo() = *columnType.TypeInfo; + } + protoColumn->SetName(column.Name); + } + } + + for (auto& read : *task.Meta.Reads) { + auto* protoReadMeta = protoTaskMeta.AddReads(); + protoReadMeta->SetShardId(read.ShardId); + read.Ranges.SerializeTo(protoReadMeta); + + YQL_ENSURE((int) read.Columns.size() == protoTaskMeta.GetColumns().size()); + for (ui64 i = 0; i < read.Columns.size(); ++i) { + YQL_ENSURE(read.Columns[i].Id == protoTaskMeta.GetColumns()[i].GetId()); + YQL_ENSURE(read.Columns[i].Type.GetTypeId() == protoTaskMeta.GetColumns()[i].GetType()); + } + } + + + taskDesc.MutableMeta()->PackFrom(protoTaskMeta); + } +} + NYql::NDqProto::TDqTask SerializeTaskToProto( const TKqpTasksGraph& tasksGraph, + const TTask& task, + const TKqpTableKeys& tableKeys, const std::unordered_map<ui64, IActor*>& resultChannelProxies, - const TTask& task, const NMiniKQL::TTypeEnvironment& typeEnv) + const NMiniKQL::TTypeEnvironment& typeEnv) { auto& stageInfo = tasksGraph.GetStageInfo(task.StageId); NYql::NDqProto::TDqTask result; + ActorIdToProto(task.Meta.ExecuterId, result.MutableExecuter()->MutableActorId()); result.SetId(task.Id); result.SetStageId(stageInfo.Id.StageId); @@ -643,6 +872,9 @@ NYql::NDqProto::TDqTask SerializeTaskToProto( dqParams[paramName] = stageInfo.Meta.Tx.Params->SerializeParamValue(paramName); } } + + FillTaskMeta(stageInfo, task, tableKeys, result); + return result; } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index 6cfe34f4bf1..f81c0f85104 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -121,6 +121,7 @@ struct TShardKeyRanges { struct TTaskMeta { ui64 ShardId = 0; // only in case of non-scans (data-query & legacy scans) ui64 NodeId = 0; // only in case of scans over persistent snapshots + bool ScanTask = false; TActorId ExecuterId; TMap<TString, NYql::NDqProto::TData> Params; @@ -202,7 +203,10 @@ void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const TKqpPhyTx void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, ui64 txId, bool enableSpilling); -NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, const TTask& task, const NMiniKQL::TTypeEnvironment& typeEnv); +NYql::NDqProto::TDqTask SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, + const TKqpTableKeys& tableKeys, + const std::unordered_map<ui64, IActor*>& resultChannelProxies, const NMiniKQL::TTypeEnvironment& typeEnv); +void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta); void FillChannelDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel); void FillInputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskInput& inputDesc, const TTaskInput& input); void FillOutputDesc(const TKqpTasksGraph& tasksGraph, const std::unordered_map<ui64, IActor*>& resultChannelProxies, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output); |