diff options
author | Iuliia Sidorina <ulya.sidorina@gmail.com> | 2022-06-23 20:50:34 +0300 |
---|---|---|
committer | Iuliia Sidorina <ulya.sidorina@gmail.com> | 2022-06-23 20:50:34 +0300 |
commit | 1ae3fabb11fa55eae8d16909eaccaa84e68ba0b5 (patch) | |
tree | a796b63f21b03a15cde2966ed9dd63d91ec771a7 | |
parent | 926a69807718910badd754b5328bab4d5570009b (diff) | |
download | ydb-1ae3fabb11fa55eae8d16909eaccaa84e68ba0b5.tar.gz |
KIKIMR-14294: impplement stream lookup for scan query
ref:3190ff51d4170603656dd557d7fdeeec8a6f03b9
31 files changed, 968 insertions, 124 deletions
diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp index 7836fff357..15f17500ba 100644 --- a/ydb/core/kqp/compile/kqp_compile.cpp +++ b/ydb/core/kqp/compile/kqp_compile.cpp @@ -12,6 +12,7 @@ #include <ydb/library/yql/dq/opt/dq_opt.h> #include <ydb/library/yql/dq/tasks/dq_task_program.h> #include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h> +#include <ydb/library/yql/minikql/mkql_node_serialization.h> namespace NKikimr { namespace NKqp { @@ -81,8 +82,9 @@ void FillTable(const TKqpTable& table, NKqpProto::TKqpPhyTable& tableProto) { tableProto.SetVersion(FromString<ui64>(table.Version())); } +template <typename TProto> void FillColumns(const TCoAtomList& columns, const TKikimrTableMetadata& tableMeta, - NKqpProto::TKqpPhyTableOperation& opProto, bool allowSystemColumns) + TProto& opProto, bool allowSystemColumns) { for (const auto& columnNode : columns) { TString columnName(columnNode); @@ -329,74 +331,6 @@ void FillOlapProgram(const TCoLambda& process, const TKikimrTableMetadata& table CompileOlapProgram(process, tableMeta, readProto); } -void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap, - NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx) -{ - auto inputStageIndex = stagesMap.FindPtr(connection.Output().Stage().Ref().UniqueId()); - YQL_ENSURE(inputStageIndex, "stage #" << connection.Output().Stage().Ref().UniqueId() << " not found in stages map: " - << PrintKqpStageOnly(connection.Output().Stage(), ctx)); - - auto outputIndex = FromString<ui32>(connection.Output().Index().Value()); - - connectionProto.SetStageIndex(*inputStageIndex); - connectionProto.SetOutputIndex(outputIndex); - - if (connection.Maybe<TDqCnUnionAll>()) { - connectionProto.MutableUnionAll(); - return; - } - - if (auto maybeShuffle = connection.Maybe<TDqCnHashShuffle>()) { - auto& shuffleProto = *connectionProto.MutableHashShuffle(); - for (const auto& keyColumn : maybeShuffle.Cast().KeyColumns()) { - shuffleProto.AddKeyColumns(TString(keyColumn)); - } - return; - } - - if (connection.Maybe<TDqCnMap>()) { - connectionProto.MutableMap(); - return; - } - - if (connection.Maybe<TDqCnBroadcast>()) { - connectionProto.MutableBroadcast(); - return; - } - - if (connection.Maybe<TDqCnResult>()) { - connectionProto.MutableResult(); - return; - } - - if (connection.Maybe<TDqCnValue>()) { - connectionProto.MutableValue(); - return; - } - - if (connection.Maybe<TKqpCnMapShard>()) { - connectionProto.MutableMapShard(); - return; - } - - if (connection.Maybe<TKqpCnShuffleShard>()) { - connectionProto.MutableShuffleShard(); - return; - } - - if (auto maybeMerge = connection.Maybe<TDqCnMerge>()) { - auto& mergeProto = *connectionProto.MutableMerge(); - for (const auto& sortColumn : maybeMerge.Cast().SortColumns()) { - auto newSortColumn = mergeProto.AddSortColumns(); - newSortColumn->SetColumn(sortColumn.Column().StringValue()); - newSortColumn->SetAscending(sortColumn.SortDirection().Value() == TTopSortSettings::AscendingSort); - } - return; - } - - YQL_ENSURE(false, "Unexpected connection type: " << connection.CallableName()); -} - class TKqpQueryCompiler : public IKqpQueryCompiler { public: TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr<TKikimrTablesData> tablesData, @@ -460,6 +394,14 @@ public: } private: + NKikimr::NMiniKQL::TType* CompileType(TProgramBuilder& pgmBuilder, const TTypeAnnotationNode& inputType) { + TStringStream errorStream; + + auto type = NCommon::BuildType(inputType, pgmBuilder, errorStream); + Y_ENSURE(type, "Failed to compile type: " << errorStream.Str()); + return type; + } + void CompileStage(const TDqPhyStage& stage, NKqpProto::TKqpPhyStage& stageProto, TExprContext& ctx, const TMap<ui64, ui32>& stagesMap) { @@ -661,11 +603,7 @@ private: } YQL_ENSURE(itemType); - TStringStream errorStream; - auto type = NCommon::BuildType(*itemType, pgmBuilder, errorStream); - YQL_ENSURE(type); - - ExportTypeToProto(type, *resultProto.MutableItemType()); + ExportTypeToProto(CompileType(pgmBuilder, *itemType), *resultProto.MutableItemType()); TMaybeNode<TCoAtomList> maybeColumnHints; if (connection.Maybe<TDqCnResult>()) { @@ -687,6 +625,99 @@ private: } } + void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap, + NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx) + { + auto inputStageIndex = stagesMap.FindPtr(connection.Output().Stage().Ref().UniqueId()); + YQL_ENSURE(inputStageIndex, "stage #" << connection.Output().Stage().Ref().UniqueId() << " not found in stages map: " + << PrintKqpStageOnly(connection.Output().Stage(), ctx)); + + auto outputIndex = FromString<ui32>(connection.Output().Index().Value()); + + connectionProto.SetStageIndex(*inputStageIndex); + connectionProto.SetOutputIndex(outputIndex); + + if (connection.Maybe<TDqCnUnionAll>()) { + connectionProto.MutableUnionAll(); + return; + } + + if (auto maybeShuffle = connection.Maybe<TDqCnHashShuffle>()) { + auto& shuffleProto = *connectionProto.MutableHashShuffle(); + for (const auto& keyColumn : maybeShuffle.Cast().KeyColumns()) { + shuffleProto.AddKeyColumns(TString(keyColumn)); + } + return; + } + + if (connection.Maybe<TDqCnMap>()) { + connectionProto.MutableMap(); + return; + } + + if (connection.Maybe<TDqCnBroadcast>()) { + connectionProto.MutableBroadcast(); + return; + } + + if (connection.Maybe<TDqCnResult>()) { + connectionProto.MutableResult(); + return; + } + + if (connection.Maybe<TDqCnValue>()) { + connectionProto.MutableValue(); + return; + } + + if (connection.Maybe<TKqpCnMapShard>()) { + connectionProto.MutableMapShard(); + return; + } + + if (connection.Maybe<TKqpCnShuffleShard>()) { + connectionProto.MutableShuffleShard(); + return; + } + + if (auto maybeMerge = connection.Maybe<TDqCnMerge>()) { + auto& mergeProto = *connectionProto.MutableMerge(); + for (const auto& sortColumn : maybeMerge.Cast().SortColumns()) { + auto newSortColumn = mergeProto.AddSortColumns(); + newSortColumn->SetColumn(sortColumn.Column().StringValue()); + newSortColumn->SetAscending(sortColumn.SortDirection().Value() == TTopSortSettings::AscendingSort); + } + return; + } + + if (auto maybeStreamLookup = connection.Maybe<TKqpCnStreamLookup>()) { + TProgramBuilder pgmBuilder(TypeEnv, FuncRegistry); + auto& streamLookupProto = *connectionProto.MutableStreamLookup(); + auto streamLookup = maybeStreamLookup.Cast(); + auto tableMeta = TablesData->ExistingTable(Cluster, streamLookup.Table().Path()).Metadata; + YQL_ENSURE(tableMeta); + + FillTable(streamLookup.Table(), *streamLookupProto.MutableTable()); + FillColumns(streamLookup.Columns(), *tableMeta, streamLookupProto, true); + + const auto lookupKeysType = streamLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + YQL_ENSURE(lookupKeysType, "Empty stream lookup keys type"); + YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List, "Unexpected stream lookup keys type"); + const auto lookupKeysItemType = lookupKeysType->Cast<TListExprType>()->GetItemType(); + streamLookupProto.SetLookupKeysType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *lookupKeysItemType), TypeEnv)); + + const auto resultType = streamLookup.Ref().GetTypeAnn(); + YQL_ENSURE(resultType, "Empty stream lookup result type"); + YQL_ENSURE(resultType->GetKind() == ETypeAnnotationKind::Stream, "Unexpected stream lookup result type"); + const auto resultItemType = resultType->Cast<TStreamExprType>()->GetItemType(); + streamLookupProto.SetResultType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *resultItemType), TypeEnv)); + + return; + } + + YQL_ENSURE(false, "Unexpected connection type: " << connection.CallableName()); + } + private: TString Cluster; const TIntrusivePtr<TKikimrTablesData> TablesData; diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h index db5e0187f6..5f84f70ce4 100644 --- a/ydb/core/kqp/executer/kqp_executer_impl.h +++ b/ydb/core/kqp/executer/kqp_executer_impl.h @@ -381,6 +381,14 @@ protected: auto& channelDesc = *inputDesc.AddChannels(); static_cast<TDerived*>(this)->FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel)); } + + if (input.Transform) { + auto* transformProto = inputDesc.MutableTransform(); + transformProto->SetType(input.Transform->Type); + transformProto->SetInputType(input.Transform->InputType); + transformProto->SetOutputType(input.Transform->OutputType); + *transformProto->MutableSettings() = input.Transform->Settings; + } } void FillOutputDesc(NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) { diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 135890f36e..263cd10011 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -584,6 +584,7 @@ private: case NKqpProto::TKqpPhyConnection::kHashShuffle: case NKqpProto::TKqpPhyConnection::kUnionAll: case NKqpProto::TKqpPhyConnection::kMerge: + case NKqpProto::TKqpPhyConnection::kStreamLookup: break; default: YQL_ENSURE(false, "Unexpected connection type: " << (ui32)input.GetTypeCase()); @@ -606,6 +607,7 @@ private: } case NKqpProto::TKqpPhyConnection::kMap: + case NKqpProto::TKqpPhyConnection::kStreamLookup: partitionsCount = originStageInfo.Tasks.size(); break; @@ -647,7 +649,7 @@ private: YQL_ENSURE(false, "Unexpected stage type " << (int) stageInfo.Meta.TableKind); } - BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, AppData()->EnableKqpSpilling); + BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, AppData()->EnableKqpSpilling, Request.Snapshot); } BuildKqpExecuterResults(*tx.Body, Results); diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp index 86613a29d8..3cb402c15b 100644 --- a/ydb/core/kqp/executer/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp @@ -223,12 +223,22 @@ private: private: // TODO: Get rid of ResolveTables & TableKeys, get table information from phy tx proto. void ResolveTables() { + auto addTable = [](const auto& proto, auto& tables) { + auto& table = tables.GetOrAddTable(MakeTableId(proto.GetTable()), proto.GetTable().GetPath()); + for (auto& column : proto.GetColumns()) { + table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn()); + } + }; + for (auto& tx : Transactions) { for (auto& stage : tx.Body->GetStages()) { for (auto& op : stage.GetTableOps()) { - auto& table = TableKeys.GetOrAddTable(MakeTableId(op.GetTable()), op.GetTable().GetPath()); - for (auto& column : op.GetColumns()) { - table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn()); + addTable(op, TableKeys); + } + + for (const auto& input : stage.GetInputs()) { + if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { + addTable(input.GetStreamLookup(), TableKeys); } } } diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.cpp b/ydb/core/kqp/executer/kqp_tasks_graph.cpp index ac9792ed9f..8ba9b9fbf8 100644 --- a/ydb/core/kqp/executer/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer/kqp_tasks_graph.cpp @@ -195,8 +195,64 @@ void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf } } +void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, ui32 inputIndex, + const TStageInfo& inputStageInfo, ui32 outputIndex, const TKqpTableKeys& tableKeys, + const NKqpProto::TKqpPhyCnStreamLookup& streamLookup, const IKqpGateway::TKqpSnapshot& snapshot, + bool enableSpilling, const TChannelLogFunc& logFunc) { + YQL_ENSURE(stageInfo.Tasks.size() == inputStageInfo.Tasks.size()); + YQL_ENSURE(snapshot.IsValid()); + + NKikimrKqp::TKqpStreamLookupSettings settings; + settings.MutableTable()->CopyFrom(streamLookup.GetTable()); + settings.MutableSnapshot()->SetStep(snapshot.Step); + settings.MutableSnapshot()->SetTxId(snapshot.TxId); + + auto table = tableKeys.GetTable(MakeTableId(streamLookup.GetTable())); + for (const auto& keyColumnType : table.KeyColumnTypes) { + settings.AddKeyColumnTypes(keyColumnType); + } + + for (const auto& column : streamLookup.GetColumns()) { + auto columnIt = table.Columns.find(column.GetName()); + YQL_ENSURE(columnIt != table.Columns.end()); + + auto newColumn = settings.AddColumns(); + newColumn->SetName(columnIt->first); + newColumn->SetId(columnIt->second.Id); + newColumn->SetTypeId(columnIt->second.Type); + } + + TTransform streamLookupTransform; + streamLookupTransform.Type = "StreamLookupInputTransformer"; + streamLookupTransform.InputType = streamLookup.GetLookupKeysType(); + streamLookupTransform.OutputType = streamLookup.GetResultType(); + streamLookupTransform.Settings.PackFrom(settings); + + for (ui32 taskId = 0; taskId < inputStageInfo.Tasks.size(); ++taskId) { + auto& originTask = graph.GetTask(inputStageInfo.Tasks[taskId]); + auto& targetTask = graph.GetTask(stageInfo.Tasks[taskId]); + + auto& channel = graph.AddChannel(); + channel.SrcTask = originTask.Id; + channel.SrcOutputIndex = outputIndex; + channel.DstTask = targetTask.Id; + channel.DstInputIndex = inputIndex; + channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1; + + auto& taskInput = targetTask.Inputs[inputIndex]; + taskInput.Transform = streamLookupTransform; + taskInput.Channels.push_back(channel.Id); + + auto& taskOutput = originTask.Outputs[outputIndex]; + taskOutput.Type = TTaskOutputType::Map; + taskOutput.Channels.push_back(channel.Id); + + logFunc(channel.Id, originTask.Id, targetTask.Id, "StreamLookup/Map", !channel.InMemory); + } +} + void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, - ui64 txId, bool enableSpilling) + ui64 txId, bool enableSpilling, const IKqpGateway::TKqpSnapshot& snapshot) { auto& stage = GetStage(stageInfo); @@ -256,6 +312,12 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl BuildMergeChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, sortColumns, log); break; } + case NKqpProto::TKqpPhyConnection::kStreamLookup: { + BuildStreamLookupChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, tableKeys, + input.GetStreamLookup(), snapshot, enableSpilling, log); + break; + } + default: YQL_ENSURE(false, "Unexpected stage input type: " << (ui32)input.GetTypeCase()); } diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.h b/ydb/core/kqp/executer/kqp_tasks_graph.h index 582be0f039..1523ed12b3 100644 --- a/ydb/core/kqp/executer/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer/kqp_tasks_graph.h @@ -148,7 +148,7 @@ using TKqpTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTask void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs); void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const NKqpProto::TKqpPhyTx& tx, ui64 txIdx); void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo, - ui64 txId, bool enableSpilling); + ui64 txId, bool enableSpilling, const IKqpGateway::TKqpSnapshot& snapshot = {}); TVector<TTaskMeta::TColumn> BuildKqpColumns(const NKqpProto::TKqpPhyTableOperation& op, const TKqpTableKeys::TTable& table); struct TKqpTaskOutputType { diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index 2faa33d5e1..724c7d9b22 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -149,6 +149,11 @@ "Match": {"Type": "Callable", "Name": "KqpLookupTable"} }, { + "Name": "TKqlStreamLookupTable", + "Base": "TKqlLookupTableBase", + "Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"} + }, + { "Name": "TKqlTableEffect", "Base": "TExprBase", "Match": {"Type": "CallableBase"}, @@ -327,6 +332,16 @@ "Match": {"Type": "Callable", "Name": "KqpCnShuffleShard"} }, { + "Name": "TKqpCnStreamLookup", + "Base": "TKqpConnection", + "Match": {"Type": "Callable", "Name": "KqpCnStreamLookup"}, + "Children": [ + {"Index": 1, "Name": "Table", "Type": "TKqpTable"}, + {"Index": 2, "Name": "Columns", "Type": "TCoAtomList"}, + {"Index": 3, "Name": "LookupKeysType", "Type": "TExprBase"} + ] + }, + { "Name": "TKqpProgram", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "KqpProgram"}, diff --git a/ydb/core/kqp/host/kqp_run_scan.cpp b/ydb/core/kqp/host/kqp_run_scan.cpp index 38a67850eb..70b2a661a0 100644 --- a/ydb/core/kqp/host/kqp_run_scan.cpp +++ b/ydb/core/kqp/host/kqp_run_scan.cpp @@ -86,6 +86,12 @@ public: for (const auto& tableOp: stage.GetTableOps()) { tablesSet.insert(tableOp.GetTable().GetPath()); } + + for (const auto& input : stage.GetInputs()) { + if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { + tablesSet.insert(input.GetStreamLookup().GetTable().GetPath()); + } + } } } TVector<TString> tables(tablesSet.begin(), tablesSet.end()); diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 8151927387..fe12058da3 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -495,6 +495,12 @@ public: for (const auto& tableOp: stage.GetTableOps()) { tablesSet.insert(tableOp.GetTable().GetPath()); } + + for (const auto& input : stage.GetInputs()) { + if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) { + tablesSet.insert(input.GetStreamLookup().GetTable().GetPath()); + } + } } } TVector<TString> tables(tablesSet.begin(), tablesSet.end()); diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp index bf7d7f69c2..632b36b924 100644 --- a/ydb/core/kqp/node/kqp_node.cpp +++ b/ydb/core/kqp/node/kqp_node.cpp @@ -12,6 +12,9 @@ #include <ydb/core/kqp/rm/kqp_resource_estimation.h> #include <ydb/core/kqp/rm/kqp_rm.h> #include <ydb/core/kqp/common/kqp_resolve.h> +#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h> + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/monlib/service/pages/templates.h> @@ -311,12 +314,12 @@ private: IActor* computeActor; if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) { computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask), - nullptr, nullptr, runtimeSettings, memoryLimits, Counters); + CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, Counters); taskCtx.ComputeActorId = Register(computeActor); } else { if (Y_LIKELY(!CaFactory)) { - computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), nullptr, nullptr, runtimeSettings, - memoryLimits); + computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateAsyncIoFactory(), + nullptr, runtimeSettings, memoryLimits); taskCtx.ComputeActorId = Register(computeActor); } else { computeActor = CaFactory->CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), @@ -548,6 +551,12 @@ private: return ResourceManager_; } + NYql::NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() { + auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>(); + RegisterStreamLookupActorFactory(*factory); + return factory; + } + private: NKikimrConfig::TTableServiceConfig::TResourceManager Config; TIntrusivePtr<TKqpCounters> Counters; diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index bc0241286a..0ce9229abe 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -71,8 +71,13 @@ TMaybeNode<TKqlKeyInc> GetRightTableKeyPrefix(const TKqlKeyRange& range) { } TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos, const TKqlReadTableBase& read, - const TExprBase& keysToLookup, const TVector<TCoAtom>& lookupNames, const TString& indexName) + const TExprBase& keysToLookup, const TVector<TCoAtom>& lookupNames, const TString& indexName, + const TKqpOptimizeContext& kqpCtx) { + if (kqpCtx.IsScanQuery()) { + YQL_ENSURE(false, "StreamLookupIndex is not implemented"); + } + return Build<TKqlLookupIndex>(ctx, pos) .Table(read.Table()) .LookupKeys<TCoSkipNullMembers>() @@ -88,8 +93,22 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos, const T } TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, const TKqlReadTableBase& read, - const TExprBase& keysToLookup, const TVector<TCoAtom>& lookupNames) + const TExprBase& keysToLookup, const TVector<TCoAtom>& lookupNames, const TKqpOptimizeContext& kqpCtx) { + if (kqpCtx.IsScanQuery()) { + YQL_ENSURE(kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup(), "Stream lookup is not enabled"); + return Build<TKqlStreamLookupTable>(ctx, pos) + .Table(read.Table()) + .LookupKeys<TCoSkipNullMembers>() + .Input(keysToLookup) + .Members() + .Add(lookupNames) + .Build() + .Build() + .Columns(read.Columns()) + .Done(); + } + return Build<TKqlLookupTable>(ctx, pos) .Table(read.Table()) .LookupKeys<TCoSkipNullMembers>() @@ -311,8 +330,8 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext return {}; } - bool needPrecomputeLeft = !join.LeftInput().Maybe<TCoParameter>() && - !IsParameterToListOfStructsRepack(join.LeftInput()); + bool needPrecomputeLeft = kqpCtx.IsDataQuery() && !join.LeftInput().Maybe<TCoParameter>() && + !IsParameterToListOfStructsRepack(join.LeftInput()); TExprBase leftData = needPrecomputeLeft ? Build<TDqPrecompute>(ctx, join.Pos()) @@ -374,8 +393,8 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext .Done(); TExprBase lookup = indexName - ? BuildLookupIndex(ctx, join.Pos(), read, keysToLookup, lookupNames, indexName) - : BuildLookupTable(ctx, join.Pos(), read, keysToLookup, lookupNames); + ? BuildLookupIndex(ctx, join.Pos(), read, keysToLookup, lookupNames, indexName, kqpCtx) + : BuildLookupTable(ctx, join.Pos(), read, keysToLookup, lookupNames, kqpCtx); // Skip null keys in lookup part as for equijoin semantics null != null, // so we can't have nulls in lookup part @@ -424,7 +443,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, const NYql::TKikimrConfiguration::TPtr& config) { - if (!kqpCtx.IsDataQuery() || !node.Maybe<TDqJoin>()) { + if ((kqpCtx.IsScanQuery() && !kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()) || !node.Maybe<TDqJoin>()) { return node; } auto join = node.Cast<TDqJoin>(); diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp index a09a982c56..ea9b7825f0 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp @@ -17,7 +17,7 @@ using namespace NYql::NNodes; TExprBase KqpRewriteSqlInToEquiJoin(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, const TKikimrConfiguration::TPtr& config) { - if (!kqpCtx.IsDataQuery()) { + if (kqpCtx.IsScanQuery() && !kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()) { return node; } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 27046fb0c7..c57f9cc6f2 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -29,6 +29,7 @@ public: AddHandler(0, &TKqlReadTable::Match, HNDL(BuildReadTableStage)); AddHandler(0, &TKqlReadTableRanges::Match, HNDL(BuildReadTableRangesStage)); AddHandler(0, &TKqlLookupTable::Match, HNDL(BuildLookupTableStage)); + AddHandler(0, &TKqlStreamLookupTable::Match, HNDL(BuildStreamLookupTableStages)); AddHandler(0, [](const TExprNode* node) { return TCoSort::Match(node) || TCoTopSort::Match(node); }, HNDL(RemoveRedundantSortByPk)); AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable)); @@ -109,6 +110,12 @@ protected: return output; } + TMaybeNode<TExprBase> BuildStreamLookupTableStages(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpBuildStreamLookupTableStages(node, ctx); + DumpAppliedRule("BuildStreamLookupTableStages", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode<TExprBase> RemoveRedundantSortByPk(TExprBase node, TExprContext& ctx) { TExprBase output = KqpRemoveRedundantSortByPk(node, ctx, KqpCtx); DumpAppliedRule("RemoveRedundantSortByPk", node.Ptr(), output.Ptr(), ctx); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 578d8b7877..db94e41950 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -383,4 +383,67 @@ TExprBase KqpBuildLookupTableStage(TExprBase node, TExprContext& ctx) { .Done(); } +NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx) { + if (!node.Maybe<TKqlStreamLookupTable>()) { + return node; + } + + const auto& lookup = node.Cast<TKqlStreamLookupTable>(); + + TMaybeNode<TKqpCnStreamLookup> cnStreamLookup; + if (IsDqPureExpr(lookup.LookupKeys())) { + YQL_ENSURE(lookup.LookupKeys().Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::List, + "" << lookup.LookupKeys().Ref().Dump()); + + cnStreamLookup = Build<TKqpCnStreamLookup>(ctx, lookup.Pos()) + .Output() + .Stage<TDqStage>() + .Inputs() + .Build() + .Program() + .Args({}) + .Body<TCoIterator>() + .List(lookup.LookupKeys()) + .Build() + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, lookup.Pos())) + .Build() + .Index().Build("0") + .Build() + .Table(lookup.Table()) + .Columns(lookup.Columns()) + .LookupKeysType(ExpandType(lookup.Pos(), *lookup.LookupKeys().Ref().GetTypeAnn(), ctx)) + .Done(); + + } else if (lookup.LookupKeys().Maybe<TDqCnUnionAll>()) { + auto output = lookup.LookupKeys().Cast<TDqCnUnionAll>().Output(); + + cnStreamLookup = Build<TKqpCnStreamLookup>(ctx, lookup.Pos()) + .Output(output) + .Table(lookup.Table()) + .Columns(lookup.Columns()) + .LookupKeysType(ExpandType(lookup.Pos(), *output.Ref().GetTypeAnn(), ctx)) + .Done(); + } else { + return node; + } + + return Build<TDqCnUnionAll>(ctx, node.Pos()) + .Output() + .Stage<TDqStage>() + .Inputs() + .Add(cnStreamLookup.Cast()) + .Build() + .Program() + .Args({"stream_lookup_output"}) + .Body<TCoToStream>() + .Input("stream_lookup_output") + .Build() + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, node.Pos())) + .Build() + .Index().Build("0") + .Build().Done(); +} + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h index 6ec28394c7..bde2856792 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h @@ -19,6 +19,8 @@ NYql::NNodes::TExprBase KqpBuildReadTableRangesStage(NYql::NNodes::TExprBase nod NYql::NNodes::TExprBase KqpBuildLookupTableStage(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx); +NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx); + NYql::NNodes::TExprBase KqpRemoveRedundantSortByPk(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); diff --git a/ydb/core/kqp/prepare/kqp_type_ann.cpp b/ydb/core/kqp/prepare/kqp_type_ann.cpp index f111792122..a19a9bed8f 100644 --- a/ydb/core/kqp/prepare/kqp_type_ann.cpp +++ b/ydb/core/kqp/prepare/kqp_type_ann.cpp @@ -1029,6 +1029,43 @@ TStatus AnnotateKqpEnsure(const TExprNode::TPtr& node, TExprContext& ctx) { return TStatus::Ok; } +TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, + const TKikimrTablesData& tablesData, bool withSystemColumns) { + + if (!EnsureArgsCount(*node, 4, ctx)) { + return TStatus::Error; + } + + if (!EnsureCallable(*node->Child(TKqpCnStreamLookup::idx_Output), ctx)) { + return TStatus::Error; + } + + if (!TDqOutput::Match(node->Child(TKqpCnStreamLookup::idx_Output))) { + ctx.AddError(TIssue(ctx.GetPosition(node->Child(TDqCnMerge::idx_Output)->Pos()), + TStringBuilder() << "Expected " << TDqOutput::CallableName())); + return TStatus::Error; + } + + auto table = ResolveTable(node->Child(TKqpCnStreamLookup::idx_Table), ctx, cluster, tablesData); + if (!table.second) { + return TStatus::Error; + } + + if (!EnsureTupleOfAtoms(*node->Child(TKqpCnStreamLookup::idx_Columns), ctx)) { + return TStatus::Error; + } + + TCoAtomList columns{node->ChildPtr(TKqlLookupTableBase::idx_Columns)}; + + auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, columns, withSystemColumns); + if (!rowType) { + return TStatus::Error; + } + + node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType)); + return TStatus::Ok; +} + } // namespace TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cluster, @@ -1099,6 +1136,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl return AnnotateDqConnection(input, ctx); } + if (TKqpCnStreamLookup::Match(input.Get())) { + return AnnotateStreamLookupConnection(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled()); + } + if (TKqpTxResultBinding::Match(input.Get())) { return AnnotateKqpTxResultBinding(input, ctx); } diff --git a/ydb/core/kqp/runtime/CMakeLists.txt b/ydb/core/kqp/runtime/CMakeLists.txt index 1083d8408f..d12069e05c 100644 --- a/ydb/core/kqp/runtime/CMakeLists.txt +++ b/ydb/core/kqp/runtime/CMakeLists.txt @@ -41,6 +41,8 @@ target_sources(core-kqp-runtime PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_read_table.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp ) diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp new file mode 100644 index 0000000000..3a06dd106d --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -0,0 +1,440 @@ +#include "kqp_stream_lookup_actor.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +#include <ydb/core/actorlib_impl/long_timer.h> +#include <ydb/core/base/tablet_pipecache.h> +#include <ydb/core/engine/minikql/minikql_engine_host.h> +#include <ydb/core/kqp/common/kqp_resolve.h> +#include <ydb/core/kqp/common/kqp_gateway.h> +#include <ydb/core/protos/kqp.pb.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/core/tx/datashard/datashard.h> + +namespace NKikimr { +namespace NKqp { + +namespace { + +static constexpr TDuration RESOLVE_SHARDS_TIMEOUT = TDuration::Seconds(5); +static constexpr TDuration RETRY_READ_TIMEOUT = TDuration::Seconds(10); + +class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput { +public: + TKqpStreamLookupActor(ui64 inputIndex, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, + const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, + NKikimrKqp::TKqpStreamLookupSettings&& settings) + : InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv) + , HolderFactory(holderFactory), TableId(MakeTableId(settings.GetTable())) + , KeyColumnTypes(settings.GetKeyColumnTypes().begin(), settings.GetKeyColumnTypes().end()) + , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) + , ResolveTableShardsTimeout(RESOLVE_SHARDS_TIMEOUT) + , RetryReadTimeout(RETRY_READ_TIMEOUT) { + + for (const auto& column : settings.GetColumns()) { + Columns.emplace_back(&column); + } + }; + + void Bootstrap() { + ResolveTableShards(); + + Become(&TKqpStreamLookupActor::StateFunc); + } + +private: + enum class EReadState { + Initial, + Running, + Finished, + }; + + std::string_view ReadStateToString(EReadState state) { + switch (state) { + case EReadState::Initial: return "Initial"sv; + case EReadState::Running: return "Running"sv; + case EReadState::Finished: return "Finished"sv; + } + } + + struct TReadState { + TReadState(ui64 id, ui64 shardId, std::vector<TOwnedTableRange>&& keys) + : Id(id) + , ShardId(shardId) + , Keys(std::move(keys)) + , State(EReadState::Initial) + , Retried(false) {} + + void SetFinished(const NActors::TActorContext& ctx) { + Keys.clear(); + State = EReadState::Finished; + + if (RetryDeadlineTimerId) { + ctx.Send(RetryDeadlineTimerId, new TEvents::TEvPoisonPill()); + RetryDeadlineTimerId = {}; + } + } + + bool Finished() const { + return (State == EReadState::Finished); + } + + const ui64 Id; + const ui64 ShardId; + std::vector<TOwnedTableRange> Keys; + EReadState State; + TActorId RetryDeadlineTimerId; + bool Retried; + }; + + struct TEvPrivate { + enum EEv { + EvRetryReadTimeout = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + EvResolveTableShardsTimeout, + }; + + struct TEvResolveTableShardsTimeout : public TEventLocal<TEvResolveTableShardsTimeout, EvResolveTableShardsTimeout> { + }; + + struct TEvRetryReadTimeout : public TEventLocal<TEvRetryReadTimeout, EvRetryReadTimeout> { + TEvRetryReadTimeout(ui64 readId) : ReadId(readId) {} + + ui64 ReadId; + }; + }; + +private: + void SaveState(const NYql::NDqProto::TCheckpoint&, NYql::NDqProto::TSourceState&) final {} + void LoadState(const NYql::NDqProto::TSourceState&) final {} + void CommitState(const NYql::NDqProto::TCheckpoint&) final {} + + ui64 GetInputIndex() const final { + return InputIndex; + } + + void PassAway() final { + { + auto alloc = BindAllocator(); + Input.Clear(); + } + + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0)); + TActorBootstrapped<TKqpStreamLookupActor>::PassAway(); + } + + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64) final { + i64 totalDataSize = 0; + + for (; !Results.empty(); Results.pop_front()) { + const auto& result = Results.front(); + YQL_ENSURE(result.size() == Columns.size(), "Result columns mismatch"); + + NUdf::TUnboxedValue* rowItems = nullptr; + auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); + + for (ui32 colId = 0; colId < Columns.size(); ++colId) { + totalDataSize += result[colId].Size(); + rowItems[colId] = NMiniKQL::GetCellValue(result[colId], Columns[colId].TypeId); + } + + batch.push_back(std::move(row)); + } + + NUdf::EFetchStatus status; + NUdf::TUnboxedValue key; + while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) { + std::vector<TCell> keyCells(KeyColumnTypes.size()); + for (ui32 colId = 0; colId < KeyColumnTypes.size(); ++colId) { + keyCells[colId] = MakeCell(KeyColumnTypes[colId], key.GetElement(colId), TypeEnv, /* copy */ true); + } + + UnprocessedKeys.emplace_back(std::move(keyCells)); + } + + if (Partitioning) { + ProcessLookupKeys(); + } + + finished = (status == NUdf::EFetchStatus::Finish) && UnprocessedKeys.empty() && AllReadsFinished(); + return totalDataSize; + } + + STFUNC(StateFunc) { + Y_UNUSED(ctx); + + try { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle); + hFunc(TEvDataShard::TEvReadResult, Handle); + hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + hFunc(TEvPrivate::TEvResolveTableShardsTimeout, Handle); + hFunc(TEvPrivate::TEvRetryReadTimeout, Handle); + IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult); + default: + RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite()); + } + } catch (const yexception& e) { + RuntimeError(e.what()); + } + } + + void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { + if (ev->Get()->Request->ErrorCount > 0) { + return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " << TableId); + } + + auto& resultSet = ev->Get()->Request->ResultSet; + YQL_ENSURE(resultSet.size() == 1, "Expected one result for range (-inf, +inf)"); + Partitioning = resultSet[0].KeyDescription->Partitioning; + + ProcessLookupKeys(); + } + + void Handle(TEvDataShard::TEvReadResult::TPtr& ev) { + const auto& record = ev->Get()->Record; + + auto readIt = Reads.find(record.GetReadId()); + YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << record.GetReadId()); + auto& read = readIt->second; + + if (read.State != EReadState::Running) { + return; + } + + // TODO: refactor after KIKIMR-15102 + if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) { + NKikimrTxDataShard::TReadContinuationToken continuationToken; + bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken()); + YQL_ENSURE(parseResult, "Failed to parse continuation token"); + YQL_ENSURE(continuationToken.GetFirstUnprocessedQuery() <= read.Keys.size()); + + return RetryTableRead(read, continuationToken); + } + + YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC); + auto nrows = ev->Get()->GetRowsCount(); + for (ui64 rowId = 0; rowId < nrows; ++rowId) { + Results.emplace_back(ev->Get()->GetCells(rowId)); + } + + Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); + + if (record.GetFinished()) { + read.SetFinished(TlsActivationContext->AsActorContext()); + } else { + THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck()); + request->Record.SetReadId(record.GetReadId()); + request->Record.SetSeqNo(record.GetSeqNo()); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), read.ShardId, true), + IEventHandle::FlagTrackDelivery); + } + } + + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { + const auto& tabletId = ev->Get()->TabletId; + auto shardIt = ReadsPerShard.find(tabletId); + YQL_ENSURE(shardIt != ReadsPerShard.end()); + + for (auto readId : shardIt->second) { + auto readIt = Reads.find(readId); + YQL_ENSURE(readIt != Reads.end()); + auto& read = readIt->second; + + if (read.State == EReadState::Running) { + for (auto& key : read.Keys) { + UnprocessedKeys.emplace_back(std::move(key)); + } + + read.SetFinished(TlsActivationContext->AsActorContext()); + } + } + + ReadsPerShard.erase(shardIt); + ResolveTableShards(); + } + + void Handle(TEvPrivate::TEvResolveTableShardsTimeout::TPtr&) { + if (!Partitioning) { + RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId + << " (request timeout exceeded)"); + } + } + + void Handle(TEvPrivate::TEvRetryReadTimeout::TPtr& ev) { + auto readIt = Reads.find(ev->Get()->ReadId); + YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId); + auto& read = readIt->second; + + if (read.Retried) { + RuntimeError(TStringBuilder() << "Retry timeout exceeded for read: " << ev->Get()->ReadId); + } + } + + void ProcessLookupKeys() { + YQL_ENSURE(Partitioning, "Table partitioning should be initialized before lookup keys processing"); + + std::map<ui64, std::vector<TOwnedTableRange>> shardKeys; + for (; !UnprocessedKeys.empty(); UnprocessedKeys.pop_front()) { + const auto& key = UnprocessedKeys.front(); + YQL_ENSURE(key.Point); + + auto partitionInfo = LowerBound( + Partitioning->begin(), Partitioning->end(), /* value */ true, + [&](const auto& partition, bool) { + const int result = CompareBorders<true, false>( + partition.Range->EndKeyPrefix.GetCells(), key.From, + partition.Range->IsInclusive || partition.Range->IsPoint, + key.InclusiveFrom || key.Point, KeyColumnTypes + ); + + return (result < 0); + } + ); + + shardKeys[partitionInfo->ShardId].emplace_back(std::move(key)); + } + + for (auto& [shardId, keys] : shardKeys) { + StartTableRead(shardId, std::move(keys)); + } + } + + TReadState& StartTableRead(ui64 shardId, std::vector<TOwnedTableRange>&& keys) { + const auto readId = GetNextReadId(); + TReadState read(readId, shardId, std::move(keys)); + + THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); + auto& record = request->Record; + + record.MutableSnapshot()->SetStep(Snapshot.Step); + record.MutableSnapshot()->SetTxId(Snapshot.TxId); + + record.SetReadId(read.Id); + record.SetResultFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); + + record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId); + record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId); + record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion); + + for (const auto& column : Columns) { + record.AddColumns(column.Id); + } + + for (auto& key : read.Keys) { + YQL_ENSURE(key.Point); + request->Keys.emplace_back(TSerializedCellVec::Serialize(key.From)); + } + + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), shardId, true), + IEventHandle::FlagTrackDelivery); + + read.State = EReadState::Running; + + const auto [readIt, succeeded] = Reads.insert({readId, std::move(read)}); + YQL_ENSURE(succeeded); + ReadsPerShard[shardId].insert(readId); + + return readIt->second; + } + + void RetryTableRead(TReadState& failedRead, NKikimrTxDataShard::TReadContinuationToken& token) { + YQL_ENSURE(token.GetFirstUnprocessedQuery() <= failedRead.Keys.size()); + std::vector<TOwnedTableRange> unprocessedKeys(failedRead.Keys.size() - token.GetFirstUnprocessedQuery()); + for (ui64 idx = token.GetFirstUnprocessedQuery(); idx < failedRead.Keys.size(); ++idx) { + unprocessedKeys.emplace_back(std::move(failedRead.Keys[idx])); + } + + auto& newRead = StartTableRead(failedRead.ShardId, std::move(unprocessedKeys)); + if (failedRead.Retried) { + newRead.RetryDeadlineTimerId = failedRead.RetryDeadlineTimerId; + failedRead.RetryDeadlineTimerId = {}; + } else { + failedRead.Retried = true; + newRead.RetryDeadlineTimerId = CreateLongTimer(TlsActivationContext->AsActorContext(), RetryReadTimeout, + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryReadTimeout(newRead.Id))); + } + + failedRead.SetFinished(TlsActivationContext->AsActorContext()); + } + + void ResolveTableShards() { + Partitioning.reset(); + + auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>(); + + TVector<TCell> minusInf(KeyColumnTypes.size()); + TVector<TCell> plusInf; + TTableRange range(minusInf, true, plusInf, true, false); + + request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read, + KeyColumnTypes, TVector<TKeyDesc::TColumnOp>{})); + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); + + ResolveTableShardsTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), ResolveTableShardsTimeout, + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveTableShardsTimeout())); + } + + bool AllReadsFinished() const { + for (const auto& [_, read] : Reads) { + if (!read.Finished()) { + return false; + } + } + + return true; + } + + ui64 GetNextReadId() { + static ui64 readId = 0; + return ++readId; + } + + TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() { + return TypeEnv.BindAllocator(); + } + + void RuntimeError(const TString& message, const NYql::TIssues& subIssues = {}) { + NYql::TIssue issue(message); + for (const auto& i : subIssues) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i)); + } + + NYql::TIssues issues; + issues.AddIssue(std::move(issue)); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), true)); + } + +private: + const ui64 InputIndex; + NUdf::TUnboxedValue Input; + const NActors::TActorId ComputeActorId; + const NMiniKQL::TTypeEnvironment& TypeEnv; + const NMiniKQL::THolderFactory& HolderFactory; + const TTableId TableId; + const TVector<NKikimr::NScheme::TTypeId> KeyColumnTypes; + std::vector<NYql::TKikimrColumnMetadata> Columns; + const IKqpGateway::TKqpSnapshot Snapshot; + std::deque<TOwnedCellVec> Results; + std::unordered_map<ui64, TReadState> Reads; + std::unordered_map<ui64, std::set<ui64>> ReadsPerShard; + std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning; + std::deque<TOwnedTableRange> UnprocessedKeys; + const TDuration ResolveTableShardsTimeout; + NActors::TActorId ResolveTableShardsTimeoutTimer; + const TDuration RetryReadTimeout; +}; + +} // namespace + +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex, + const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, + const NMiniKQL::THolderFactory& holderFactory, NKikimrKqp::TKqpStreamLookupSettings&& settings) { + auto actor = new TKqpStreamLookupActor(inputIndex, input, computeActorId, typeEnv, holderFactory, + std::move(settings)); + return {actor, actor}; +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h new file mode 100644 index 0000000000..59d8d83b87 --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h @@ -0,0 +1,14 @@ +#pragma once + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> +#include <ydb/core/protos/kqp.pb.h> + +namespace NKikimr { +namespace NKqp { + +std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex, + const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, + const NMiniKQL::THolderFactory& holderFactory, NKikimrKqp::TKqpStreamLookupSettings&& settings); + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp new file mode 100644 index 0000000000..dab7ce8f5c --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp @@ -0,0 +1,16 @@ +#include "kqp_stream_lookup_factory.h" +#include "kqp_stream_lookup_actor.h" + +namespace NKikimr { +namespace NKqp { + +void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory) { + factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [](NKikimrKqp::TKqpStreamLookupSettings&& settings, + NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) { + return CreateStreamLookupActor(args.InputIndex, args.TransformInput, args.ComputeActorId, args.TypeEnv, + args.HolderFactory, std::move(settings)); + }); +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h new file mode 100644 index 0000000000..1eded0576b --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h @@ -0,0 +1,11 @@ +#pragma once + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> + +namespace NKikimr { +namespace NKqp { + +void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory); + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 8b1d4426d3..46b5de9186 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -110,6 +110,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) { ServerSettings->SetKeepSnapshotTimeout(settings.KeepSnapshotTimeout); ServerSettings->SetFrFactory(&UdfFrFactory); ServerSettings->SetEnableNotNullColumns(true); + ServerSettings->SetEnableKqpScanQueryStreamLookup(true); if (settings.LogStream) ServerSettings->SetLogBackend(new TStreamLogBackend(settings.LogStream)); diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp index 2087a2b1e5..d81eaa9199 100644 --- a/ydb/core/kqp/ut/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp @@ -1939,6 +1939,35 @@ Y_UNIT_TEST_SUITE(KqpScan) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); CompareYson("[[%false]]", StreamResultToYson(result)); } + + Y_UNIT_TEST_TWIN(StreamLookup, UseSessionActor) { + auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor); + auto db = kikimr.GetTableClient(); + CreateSampleTables(kikimr); + + { + auto result = db.CreateSession().GetValueSync().GetSession().ExecuteDataQuery(R"( + REPLACE INTO `/Root/EightShard` (Key, Text, Data) VALUES + (1u, "Value1", 1), + (2u, "Value2", 1), + (3u, "Value3", 1), + (4u, "Value4", 1), + (5u, "Value5", 1); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } + + { + auto result = db.StreamExecuteScanQuery(R"( + PRAGMA kikimr.UseNewEngine = "true"; + PRAGMA kikimr.OptEnablePredicateExtract = "false"; + $keys = SELECT Key FROM `/Root/EightShard`; + SELECT * FROM `/Root/KeyValue` WHERE Key IN $keys ORDER BY Key; + )").GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + CompareYson(R"([[[1u];["One"]];[[2u];["Two"]]])", StreamResultToYson(result)); + } + } } } // namespace NKqp diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 8e1327f846..8c93d3fcac 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -699,6 +699,7 @@ message TFeatureFlags { optional bool AllowVDiskDefrag = 63 [default = true]; optional bool EnableAsyncHttpMon = 64 [default = true]; optional bool EnableChangefeeds = 65 [default = true]; + optional bool EnableKqpScanQueryStreamLookup = 66 [default = false]; } diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index b60b518aed..2bd82e48e5 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -574,3 +574,10 @@ message TEvRemoteScanDataAck { message TEvKillScanTablet { } + +message TKqpStreamLookupSettings { + optional NKqpProto.TKqpPhyTable Table = 1; + repeated uint32 KeyColumnTypes = 2; + repeated TKqpColumnMetadataProto Columns = 3; + optional TKqpSnapshot Snapshot = 4; +} diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index cae8f32e05..d0c652a324 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -208,6 +208,13 @@ message TKqpPhyCnMerge { repeated TKqpPhySortColumn SortColumns = 1; } +message TKqpPhyCnStreamLookup { + TKqpPhyTable Table = 1; + repeated TKqpPhyColumn Columns = 2; + bytes LookupKeysType = 3; + bytes ResultType = 4; +} + message TKqpPhyConnection { uint32 StageIndex = 1; uint32 OutputIndex = 2; @@ -222,6 +229,7 @@ message TKqpPhyConnection { TKqpPhyCnResult Result = 9; TKqpPhyCnValue Value = 10; TKqpPhyCnMerge Merge = 11; + TKqpPhyCnStreamLookup StreamLookup = 12; }; } diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 5f58521a27..a7ed81b02e 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -35,6 +35,7 @@ public: FEATURE_FLAG_SETTER(EnableBulkUpsertToAsyncIndexedTables) FEATURE_FLAG_SETTER(EnableChangefeeds) FEATURE_FLAG_SETTER(EnableKqpSessionActor) + FEATURE_FLAG_SETTER(EnableKqpScanQueryStreamLookup) TDerived& SetEnableMvcc(std::optional<bool> value) { if (value) { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 41f0e58817..acabdd7d8d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -69,14 +69,14 @@ struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks { struct TOutputTransformCallbacks : public IDqComputeActorAsyncOutput::ICallbacks { void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, bool isFatal) override final { - OnTransformError(outputIndex, issues, isFatal); + OnOutputTransformError(outputIndex, issues, isFatal); } void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override final { OnTransformStateSaved(std::move(state), outputIndex, checkpoint); } - virtual void OnTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0; + virtual void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0; virtual void OnTransformStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0; }; @@ -1416,18 +1416,38 @@ protected: } void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) { - Y_VERIFY(SourcesMap.FindPtr(ev->Get()->InputIndex)); + Y_VERIFY(SourcesMap.FindPtr(ev->Get()->InputIndex) || InputTransformsMap.FindPtr(ev->Get()->InputIndex)); ContinueExecute(); } void OnAsyncInputError(const IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr& ev) { - if (!ev->Get()->IsFatal) { - SourcesMap.at(ev->Get()->InputIndex).IssuesBuffer.Push(ev->Get()->Issues); + if (SourcesMap.FindPtr(ev->Get()->InputIndex)) { + OnSourceError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal); + } else if (InputTransformsMap.FindPtr(ev->Get()->InputIndex)) { + OnInputTransformError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal); + } else { + YQL_ENSURE(false, "Unexpected input index: " << ev->Get()->InputIndex); + } + } + + void OnSourceError(ui64 inputIndex, const TIssues& issues, bool isFatal) { + if (!isFatal) { + SourcesMap.at(inputIndex).IssuesBuffer.Push(issues); return; } - CA_LOG_E("Source[" << ev->Get()->InputIndex << "] fatal error: " << ev->Get()->Issues.ToString()); - InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, ev->Get()->Issues); + CA_LOG_E("Source[" << inputIndex << "] fatal error: " << issues.ToOneLineString()); + InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues); + } + + void OnInputTransformError(ui64 inputIndex, const TIssues& issues, bool isFatal) { + if (!isFatal) { + InputTransformsMap.at(inputIndex).IssuesBuffer.Push(issues); + return; + } + + CA_LOG_E("InputTransform[" << inputIndex << "] fatal error: " << issues.ToOneLineString()); + InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues); } void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) override { @@ -1440,7 +1460,7 @@ protected: InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues); } - void OnTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) override { + void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) override { if (!isFatal) { OutputTransformsMap.at(outputIndex).IssuesBuffer.Push(issues); return; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index bb0a0a5baf..42de67823f 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -422,13 +422,13 @@ public: Y_VERIFY(!transform->TransformInput); Y_VERIFY(!transform->TransformOutput); - TStringBuilder err; - transform->TransformInputType = NCommon::ParseTypeFromYson(TStringBuf{transformDesc.GetInputType()}, *pb, err.Out); - YQL_ENSURE(transform->TransformInputType, "Can't parse transform input type: " << err); + auto inputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{transformDesc.GetInputType()}, typeEnv); + YQL_ENSURE(inputTypeNode, "Failed to deserialize transform input type"); + transform->TransformInputType = static_cast<TType*>(inputTypeNode); - err.clear(); - TType* outputType = NCommon::ParseTypeFromYson(TStringBuf{transformDesc.GetOutputType()}, *pb, err.Out); - YQL_ENSURE(outputType, "Can't parse transform output type: " << err); + auto outputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{transformDesc.GetOutputType()}, typeEnv); + YQL_ENSURE(outputTypeNode, "Failed to deserialize transform output type"); + TType* outputType = static_cast<TType*>(outputTypeNode); YQL_ENSURE(outputType->IsSameType(*ProgramParsed.InputItemTypes[i])); LOG(TStringBuilder() << "Task: " << TaskId << " has transform by " << transformDesc.GetType() << " with input type: " << *transform->TransformInputType @@ -488,13 +488,13 @@ public: Y_VERIFY(!transform->TransformInput); Y_VERIFY(!transform->TransformOutput); - TStringBuilder err; - transform->TransformOutputType = NCommon::ParseTypeFromYson(TStringBuf{transformDesc.GetOutputType()}, *pb, err.Out); - YQL_ENSURE(transform->TransformOutputType, "Can't parse transform output type: " << err); + auto outputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{transformDesc.GetOutputType()}, typeEnv); + YQL_ENSURE(outputTypeNode, "Failed to deserialize transform output type"); + transform->TransformOutputType = static_cast<TType*>(outputTypeNode); - err.clear(); - TType* inputType = NCommon::ParseTypeFromYson(TStringBuf{transformDesc.GetInputType()}, *pb, err.Out); - YQL_ENSURE(inputType, "Can't parse transform input type: " << err); + auto inputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{transformDesc.GetInputType()}, typeEnv); + YQL_ENSURE(inputTypeNode, "Failed to deserialize transform input type"); + TType* inputType = static_cast<TType*>(inputTypeNode); YQL_ENSURE(inputType->IsSameType(*ProgramParsed.OutputItemTypes[i])); LOG(TStringBuilder() << "Task: " << TaskId << " has transform by " << transformDesc.GetType() << " with input type: " << *inputType diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h index ee7cba6b71..780f500903 100644 --- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h +++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h @@ -102,6 +102,15 @@ enum class TTaskInputType { Merge }; +struct TTransform { + TString Type; + + TString InputType; + TString OutputType; + + ::google::protobuf::Any Settings; +}; + template <class TInputMeta> struct TTaskInput { std::variant<std::monostate, TMergeTaskInput> ConnectionInfo; @@ -109,6 +118,7 @@ struct TTaskInput { TMaybe<::google::protobuf::Any> SourceSettings; TString SourceType; TInputMeta Meta; + TMaybe<TTransform> Transform; TTaskInputType Type() const { return static_cast<TTaskInputType>(ConnectionInfo.index()); @@ -127,15 +137,6 @@ struct TTaskOutputType { }; }; -struct TTransform { - TString Type; - - TString InputType; - TString OutputType; - - ::google::protobuf::Any Settings; -}; - template <class TOutputMeta> struct TTaskOutput { ui32 Type = TTaskOutputType::Undefined; diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index 5cf9450d05..06c8112192 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -158,6 +158,10 @@ namespace NYql::NDqs { // Sinks if (auto maybeDqOutputsList = stage.Outputs()) { + TScopedAlloc alloc; + TTypeEnvironment typeEnv(alloc); + TProgramBuilder pgmBuilder(typeEnv, *FunctionRegistry); + auto dqOutputsList = maybeDqOutputsList.Cast(); for (const auto& output : dqOutputsList) { const ui64 index = FromString(output.Ptr()->Child(TDqOutputAnnotationBase::idx_Index)->Content()); @@ -179,12 +183,20 @@ namespace NYql::NDqs { YQL_ENSURE(!sinkSettings.type_url().empty(), "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings for its dq sink node"); YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings type for its dq sink node"); } else if (output.Maybe<NNodes::TDqTransform>()) { + TStringStream errorStream; + auto transform = output.Cast<NNodes::TDqTransform>(); outputTransform.Type = transform.Type(); - const auto inputType = transform.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - outputTransform.InputType = NCommon::WriteTypeToYson(inputType); - const auto outputType = transform.OutputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - outputTransform.OutputType = NCommon::WriteTypeToYson(outputType); + const auto inputTypeAnnotation = transform.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + auto inputType = NCommon::BuildType(*inputTypeAnnotation, pgmBuilder, errorStream); + Y_ENSURE(inputType, "Failed to build transform input type: " << errorStream.Str()); + outputTransform.InputType = NKikimr::NMiniKQL::SerializeNode(inputType, typeEnv); + + errorStream.clear(); + const auto outputTypeAnnotation = transform.OutputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + auto outputType = NCommon::BuildType(*outputTypeAnnotation, pgmBuilder, errorStream); + Y_ENSURE(outputType, "Failed to build transform output type: " << errorStream.Str()); + outputTransform.OutputType = NKikimr::NMiniKQL::SerializeNode(outputType, typeEnv); dqIntegration->FillTransformSettings(transform.Ref(), outputTransform.Settings); } else { YQL_ENSURE(false, "Unknown stage output type"); |