aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <ulya.sidorina@gmail.com>2022-06-23 20:50:34 +0300
committerIuliia Sidorina <ulya.sidorina@gmail.com>2022-06-23 20:50:34 +0300
commit1ae3fabb11fa55eae8d16909eaccaa84e68ba0b5 (patch)
treea796b63f21b03a15cde2966ed9dd63d91ec771a7
parent926a69807718910badd754b5328bab4d5570009b (diff)
downloadydb-1ae3fabb11fa55eae8d16909eaccaa84e68ba0b5.tar.gz
KIKIMR-14294: impplement stream lookup for scan query
ref:3190ff51d4170603656dd557d7fdeeec8a6f03b9
-rw-r--r--ydb/core/kqp/compile/kqp_compile.cpp179
-rw-r--r--ydb/core/kqp/executer/kqp_executer_impl.h8
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp4
-rw-r--r--ydb/core/kqp/executer/kqp_table_resolver.cpp16
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_graph.cpp64
-rw-r--r--ydb/core/kqp/executer/kqp_tasks_graph.h2
-rw-r--r--ydb/core/kqp/expr_nodes/kqp_expr_nodes.json15
-rw-r--r--ydb/core/kqp/host/kqp_run_scan.cpp6
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp6
-rw-r--r--ydb/core/kqp/node/kqp_node.cpp15
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp33
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp2
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp7
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp63
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h2
-rw-r--r--ydb/core/kqp/prepare/kqp_type_ann.cpp41
-rw-r--r--ydb/core/kqp/runtime/CMakeLists.txt2
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp440
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.h14
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp16
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_factory.h11
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp1
-rw-r--r--ydb/core/kqp/ut/kqp_scan_ut.cpp29
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/kqp.proto7
-rw-r--r--ydb/core/protos/kqp_physical.proto8
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h36
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp24
-rw-r--r--ydb/library/yql/dq/tasks/dq_tasks_graph.h19
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp20
31 files changed, 968 insertions, 124 deletions
diff --git a/ydb/core/kqp/compile/kqp_compile.cpp b/ydb/core/kqp/compile/kqp_compile.cpp
index 7836fff357..15f17500ba 100644
--- a/ydb/core/kqp/compile/kqp_compile.cpp
+++ b/ydb/core/kqp/compile/kqp_compile.cpp
@@ -12,6 +12,7 @@
#include <ydb/library/yql/dq/opt/dq_opt.h>
#include <ydb/library/yql/dq/tasks/dq_task_program.h>
#include <ydb/library/yql/providers/common/mkql/yql_type_mkql.h>
+#include <ydb/library/yql/minikql/mkql_node_serialization.h>
namespace NKikimr {
namespace NKqp {
@@ -81,8 +82,9 @@ void FillTable(const TKqpTable& table, NKqpProto::TKqpPhyTable& tableProto) {
tableProto.SetVersion(FromString<ui64>(table.Version()));
}
+template <typename TProto>
void FillColumns(const TCoAtomList& columns, const TKikimrTableMetadata& tableMeta,
- NKqpProto::TKqpPhyTableOperation& opProto, bool allowSystemColumns)
+ TProto& opProto, bool allowSystemColumns)
{
for (const auto& columnNode : columns) {
TString columnName(columnNode);
@@ -329,74 +331,6 @@ void FillOlapProgram(const TCoLambda& process, const TKikimrTableMetadata& table
CompileOlapProgram(process, tableMeta, readProto);
}
-void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap,
- NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx)
-{
- auto inputStageIndex = stagesMap.FindPtr(connection.Output().Stage().Ref().UniqueId());
- YQL_ENSURE(inputStageIndex, "stage #" << connection.Output().Stage().Ref().UniqueId() << " not found in stages map: "
- << PrintKqpStageOnly(connection.Output().Stage(), ctx));
-
- auto outputIndex = FromString<ui32>(connection.Output().Index().Value());
-
- connectionProto.SetStageIndex(*inputStageIndex);
- connectionProto.SetOutputIndex(outputIndex);
-
- if (connection.Maybe<TDqCnUnionAll>()) {
- connectionProto.MutableUnionAll();
- return;
- }
-
- if (auto maybeShuffle = connection.Maybe<TDqCnHashShuffle>()) {
- auto& shuffleProto = *connectionProto.MutableHashShuffle();
- for (const auto& keyColumn : maybeShuffle.Cast().KeyColumns()) {
- shuffleProto.AddKeyColumns(TString(keyColumn));
- }
- return;
- }
-
- if (connection.Maybe<TDqCnMap>()) {
- connectionProto.MutableMap();
- return;
- }
-
- if (connection.Maybe<TDqCnBroadcast>()) {
- connectionProto.MutableBroadcast();
- return;
- }
-
- if (connection.Maybe<TDqCnResult>()) {
- connectionProto.MutableResult();
- return;
- }
-
- if (connection.Maybe<TDqCnValue>()) {
- connectionProto.MutableValue();
- return;
- }
-
- if (connection.Maybe<TKqpCnMapShard>()) {
- connectionProto.MutableMapShard();
- return;
- }
-
- if (connection.Maybe<TKqpCnShuffleShard>()) {
- connectionProto.MutableShuffleShard();
- return;
- }
-
- if (auto maybeMerge = connection.Maybe<TDqCnMerge>()) {
- auto& mergeProto = *connectionProto.MutableMerge();
- for (const auto& sortColumn : maybeMerge.Cast().SortColumns()) {
- auto newSortColumn = mergeProto.AddSortColumns();
- newSortColumn->SetColumn(sortColumn.Column().StringValue());
- newSortColumn->SetAscending(sortColumn.SortDirection().Value() == TTopSortSettings::AscendingSort);
- }
- return;
- }
-
- YQL_ENSURE(false, "Unexpected connection type: " << connection.CallableName());
-}
-
class TKqpQueryCompiler : public IKqpQueryCompiler {
public:
TKqpQueryCompiler(const TString& cluster, const TIntrusivePtr<TKikimrTablesData> tablesData,
@@ -460,6 +394,14 @@ public:
}
private:
+ NKikimr::NMiniKQL::TType* CompileType(TProgramBuilder& pgmBuilder, const TTypeAnnotationNode& inputType) {
+ TStringStream errorStream;
+
+ auto type = NCommon::BuildType(inputType, pgmBuilder, errorStream);
+ Y_ENSURE(type, "Failed to compile type: " << errorStream.Str());
+ return type;
+ }
+
void CompileStage(const TDqPhyStage& stage, NKqpProto::TKqpPhyStage& stageProto, TExprContext& ctx,
const TMap<ui64, ui32>& stagesMap)
{
@@ -661,11 +603,7 @@ private:
}
YQL_ENSURE(itemType);
- TStringStream errorStream;
- auto type = NCommon::BuildType(*itemType, pgmBuilder, errorStream);
- YQL_ENSURE(type);
-
- ExportTypeToProto(type, *resultProto.MutableItemType());
+ ExportTypeToProto(CompileType(pgmBuilder, *itemType), *resultProto.MutableItemType());
TMaybeNode<TCoAtomList> maybeColumnHints;
if (connection.Maybe<TDqCnResult>()) {
@@ -687,6 +625,99 @@ private:
}
}
+ void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap,
+ NKqpProto::TKqpPhyConnection& connectionProto, TExprContext& ctx)
+ {
+ auto inputStageIndex = stagesMap.FindPtr(connection.Output().Stage().Ref().UniqueId());
+ YQL_ENSURE(inputStageIndex, "stage #" << connection.Output().Stage().Ref().UniqueId() << " not found in stages map: "
+ << PrintKqpStageOnly(connection.Output().Stage(), ctx));
+
+ auto outputIndex = FromString<ui32>(connection.Output().Index().Value());
+
+ connectionProto.SetStageIndex(*inputStageIndex);
+ connectionProto.SetOutputIndex(outputIndex);
+
+ if (connection.Maybe<TDqCnUnionAll>()) {
+ connectionProto.MutableUnionAll();
+ return;
+ }
+
+ if (auto maybeShuffle = connection.Maybe<TDqCnHashShuffle>()) {
+ auto& shuffleProto = *connectionProto.MutableHashShuffle();
+ for (const auto& keyColumn : maybeShuffle.Cast().KeyColumns()) {
+ shuffleProto.AddKeyColumns(TString(keyColumn));
+ }
+ return;
+ }
+
+ if (connection.Maybe<TDqCnMap>()) {
+ connectionProto.MutableMap();
+ return;
+ }
+
+ if (connection.Maybe<TDqCnBroadcast>()) {
+ connectionProto.MutableBroadcast();
+ return;
+ }
+
+ if (connection.Maybe<TDqCnResult>()) {
+ connectionProto.MutableResult();
+ return;
+ }
+
+ if (connection.Maybe<TDqCnValue>()) {
+ connectionProto.MutableValue();
+ return;
+ }
+
+ if (connection.Maybe<TKqpCnMapShard>()) {
+ connectionProto.MutableMapShard();
+ return;
+ }
+
+ if (connection.Maybe<TKqpCnShuffleShard>()) {
+ connectionProto.MutableShuffleShard();
+ return;
+ }
+
+ if (auto maybeMerge = connection.Maybe<TDqCnMerge>()) {
+ auto& mergeProto = *connectionProto.MutableMerge();
+ for (const auto& sortColumn : maybeMerge.Cast().SortColumns()) {
+ auto newSortColumn = mergeProto.AddSortColumns();
+ newSortColumn->SetColumn(sortColumn.Column().StringValue());
+ newSortColumn->SetAscending(sortColumn.SortDirection().Value() == TTopSortSettings::AscendingSort);
+ }
+ return;
+ }
+
+ if (auto maybeStreamLookup = connection.Maybe<TKqpCnStreamLookup>()) {
+ TProgramBuilder pgmBuilder(TypeEnv, FuncRegistry);
+ auto& streamLookupProto = *connectionProto.MutableStreamLookup();
+ auto streamLookup = maybeStreamLookup.Cast();
+ auto tableMeta = TablesData->ExistingTable(Cluster, streamLookup.Table().Path()).Metadata;
+ YQL_ENSURE(tableMeta);
+
+ FillTable(streamLookup.Table(), *streamLookupProto.MutableTable());
+ FillColumns(streamLookup.Columns(), *tableMeta, streamLookupProto, true);
+
+ const auto lookupKeysType = streamLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ YQL_ENSURE(lookupKeysType, "Empty stream lookup keys type");
+ YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List, "Unexpected stream lookup keys type");
+ const auto lookupKeysItemType = lookupKeysType->Cast<TListExprType>()->GetItemType();
+ streamLookupProto.SetLookupKeysType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *lookupKeysItemType), TypeEnv));
+
+ const auto resultType = streamLookup.Ref().GetTypeAnn();
+ YQL_ENSURE(resultType, "Empty stream lookup result type");
+ YQL_ENSURE(resultType->GetKind() == ETypeAnnotationKind::Stream, "Unexpected stream lookup result type");
+ const auto resultItemType = resultType->Cast<TStreamExprType>()->GetItemType();
+ streamLookupProto.SetResultType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *resultItemType), TypeEnv));
+
+ return;
+ }
+
+ YQL_ENSURE(false, "Unexpected connection type: " << connection.CallableName());
+ }
+
private:
TString Cluster;
const TIntrusivePtr<TKikimrTablesData> TablesData;
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h
index db5e0187f6..5f84f70ce4 100644
--- a/ydb/core/kqp/executer/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer/kqp_executer_impl.h
@@ -381,6 +381,14 @@ protected:
auto& channelDesc = *inputDesc.AddChannels();
static_cast<TDerived*>(this)->FillChannelDesc(channelDesc, TasksGraph.GetChannel(channel));
}
+
+ if (input.Transform) {
+ auto* transformProto = inputDesc.MutableTransform();
+ transformProto->SetType(input.Transform->Type);
+ transformProto->SetInputType(input.Transform->InputType);
+ transformProto->SetOutputType(input.Transform->OutputType);
+ *transformProto->MutableSettings() = input.Transform->Settings;
+ }
}
void FillOutputDesc(NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) {
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 135890f36e..263cd10011 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -584,6 +584,7 @@ private:
case NKqpProto::TKqpPhyConnection::kHashShuffle:
case NKqpProto::TKqpPhyConnection::kUnionAll:
case NKqpProto::TKqpPhyConnection::kMerge:
+ case NKqpProto::TKqpPhyConnection::kStreamLookup:
break;
default:
YQL_ENSURE(false, "Unexpected connection type: " << (ui32)input.GetTypeCase());
@@ -606,6 +607,7 @@ private:
}
case NKqpProto::TKqpPhyConnection::kMap:
+ case NKqpProto::TKqpPhyConnection::kStreamLookup:
partitionsCount = originStageInfo.Tasks.size();
break;
@@ -647,7 +649,7 @@ private:
YQL_ENSURE(false, "Unexpected stage type " << (int) stageInfo.Meta.TableKind);
}
- BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, AppData()->EnableKqpSpilling);
+ BuildKqpStageChannels(TasksGraph, TableKeys, stageInfo, TxId, AppData()->EnableKqpSpilling, Request.Snapshot);
}
BuildKqpExecuterResults(*tx.Body, Results);
diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp
index 86613a29d8..3cb402c15b 100644
--- a/ydb/core/kqp/executer/kqp_table_resolver.cpp
+++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp
@@ -223,12 +223,22 @@ private:
private:
// TODO: Get rid of ResolveTables & TableKeys, get table information from phy tx proto.
void ResolveTables() {
+ auto addTable = [](const auto& proto, auto& tables) {
+ auto& table = tables.GetOrAddTable(MakeTableId(proto.GetTable()), proto.GetTable().GetPath());
+ for (auto& column : proto.GetColumns()) {
+ table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn());
+ }
+ };
+
for (auto& tx : Transactions) {
for (auto& stage : tx.Body->GetStages()) {
for (auto& op : stage.GetTableOps()) {
- auto& table = TableKeys.GetOrAddTable(MakeTableId(op.GetTable()), op.GetTable().GetPath());
- for (auto& column : op.GetColumns()) {
- table.Columns.emplace(column.GetName(), TKqpTableKeys::TColumn());
+ addTable(op, TableKeys);
+ }
+
+ for (const auto& input : stage.GetInputs()) {
+ if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
+ addTable(input.GetStreamLookup(), TableKeys);
}
}
}
diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.cpp b/ydb/core/kqp/executer/kqp_tasks_graph.cpp
index ac9792ed9f..8ba9b9fbf8 100644
--- a/ydb/core/kqp/executer/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer/kqp_tasks_graph.cpp
@@ -195,8 +195,64 @@ void BuildShuffleShardChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
}
}
+void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInfo, ui32 inputIndex,
+ const TStageInfo& inputStageInfo, ui32 outputIndex, const TKqpTableKeys& tableKeys,
+ const NKqpProto::TKqpPhyCnStreamLookup& streamLookup, const IKqpGateway::TKqpSnapshot& snapshot,
+ bool enableSpilling, const TChannelLogFunc& logFunc) {
+ YQL_ENSURE(stageInfo.Tasks.size() == inputStageInfo.Tasks.size());
+ YQL_ENSURE(snapshot.IsValid());
+
+ NKikimrKqp::TKqpStreamLookupSettings settings;
+ settings.MutableTable()->CopyFrom(streamLookup.GetTable());
+ settings.MutableSnapshot()->SetStep(snapshot.Step);
+ settings.MutableSnapshot()->SetTxId(snapshot.TxId);
+
+ auto table = tableKeys.GetTable(MakeTableId(streamLookup.GetTable()));
+ for (const auto& keyColumnType : table.KeyColumnTypes) {
+ settings.AddKeyColumnTypes(keyColumnType);
+ }
+
+ for (const auto& column : streamLookup.GetColumns()) {
+ auto columnIt = table.Columns.find(column.GetName());
+ YQL_ENSURE(columnIt != table.Columns.end());
+
+ auto newColumn = settings.AddColumns();
+ newColumn->SetName(columnIt->first);
+ newColumn->SetId(columnIt->second.Id);
+ newColumn->SetTypeId(columnIt->second.Type);
+ }
+
+ TTransform streamLookupTransform;
+ streamLookupTransform.Type = "StreamLookupInputTransformer";
+ streamLookupTransform.InputType = streamLookup.GetLookupKeysType();
+ streamLookupTransform.OutputType = streamLookup.GetResultType();
+ streamLookupTransform.Settings.PackFrom(settings);
+
+ for (ui32 taskId = 0; taskId < inputStageInfo.Tasks.size(); ++taskId) {
+ auto& originTask = graph.GetTask(inputStageInfo.Tasks[taskId]);
+ auto& targetTask = graph.GetTask(stageInfo.Tasks[taskId]);
+
+ auto& channel = graph.AddChannel();
+ channel.SrcTask = originTask.Id;
+ channel.SrcOutputIndex = outputIndex;
+ channel.DstTask = targetTask.Id;
+ channel.DstInputIndex = inputIndex;
+ channel.InMemory = !enableSpilling || inputStageInfo.OutputsCount == 1;
+
+ auto& taskInput = targetTask.Inputs[inputIndex];
+ taskInput.Transform = streamLookupTransform;
+ taskInput.Channels.push_back(channel.Id);
+
+ auto& taskOutput = originTask.Outputs[outputIndex];
+ taskOutput.Type = TTaskOutputType::Map;
+ taskOutput.Channels.push_back(channel.Id);
+
+ logFunc(channel.Id, originTask.Id, targetTask.Id, "StreamLookup/Map", !channel.InMemory);
+ }
+}
+
void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo,
- ui64 txId, bool enableSpilling)
+ ui64 txId, bool enableSpilling, const IKqpGateway::TKqpSnapshot& snapshot)
{
auto& stage = GetStage(stageInfo);
@@ -256,6 +312,12 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tabl
BuildMergeChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, sortColumns, log);
break;
}
+ case NKqpProto::TKqpPhyConnection::kStreamLookup: {
+ BuildStreamLookupChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, tableKeys,
+ input.GetStreamLookup(), snapshot, enableSpilling, log);
+ break;
+ }
+
default:
YQL_ENSURE(false, "Unexpected stage input type: " << (ui32)input.GetTypeCase());
}
diff --git a/ydb/core/kqp/executer/kqp_tasks_graph.h b/ydb/core/kqp/executer/kqp_tasks_graph.h
index 582be0f039..1523ed12b3 100644
--- a/ydb/core/kqp/executer/kqp_tasks_graph.h
+++ b/ydb/core/kqp/executer/kqp_tasks_graph.h
@@ -148,7 +148,7 @@ using TKqpTasksGraph = NYql::NDq::TDqTasksGraph<TStageInfoMeta, TTaskMeta, TTask
void FillKqpTasksGraphStages(TKqpTasksGraph& tasksGraph, const TVector<IKqpGateway::TPhysicalTxData>& txs);
void BuildKqpTaskGraphResultChannels(TKqpTasksGraph& tasksGraph, const NKqpProto::TKqpPhyTx& tx, ui64 txIdx);
void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TKqpTableKeys& tableKeys, const TStageInfo& stageInfo,
- ui64 txId, bool enableSpilling);
+ ui64 txId, bool enableSpilling, const IKqpGateway::TKqpSnapshot& snapshot = {});
TVector<TTaskMeta::TColumn> BuildKqpColumns(const NKqpProto::TKqpPhyTableOperation& op, const TKqpTableKeys::TTable& table);
struct TKqpTaskOutputType {
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
index 2faa33d5e1..724c7d9b22 100644
--- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
+++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
@@ -149,6 +149,11 @@
"Match": {"Type": "Callable", "Name": "KqpLookupTable"}
},
{
+ "Name": "TKqlStreamLookupTable",
+ "Base": "TKqlLookupTableBase",
+ "Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"}
+ },
+ {
"Name": "TKqlTableEffect",
"Base": "TExprBase",
"Match": {"Type": "CallableBase"},
@@ -327,6 +332,16 @@
"Match": {"Type": "Callable", "Name": "KqpCnShuffleShard"}
},
{
+ "Name": "TKqpCnStreamLookup",
+ "Base": "TKqpConnection",
+ "Match": {"Type": "Callable", "Name": "KqpCnStreamLookup"},
+ "Children": [
+ {"Index": 1, "Name": "Table", "Type": "TKqpTable"},
+ {"Index": 2, "Name": "Columns", "Type": "TCoAtomList"},
+ {"Index": 3, "Name": "LookupKeysType", "Type": "TExprBase"}
+ ]
+ },
+ {
"Name": "TKqpProgram",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "KqpProgram"},
diff --git a/ydb/core/kqp/host/kqp_run_scan.cpp b/ydb/core/kqp/host/kqp_run_scan.cpp
index 38a67850eb..70b2a661a0 100644
--- a/ydb/core/kqp/host/kqp_run_scan.cpp
+++ b/ydb/core/kqp/host/kqp_run_scan.cpp
@@ -86,6 +86,12 @@ public:
for (const auto& tableOp: stage.GetTableOps()) {
tablesSet.insert(tableOp.GetTable().GetPath());
}
+
+ for (const auto& input : stage.GetInputs()) {
+ if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
+ tablesSet.insert(input.GetStreamLookup().GetTable().GetPath());
+ }
+ }
}
}
TVector<TString> tables(tablesSet.begin(), tablesSet.end());
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 8151927387..fe12058da3 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -495,6 +495,12 @@ public:
for (const auto& tableOp: stage.GetTableOps()) {
tablesSet.insert(tableOp.GetTable().GetPath());
}
+
+ for (const auto& input : stage.GetInputs()) {
+ if (input.GetTypeCase() == NKqpProto::TKqpPhyConnection::kStreamLookup) {
+ tablesSet.insert(input.GetStreamLookup().GetTable().GetPath());
+ }
+ }
}
}
TVector<TString> tables(tablesSet.begin(), tablesSet.end());
diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp
index bf7d7f69c2..632b36b924 100644
--- a/ydb/core/kqp/node/kqp_node.cpp
+++ b/ydb/core/kqp/node/kqp_node.cpp
@@ -12,6 +12,9 @@
#include <ydb/core/kqp/rm/kqp_resource_estimation.h>
#include <ydb/core/kqp/rm/kqp_rm.h>
#include <ydb/core/kqp/common/kqp_resolve.h>
+#include <ydb/core/kqp/runtime/kqp_stream_lookup_factory.h>
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/monlib/service/pages/templates.h>
@@ -311,12 +314,12 @@ private:
IActor* computeActor;
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
computeActor = CreateKqpScanComputeActor(msg.GetSnapshot(), request.Executer, txId, std::move(dqTask),
- nullptr, nullptr, runtimeSettings, memoryLimits, Counters);
+ CreateAsyncIoFactory(), nullptr, runtimeSettings, memoryLimits, Counters);
taskCtx.ComputeActorId = Register(computeActor);
} else {
if (Y_LIKELY(!CaFactory)) {
- computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), nullptr, nullptr, runtimeSettings,
- memoryLimits);
+ computeActor = CreateKqpComputeActor(request.Executer, txId, std::move(dqTask), CreateAsyncIoFactory(),
+ nullptr, runtimeSettings, memoryLimits);
taskCtx.ComputeActorId = Register(computeActor);
} else {
computeActor = CaFactory->CreateKqpComputeActor(request.Executer, txId, std::move(dqTask),
@@ -548,6 +551,12 @@ private:
return ResourceManager_;
}
+ NYql::NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() {
+ auto factory = MakeIntrusive<NYql::NDq::TDqAsyncIoFactory>();
+ RegisterStreamLookupActorFactory(*factory);
+ return factory;
+ }
+
private:
NKikimrConfig::TTableServiceConfig::TResourceManager Config;
TIntrusivePtr<TKqpCounters> Counters;
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
index bc0241286a..0ce9229abe 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
@@ -71,8 +71,13 @@ TMaybeNode<TKqlKeyInc> GetRightTableKeyPrefix(const TKqlKeyRange& range) {
}
TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos, const TKqlReadTableBase& read,
- const TExprBase& keysToLookup, const TVector<TCoAtom>& lookupNames, const TString& indexName)
+ const TExprBase& keysToLookup, const TVector<TCoAtom>& lookupNames, const TString& indexName,
+ const TKqpOptimizeContext& kqpCtx)
{
+ if (kqpCtx.IsScanQuery()) {
+ YQL_ENSURE(false, "StreamLookupIndex is not implemented");
+ }
+
return Build<TKqlLookupIndex>(ctx, pos)
.Table(read.Table())
.LookupKeys<TCoSkipNullMembers>()
@@ -88,8 +93,22 @@ TExprBase BuildLookupIndex(TExprContext& ctx, const TPositionHandle pos, const T
}
TExprBase BuildLookupTable(TExprContext& ctx, const TPositionHandle pos, const TKqlReadTableBase& read,
- const TExprBase& keysToLookup, const TVector<TCoAtom>& lookupNames)
+ const TExprBase& keysToLookup, const TVector<TCoAtom>& lookupNames, const TKqpOptimizeContext& kqpCtx)
{
+ if (kqpCtx.IsScanQuery()) {
+ YQL_ENSURE(kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup(), "Stream lookup is not enabled");
+ return Build<TKqlStreamLookupTable>(ctx, pos)
+ .Table(read.Table())
+ .LookupKeys<TCoSkipNullMembers>()
+ .Input(keysToLookup)
+ .Members()
+ .Add(lookupNames)
+ .Build()
+ .Build()
+ .Columns(read.Columns())
+ .Done();
+ }
+
return Build<TKqlLookupTable>(ctx, pos)
.Table(read.Table())
.LookupKeys<TCoSkipNullMembers>()
@@ -311,8 +330,8 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
return {};
}
- bool needPrecomputeLeft = !join.LeftInput().Maybe<TCoParameter>() &&
- !IsParameterToListOfStructsRepack(join.LeftInput());
+ bool needPrecomputeLeft = kqpCtx.IsDataQuery() && !join.LeftInput().Maybe<TCoParameter>() &&
+ !IsParameterToListOfStructsRepack(join.LeftInput());
TExprBase leftData = needPrecomputeLeft
? Build<TDqPrecompute>(ctx, join.Pos())
@@ -374,8 +393,8 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
.Done();
TExprBase lookup = indexName
- ? BuildLookupIndex(ctx, join.Pos(), read, keysToLookup, lookupNames, indexName)
- : BuildLookupTable(ctx, join.Pos(), read, keysToLookup, lookupNames);
+ ? BuildLookupIndex(ctx, join.Pos(), read, keysToLookup, lookupNames, indexName, kqpCtx)
+ : BuildLookupTable(ctx, join.Pos(), read, keysToLookup, lookupNames, kqpCtx);
// Skip null keys in lookup part as for equijoin semantics null != null,
// so we can't have nulls in lookup part
@@ -424,7 +443,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
TExprBase KqpJoinToIndexLookup(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
const NYql::TKikimrConfiguration::TPtr& config)
{
- if (!kqpCtx.IsDataQuery() || !node.Maybe<TDqJoin>()) {
+ if ((kqpCtx.IsScanQuery() && !kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()) || !node.Maybe<TDqJoin>()) {
return node;
}
auto join = node.Cast<TDqJoin>();
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp
index a09a982c56..ea9b7825f0 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_sqlin.cpp
@@ -17,7 +17,7 @@ using namespace NYql::NNodes;
TExprBase KqpRewriteSqlInToEquiJoin(const TExprBase& node, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx,
const TKikimrConfiguration::TPtr& config)
{
- if (!kqpCtx.IsDataQuery()) {
+ if (kqpCtx.IsScanQuery() && !kqpCtx.Config->FeatureFlags.GetEnableKqpScanQueryStreamLookup()) {
return node;
}
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 27046fb0c7..c57f9cc6f2 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -29,6 +29,7 @@ public:
AddHandler(0, &TKqlReadTable::Match, HNDL(BuildReadTableStage));
AddHandler(0, &TKqlReadTableRanges::Match, HNDL(BuildReadTableRangesStage));
AddHandler(0, &TKqlLookupTable::Match, HNDL(BuildLookupTableStage));
+ AddHandler(0, &TKqlStreamLookupTable::Match, HNDL(BuildStreamLookupTableStages));
AddHandler(0, [](const TExprNode* node) { return TCoSort::Match(node) || TCoTopSort::Match(node); },
HNDL(RemoveRedundantSortByPk));
AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable));
@@ -109,6 +110,12 @@ protected:
return output;
}
+ TMaybeNode<TExprBase> BuildStreamLookupTableStages(TExprBase node, TExprContext& ctx) {
+ TExprBase output = KqpBuildStreamLookupTableStages(node, ctx);
+ DumpAppliedRule("BuildStreamLookupTableStages", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
TMaybeNode<TExprBase> RemoveRedundantSortByPk(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpRemoveRedundantSortByPk(node, ctx, KqpCtx);
DumpAppliedRule("RemoveRedundantSortByPk", node.Ptr(), output.Ptr(), ctx);
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index 578d8b7877..db94e41950 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -383,4 +383,67 @@ TExprBase KqpBuildLookupTableStage(TExprBase node, TExprContext& ctx) {
.Done();
}
+NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx) {
+ if (!node.Maybe<TKqlStreamLookupTable>()) {
+ return node;
+ }
+
+ const auto& lookup = node.Cast<TKqlStreamLookupTable>();
+
+ TMaybeNode<TKqpCnStreamLookup> cnStreamLookup;
+ if (IsDqPureExpr(lookup.LookupKeys())) {
+ YQL_ENSURE(lookup.LookupKeys().Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::List,
+ "" << lookup.LookupKeys().Ref().Dump());
+
+ cnStreamLookup = Build<TKqpCnStreamLookup>(ctx, lookup.Pos())
+ .Output()
+ .Stage<TDqStage>()
+ .Inputs()
+ .Build()
+ .Program()
+ .Args({})
+ .Body<TCoIterator>()
+ .List(lookup.LookupKeys())
+ .Build()
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, lookup.Pos()))
+ .Build()
+ .Index().Build("0")
+ .Build()
+ .Table(lookup.Table())
+ .Columns(lookup.Columns())
+ .LookupKeysType(ExpandType(lookup.Pos(), *lookup.LookupKeys().Ref().GetTypeAnn(), ctx))
+ .Done();
+
+ } else if (lookup.LookupKeys().Maybe<TDqCnUnionAll>()) {
+ auto output = lookup.LookupKeys().Cast<TDqCnUnionAll>().Output();
+
+ cnStreamLookup = Build<TKqpCnStreamLookup>(ctx, lookup.Pos())
+ .Output(output)
+ .Table(lookup.Table())
+ .Columns(lookup.Columns())
+ .LookupKeysType(ExpandType(lookup.Pos(), *output.Ref().GetTypeAnn(), ctx))
+ .Done();
+ } else {
+ return node;
+ }
+
+ return Build<TDqCnUnionAll>(ctx, node.Pos())
+ .Output()
+ .Stage<TDqStage>()
+ .Inputs()
+ .Add(cnStreamLookup.Cast())
+ .Build()
+ .Program()
+ .Args({"stream_lookup_output"})
+ .Body<TCoToStream>()
+ .Input("stream_lookup_output")
+ .Build()
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, node.Pos()))
+ .Build()
+ .Index().Build("0")
+ .Build().Done();
+}
+
} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
index 6ec28394c7..bde2856792 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
@@ -19,6 +19,8 @@ NYql::NNodes::TExprBase KqpBuildReadTableRangesStage(NYql::NNodes::TExprBase nod
NYql::NNodes::TExprBase KqpBuildLookupTableStage(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx);
+NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx);
+
NYql::NNodes::TExprBase KqpRemoveRedundantSortByPk(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
diff --git a/ydb/core/kqp/prepare/kqp_type_ann.cpp b/ydb/core/kqp/prepare/kqp_type_ann.cpp
index f111792122..a19a9bed8f 100644
--- a/ydb/core/kqp/prepare/kqp_type_ann.cpp
+++ b/ydb/core/kqp/prepare/kqp_type_ann.cpp
@@ -1029,6 +1029,43 @@ TStatus AnnotateKqpEnsure(const TExprNode::TPtr& node, TExprContext& ctx) {
return TStatus::Ok;
}
+TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
+ const TKikimrTablesData& tablesData, bool withSystemColumns) {
+
+ if (!EnsureArgsCount(*node, 4, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureCallable(*node->Child(TKqpCnStreamLookup::idx_Output), ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!TDqOutput::Match(node->Child(TKqpCnStreamLookup::idx_Output))) {
+ ctx.AddError(TIssue(ctx.GetPosition(node->Child(TDqCnMerge::idx_Output)->Pos()),
+ TStringBuilder() << "Expected " << TDqOutput::CallableName()));
+ return TStatus::Error;
+ }
+
+ auto table = ResolveTable(node->Child(TKqpCnStreamLookup::idx_Table), ctx, cluster, tablesData);
+ if (!table.second) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureTupleOfAtoms(*node->Child(TKqpCnStreamLookup::idx_Columns), ctx)) {
+ return TStatus::Error;
+ }
+
+ TCoAtomList columns{node->ChildPtr(TKqlLookupTableBase::idx_Columns)};
+
+ auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, columns, withSystemColumns);
+ if (!rowType) {
+ return TStatus::Error;
+ }
+
+ node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));
+ return TStatus::Ok;
+}
+
} // namespace
TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cluster,
@@ -1099,6 +1136,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
return AnnotateDqConnection(input, ctx);
}
+ if (TKqpCnStreamLookup::Match(input.Get())) {
+ return AnnotateStreamLookupConnection(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
+ }
+
if (TKqpTxResultBinding::Match(input.Get())) {
return AnnotateKqpTxResultBinding(input, ctx);
}
diff --git a/ydb/core/kqp/runtime/CMakeLists.txt b/ydb/core/kqp/runtime/CMakeLists.txt
index 1083d8408f..d12069e05c 100644
--- a/ydb/core/kqp/runtime/CMakeLists.txt
+++ b/ydb/core/kqp/runtime/CMakeLists.txt
@@ -41,6 +41,8 @@ target_sources(core-kqp-runtime PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_read_table.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_scan_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp
)
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
new file mode 100644
index 0000000000..3a06dd106d
--- /dev/null
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -0,0 +1,440 @@
+#include "kqp_stream_lookup_actor.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+#include <ydb/core/actorlib_impl/long_timer.h>
+#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/engine/minikql/minikql_engine_host.h>
+#include <ydb/core/kqp/common/kqp_resolve.h>
+#include <ydb/core/kqp/common/kqp_gateway.h>
+#include <ydb/core/protos/kqp.pb.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/tx/datashard/datashard.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+namespace {
+
+static constexpr TDuration RESOLVE_SHARDS_TIMEOUT = TDuration::Seconds(5);
+static constexpr TDuration RETRY_READ_TIMEOUT = TDuration::Seconds(10);
+
+class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLookupActor>, public NYql::NDq::IDqComputeActorAsyncInput {
+public:
+ TKqpStreamLookupActor(ui64 inputIndex, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
+ const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
+ NKikimrKqp::TKqpStreamLookupSettings&& settings)
+ : InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv)
+ , HolderFactory(holderFactory), TableId(MakeTableId(settings.GetTable()))
+ , KeyColumnTypes(settings.GetKeyColumnTypes().begin(), settings.GetKeyColumnTypes().end())
+ , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
+ , ResolveTableShardsTimeout(RESOLVE_SHARDS_TIMEOUT)
+ , RetryReadTimeout(RETRY_READ_TIMEOUT) {
+
+ for (const auto& column : settings.GetColumns()) {
+ Columns.emplace_back(&column);
+ }
+ };
+
+ void Bootstrap() {
+ ResolveTableShards();
+
+ Become(&TKqpStreamLookupActor::StateFunc);
+ }
+
+private:
+ enum class EReadState {
+ Initial,
+ Running,
+ Finished,
+ };
+
+ std::string_view ReadStateToString(EReadState state) {
+ switch (state) {
+ case EReadState::Initial: return "Initial"sv;
+ case EReadState::Running: return "Running"sv;
+ case EReadState::Finished: return "Finished"sv;
+ }
+ }
+
+ struct TReadState {
+ TReadState(ui64 id, ui64 shardId, std::vector<TOwnedTableRange>&& keys)
+ : Id(id)
+ , ShardId(shardId)
+ , Keys(std::move(keys))
+ , State(EReadState::Initial)
+ , Retried(false) {}
+
+ void SetFinished(const NActors::TActorContext& ctx) {
+ Keys.clear();
+ State = EReadState::Finished;
+
+ if (RetryDeadlineTimerId) {
+ ctx.Send(RetryDeadlineTimerId, new TEvents::TEvPoisonPill());
+ RetryDeadlineTimerId = {};
+ }
+ }
+
+ bool Finished() const {
+ return (State == EReadState::Finished);
+ }
+
+ const ui64 Id;
+ const ui64 ShardId;
+ std::vector<TOwnedTableRange> Keys;
+ EReadState State;
+ TActorId RetryDeadlineTimerId;
+ bool Retried;
+ };
+
+ struct TEvPrivate {
+ enum EEv {
+ EvRetryReadTimeout = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ EvResolveTableShardsTimeout,
+ };
+
+ struct TEvResolveTableShardsTimeout : public TEventLocal<TEvResolveTableShardsTimeout, EvResolveTableShardsTimeout> {
+ };
+
+ struct TEvRetryReadTimeout : public TEventLocal<TEvRetryReadTimeout, EvRetryReadTimeout> {
+ TEvRetryReadTimeout(ui64 readId) : ReadId(readId) {}
+
+ ui64 ReadId;
+ };
+ };
+
+private:
+ void SaveState(const NYql::NDqProto::TCheckpoint&, NYql::NDqProto::TSourceState&) final {}
+ void LoadState(const NYql::NDqProto::TSourceState&) final {}
+ void CommitState(const NYql::NDqProto::TCheckpoint&) final {}
+
+ ui64 GetInputIndex() const final {
+ return InputIndex;
+ }
+
+ void PassAway() final {
+ {
+ auto alloc = BindAllocator();
+ Input.Clear();
+ }
+
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
+ TActorBootstrapped<TKqpStreamLookupActor>::PassAway();
+ }
+
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64) final {
+ i64 totalDataSize = 0;
+
+ for (; !Results.empty(); Results.pop_front()) {
+ const auto& result = Results.front();
+ YQL_ENSURE(result.size() == Columns.size(), "Result columns mismatch");
+
+ NUdf::TUnboxedValue* rowItems = nullptr;
+ auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
+
+ for (ui32 colId = 0; colId < Columns.size(); ++colId) {
+ totalDataSize += result[colId].Size();
+ rowItems[colId] = NMiniKQL::GetCellValue(result[colId], Columns[colId].TypeId);
+ }
+
+ batch.push_back(std::move(row));
+ }
+
+ NUdf::EFetchStatus status;
+ NUdf::TUnboxedValue key;
+ while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) {
+ std::vector<TCell> keyCells(KeyColumnTypes.size());
+ for (ui32 colId = 0; colId < KeyColumnTypes.size(); ++colId) {
+ keyCells[colId] = MakeCell(KeyColumnTypes[colId], key.GetElement(colId), TypeEnv, /* copy */ true);
+ }
+
+ UnprocessedKeys.emplace_back(std::move(keyCells));
+ }
+
+ if (Partitioning) {
+ ProcessLookupKeys();
+ }
+
+ finished = (status == NUdf::EFetchStatus::Finish) && UnprocessedKeys.empty() && AllReadsFinished();
+ return totalDataSize;
+ }
+
+ STFUNC(StateFunc) {
+ Y_UNUSED(ctx);
+
+ try {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, Handle);
+ hFunc(TEvDataShard::TEvReadResult, Handle);
+ hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
+ hFunc(TEvPrivate::TEvResolveTableShardsTimeout, Handle);
+ hFunc(TEvPrivate::TEvRetryReadTimeout, Handle);
+ IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult);
+ default:
+ RuntimeError(TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite());
+ }
+ } catch (const yexception& e) {
+ RuntimeError(e.what());
+ }
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
+ if (ev->Get()->Request->ErrorCount > 0) {
+ return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " << TableId);
+ }
+
+ auto& resultSet = ev->Get()->Request->ResultSet;
+ YQL_ENSURE(resultSet.size() == 1, "Expected one result for range (-inf, +inf)");
+ Partitioning = resultSet[0].KeyDescription->Partitioning;
+
+ ProcessLookupKeys();
+ }
+
+ void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
+ const auto& record = ev->Get()->Record;
+
+ auto readIt = Reads.find(record.GetReadId());
+ YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << record.GetReadId());
+ auto& read = readIt->second;
+
+ if (read.State != EReadState::Running) {
+ return;
+ }
+
+ // TODO: refactor after KIKIMR-15102
+ if (record.GetStatus().GetCode() != Ydb::StatusIds::SUCCESS) {
+ NKikimrTxDataShard::TReadContinuationToken continuationToken;
+ bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken());
+ YQL_ENSURE(parseResult, "Failed to parse continuation token");
+ YQL_ENSURE(continuationToken.GetFirstUnprocessedQuery() <= read.Keys.size());
+
+ return RetryTableRead(read, continuationToken);
+ }
+
+ YQL_ENSURE(record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::CELLVEC);
+ auto nrows = ev->Get()->GetRowsCount();
+ for (ui64 rowId = 0; rowId < nrows; ++rowId) {
+ Results.emplace_back(ev->Get()->GetCells(rowId));
+ }
+
+ Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
+
+ if (record.GetFinished()) {
+ read.SetFinished(TlsActivationContext->AsActorContext());
+ } else {
+ THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck());
+ request->Record.SetReadId(record.GetReadId());
+ request->Record.SetSeqNo(record.GetSeqNo());
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), read.ShardId, true),
+ IEventHandle::FlagTrackDelivery);
+ }
+ }
+
+ void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
+ const auto& tabletId = ev->Get()->TabletId;
+ auto shardIt = ReadsPerShard.find(tabletId);
+ YQL_ENSURE(shardIt != ReadsPerShard.end());
+
+ for (auto readId : shardIt->second) {
+ auto readIt = Reads.find(readId);
+ YQL_ENSURE(readIt != Reads.end());
+ auto& read = readIt->second;
+
+ if (read.State == EReadState::Running) {
+ for (auto& key : read.Keys) {
+ UnprocessedKeys.emplace_back(std::move(key));
+ }
+
+ read.SetFinished(TlsActivationContext->AsActorContext());
+ }
+ }
+
+ ReadsPerShard.erase(shardIt);
+ ResolveTableShards();
+ }
+
+ void Handle(TEvPrivate::TEvResolveTableShardsTimeout::TPtr&) {
+ if (!Partitioning) {
+ RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId
+ << " (request timeout exceeded)");
+ }
+ }
+
+ void Handle(TEvPrivate::TEvRetryReadTimeout::TPtr& ev) {
+ auto readIt = Reads.find(ev->Get()->ReadId);
+ YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
+ auto& read = readIt->second;
+
+ if (read.Retried) {
+ RuntimeError(TStringBuilder() << "Retry timeout exceeded for read: " << ev->Get()->ReadId);
+ }
+ }
+
+ void ProcessLookupKeys() {
+ YQL_ENSURE(Partitioning, "Table partitioning should be initialized before lookup keys processing");
+
+ std::map<ui64, std::vector<TOwnedTableRange>> shardKeys;
+ for (; !UnprocessedKeys.empty(); UnprocessedKeys.pop_front()) {
+ const auto& key = UnprocessedKeys.front();
+ YQL_ENSURE(key.Point);
+
+ auto partitionInfo = LowerBound(
+ Partitioning->begin(), Partitioning->end(), /* value */ true,
+ [&](const auto& partition, bool) {
+ const int result = CompareBorders<true, false>(
+ partition.Range->EndKeyPrefix.GetCells(), key.From,
+ partition.Range->IsInclusive || partition.Range->IsPoint,
+ key.InclusiveFrom || key.Point, KeyColumnTypes
+ );
+
+ return (result < 0);
+ }
+ );
+
+ shardKeys[partitionInfo->ShardId].emplace_back(std::move(key));
+ }
+
+ for (auto& [shardId, keys] : shardKeys) {
+ StartTableRead(shardId, std::move(keys));
+ }
+ }
+
+ TReadState& StartTableRead(ui64 shardId, std::vector<TOwnedTableRange>&& keys) {
+ const auto readId = GetNextReadId();
+ TReadState read(readId, shardId, std::move(keys));
+
+ THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
+ auto& record = request->Record;
+
+ record.MutableSnapshot()->SetStep(Snapshot.Step);
+ record.MutableSnapshot()->SetTxId(Snapshot.TxId);
+
+ record.SetReadId(read.Id);
+ record.SetResultFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
+
+ record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId);
+ record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId);
+ record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion);
+
+ for (const auto& column : Columns) {
+ record.AddColumns(column.Id);
+ }
+
+ for (auto& key : read.Keys) {
+ YQL_ENSURE(key.Point);
+ request->Keys.emplace_back(TSerializedCellVec::Serialize(key.From));
+ }
+
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), shardId, true),
+ IEventHandle::FlagTrackDelivery);
+
+ read.State = EReadState::Running;
+
+ const auto [readIt, succeeded] = Reads.insert({readId, std::move(read)});
+ YQL_ENSURE(succeeded);
+ ReadsPerShard[shardId].insert(readId);
+
+ return readIt->second;
+ }
+
+ void RetryTableRead(TReadState& failedRead, NKikimrTxDataShard::TReadContinuationToken& token) {
+ YQL_ENSURE(token.GetFirstUnprocessedQuery() <= failedRead.Keys.size());
+ std::vector<TOwnedTableRange> unprocessedKeys(failedRead.Keys.size() - token.GetFirstUnprocessedQuery());
+ for (ui64 idx = token.GetFirstUnprocessedQuery(); idx < failedRead.Keys.size(); ++idx) {
+ unprocessedKeys.emplace_back(std::move(failedRead.Keys[idx]));
+ }
+
+ auto& newRead = StartTableRead(failedRead.ShardId, std::move(unprocessedKeys));
+ if (failedRead.Retried) {
+ newRead.RetryDeadlineTimerId = failedRead.RetryDeadlineTimerId;
+ failedRead.RetryDeadlineTimerId = {};
+ } else {
+ failedRead.Retried = true;
+ newRead.RetryDeadlineTimerId = CreateLongTimer(TlsActivationContext->AsActorContext(), RetryReadTimeout,
+ new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryReadTimeout(newRead.Id)));
+ }
+
+ failedRead.SetFinished(TlsActivationContext->AsActorContext());
+ }
+
+ void ResolveTableShards() {
+ Partitioning.reset();
+
+ auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
+
+ TVector<TCell> minusInf(KeyColumnTypes.size());
+ TVector<TCell> plusInf;
+ TTableRange range(minusInf, true, plusInf, true, false);
+
+ request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read,
+ KeyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
+
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
+
+ ResolveTableShardsTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), ResolveTableShardsTimeout,
+ new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveTableShardsTimeout()));
+ }
+
+ bool AllReadsFinished() const {
+ for (const auto& [_, read] : Reads) {
+ if (!read.Finished()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ ui64 GetNextReadId() {
+ static ui64 readId = 0;
+ return ++readId;
+ }
+
+ TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() {
+ return TypeEnv.BindAllocator();
+ }
+
+ void RuntimeError(const TString& message, const NYql::TIssues& subIssues = {}) {
+ NYql::TIssue issue(message);
+ for (const auto& i : subIssues) {
+ issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
+ }
+
+ NYql::TIssues issues;
+ issues.AddIssue(std::move(issue));
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), true));
+ }
+
+private:
+ const ui64 InputIndex;
+ NUdf::TUnboxedValue Input;
+ const NActors::TActorId ComputeActorId;
+ const NMiniKQL::TTypeEnvironment& TypeEnv;
+ const NMiniKQL::THolderFactory& HolderFactory;
+ const TTableId TableId;
+ const TVector<NKikimr::NScheme::TTypeId> KeyColumnTypes;
+ std::vector<NYql::TKikimrColumnMetadata> Columns;
+ const IKqpGateway::TKqpSnapshot Snapshot;
+ std::deque<TOwnedCellVec> Results;
+ std::unordered_map<ui64, TReadState> Reads;
+ std::unordered_map<ui64, std::set<ui64>> ReadsPerShard;
+ std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
+ std::deque<TOwnedTableRange> UnprocessedKeys;
+ const TDuration ResolveTableShardsTimeout;
+ NActors::TActorId ResolveTableShardsTimeoutTimer;
+ const TDuration RetryReadTimeout;
+};
+
+} // namespace
+
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
+ const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
+ const NMiniKQL::THolderFactory& holderFactory, NKikimrKqp::TKqpStreamLookupSettings&& settings) {
+ auto actor = new TKqpStreamLookupActor(inputIndex, input, computeActorId, typeEnv, holderFactory,
+ std::move(settings));
+ return {actor, actor};
+}
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
new file mode 100644
index 0000000000..59d8d83b87
--- /dev/null
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
+#include <ydb/core/protos/kqp.pb.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
+ const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
+ const NMiniKQL::THolderFactory& holderFactory, NKikimrKqp::TKqpStreamLookupSettings&& settings);
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
new file mode 100644
index 0000000000..dab7ce8f5c
--- /dev/null
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
@@ -0,0 +1,16 @@
+#include "kqp_stream_lookup_factory.h"
+#include "kqp_stream_lookup_actor.h"
+
+namespace NKikimr {
+namespace NKqp {
+
+void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory) {
+ factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [](NKikimrKqp::TKqpStreamLookupSettings&& settings,
+ NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) {
+ return CreateStreamLookupActor(args.InputIndex, args.TransformInput, args.ComputeActorId, args.TypeEnv,
+ args.HolderFactory, std::move(settings));
+ });
+}
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h
new file mode 100644
index 0000000000..1eded0576b
--- /dev/null
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory);
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 8b1d4426d3..46b5de9186 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -110,6 +110,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
ServerSettings->SetKeepSnapshotTimeout(settings.KeepSnapshotTimeout);
ServerSettings->SetFrFactory(&UdfFrFactory);
ServerSettings->SetEnableNotNullColumns(true);
+ ServerSettings->SetEnableKqpScanQueryStreamLookup(true);
if (settings.LogStream)
ServerSettings->SetLogBackend(new TStreamLogBackend(settings.LogStream));
diff --git a/ydb/core/kqp/ut/kqp_scan_ut.cpp b/ydb/core/kqp/ut/kqp_scan_ut.cpp
index 2087a2b1e5..d81eaa9199 100644
--- a/ydb/core/kqp/ut/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scan_ut.cpp
@@ -1939,6 +1939,35 @@ Y_UNIT_TEST_SUITE(KqpScan) {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson("[[%false]]", StreamResultToYson(result));
}
+
+ Y_UNIT_TEST_TWIN(StreamLookup, UseSessionActor) {
+ auto kikimr = KikimrRunnerEnableSessionActor(UseSessionActor);
+ auto db = kikimr.GetTableClient();
+ CreateSampleTables(kikimr);
+
+ {
+ auto result = db.CreateSession().GetValueSync().GetSession().ExecuteDataQuery(R"(
+ REPLACE INTO `/Root/EightShard` (Key, Text, Data) VALUES
+ (1u, "Value1", 1),
+ (2u, "Value2", 1),
+ (3u, "Value3", 1),
+ (4u, "Value4", 1),
+ (5u, "Value5", 1);
+ )", TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ {
+ auto result = db.StreamExecuteScanQuery(R"(
+ PRAGMA kikimr.UseNewEngine = "true";
+ PRAGMA kikimr.OptEnablePredicateExtract = "false";
+ $keys = SELECT Key FROM `/Root/EightShard`;
+ SELECT * FROM `/Root/KeyValue` WHERE Key IN $keys ORDER BY Key;
+ )").GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ CompareYson(R"([[[1u];["One"]];[[2u];["Two"]]])", StreamResultToYson(result));
+ }
+ }
}
} // namespace NKqp
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 8e1327f846..8c93d3fcac 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -699,6 +699,7 @@ message TFeatureFlags {
optional bool AllowVDiskDefrag = 63 [default = true];
optional bool EnableAsyncHttpMon = 64 [default = true];
optional bool EnableChangefeeds = 65 [default = true];
+ optional bool EnableKqpScanQueryStreamLookup = 66 [default = false];
}
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index b60b518aed..2bd82e48e5 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -574,3 +574,10 @@ message TEvRemoteScanDataAck {
message TEvKillScanTablet {
}
+
+message TKqpStreamLookupSettings {
+ optional NKqpProto.TKqpPhyTable Table = 1;
+ repeated uint32 KeyColumnTypes = 2;
+ repeated TKqpColumnMetadataProto Columns = 3;
+ optional TKqpSnapshot Snapshot = 4;
+}
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index cae8f32e05..d0c652a324 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -208,6 +208,13 @@ message TKqpPhyCnMerge {
repeated TKqpPhySortColumn SortColumns = 1;
}
+message TKqpPhyCnStreamLookup {
+ TKqpPhyTable Table = 1;
+ repeated TKqpPhyColumn Columns = 2;
+ bytes LookupKeysType = 3;
+ bytes ResultType = 4;
+}
+
message TKqpPhyConnection {
uint32 StageIndex = 1;
uint32 OutputIndex = 2;
@@ -222,6 +229,7 @@ message TKqpPhyConnection {
TKqpPhyCnResult Result = 9;
TKqpPhyCnValue Value = 10;
TKqpPhyCnMerge Merge = 11;
+ TKqpPhyCnStreamLookup StreamLookup = 12;
};
}
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index 5f58521a27..a7ed81b02e 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -35,6 +35,7 @@ public:
FEATURE_FLAG_SETTER(EnableBulkUpsertToAsyncIndexedTables)
FEATURE_FLAG_SETTER(EnableChangefeeds)
FEATURE_FLAG_SETTER(EnableKqpSessionActor)
+ FEATURE_FLAG_SETTER(EnableKqpScanQueryStreamLookup)
TDerived& SetEnableMvcc(std::optional<bool> value) {
if (value) {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 41f0e58817..acabdd7d8d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -69,14 +69,14 @@ struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {
struct TOutputTransformCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {
void OnAsyncOutputError(ui64 outputIndex, const TIssues& issues, bool isFatal) override final {
- OnTransformError(outputIndex, issues, isFatal);
+ OnOutputTransformError(outputIndex, issues, isFatal);
}
void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override final {
OnTransformStateSaved(std::move(state), outputIndex, checkpoint);
}
- virtual void OnTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0;
+ virtual void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0;
virtual void OnTransformStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0;
};
@@ -1416,18 +1416,38 @@ protected:
}
void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {
- Y_VERIFY(SourcesMap.FindPtr(ev->Get()->InputIndex));
+ Y_VERIFY(SourcesMap.FindPtr(ev->Get()->InputIndex) || InputTransformsMap.FindPtr(ev->Get()->InputIndex));
ContinueExecute();
}
void OnAsyncInputError(const IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr& ev) {
- if (!ev->Get()->IsFatal) {
- SourcesMap.at(ev->Get()->InputIndex).IssuesBuffer.Push(ev->Get()->Issues);
+ if (SourcesMap.FindPtr(ev->Get()->InputIndex)) {
+ OnSourceError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal);
+ } else if (InputTransformsMap.FindPtr(ev->Get()->InputIndex)) {
+ OnInputTransformError(ev->Get()->InputIndex, ev->Get()->Issues, ev->Get()->IsFatal);
+ } else {
+ YQL_ENSURE(false, "Unexpected input index: " << ev->Get()->InputIndex);
+ }
+ }
+
+ void OnSourceError(ui64 inputIndex, const TIssues& issues, bool isFatal) {
+ if (!isFatal) {
+ SourcesMap.at(inputIndex).IssuesBuffer.Push(issues);
return;
}
- CA_LOG_E("Source[" << ev->Get()->InputIndex << "] fatal error: " << ev->Get()->Issues.ToString());
- InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, ev->Get()->Issues);
+ CA_LOG_E("Source[" << inputIndex << "] fatal error: " << issues.ToOneLineString());
+ InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues);
+ }
+
+ void OnInputTransformError(ui64 inputIndex, const TIssues& issues, bool isFatal) {
+ if (!isFatal) {
+ InputTransformsMap.at(inputIndex).IssuesBuffer.Push(issues);
+ return;
+ }
+
+ CA_LOG_E("InputTransform[" << inputIndex << "] fatal error: " << issues.ToOneLineString());
+ InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues);
}
void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) override {
@@ -1440,7 +1460,7 @@ protected:
InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues);
}
- void OnTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) override {
+ void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) override {
if (!isFatal) {
OutputTransformsMap.at(outputIndex).IssuesBuffer.Push(issues);
return;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index bb0a0a5baf..42de67823f 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -422,13 +422,13 @@ public:
Y_VERIFY(!transform->TransformInput);
Y_VERIFY(!transform->TransformOutput);
- TStringBuilder err;
- transform->TransformInputType = NCommon::ParseTypeFromYson(TStringBuf{transformDesc.GetInputType()}, *pb, err.Out);
- YQL_ENSURE(transform->TransformInputType, "Can't parse transform input type: " << err);
+ auto inputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{transformDesc.GetInputType()}, typeEnv);
+ YQL_ENSURE(inputTypeNode, "Failed to deserialize transform input type");
+ transform->TransformInputType = static_cast<TType*>(inputTypeNode);
- err.clear();
- TType* outputType = NCommon::ParseTypeFromYson(TStringBuf{transformDesc.GetOutputType()}, *pb, err.Out);
- YQL_ENSURE(outputType, "Can't parse transform output type: " << err);
+ auto outputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{transformDesc.GetOutputType()}, typeEnv);
+ YQL_ENSURE(outputTypeNode, "Failed to deserialize transform output type");
+ TType* outputType = static_cast<TType*>(outputTypeNode);
YQL_ENSURE(outputType->IsSameType(*ProgramParsed.InputItemTypes[i]));
LOG(TStringBuilder() << "Task: " << TaskId << " has transform by "
<< transformDesc.GetType() << " with input type: " << *transform->TransformInputType
@@ -488,13 +488,13 @@ public:
Y_VERIFY(!transform->TransformInput);
Y_VERIFY(!transform->TransformOutput);
- TStringBuilder err;
- transform->TransformOutputType = NCommon::ParseTypeFromYson(TStringBuf{transformDesc.GetOutputType()}, *pb, err.Out);
- YQL_ENSURE(transform->TransformOutputType, "Can't parse transform output type: " << err);
+ auto outputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{transformDesc.GetOutputType()}, typeEnv);
+ YQL_ENSURE(outputTypeNode, "Failed to deserialize transform output type");
+ transform->TransformOutputType = static_cast<TType*>(outputTypeNode);
- err.clear();
- TType* inputType = NCommon::ParseTypeFromYson(TStringBuf{transformDesc.GetInputType()}, *pb, err.Out);
- YQL_ENSURE(inputType, "Can't parse transform input type: " << err);
+ auto inputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{transformDesc.GetInputType()}, typeEnv);
+ YQL_ENSURE(inputTypeNode, "Failed to deserialize transform input type");
+ TType* inputType = static_cast<TType*>(inputTypeNode);
YQL_ENSURE(inputType->IsSameType(*ProgramParsed.OutputItemTypes[i]));
LOG(TStringBuilder() << "Task: " << TaskId << " has transform by "
<< transformDesc.GetType() << " with input type: " << *inputType
diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
index ee7cba6b71..780f500903 100644
--- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h
+++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
@@ -102,6 +102,15 @@ enum class TTaskInputType {
Merge
};
+struct TTransform {
+ TString Type;
+
+ TString InputType;
+ TString OutputType;
+
+ ::google::protobuf::Any Settings;
+};
+
template <class TInputMeta>
struct TTaskInput {
std::variant<std::monostate, TMergeTaskInput> ConnectionInfo;
@@ -109,6 +118,7 @@ struct TTaskInput {
TMaybe<::google::protobuf::Any> SourceSettings;
TString SourceType;
TInputMeta Meta;
+ TMaybe<TTransform> Transform;
TTaskInputType Type() const {
return static_cast<TTaskInputType>(ConnectionInfo.index());
@@ -127,15 +137,6 @@ struct TTaskOutputType {
};
};
-struct TTransform {
- TString Type;
-
- TString InputType;
- TString OutputType;
-
- ::google::protobuf::Any Settings;
-};
-
template <class TOutputMeta>
struct TTaskOutput {
ui32 Type = TTaskOutputType::Undefined;
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index 5cf9450d05..06c8112192 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -158,6 +158,10 @@ namespace NYql::NDqs {
// Sinks
if (auto maybeDqOutputsList = stage.Outputs()) {
+ TScopedAlloc alloc;
+ TTypeEnvironment typeEnv(alloc);
+ TProgramBuilder pgmBuilder(typeEnv, *FunctionRegistry);
+
auto dqOutputsList = maybeDqOutputsList.Cast();
for (const auto& output : dqOutputsList) {
const ui64 index = FromString(output.Ptr()->Child(TDqOutputAnnotationBase::idx_Index)->Content());
@@ -179,12 +183,20 @@ namespace NYql::NDqs {
YQL_ENSURE(!sinkSettings.type_url().empty(), "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings for its dq sink node");
YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkName << "\" did't fill dq sink settings type for its dq sink node");
} else if (output.Maybe<NNodes::TDqTransform>()) {
+ TStringStream errorStream;
+
auto transform = output.Cast<NNodes::TDqTransform>();
outputTransform.Type = transform.Type();
- const auto inputType = transform.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- outputTransform.InputType = NCommon::WriteTypeToYson(inputType);
- const auto outputType = transform.OutputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- outputTransform.OutputType = NCommon::WriteTypeToYson(outputType);
+ const auto inputTypeAnnotation = transform.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ auto inputType = NCommon::BuildType(*inputTypeAnnotation, pgmBuilder, errorStream);
+ Y_ENSURE(inputType, "Failed to build transform input type: " << errorStream.Str());
+ outputTransform.InputType = NKikimr::NMiniKQL::SerializeNode(inputType, typeEnv);
+
+ errorStream.clear();
+ const auto outputTypeAnnotation = transform.OutputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ auto outputType = NCommon::BuildType(*outputTypeAnnotation, pgmBuilder, errorStream);
+ Y_ENSURE(outputType, "Failed to build transform output type: " << errorStream.Str());
+ outputTransform.OutputType = NKikimr::NMiniKQL::SerializeNode(outputType, typeEnv);
dqIntegration->FillTransformSettings(transform.Ref(), outputTransform.Settings);
} else {
YQL_ENSURE(false, "Unknown stage output type");