aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-10-06 13:11:20 +0300
committerulya-sidorina <yulia@ydb.tech>2023-10-06 13:47:02 +0300
commitfdac813cf5d2394785944db3cd146c863a87bb92 (patch)
treed112f68851ddae693111bcaf25e4936eff25ebce
parent7775f59248540408cbd58e95b8b3a285fcf51e2d (diff)
downloadydb-fdac813cf5d2394785944db3cd146c863a87bb92.tar.gz
KIKIMR-16746: implement stream index lookup join
feature(kqp): implement stream index lookup join
-rw-r--r--ydb/core/kqp/common/kqp_yql.h3
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp1
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.cpp4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp2
-rw-r--r--ydb/core/kqp/expr_nodes/kqp_expr_nodes.json28
-rw-r--r--ydb/core/kqp/host/kqp_type_ann.cpp257
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp27
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp40
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp7
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp50
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.h1
-rw-r--r--ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp13
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp97
-rw-r--r--ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/runtime/kqp_compute.cpp144
-rw-r--r--ydb/core/kqp/runtime/kqp_compute.h1
-rw-r--r--ydb/core/kqp/runtime/kqp_program_builder.cpp49
-rw-r--r--ydb/core/kqp/runtime/kqp_program_builder.h2
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp340
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp761
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.h60
-rw-r--r--ydb/core/kqp/runtime/ya.make2
-rw-r--r--ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp186
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/kqp.proto1
-rw-r--r--ydb/core/protos/kqp_physical.proto7
30 files changed, 1761 insertions, 329 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h
index 3549a1a9854..3aab7b5d085 100644
--- a/ydb/core/kqp/common/kqp_yql.h
+++ b/ydb/core/kqp/common/kqp_yql.h
@@ -45,6 +45,9 @@ struct TKqpPhyTxSettings {
constexpr TStringBuf KqpReadRangesSourceName = "KqpReadRangesSource";
+static constexpr std::string_view TKqpStreamLookupStrategyName = "LookupRows"sv;
+static constexpr std::string_view TKqpStreamLookupJoinStrategyName = "LookupJoinRows"sv;
+
struct TKqpReadTableSettings {
static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys";
static constexpr TStringBuf ItemsLimitSettingName = "ItemsLimit";
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index 29485c077cb..276c0469573 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -422,6 +422,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableKqpDataQueryStreamLookup = serviceConfig.GetEnableKqpDataQueryStreamLookup();
kqpConfig.EnableKqpScanQueryStreamLookup = serviceConfig.GetEnableKqpScanQueryStreamLookup();
kqpConfig.EnableKqpScanQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin();
+ kqpConfig.EnableKqpDataQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin();
kqpConfig.EnablePredicateExtractForDataQuery = serviceConfig.GetEnablePredicateExtractForDataQueries();
kqpConfig.EnablePredicateExtractForScanQuery = serviceConfig.GetEnablePredicateExtractForScanQueries();
kqpConfig.EnableSequentialReads = serviceConfig.GetEnableSequentialReads();
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
index d0232c0ea66..e8e18bbfdbd 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
@@ -50,6 +50,10 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
return WrapKqpEnsure(callable, ctx);
}
+ if (name == "KqpIndexLookupJoin"sv) {
+ return WrapKqpIndexLookupJoin(callable, ctx);
+ }
+
return nullptr;
};
}
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
index 9d76aaf9ef4..a49088a0c92 100644
--- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
@@ -384,6 +384,8 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf
columnProto->SetTypeId(columnIt->second.Type.GetTypeId());
}
+ settings->SetLookupStrategy(streamLookup.GetLookupStrategy());
+
TTransform streamLookupTransform;
streamLookupTransform.Type = "StreamLookupInputTransformer";
streamLookupTransform.InputType = streamLookup.GetLookupKeysType();
diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
index d208142f542..09413f17157 100644
--- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
+++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json
@@ -183,6 +183,20 @@
"Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"}
},
{
+ "Name": "TKqlStreamIdxLookupJoin",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "KqlStreamIdxLookupJoin"},
+ "Children": [
+ {"Index": 0, "Name": "LeftInput", "Type": "TExprBase"},
+ {"Index": 1, "Name": "LeftLabel", "Type": "TCoAtom"},
+ {"Index": 2, "Name": "RightTable", "Type": "TKqpTable"},
+ {"Index": 3, "Name": "RightColumns", "Type": "TCoAtomList"},
+ {"Index": 4, "Name": "RightLabel", "Type": "TCoAtom"},
+ {"Index": 5, "Name": "JoinType", "Type": "TCoAtom"}
+ ]
+
+ },
+ {
"Name": "TKqlSequencer",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "KqlSequencer"},
@@ -416,7 +430,19 @@
"Children": [
{"Index": 1, "Name": "Table", "Type": "TKqpTable"},
{"Index": 2, "Name": "Columns", "Type": "TCoAtomList"},
- {"Index": 3, "Name": "LookupKeysType", "Type": "TExprBase"}
+ {"Index": 3, "Name": "InputType", "Type": "TExprBase"},
+ {"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"}
+ ]
+ },
+ {
+ "Name": "TKqpIndexLookupJoin",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "KqpIndexLookupJoin"},
+ "Children": [
+ {"Index": 0, "Name": "Input", "Type": "TExprBase"},
+ {"Index": 1, "Name": "JoinType", "Type": "TCoAtom"},
+ {"Index": 2, "Name": "LeftLabel", "Type": "TCoAtom"},
+ {"Index": 3, "Name": "RightLabel", "Type": "TCoAtom"}
]
},
{
diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp
index 2d9174ddb55..57c6677cb07 100644
--- a/ydb/core/kqp/host/kqp_type_ann.cpp
+++ b/ydb/core/kqp/host/kqp_type_ann.cpp
@@ -1281,6 +1281,96 @@ TStatus AnnotateSequencer(const TExprNode::TPtr& node, TExprContext& ctx, const
return TStatus::Ok;
}
+TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
+ const TKikimrTablesData& tablesData, bool withSystemColumns)
+{
+ if (!EnsureArgsCount(*node, 6, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto leftInputType = node->Child(TKqlStreamIdxLookupJoin::idx_LeftInput)->GetTypeAnn();
+ const TTypeAnnotationNode* leftInputItemType;
+ if (!EnsureNewSeqType<false>(node->Pos(), *leftInputType, ctx, &leftInputItemType)) {
+ return TStatus::Error;
+ }
+
+ YQL_ENSURE(leftInputItemType);
+ if (!EnsureTupleType(node->Pos(), *leftInputItemType, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureTupleTypeSize(node->Pos(), leftInputItemType, 2, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto leftInputTupleType = leftInputItemType->Cast<TTupleExprType>();
+ if (!EnsureStructType(node->Pos(), *leftInputTupleType->GetItems()[0], ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureStructType(node->Pos(), *leftInputTupleType->GetItems()[1], ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureAtom(*node->Child(TKqlStreamIdxLookupJoin::idx_LeftLabel), ctx)) {
+ return TStatus::Error;
+ }
+
+ TCoAtom leftLabel(node->Child(TKqlStreamIdxLookupJoin::idx_LeftLabel));
+
+ auto rightTable = ResolveTable(node->Child(TKqlStreamIdxLookupJoin::idx_RightTable), ctx, cluster, tablesData);
+ if (!rightTable.second) {
+ return TStatus::Error;
+ }
+
+ const TStructExprType* inputKeysType = leftInputTupleType->GetItems()[0]->Cast<TStructExprType>();
+ for (const auto& inputKey : inputKeysType->GetItems()) {
+ if (!rightTable.second->GetKeyColumnIndex(TString(inputKey->GetName()))) {
+ return TStatus::Error;
+ }
+ }
+
+ if (!EnsureTupleOfAtoms(*node->Child(TKqlStreamIdxLookupJoin::idx_RightColumns), ctx)) {
+ return TStatus::Error;
+ }
+
+ TCoAtomList rightColumns{node->ChildPtr(TKqlStreamIdxLookupJoin::idx_RightColumns)};
+ for (const auto& rightColumn : rightColumns) {
+ if (!rightTable.second->GetColumnType(TString(rightColumn.Value()))) {
+ return TStatus::Error;
+ }
+ }
+
+ auto rightDataType = GetReadTableRowType(ctx, tablesData, cluster, rightTable.first, rightColumns, withSystemColumns);
+ if (!rightDataType) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureAtom(*node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel), ctx)) {
+ return TStatus::Error;
+ }
+
+ TCoAtom rightLabel(node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel));
+
+ const TStructExprType* leftDataType = leftInputTupleType->GetItems()[1]->Cast<TStructExprType>();
+ TVector<const TItemExprType*> resultStructItems;
+ for (const auto& member : leftDataType->GetItems()) {
+ resultStructItems.emplace_back(
+ ctx.MakeType<TItemExprType>(TString::Join(leftLabel.Value(), ".", member->GetName()), member->GetItemType())
+ );
+ }
+
+ for (const auto& member : rightDataType->Cast<TStructExprType>()->GetItems()) {
+ resultStructItems.emplace_back(
+ ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), member->GetItemType())
+ );
+ }
+
+ auto rowType = ctx.MakeType<TStructExprType>(resultStructItems);
+ node->SetTypeAnn(ctx.MakeType<TListExprType>(rowType));
+ return TStatus::Ok;
+}
+
TStatus AnnotateKqpProgram(const TExprNode::TPtr& node, TExprContext& ctx) {
if (!EnsureArgsCount(*node, 2, ctx)) {
return TStatus::Error;
@@ -1420,7 +1510,7 @@ TStatus AnnotateSequencerConnection(const TExprNode::TPtr& node, TExprContext& c
TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster,
const TKikimrTablesData& tablesData, bool withSystemColumns) {
- if (!EnsureArgsCount(*node, 4, ctx)) {
+ if (!EnsureArgsCount(*node, 5, ctx)) {
return TStatus::Error;
}
@@ -1429,7 +1519,7 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
}
if (!TDqOutput::Match(node->Child(TKqpCnStreamLookup::idx_Output))) {
- ctx.AddError(TIssue(ctx.GetPosition(node->Child(TDqCnMerge::idx_Output)->Pos()),
+ ctx.AddError(TIssue(ctx.GetPosition(node->Child(TKqpCnStreamLookup::idx_Output)->Pos()),
TStringBuilder() << "Expected " << TDqOutput::CallableName()));
return TStatus::Error;
}
@@ -1439,40 +1529,167 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext
return TStatus::Error;
}
- auto lookupKeysTypeNode = node->Child(TKqpCnStreamLookup::idx_LookupKeysType);
- if (!EnsureType(*lookupKeysTypeNode, ctx)) {
+ if (!EnsureTupleOfAtoms(*node->Child(TKqpCnStreamLookup::idx_Columns), ctx)) {
return TStatus::Error;
}
- auto lookupKeysType = lookupKeysTypeNode->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- if (!EnsureListType(node->Pos(), *lookupKeysType, ctx)) {
+ TCoAtomList columns{node->ChildPtr(TKqpCnStreamLookup::idx_Columns)};
+
+ if (!EnsureAtom(*node->Child(TKqpCnStreamLookup::idx_LookupStrategy), ctx)) {
+ return TStatus::Error;
+ }
+
+ TCoAtom lookupStrategy(node->Child(TKqpCnStreamLookup::idx_LookupStrategy));
+
+ auto inputTypeNode = node->Child(TKqpCnStreamLookup::idx_InputType);
+
+ if (!EnsureType(*inputTypeNode, ctx)) {
return TStatus::Error;
}
- auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType();
- if (!EnsureStructType(node->Pos(), *lookupKeyType, ctx)) {
+ auto inputType = inputTypeNode->GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ const TTypeAnnotationNode* inputItemType;
+ if (!EnsureNewSeqType<false>(node->Pos(), *inputType, ctx, &inputItemType)) {
return TStatus::Error;
}
- const auto& lookupKeyColumns = lookupKeyType->Cast<TStructExprType>()->GetItems();
- for (const auto& keyColumn : lookupKeyColumns) {
- if (!table.second->GetKeyColumnIndex(TString(keyColumn->GetName()))) {
+ YQL_ENSURE(inputItemType);
+
+ if (lookupStrategy.Value() == "LookupRows") {
+ if (!EnsureStructType(node->Pos(), *inputItemType, ctx)) {
+ return TStatus::Error;
+ }
+
+ const auto& lookupKeyColumns = inputItemType->Cast<TStructExprType>()->GetItems();
+ for (const auto& keyColumn : lookupKeyColumns) {
+ if (!table.second->GetKeyColumnIndex(TString(keyColumn->GetName()))) {
+ return TStatus::Error;
+ }
+ }
+
+ auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, columns, withSystemColumns);
+ if (!rowType) {
return TStatus::Error;
}
+
+ node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));
+
+ } else if (lookupStrategy.Value() == "LookupJoinRows") {
+ if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureTupleTypeSize(node->Pos(), inputItemType, 2, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto inputTupleType = inputItemType->Cast<TTupleExprType>();
+ if (!EnsureStructType(node->Pos(), *inputTupleType->GetItems()[0], ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureStructType(node->Pos(), *inputTupleType->GetItems()[1], ctx)) {
+ return TStatus::Error;
+ }
+
+ const TStructExprType* joinKeys = inputTupleType->GetItems()[0]->Cast<TStructExprType>();
+ const TStructExprType* leftRowType = inputTupleType->GetItems()[1]->Cast<TStructExprType>();
+
+ for (const auto& inputKey : joinKeys->GetItems()) {
+ if (!table.second->GetKeyColumnIndex(TString(inputKey->GetName()))) {
+ return TStatus::Error;
+ }
+ }
+
+ auto rightRowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, columns, withSystemColumns);
+ if (!rightRowType) {
+ return TStatus::Error;
+ }
+
+ TVector<const TTypeAnnotationNode*> outputTypes;
+ outputTypes.push_back(leftRowType);
+ outputTypes.push_back(ctx.MakeType<TOptionalExprType>(rightRowType));
+
+ auto outputItemType = ctx.MakeType<TTupleExprType>(outputTypes);
+ node->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputItemType));
+
+ } else {
+ ctx.AddError(TIssue(ctx.GetPosition(node->Child(TKqpCnStreamLookup::idx_LookupStrategy)->Pos()),
+ TStringBuilder() << "Unexpected lookup strategy: " << lookupStrategy.Value()));
+ return TStatus::Error;
}
- if (!EnsureTupleOfAtoms(*node->Child(TKqpCnStreamLookup::idx_Columns), ctx)) {
+ return TStatus::Ok;
+}
+
+TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx) {
+
+ if (!EnsureArgsCount(*node, 4, ctx)) {
return TStatus::Error;
}
- TCoAtomList columns{node->ChildPtr(TKqpCnStreamLookup::idx_Columns)};
+ auto inputType = node->Child(TKqpIndexLookupJoin::idx_Input)->GetTypeAnn();
+ const TTypeAnnotationNode* inputItemType;
+ if (!EnsureNewSeqType<false>(node->Pos(), *inputType, ctx, &inputItemType)) {
+ return TStatus::Error;
+ }
- auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, columns, withSystemColumns);
- if (!rowType) {
+ YQL_ENSURE(inputItemType);
+ if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) {
return TStatus::Error;
}
- node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType));
+ if (!EnsureTupleTypeSize(node->Pos(), inputItemType, 2, ctx)) {
+ return TStatus::Error;
+ }
+
+ auto inputTupleType = inputItemType->Cast<TTupleExprType>();
+ if (!EnsureStructType(node->Pos(), *inputTupleType->GetItems()[0], ctx)) {
+ return TStatus::Error;
+ }
+
+ const TStructExprType* leftRowType = inputTupleType->GetItems()[0]->Cast<TStructExprType>();
+
+ if (!EnsureOptionalType(node->Pos(), *inputTupleType->GetItems()[1], ctx)) {
+ return TStatus::Error;
+ }
+
+ auto rightRowType = inputTupleType->GetItems()[1]->Cast<TOptionalExprType>()->GetItemType();
+ if (!EnsureStructType(node->Pos(), *rightRowType, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_JoinType), ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_LeftLabel), ctx)) {
+ return TStatus::Error;
+ }
+
+ TCoAtom leftLabel(node->Child(TKqpIndexLookupJoin::idx_LeftLabel));
+
+ if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_RightLabel), ctx)) {
+ return TStatus::Error;
+ }
+
+ TCoAtom rightLabel(node->Child(TKqpIndexLookupJoin::idx_RightLabel));
+
+ TVector<const TItemExprType*> resultStructItems;
+ for (const auto& item : leftRowType->GetItems()) {
+ resultStructItems.emplace_back(
+ ctx.MakeType<TItemExprType>(TString::Join(leftLabel.Value(), ".", item->GetName()), item->GetItemType())
+ );
+ }
+
+ for (const auto& item : rightRowType->Cast<TStructExprType>()->GetItems()) {
+ resultStructItems.emplace_back(
+ ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), item->GetItemType())
+ );
+ }
+
+ auto outputRowType = ctx.MakeType<TStructExprType>(resultStructItems);
+ node->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputRowType));
return TStatus::Ok;
}
@@ -1600,6 +1817,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
return AnnotateStreamLookupConnection(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
}
+ if (TKqpIndexLookupJoin::Match(input.Get())) {
+ return AnnotateIndexLookupJoin(input, ctx);
+ }
+
if (TKqpTxResultBinding::Match(input.Get())) {
return AnnotateKqpTxResultBinding(input, ctx);
}
@@ -1628,6 +1849,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl
return AnnotateSequencer(input, ctx, cluster, *tablesData);
}
+ if (TKqlStreamIdxLookupJoin::Match(input.Get())) {
+ return AnnotateStreamIdxLookupJoin(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled());
+ }
+
if (TKqpProgram::Match(input.Get())) {
return AnnotateKqpProgram(input, ctx);
}
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index 9d849f1f977..f072ae20da9 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -560,7 +560,6 @@ private:
TTableRead readInfo;
readInfo.Type = EPlanTableReadType::Lookup;
- planNode.TypeName = "TableLookup";
TString table(tableLookup.Table().Path().Value());
auto& tableData = SerializerCtx.TablesData->GetTable(SerializerCtx.Cluster, table);
planNode.NodeInfo["Table"] = tableData.RelativePath ? *tableData.RelativePath : table;
@@ -572,15 +571,25 @@ private:
readInfo.Columns.push_back(TString(column.Value()));
}
- const auto lookupKeysType = tableLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- YQL_ENSURE(lookupKeysType);
- YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List);
- const auto lookupKeysItemType = lookupKeysType->Cast<TListExprType>()->GetItemType();
- YQL_ENSURE(lookupKeysItemType->GetKind() == ETypeAnnotationKind::Struct);
- const auto& lookupKeyColumnsStruct = lookupKeysItemType->Cast<TStructExprType>()->GetItems();
- readInfo.LookupBy.reserve(lookupKeyColumnsStruct.size());
+ const auto inputType = tableLookup.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ YQL_ENSURE(inputType);
+ YQL_ENSURE(inputType->GetKind() == ETypeAnnotationKind::List);
+ const auto inputItemType = inputType->Cast<TListExprType>()->GetItemType();
+
+ const TStructExprType* lookupKeyColumnsStruct = nullptr;
+ if (inputItemType->GetKind() == ETypeAnnotationKind::Struct) {
+ planNode.TypeName = "TableLookup";
+ lookupKeyColumnsStruct = inputItemType->Cast<TStructExprType>();
+ } else if (inputItemType->GetKind() == ETypeAnnotationKind::Tuple) {
+ planNode.TypeName = "TableLookupJoin";
+ const auto inputTupleType = inputItemType->Cast<TTupleExprType>();
+ lookupKeyColumnsStruct = inputTupleType->GetItems()[0]->Cast<TStructExprType>();
+ }
+
+ YQL_ENSURE(lookupKeyColumnsStruct);
+ readInfo.LookupBy.reserve(lookupKeyColumnsStruct->GetItems().size());
auto& lookupKeyColumns = planNode.NodeInfo["LookupKeyColumns"];
- for (const auto keyColumn : lookupKeyColumnsStruct) {
+ for (const auto keyColumn : lookupKeyColumnsStruct->GetItems()) {
lookupKeyColumns.AppendValue(keyColumn->GetName());
readInfo.LookupBy.push_back(TString(keyColumn->GetName()));
}
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
index d4f994c0b04..e5e5585b6e1 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp
@@ -338,6 +338,21 @@ bool IsParameterToListOfStructsRepack(const TExprBase& expr) {
#define DBG(...)
template<typename ReadType>
+TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(const TDqJoin& join, TExprBase leftInput, ReadType rightRead, TExprContext& ctx) {
+ TString leftLabel = join.LeftLabel().Maybe<TCoAtom>() ? TString(join.LeftLabel().Cast<TCoAtom>().Value()) : "";
+ TString rightLabel = join.RightLabel().Maybe<TCoAtom>() ? TString(join.RightLabel().Cast<TCoAtom>().Value()) : "";
+
+ return Build<TKqlStreamIdxLookupJoin>(ctx, join.Pos())
+ .LeftInput(leftInput)
+ .LeftLabel().Build(leftLabel)
+ .RightTable(rightRead.Table())
+ .RightColumns(rightRead.Columns())
+ .RightLabel().Build(rightLabel)
+ .JoinType(join.JoinType())
+ .Done();
+}
+
+template<typename ReadType>
TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) {
static_assert(std::is_same_v<ReadType, TKqlReadTableBase> || std::is_same_v<ReadType, TKqlReadTableRangesBase>, "unsupported read type");
@@ -347,6 +362,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
}
static THashSet<TStringBuf> supportedJoinKinds = {"Inner", "Left", "LeftOnly", "LeftSemi", "RightSemi"};
+ static THashSet<TStringBuf> supportedStreamJoinKinds = {"Inner", "Left"};
if (!supportedJoinKinds.contains(join.JoinType().Value())) {
return {};
}
@@ -554,9 +570,14 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
return {};
}
+ const bool useStreamIndexLookupJoin = kqpCtx.IsDataQuery()
+ && kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin
+ && supportedStreamJoinKinds.contains(join.JoinType().Value());
+
bool needPrecomputeLeft = kqpCtx.IsDataQuery()
&& !join.LeftInput().Maybe<TCoParameter>()
- && !IsParameterToListOfStructsRepack(join.LeftInput());
+ && !IsParameterToListOfStructsRepack(join.LeftInput())
+ && !useStreamIndexLookupJoin;
TExprBase leftData = needPrecomputeLeft
? Build<TDqPrecompute>(ctx, join.Pos())
@@ -604,6 +625,23 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext
.Done();
}
+ if (useStreamIndexLookupJoin) {
+ auto leftInput = Build<TCoMap>(ctx, join.Pos())
+ .Input(leftData)
+ .Lambda()
+ .Args({leftRowArg})
+ .Body<TExprList>()
+ .Add<TCoAsStruct>()
+ .Add(lookupMembers)
+ .Build()
+ .Add(leftRowArg)
+ .Build()
+ .Build()
+ .Done();
+
+ return BuildKqpStreamIndexLookupJoin(join, leftInput, rightRead, ctx);
+ }
+
auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos());
auto keysToLookup = Build<TCoMap>(ctx, join.Pos())
.Input(leftDataDeduplicated)
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 1f3a531fc91..77b029b0028 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -34,6 +34,7 @@ public:
AddHandler(0, &TKqlReadTableRanges::Match, HNDL(BuildReadTableRangesStage));
AddHandler(0, &TKqlLookupTable::Match, HNDL(BuildLookupTableStage));
AddHandler(0, &TKqlStreamLookupTable::Match, HNDL(BuildStreamLookupTableStages));
+ AddHandler(0, &TKqlStreamIdxLookupJoin::Match, HNDL(BuildStreamIdxLookupJoinStages));
AddHandler(0, &TKqlSequencer::Match, HNDL(BuildSequencerStages));
AddHandler(0, [](auto) { return true; }, HNDL(RemoveRedundantSortByPk));
AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable));
@@ -150,6 +151,12 @@ protected:
return output;
}
+ TMaybeNode<TExprBase> BuildStreamIdxLookupJoinStages(TExprBase node, TExprContext& ctx) {
+ TExprBase output = KqpBuildStreamIdxLookupJoinStages(node, ctx);
+ DumpAppliedRule("BuildStreamIdxLookupJoinStages", node.Ptr(), output.Ptr(), ctx);
+ return output;
+ }
+
TMaybeNode<TExprBase> BuildSequencerStages(TExprBase node, TExprContext& ctx) {
TExprBase output = KqpBuildSequencerStages(node, ctx);
DumpAppliedRule("BuildSequencerStages", node.Ptr(), output.Ptr(), ctx);
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
index 3cf59734a43..22688e7505a 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp
@@ -613,7 +613,8 @@ NYql::NNodes::TExprBase KqpRewriteLookupTable(NYql::NNodes::TExprBase node, NYql
.Build()
.Table(lookupTable.Table())
.Columns(lookupTable.Columns())
- .LookupKeysType(ExpandType(node.Pos(), *keysPrecompute.Ref().GetTypeAnn(), ctx))
+ .InputType(ExpandType(node.Pos(), *keysPrecompute.Ref().GetTypeAnn(), ctx))
+ .LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
newInputs.emplace_back(std::move(cnStreamLookup));
@@ -647,7 +648,8 @@ NYql::NNodes::TExprBase KqpRewriteLookupTable(NYql::NNodes::TExprBase node, NYql
.Build()
.Table(lookupTable.Table())
.Columns(lookupTable.Columns())
- .LookupKeysType(ExpandType(node.Pos(), *lookupKeysList.Ref().GetTypeAnn(), ctx))
+ .InputType(ExpandType(node.Pos(), *lookupKeysList.Ref().GetTypeAnn(), ctx))
+ .LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
newInputs.emplace_back(std::move(cnStreamLookup));
@@ -698,7 +700,8 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
.Build()
.Table(lookup.Table())
.Columns(lookup.Columns())
- .LookupKeysType(ExpandType(lookup.Pos(), *lookup.LookupKeys().Ref().GetTypeAnn(), ctx))
+ .InputType(ExpandType(lookup.Pos(), *lookup.LookupKeys().Ref().GetTypeAnn(), ctx))
+ .LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
} else if (lookup.LookupKeys().Maybe<TDqCnUnionAll>()) {
@@ -708,7 +711,8 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
.Output(output)
.Table(lookup.Table())
.Columns(lookup.Columns())
- .LookupKeysType(ExpandType(lookup.Pos(), *output.Ref().GetTypeAnn(), ctx))
+ .InputType(ExpandType(lookup.Pos(), *output.Ref().GetTypeAnn(), ctx))
+ .LookupStrategy().Build(TKqpStreamLookupStrategyName)
.Done();
} else {
return node;
@@ -732,4 +736,42 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase
.Build().Done();
}
+NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx) {
+ if (!node.Maybe<TKqlStreamIdxLookupJoin>()) {
+ return node;
+ }
+
+ const auto& idxLookupJoin = node.Cast<TKqlStreamIdxLookupJoin>();
+ YQL_ENSURE(idxLookupJoin.LeftInput().Maybe<TDqCnUnionAll>(), "Expected UnionAll as left input");
+
+ auto output = idxLookupJoin.LeftInput().Cast<TDqCnUnionAll>().Output();
+ auto cnStreamIdxLookupJoin = Build<TKqpCnStreamLookup>(ctx, idxLookupJoin.Pos())
+ .Output(output)
+ .Table(idxLookupJoin.RightTable())
+ .Columns(idxLookupJoin.RightColumns())
+ .InputType(ExpandType(idxLookupJoin.Pos(), *output.Ref().GetTypeAnn(), ctx))
+ .LookupStrategy().Build(TKqpStreamLookupJoinStrategyName)
+ .Done();
+
+ return Build<TDqCnUnionAll>(ctx, node.Pos())
+ .Output()
+ .Stage<TDqStage>()
+ .Inputs()
+ .Add(cnStreamIdxLookupJoin)
+ .Build()
+ .Program()
+ .Args({"stream_lookup_join_output"})
+ .Body<TKqpIndexLookupJoin>()
+ .Input("stream_lookup_join_output")
+ .JoinType(idxLookupJoin.JoinType())
+ .LeftLabel(idxLookupJoin.LeftLabel())
+ .RightLabel(idxLookupJoin.RightLabel())
+ .Build()
+ .Build()
+ .Settings(TDqStageSettings().BuildNode(ctx, node.Pos()))
+ .Build()
+ .Index().Build("0")
+ .Build().Done();
+}
+
} // namespace NKikimr::NKqp::NOpt
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
index f274ae765a8..64c58a73773 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h
@@ -29,6 +29,8 @@ NYql::NNodes::TExprBase KqpBuildSequencerStages(NYql::NNodes::TExprBase node, NY
NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx);
+NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx);
+
NYql::NNodes::TExprBase KqpRemoveRedundantSortByPk(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx);
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h
index 5887882481f..7ade5af9d84 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.h
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.h
@@ -144,6 +144,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableKqpScanQueryStreamLookup = false;
bool EnableKqpDataQueryStreamLookup = false;
bool EnableKqpScanQueryStreamIdxLookupJoin = false;
+ bool EnableKqpDataQueryStreamIdxLookupJoin = false;
bool EnablePredicateExtractForScanQuery = true;
bool EnablePredicateExtractForDataQuery = false;
bool PredicateExtract20 = false;
diff --git a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
index 72a77a87a2b..92fd414df23 100644
--- a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
@@ -425,6 +425,19 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext&
return ctx.PgmBuilder().KqpEnsure(value, predicate, issueCode, message);
});
+ compiler->AddCallable(TKqpIndexLookupJoin::CallableName(),
+ [&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) {
+ TKqpIndexLookupJoin indexLookupJoin(&node);
+
+ const TString joinType(indexLookupJoin.JoinType().Value());
+ const TString leftLabel(indexLookupJoin.LeftLabel().Value());
+ const TString rightLabel(indexLookupJoin.RightLabel().Value());
+
+ auto input = MkqlBuildExpr(indexLookupJoin.Input().Ref(), buildCtx);
+
+ return ctx.PgmBuilder().KqpIndexLookupJoin(input, joinType, leftLabel, rightLabel);
+ });
+
return compiler;
}
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index c9017e14b82..f26ff59b284 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -81,6 +81,20 @@ NKqpProto::TKqpPhyInternalBinding::EType GetPhyInternalBindingType(const std::st
return bindingType;
}
+NKqpProto::EStreamLookupStrategy GetStreamLookupStrategy(const std::string_view strategy) {
+ NKqpProto::EStreamLookupStrategy lookupStrategy = NKqpProto::EStreamLookupStrategy::UNSPECIFIED;
+
+ if (strategy == "LookupRows"sv) {
+ lookupStrategy = NKqpProto::EStreamLookupStrategy::LOOKUP;
+ } else if (strategy == "LookupJoinRows"sv) {
+ lookupStrategy = NKqpProto::EStreamLookupStrategy::JOIN;
+ }
+
+ YQL_ENSURE(lookupStrategy != NKqpProto::EStreamLookupStrategy::UNSPECIFIED,
+ "Unexpected stream lookup strategy: " << strategy);
+ return lookupStrategy;
+}
+
void FillTableId(const TKqpTable& table, NKqpProto::TKqpPhyTableId& tableProto) {
auto pathId = TKikimrPathId::Parse(table.PathId());
@@ -1088,18 +1102,11 @@ private:
FillTablesMap(streamLookup.Table(), streamLookup.Columns(), tablesMap);
FillTableId(streamLookup.Table(), *streamLookupProto.MutableTable());
- const auto lookupKeysType = streamLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
- YQL_ENSURE(lookupKeysType, "Empty stream lookup keys type");
- YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List, "Unexpected stream lookup keys type");
- const auto lookupKeysItemType = lookupKeysType->Cast<TListExprType>()->GetItemType();
- streamLookupProto.SetLookupKeysType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *lookupKeysItemType), TypeEnv));
-
- YQL_ENSURE(lookupKeysItemType->GetKind() == ETypeAnnotationKind::Struct);
- const auto& lookupKeyColumns = lookupKeysItemType->Cast<TStructExprType>()->GetItems();
- for (const auto keyColumn : lookupKeyColumns) {
- YQL_ENSURE(tableMeta->Columns.FindPtr(keyColumn->GetName()), "Unknown column: " << keyColumn->GetName());
- streamLookupProto.AddKeyColumns(TString(keyColumn->GetName()));
- }
+ const auto inputType = streamLookup.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType();
+ YQL_ENSURE(inputType, "Empty stream lookup input type");
+ YQL_ENSURE(inputType->GetKind() == ETypeAnnotationKind::List, "Unexpected stream lookup input type");
+ const auto inputItemType = inputType->Cast<TListExprType>()->GetItemType();
+ streamLookupProto.SetLookupKeysType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *inputItemType), TypeEnv));
const auto resultType = streamLookup.Ref().GetTypeAnn();
YQL_ENSURE(resultType, "Empty stream lookup result type");
@@ -1107,13 +1114,65 @@ private:
const auto resultItemType = resultType->Cast<TStreamExprType>()->GetItemType();
streamLookupProto.SetResultType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *resultItemType), TypeEnv));
- YQL_ENSURE(resultItemType->GetKind() == ETypeAnnotationKind::Struct);
- const auto& resultColumns = resultItemType->Cast<TStructExprType>()->GetItems();
- for (const auto column : resultColumns) {
- const auto& systemColumns = GetSystemColumns();
- YQL_ENSURE(tableMeta->Columns.FindPtr(column->GetName()) || systemColumns.find(column->GetName()) != systemColumns.end(),
- "Unknown column: " << column->GetName());
- streamLookupProto.AddColumns(TString(column->GetName()));
+ YQL_ENSURE(streamLookup.LookupStrategy().Maybe<TCoAtom>());
+ TString lookupStrategy = streamLookup.LookupStrategy().Maybe<TCoAtom>().Cast().StringValue();
+ streamLookupProto.SetLookupStrategy(GetStreamLookupStrategy(lookupStrategy));
+
+ switch (streamLookupProto.GetLookupStrategy()) {
+ case NKqpProto::EStreamLookupStrategy::LOOKUP: {
+ YQL_ENSURE(inputItemType->GetKind() == ETypeAnnotationKind::Struct);
+ const auto& lookupKeyColumns = inputItemType->Cast<TStructExprType>()->GetItems();
+ for (const auto keyColumn : lookupKeyColumns) {
+ YQL_ENSURE(tableMeta->Columns.FindPtr(keyColumn->GetName()),
+ "Unknown column: " << keyColumn->GetName());
+ streamLookupProto.AddKeyColumns(TString(keyColumn->GetName()));
+ }
+
+ YQL_ENSURE(resultItemType->GetKind() == ETypeAnnotationKind::Struct);
+ const auto& resultColumns = resultItemType->Cast<TStructExprType>()->GetItems();
+ for (const auto column : resultColumns) {
+ const auto &systemColumns = GetSystemColumns();
+ YQL_ENSURE(tableMeta->Columns.FindPtr(column->GetName())
+ || systemColumns.find(column->GetName()) != systemColumns.end(),
+ "Unknown column: " << column->GetName());
+ streamLookupProto.AddColumns(TString(column->GetName()));
+ }
+
+ break;
+ }
+ case NKqpProto::EStreamLookupStrategy::JOIN: {
+ YQL_ENSURE(inputItemType->GetKind() == ETypeAnnotationKind::Tuple);
+ const auto inputTupleType = inputItemType->Cast<TTupleExprType>();
+ YQL_ENSURE(inputTupleType->GetSize() == 2);
+
+ YQL_ENSURE(inputTupleType->GetItems()[0]->GetKind() == ETypeAnnotationKind::Struct);
+ const auto& joinKeyColumns = inputTupleType->GetItems()[0]->Cast<TStructExprType>()->GetItems();
+ for (const auto keyColumn : joinKeyColumns) {
+ YQL_ENSURE(tableMeta->Columns.FindPtr(keyColumn->GetName()),
+ "Unknown column: " << keyColumn->GetName());
+ streamLookupProto.AddKeyColumns(TString(keyColumn->GetName()));
+ }
+
+ YQL_ENSURE(resultItemType->GetKind() == ETypeAnnotationKind::Tuple);
+ const auto resultTupleType = resultItemType->Cast<TTupleExprType>();
+ YQL_ENSURE(resultTupleType->GetSize() == 2);
+
+ YQL_ENSURE(resultTupleType->GetItems()[1]->GetKind() == ETypeAnnotationKind::Optional);
+ auto rightRowOptionalType = resultTupleType->GetItems()[1]->Cast<TOptionalExprType>()->GetItemType();
+ YQL_ENSURE(rightRowOptionalType->GetKind() == ETypeAnnotationKind::Struct);
+ const auto& rightColumns = rightRowOptionalType->Cast<TStructExprType>()->GetItems();
+ for (const auto column : rightColumns) {
+ const auto& systemColumns = GetSystemColumns();
+ YQL_ENSURE(tableMeta->Columns.FindPtr(column->GetName())
+ || systemColumns.find(column->GetName()) != systemColumns.end(),
+ "Unknown column: " << column->GetName());
+ streamLookupProto.AddColumns(TString(column->GetName()));
+ }
+
+ break;
+ }
+ default:
+ YQL_ENSURE(false, "Unexpected lookup strategy for stream lookup: " << lookupStrategy);
}
return;
diff --git a/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt
index 6f6ea6b4954..e8854f07a98 100644
--- a/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt
@@ -54,6 +54,7 @@ target_sources(core-kqp-runtime PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp
)
diff --git a/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt
index 8cd64049312..8a8cb70240a 100644
--- a/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt
@@ -55,6 +55,7 @@ target_sources(core-kqp-runtime PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp
)
diff --git a/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt
index 8cd64049312..8a8cb70240a 100644
--- a/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt
@@ -55,6 +55,7 @@ target_sources(core-kqp-runtime PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp
)
diff --git a/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt
index 6f6ea6b4954..e8854f07a98 100644
--- a/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt
@@ -54,6 +54,7 @@ target_sources(core-kqp-runtime PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp
)
diff --git a/ydb/core/kqp/runtime/kqp_compute.cpp b/ydb/core/kqp/runtime/kqp_compute.cpp
index bb973df45ec..b2ea8ba2cb3 100644
--- a/ydb/core/kqp/runtime/kqp_compute.cpp
+++ b/ydb/core/kqp/runtime/kqp_compute.cpp
@@ -2,7 +2,7 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h>
#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
-#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_node_cast.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
#include <ydb/library/yql/public/udf/udf_terminator.h>
#include <ydb/library/yql/public/udf/udf_type_builder.h>
@@ -83,6 +83,137 @@ private:
IComputationNode* const Message;
};
+class TKqpIndexLookupJoinWrapper : public TMutableComputationNode<TKqpIndexLookupJoinWrapper> {
+public:
+ class TStreamValue : public TComputationValue<TStreamValue> {
+ public:
+ TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream, TComputationContext& ctx,
+ const TKqpIndexLookupJoinWrapper* self)
+ : TComputationValue<TStreamValue>(memInfo)
+ , Stream(std::move(stream))
+ , Self(self)
+ , Ctx(ctx) {
+ }
+
+ private:
+ enum class EOutputMode {
+ OnlyLeftRow,
+ Both
+ };
+
+ NUdf::TUnboxedValue FillResultItems(NUdf::TUnboxedValue leftRow, NUdf::TUnboxedValue rightRow, EOutputMode mode) {
+ auto resultRowSize = (mode == EOutputMode::OnlyLeftRow) ? Self->LeftColumnsCount
+ : Self->LeftColumnsCount + Self->RightColumnsCount;
+ auto resultRow = Self->ResultRowCache.NewArray(Ctx, resultRowSize, ResultItems);
+
+ size_t resIdx = 0;
+
+ if (mode == EOutputMode::OnlyLeftRow || mode == EOutputMode::Both) {
+ for (size_t i = 0; i < Self->LeftColumnsCount; ++i) {
+ ResultItems[resIdx++] = std::move(leftRow.GetElement(i));
+ }
+ }
+
+ if (mode == EOutputMode::Both) {
+ if (rightRow.HasValue()) {
+ for (size_t i = 0; i < Self->RightColumnsCount; ++i) {
+ ResultItems[resIdx++] = std::move(rightRow.GetElement(i));
+ }
+ } else {
+ for (size_t i = 0; i < Self->RightColumnsCount; ++i) {
+ ResultItems[resIdx++] = NUdf::TUnboxedValuePod();
+ }
+ }
+ }
+
+ return resultRow;
+ }
+
+ bool TryBuildResultRow(NUdf::TUnboxedValue inputRow, NUdf::TUnboxedValue& result) {
+ auto leftRow = inputRow.GetElement(0);
+ auto rightRow = inputRow.GetElement(1);
+
+ bool ok = true;
+ switch (Self->JoinType) {
+ case EJoinKind::Inner: {
+ if (!rightRow.HasValue()) {
+ ok = false;
+ break;
+ }
+
+ result = FillResultItems(std::move(leftRow), std::move(rightRow), EOutputMode::Both);
+ break;
+ }
+ case EJoinKind::Left: {
+ result = FillResultItems(std::move(leftRow), std::move(rightRow), EOutputMode::Both);
+ break;
+ }
+ case EJoinKind::LeftOnly: {
+ if (rightRow.HasValue()) {
+ ok = false;
+ break;
+ }
+
+ result = FillResultItems(std::move(leftRow), std::move(rightRow), EOutputMode::OnlyLeftRow);
+ break;
+ }
+ default:
+ MKQL_ENSURE(false, "Unsupported join kind");
+ }
+
+ return ok;
+ }
+
+ NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override {
+ NUdf::TUnboxedValue row;
+ NUdf::EFetchStatus status = Stream.Fetch(row);
+
+ while (status == NUdf::EFetchStatus::Ok) {
+ if (TryBuildResultRow(std::move(row), result)) {
+ break;
+ }
+
+ status = Stream.Fetch(row);
+ }
+
+ return status;
+ }
+
+ private:
+ NUdf::TUnboxedValue Stream;
+ const TKqpIndexLookupJoinWrapper* Self;
+ TComputationContext& Ctx;
+ NUdf::TUnboxedValue* ResultItems = nullptr;
+ };
+
+public:
+ TKqpIndexLookupJoinWrapper(TComputationMutables& mutables, IComputationNode* inputNode,
+ EJoinKind joinType, ui64 leftColumnsCount, ui64 rightColumnsCount)
+ : TMutableComputationNode<TKqpIndexLookupJoinWrapper>(mutables)
+ , InputNode(inputNode)
+ , JoinType(joinType)
+ , LeftColumnsCount(leftColumnsCount)
+ , RightColumnsCount(rightColumnsCount)
+ , ResultRowCache(mutables) {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ return ctx.HolderFactory.Create<TStreamValue>(InputNode->GetValue(ctx), ctx, this);
+ }
+
+private:
+ void RegisterDependencies() const final {
+ this->DependsOn(InputNode);
+ }
+
+private:
+ IComputationNode* InputNode;
+ const EJoinKind JoinType;
+ const ui64 LeftColumnsCount;
+ const ui64 RightColumnsCount;
+ const TContainerCacheOnContext ResultRowCache;
+};
+
} // namespace
IComputationNode* WrapKqpEnsure(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
@@ -99,5 +230,16 @@ IComputationNode* WrapKqpEnsure(TCallable& callable, const TComputationNodeFacto
return new TKqpEnsureWrapper(ctx.Mutables, value, predicate, issueCode, message);
}
+IComputationNode* WrapKqpIndexLookupJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
+
+ auto inputNode = LocateNode(ctx.NodeLocator, callable, 0);
+ ui32 joinKind = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>();
+ ui64 leftColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui64>();
+ ui64 rightColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui64>();
+
+ return new TKqpIndexLookupJoinWrapper(ctx.Mutables, inputNode, GetJoinKind(joinKind), leftColumnsCount, rightColumnsCount);
+}
+
} // namespace NMiniKQL
} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_compute.h b/ydb/core/kqp/runtime/kqp_compute.h
index b3cfc21d2e6..9065325a276 100644
--- a/ydb/core/kqp/runtime/kqp_compute.h
+++ b/ydb/core/kqp/runtime/kqp_compute.h
@@ -48,6 +48,7 @@ private:
};
IComputationNode* WrapKqpEnsure(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+IComputationNode* WrapKqpIndexLookupJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx);
} // namespace NMiniKQL
} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp
index 2cf24451483..43fa17bfb3f 100644
--- a/ydb/core/kqp/runtime/kqp_program_builder.cpp
+++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp
@@ -143,6 +143,22 @@ TType* MakeBlockType(TProgramBuilder& builder, TStructType* rowType) {
return builder.NewBlockType(builder.NewTupleType(tupleItems), TBlockType::EShape::Many);
}
+EJoinKind GetIndexLookupJoinKind(const TString& joinKind) {
+ if (joinKind == "Inner") {
+ return EJoinKind::Inner;
+ } else if (joinKind == "Left") {
+ return EJoinKind::Left;
+ } else if (joinKind == "LeftOnly") {
+ return EJoinKind::LeftOnly;
+ } else if (joinKind == "RightSemi") {
+ return EJoinKind::RightSemi;
+ } else if (joinKind == "LeftSemi") {
+ return EJoinKind::LeftSemi;
+ } else {
+ MKQL_ENSURE_S(false, "Unexpected join kind: " << joinKind);
+ }
+}
+
} // namespace
TKqpProgramBuilder::TKqpProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry)
@@ -314,5 +330,38 @@ TRuntimeNode TKqpProgramBuilder::KqpEnsure(TRuntimeNode value, TRuntimeNode pred
return TRuntimeNode(callableBuilder.Build(), false);
}
+TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, const TString& joinType,
+ const TString& leftLabel, const TString& rightLabel) {
+
+ auto inputRowItems = AS_TYPE(TTupleType, AS_TYPE(TStreamType, input.GetStaticType())->GetItemType());
+ MKQL_ENSURE(inputRowItems->GetElementsCount() == 2, "Expected 2 elements");
+
+ auto leftRowType = AS_TYPE(TStructType, inputRowItems->GetElementType(0));
+ auto rightRowType = AS_TYPE(TStructType, AS_TYPE(TOptionalType, inputRowItems->GetElementType(1))->GetItemType());
+
+ TStructTypeBuilder rowTypeBuilder(GetTypeEnvironment());
+
+ for (ui32 i = 0; i < leftRowType->GetMembersCount(); ++i) {
+ TString newMemberName = leftLabel.empty() ? TString(leftRowType->GetMemberName(i))
+ : TString::Join(leftLabel, ".", leftRowType->GetMemberName(i));
+ rowTypeBuilder.Add(newMemberName, leftRowType->GetMemberType(i));
+ }
+
+ for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) {
+ TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i))
+ : TString::Join(rightLabel, ".", rightRowType->GetMemberName(i));
+ rowTypeBuilder.Add(newMemberName, rightRowType->GetMemberType(i));
+ }
+
+ auto returnType = NewStreamType(rowTypeBuilder.Build());
+
+ TCallableBuilder callableBuilder(Env, __func__, returnType);
+ callableBuilder.Add(input);
+ callableBuilder.Add(NewDataLiteral<ui32>((ui32)GetIndexLookupJoinKind(joinType)));
+ callableBuilder.Add(NewDataLiteral<ui64>(leftRowType->GetMembersCount()));
+ callableBuilder.Add(NewDataLiteral<ui64>(rightRowType->GetMembersCount()));
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
} // namespace NMiniKQL
} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_program_builder.h b/ydb/core/kqp/runtime/kqp_program_builder.h
index a26938b8db3..3c4825001f0 100644
--- a/ydb/core/kqp/runtime/kqp_program_builder.h
+++ b/ydb/core/kqp/runtime/kqp_program_builder.h
@@ -69,6 +69,8 @@ public:
TRuntimeNode KqpEffects(const TArrayRef<const TRuntimeNode>& effects);
TRuntimeNode KqpEnsure(TRuntimeNode value, TRuntimeNode predicate, TRuntimeNode issueCode, TRuntimeNode message);
+
+ TRuntimeNode KqpIndexLookupJoin(const TRuntimeNode& input, const TString& joinType, const TString& leftLabel, const TString& rightLabel);
};
} // namespace NMiniKQL
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index f35e58bda98..e343ce9ef0d 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -7,13 +7,12 @@
#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/kqp/common/kqp_resolve.h>
#include <ydb/core/kqp/gateway/kqp_gateway.h>
-#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/protos/kqp_stats.pb.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/kqp/common/kqp_event_ids.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
+#include <ydb/core/kqp/runtime/kqp_stream_lookup_worker.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
namespace NKikimr {
@@ -31,50 +30,24 @@ public:
std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
TIntrusivePtr<TKqpCounters> counters)
: LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId)
- , InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv)
- , HolderFactory(holderFactory), Alloc(alloc), TablePath(settings.GetTable().GetPath())
- , TableId(MakeTableId(settings.GetTable()))
+ , InputIndex(inputIndex)
+ , Input(input)
+ , ComputeActorId(computeActorId)
+ , TypeEnv(typeEnv)
+ , Alloc(alloc)
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
+ , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory))
, Counters(counters)
{
- KeyColumns.reserve(settings.GetKeyColumns().size());
- i32 keyOrder = 0;
- for (const auto& keyColumn : settings.GetKeyColumns()) {
- KeyColumns.emplace(
- keyColumn.GetName(),
- TSysTables::TTableColumnInfo{
- keyColumn.GetName(),
- keyColumn.GetId(),
- NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(keyColumn.GetTypeId())},
- "",
- keyOrder++
- }
- );
- }
-
- LookupKeyColumns.reserve(KeyColumns.size());
- for (const auto& lookupKeyColumn : settings.GetLookupKeyColumns()) {
- auto columnIt = KeyColumns.find(lookupKeyColumn);
- YQL_ENSURE(columnIt != KeyColumns.end());
- LookupKeyColumns.push_back(&columnIt->second);
- }
-
- Columns.reserve(settings.GetColumns().size());
- for (const auto& column : settings.GetColumns()) {
- Columns.emplace_back(TSysTables::TTableColumnInfo{
- column.GetName(),
- column.GetId(),
- NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(column.GetTypeId())}
- });
- }
};
virtual ~TKqpStreamLookupActor() {
- if (Input.HasValue() && Alloc) {
+ if (Alloc) {
TGuard<NMiniKQL::TScopedAlloc> allocGuard(*Alloc);
Input.Clear();
+ StreamLookupWorker.reset();
}
}
@@ -94,14 +67,14 @@ public:
if (last) {
NYql::NDqProto::TDqTableStats* tableStats = nullptr;
for (auto& table : *stats->MutableTables()) {
- if (table.GetTablePath() == TablePath) {
+ if (table.GetTablePath() == StreamLookupWorker->GetTablePath()) {
tableStats = &table;
}
}
if (!tableStats) {
tableStats = stats->AddTables();
- tableStats->SetTablePath(TablePath);
+ tableStats->SetTablePath(StreamLookupWorker->GetTablePath());
}
// TODO: use evread statistics after KIKIMR-16924
@@ -135,14 +108,12 @@ private:
}
struct TReadState {
- TReadState(ui64 id, ui64 shardId, std::vector<TOwnedTableRange>&& keys)
+ TReadState(ui64 id, ui64 shardId)
: Id(id)
, ShardId(shardId)
- , Keys(std::move(keys))
, State(EReadState::Initial) {}
void SetFinished() {
- Keys.clear();
State = EReadState::Finished;
}
@@ -152,8 +123,9 @@ private:
const ui64 Id;
const ui64 ShardId;
- std::vector<TOwnedTableRange> Keys;
EReadState State;
+ TMaybe<TOwnedCellVec> LastProcessedKey;
+ ui32 FirstUnprocessedQuery = 0;
};
struct TShardState {
@@ -161,12 +133,6 @@ private:
std::vector<TReadState*> Reads;
};
- struct TResult {
- const ui64 ShardId;
- THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult;
- size_t UnprocessedResultRow = 0;
- };
-
struct TEvPrivate {
enum EEv {
EvRetryReadTimeout = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
@@ -191,6 +157,7 @@ private:
{
auto alloc = BindAllocator();
Input.Clear();
+ StreamLookupWorker.reset();
for (auto& [id, state] : Reads) {
Counters->SentIteratorCancels->Inc();
auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
@@ -204,24 +171,26 @@ private:
}
i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
- i64 totalDataSize = 0;
-
YQL_ENSURE(!batch.IsWide(), "Wide stream is not supported");
- totalDataSize = PackResults(batch, freeSpace);
- auto status = FetchLookupKeys();
+ auto replyResultStats = StreamLookupWorker->ReplyResult(batch, freeSpace);
+ ReadRowsCount += replyResultStats.RowsCount;
+ ReadBytesCount += replyResultStats.BytesCount;
+
+ auto status = FetchInputRows();
if (Partitioning) {
- ProcessLookupKeys();
+ ProcessInputRows();
}
finished = (status == NUdf::EFetchStatus::Finish)
- && UnprocessedKeys.empty()
&& AllReadsFinished()
- && Results.empty();
+ && StreamLookupWorker->AllRowsProcessed();
+
+ CA_LOG_D("Returned " << replyResultStats.BytesCount << " bytes, " << replyResultStats.RowsCount
+ << " rows, finished: " << finished);
- CA_LOG_D("Returned " << totalDataSize << " bytes, finished: " << finished);
- return totalDataSize;
+ return replyResultStats.BytesCount;
}
TMaybe<google::protobuf::Any> ExtraData() override {
@@ -257,23 +226,23 @@ private:
}
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
- CA_LOG_D("TEvResolveKeySetResult was received for table: " << TablePath);
+ CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
if (ev->Get()->Request->ErrorCount > 0) {
- return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " << TableId,
- NYql::NDqProto::StatusIds::SCHEME_ERROR);
+ return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: "
+ << StreamLookupWorker->GetTablePath(), NYql::NDqProto::StatusIds::SCHEME_ERROR);
}
auto& resultSet = ev->Get()->Request->ResultSet;
YQL_ENSURE(resultSet.size() == 1, "Expected one result for range [NULL, +inf)");
Partitioning = resultSet[0].KeyDescription->Partitioning;
- ProcessLookupKeys();
+ ProcessInputRows();
}
void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
const auto& record = ev->Get()->Record;
- CA_LOG_D("TEvReadResult was received for table: " << TablePath <<
+ CA_LOG_D("TEvReadResult was received for table: " << StreamLookupWorker->GetTablePath() <<
", readId: " << record.GetReadId() << ", finished: " << record.GetFinished());
auto readIt = Reads.find(record.GetReadId());
@@ -303,13 +272,7 @@ private:
case Ydb::StatusIds::NOT_FOUND:
case Ydb::StatusIds::OVERLOADED:
case Ydb::StatusIds::INTERNAL_ERROR: {
- TMaybe<NKikimrTxDataShard::TReadContinuationToken> continuationToken;
- if (record.HasContinuationToken()) {
- bool parseResult = continuationToken->ParseFromString(record.GetContinuationToken());
- YQL_ENSURE(parseResult, "Failed to parse continuation token");
- }
-
- return RetryTableRead(read, continuationToken);
+ return RetryTableRead(read);
}
default: {
NYql::TIssues issues;
@@ -321,6 +284,17 @@ private:
if (record.GetFinished()) {
read.SetFinished();
} else {
+ YQL_ENSURE(record.HasContinuationToken(), "Successful TEvReadResult should contain continuation token");
+ NKikimrTxDataShard::TReadContinuationToken continuationToken;
+ bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken());
+ YQL_ENSURE(parseResult, "Failed to parse continuation token");
+ read.FirstUnprocessedQuery = continuationToken.GetFirstUnprocessedQuery();
+
+ if (continuationToken.HasLastProcessedKey()) {
+ TSerializedCellVec lastKey(continuationToken.GetLastProcessedKey());
+ read.LastProcessedKey = TOwnedCellVec(lastKey.GetCells());
+ }
+
Counters->SentIteratorAcks->Inc();
THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck());
request->Record.SetReadId(record.GetReadId());
@@ -333,7 +307,9 @@ private:
CA_LOG_D("TEvReadAck was sent to shard: " << read.ShardId);
}
- Results.emplace_back(TResult{read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
+ StreamLookupWorker->AddResult(TKqpStreamLookupWorker::TShardReadResult{
+ read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())
+ });
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
@@ -347,10 +323,7 @@ private:
for (auto* read : shardIt->second.Reads) {
if (read->State == EReadState::Running) {
Counters->IteratorDeliveryProblems->Inc();
- for (auto& key : read->Keys) {
- UnprocessedKeys.emplace_back(std::move(key));
- }
-
+ StreamLookupWorker->ResetRowsProcessing(read->Id, read->FirstUnprocessedQuery, read->LastProcessedKey);
read->SetFinished();
}
}
@@ -359,170 +332,47 @@ private:
}
void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr&) {
- CA_LOG_D("TEvSchemeCacheRequestTimeout was received, shards for table " << TablePath
+ CA_LOG_D("TEvSchemeCacheRequestTimeout was received, shards for table " << StreamLookupWorker->GetTablePath()
<< " was resolved: " << !!Partitioning);
if (!Partitioning) {
- RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId
+ RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath()
<< " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT);
}
}
- ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) {
- i64 totalSize = 0;
- bool sizeLimitExceeded = false;
- batch.clear();
-
- while (!Results.empty() && !sizeLimitExceeded) {
- auto& result = Results.front();
- for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
- const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
- YQL_ENSURE(resultRow.size() <= Columns.size(), "Result columns mismatch");
-
- NUdf::TUnboxedValue* rowItems = nullptr;
- auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
-
- i64 rowSize = 0;
- for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
- const auto& column = Columns[colIndex];
- if (IsSystemColumn(column.Name)) {
- NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
- rowSize += sizeof(NUdf::TUnboxedValue);
- } else {
- YQL_ENSURE(resultColIndex < resultRow.size());
- rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType);
- rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
- ++resultColIndex;
- }
- }
-
- if (totalSize + rowSize > freeSpace) {
- row.DeleteUnreferenced();
- sizeLimitExceeded = true;
- break;
- }
-
- batch.push_back(std::move(row));
- ++ReadRowsCount;
- ReadBytesCount += rowSize;
- totalSize += rowSize;
- }
-
- if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
- Results.pop_front();
- }
- }
-
- CA_LOG_D("Total batch size: " << totalSize << ", size limit exceeded: " << sizeLimitExceeded);
- return totalSize;
- }
-
- NUdf::EFetchStatus FetchLookupKeys() {
- YQL_ENSURE(LookupKeyColumns.size() <= KeyColumns.size());
+ NUdf::EFetchStatus FetchInputRows() {
+ auto guard = BindAllocator();
NUdf::EFetchStatus status;
- NUdf::TUnboxedValue key;
- while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) {
- std::vector<TCell> keyCells(LookupKeyColumns.size());
- for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) {
- const auto* lookupKeyColumn = LookupKeyColumns[colId];
- YQL_ENSURE(lookupKeyColumn->KeyOrder < static_cast<i64>(keyCells.size()));
- keyCells[lookupKeyColumn->KeyOrder] = MakeCell(lookupKeyColumn->PType,
- key.GetElement(colId), TypeEnv, /* copy */ true);
- }
-
- UnprocessedKeys.emplace_back(std::move(keyCells));
+ NUdf::TUnboxedValue row;
+ while ((status = Input.Fetch(row)) == NUdf::EFetchStatus::Ok) {
+ StreamLookupWorker->AddInputRow(std::move(row));
}
return status;
}
- void ProcessLookupKeys() {
+ void ProcessInputRows() {
YQL_ENSURE(Partitioning, "Table partitioning should be initialized before lookup keys processing");
- std::unordered_map<ui64, std::vector<TOwnedTableRange>> shardKeys;
- for (; !UnprocessedKeys.empty(); UnprocessedKeys.pop_front()) {
- const auto& key = UnprocessedKeys.front();
- YQL_ENSURE(key.Point);
-
- std::vector<ui64> shardIds;
- if (LookupKeyColumns.size() < KeyColumns.size()) {
- /* build range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) */
- std::vector<TCell> fromCells(KeyColumns.size());
- fromCells.insert(fromCells.begin(), key.From.begin(), key.From.end());
- std::vector<TCell> toCells(key.From.begin(), key.From.end());
-
- shardIds = GetRangePartitioning(TOwnedTableRange{std::move(fromCells), /* inclusiveFrom */ true,
- std::move(toCells), /* inclusiveTo */ false});
- } else {
- shardIds = GetRangePartitioning(key);
- }
+ auto guard = BindAllocator();
- for (auto shardId : shardIds) {
- shardKeys[shardId].emplace_back(std::move(key));
- }
- }
-
- for (auto& [shardId, keys] : shardKeys) {
- StartTableRead(shardId, std::move(keys));
+ auto requests = StreamLookupWorker->BuildRequests(Partitioning, ReadId);
+ for (auto& [shardId, request] : requests) {
+ StartTableRead(shardId, std::move(request));
}
}
- std::vector<ui64> GetRangePartitioning(const TOwnedTableRange& range) {
- YQL_ENSURE(Partitioning);
-
- std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size());
- for (const auto& [_, columnInfo] : KeyColumns) {
- YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(keyColumnTypes.size()));
- keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
- }
-
- auto it = LowerBound(Partitioning->begin(), Partitioning->end(), /* value */ true,
- [&](const auto& partition, bool) {
- const int result = CompareBorders<true, false>(
- partition.Range->EndKeyPrefix.GetCells(), range.From,
- partition.Range->IsInclusive || partition.Range->IsPoint,
- range.InclusiveFrom || range.Point, keyColumnTypes
- );
-
- return (result < 0);
- }
- );
-
- YQL_ENSURE(it != Partitioning->end());
-
- std::vector<ui64> rangePartitions;
- for (; it != Partitioning->end(); ++it) {
- rangePartitions.push_back(it->ShardId);
-
- if (range.Point) {
- break;
- }
-
- auto cmp = CompareBorders<true, true>(
- it->Range->EndKeyPrefix.GetCells(), range.To,
- it->Range->IsInclusive || it->Range->IsPoint,
- range.InclusiveTo || range.Point, keyColumnTypes
- );
-
- if (cmp >= 0) {
- break;
- }
- }
-
- return rangePartitions;
- }
-
- TReadState& StartTableRead(ui64 shardId, std::vector<TOwnedTableRange>&& keys) {
- const auto readId = GetNextReadId();
- TReadState read(readId, shardId, std::move(keys));
-
- CA_LOG_D("Start reading of table: " << TablePath << ", readId: " << readId << ", shardId: " << shardId);
-
+ TReadState& StartTableRead(ui64 shardId, THolder<TEvDataShard::TEvRead> request) {
Counters->CreatedIterators->Inc();
- THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
auto& record = request->Record;
+ CA_LOG_D("Start reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << record.GetReadId()
+ << ", shardId: " << shardId);
+
+ TReadState read(record.GetReadId(), shardId);
+
YQL_ENSURE(Snapshot.IsValid(), "Invalid snapshot value");
record.MutableSnapshot()->SetStep(Snapshot.Step);
record.MutableSnapshot()->SetTxId(Snapshot.TxId);
@@ -531,48 +381,27 @@ private:
record.SetLockTxId(*LockTxId);
}
- record.SetReadId(read.Id);
record.SetMaxRows(Max<ui16>());
record.SetMaxBytes(5_MB);
record.SetResultFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC);
- record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId);
- record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId);
- record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion);
-
- for (const auto& column : Columns) {
- if (!IsSystemColumn(column.Name)) {
- record.AddColumns(column.Id);
- }
- }
-
- for (auto& key : read.Keys) {
- YQL_ENSURE(key.Point);
- request->Keys.emplace_back(key.From);
- }
-
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), shardId, true),
IEventHandle::FlagTrackDelivery);
read.State = EReadState::Running;
- const auto [readIt, succeeded] = Reads.insert({readId, std::move(read)});
+ const auto [readIt, succeeded] = Reads.insert({read.Id, std::move(read)});
YQL_ENSURE(succeeded);
ReadsPerShard[shardId].Reads.push_back(&readIt->second);
return readIt->second;
}
- void RetryTableRead(TReadState& failedRead, TMaybe<NKikimrTxDataShard::TReadContinuationToken>& token) {
- CA_LOG_D("Retry reading of table: " << TablePath << ", readId: " << failedRead.Id
+ void RetryTableRead(TReadState& failedRead) {
+ CA_LOG_D("Retry reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << failedRead.Id
<< ", shardId: " << failedRead.ShardId);
- size_t firstUnprocessedQuery = token ? token->GetFirstUnprocessedQuery() : 0;
- YQL_ENSURE(firstUnprocessedQuery <= failedRead.Keys.size());
- for (ui64 idx = firstUnprocessedQuery; idx < failedRead.Keys.size(); ++idx) {
- UnprocessedKeys.emplace_back(std::move(failedRead.Keys[idx]));
- }
-
+ StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey);
failedRead.SetFinished();
auto& shardState = ReadsPerShard[failedRead.ShardId];
@@ -586,26 +415,23 @@ private:
}
void ResolveTableShards() {
- CA_LOG_D("Resolve shards for table: " << TablePath);
+ CA_LOG_D("Resolve shards for table: " << StreamLookupWorker->GetTablePath());
Partitioning.reset();
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
- TVector<TCell> minusInf(KeyColumns.size());
+ auto keyColumnTypes = StreamLookupWorker->GetKeyColumnTypes();
+
+ TVector<TCell> minusInf(keyColumnTypes.size());
TVector<TCell> plusInf;
TTableRange range(minusInf, true, plusInf, true, false);
- std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size());
- for (const auto& [_, columnInfo] : KeyColumns) {
- keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
- }
-
- request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read,
+ request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(StreamLookupWorker->GetTableId(), range, TKeyDesc::ERowOperation::Read,
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));
Counters->IteratorsShardResolve->Inc();
- Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout,
@@ -622,10 +448,6 @@ private:
return true;
}
- ui64 GetNextReadId() {
- return ++ReadId;
- }
-
TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() {
return TypeEnv.BindAllocator();
}
@@ -647,32 +469,24 @@ private:
NUdf::TUnboxedValue Input;
const NActors::TActorId ComputeActorId;
const NMiniKQL::TTypeEnvironment& TypeEnv;
- const NMiniKQL::THolderFactory& HolderFactory;
std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
- const TString TablePath;
- const TTableId TableId;
IKqpGateway::TKqpSnapshot Snapshot;
const TMaybe<ui64> LockTxId;
- std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
- std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
- std::vector<TSysTables::TTableColumnInfo> Columns;
- std::deque<TResult> Results;
std::unordered_map<ui64, TReadState> Reads;
std::unordered_map<ui64, TShardState> ReadsPerShard;
std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
- std::deque<TOwnedTableRange> UnprocessedKeys;
const TDuration SchemeCacheRequestTimeout;
NActors::TActorId SchemeCacheRequestTimeoutTimer;
TVector<NKikimrTxDataShard::TLock> Locks;
TVector<NKikimrTxDataShard::TLock> BrokenLocks;
+ std::unique_ptr<TKqpStreamLookupWorker> StreamLookupWorker;
+ ui64 ReadId = 0;
// stats
ui64 ReadRowsCount = 0;
ui64 ReadBytesCount = 0;
TIntrusivePtr<TKqpCounters> Counters;
-
- ui64 ReadId = 0;
};
} // namespace
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
new file mode 100644
index 00000000000..62613dc376c
--- /dev/null
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -0,0 +1,761 @@
+#include "kqp_stream_lookup_worker.h"
+
+#include <ydb/core/kqp/common/kqp_resolve.h>
+#include <ydb/core/kqp/runtime/kqp_scan_data.h>
+#include <ydb/core/tx/datashard/range_ops.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+namespace {
+std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpStreamLookupWorker::TPartitionInfo& partitionInfo,
+ const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TOwnedTableRange& range) {
+
+ YQL_ENSURE(partitionInfo);
+
+ std::vector<TCell> minusInf(keyColumnTypes.size());
+
+ std::vector<std::pair<ui64, TOwnedTableRange>> rangePartition;
+ for (size_t idx = 0; idx < partitionInfo->size(); ++idx) {
+ TTableRange partitionRange{
+ idx == 0 ? minusInf : (*partitionInfo)[idx - 1].Range->EndKeyPrefix.GetCells(),
+ idx == 0 ? true : !(*partitionInfo)[idx - 1].Range->IsInclusive,
+ (*partitionInfo)[idx].Range->EndKeyPrefix.GetCells(),
+ (*partitionInfo)[idx].Range->IsInclusive
+ };
+
+ if (range.Point) {
+ int intersection = ComparePointAndRange(
+ range.From,
+ partitionRange,
+ keyColumnTypes,
+ keyColumnTypes);
+
+ if (intersection == 0) {
+ rangePartition.emplace_back((*partitionInfo)[idx].ShardId, range);
+ } else if (intersection < 0) {
+ break;
+ }
+ } else {
+ int intersection = CompareRanges(range, partitionRange, keyColumnTypes);
+
+ if (intersection == 0) {
+ auto rangeIntersection = Intersect(keyColumnTypes, range, partitionRange);
+ rangePartition.emplace_back((*partitionInfo)[idx].ShardId, rangeIntersection);
+ } else if (intersection < 0) {
+ break;
+ }
+ }
+ }
+
+ return rangePartition;
+}
+
+struct THashableKey {
+ TConstArrayRef<TCell> Cells;
+
+ template <typename H>
+ friend H AbslHashValue(H h, const THashableKey& key) {
+ h = H::combine(std::move(h), key.Cells.size());
+ for (const TCell& cell : key.Cells) {
+ h = H::combine(std::move(h), cell.IsNull());
+ if (!cell.IsNull()) {
+ h = H::combine(std::move(h), cell.Size());
+ h = H::combine_contiguous(std::move(h), cell.Data(), cell.Size());
+ }
+ }
+ return h;
+ }
+};
+
+struct TKeyHash {
+ using is_transparent = void;
+
+ bool operator()(TConstArrayRef<TCell> key) const {
+ return absl::Hash<THashableKey>()(THashableKey{ key });
+ }
+};
+
+struct TKeyEq {
+ using is_transparent = void;
+
+ bool operator()(TConstArrayRef<TCell> a, TConstArrayRef<TCell> b) const {
+ if (a.size() != b.size()) {
+ return false;
+ }
+
+ const TCell* pa = a.data();
+ const TCell* pb = b.data();
+ if (pa == pb) {
+ return true;
+ }
+
+ size_t left = a.size();
+ while (left > 0) {
+ if (pa->IsNull()) {
+ if (!pb->IsNull()) {
+ return false;
+ }
+ } else {
+ if (pb->IsNull()) {
+ return false;
+ }
+ if (pa->Size() != pb->Size()) {
+ return false;
+ }
+ if (pa->Size() > 0 && ::memcmp(pa->Data(), pb->Data(), pa->Size()) != 0) {
+ return false;
+ }
+ }
+ ++pa;
+ ++pb;
+ --left;
+ }
+
+ return true;
+ }
+};
+} // !namespace
+
+TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
+ const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory)
+ : TypeEnv(typeEnv)
+ , HolderFactory(holderFactory)
+ , TablePath(settings.GetTable().GetPath())
+ , TableId(MakeTableId(settings.GetTable())) {
+
+ KeyColumns.reserve(settings.GetKeyColumns().size());
+ i32 keyOrder = 0;
+ for (const auto& keyColumn : settings.GetKeyColumns()) {
+ KeyColumns.emplace(
+ keyColumn.GetName(),
+ TSysTables::TTableColumnInfo{
+ keyColumn.GetName(),
+ keyColumn.GetId(),
+ NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(keyColumn.GetTypeId())},
+ "",
+ keyOrder++
+ }
+ );
+ }
+
+ LookupKeyColumns.reserve(settings.GetLookupKeyColumns().size());
+ for (const auto& lookupKey : settings.GetLookupKeyColumns()) {
+ auto columnIt = KeyColumns.find(lookupKey);
+ YQL_ENSURE(columnIt != KeyColumns.end());
+ LookupKeyColumns.push_back(&columnIt->second);
+ }
+
+ Columns.reserve(settings.GetColumns().size());
+ for (const auto& column : settings.GetColumns()) {
+ Columns.emplace_back(TSysTables::TTableColumnInfo{
+ column.GetName(),
+ column.GetId(),
+ NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(column.GetTypeId())}
+ });
+ }
+}
+
+TKqpStreamLookupWorker::~TKqpStreamLookupWorker() {
+}
+
+std::string TKqpStreamLookupWorker::GetTablePath() const {
+ return TablePath;
+}
+
+TTableId TKqpStreamLookupWorker::GetTableId() const {
+ return TableId;
+}
+
+std::vector<NScheme::TTypeInfo> TKqpStreamLookupWorker::GetKeyColumnTypes() const {
+ std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size());
+ for (const auto& [_, columnInfo] : KeyColumns) {
+ YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(keyColumnTypes.size()));
+ keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType;
+ }
+
+ return keyColumnTypes;
+}
+
+class TKqpLookupRows : public TKqpStreamLookupWorker {
+public:
+ TKqpLookupRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
+ const NMiniKQL::THolderFactory& holderFactory) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory) {
+ }
+
+ virtual ~TKqpLookupRows() {}
+
+ void AddInputRow(NUdf::TUnboxedValue inputRow) final {
+ std::vector<TCell> keyCells(LookupKeyColumns.size());
+ for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) {
+ const auto* lookupKeyColumn = LookupKeyColumns[colId];
+ YQL_ENSURE(lookupKeyColumn->KeyOrder < static_cast<i64>(keyCells.size()));
+ keyCells[lookupKeyColumn->KeyOrder] = MakeCell(lookupKeyColumn->PType,
+ inputRow.GetElement(colId), TypeEnv, /* copy */ true);
+ }
+
+ if (keyCells.size() < KeyColumns.size()) {
+ // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf])
+ std::vector<TCell> fromCells(KeyColumns.size());
+ for (size_t i = 0; i < keyCells.size(); ++i) {
+ fromCells[i] = keyCells[i];
+ }
+
+ bool fromInclusive = true;
+ bool toInclusive = false;
+
+ UnprocessedKeys.emplace_back(fromCells, fromInclusive, keyCells, toInclusive);
+ } else {
+ // full pk, build point
+ UnprocessedKeys.emplace_back(std::move(keyCells));
+ }
+ }
+
+ TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) final {
+ YQL_ENSURE(partitioning);
+
+ std::unordered_map<ui64, std::vector<TOwnedTableRange>> rangesPerShard;
+ std::unordered_map<ui64, std::vector<TOwnedTableRange>> pointsPerShard;
+
+ while (!UnprocessedKeys.empty()) {
+ auto range = std::move(UnprocessedKeys.front());
+ UnprocessedKeys.pop_front();
+
+ auto partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), range);
+ for (auto [shardId, range] : partitions) {
+ if (range.Point) {
+ pointsPerShard[shardId].push_back(std::move(range));
+ } else {
+ rangesPerShard[shardId].push_back(std::move(range));
+ }
+ }
+ }
+
+ std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>> readRequests;
+ readRequests.reserve(rangesPerShard.size() + pointsPerShard.size());
+
+ for (auto& [shardId, points] : pointsPerShard) {
+ THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
+ FillReadRequest(++readId, request, points);
+ readRequests.emplace_back(shardId, std::move(request));
+ PendingKeysByReadId.insert({readId, std::move(points)});
+ }
+
+ for (auto& [shardId, ranges] : rangesPerShard) {
+ THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
+ FillReadRequest(++readId, request, ranges);
+ readRequests.emplace_back(shardId, std::move(request));
+ PendingKeysByReadId.insert({readId, std::move(ranges)});
+ }
+
+ return readRequests;
+ }
+
+ void AddResult(TShardReadResult result) final {
+ const auto& record = result.ReadResult->Get()->Record;
+ YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS);
+
+ auto it = PendingKeysByReadId.find(record.GetReadId());
+ YQL_ENSURE(it != PendingKeysByReadId.end());
+
+ ReadResults.emplace_back(std::move(result));
+ }
+
+ TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
+ TReadResultStats resultStats;
+ bool sizeLimitExceeded = false;
+ batch.clear();
+
+ while (!ReadResults.empty() && !sizeLimitExceeded) {
+ auto& result = ReadResults.front();
+ for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
+ const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
+ YQL_ENSURE(resultRow.size() <= Columns.size(), "Result columns mismatch");
+
+ NUdf::TUnboxedValue* rowItems = nullptr;
+ auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);
+
+ i64 rowSize = 0;
+ for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
+ const auto& column = Columns[colIndex];
+ if (IsSystemColumn(column.Name)) {
+ NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
+ rowSize += sizeof(NUdf::TUnboxedValue);
+ } else {
+ YQL_ENSURE(resultColIndex < resultRow.size());
+ rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType);
+ rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
+ ++resultColIndex;
+ }
+ }
+
+ if (rowSize > freeSpace - (i64)resultStats.BytesCount) {
+ row.DeleteUnreferenced();
+ sizeLimitExceeded = true;
+ break;
+ }
+
+ batch.push_back(std::move(row));
+
+ resultStats.RowsCount += 1;
+ resultStats.BytesCount += rowSize;
+ }
+
+ if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
+ if (result.ReadResult->Get()->Record.GetFinished()) {
+ // delete finished read
+ auto it = PendingKeysByReadId.find(result.ReadResult->Get()->Record.GetReadId());
+ PendingKeysByReadId.erase(it);
+ }
+
+ ReadResults.pop_front();
+ }
+ }
+
+ return resultStats;
+ }
+
+ bool AllRowsProcessed() final {
+ return UnprocessedKeys.empty()
+ && PendingKeysByReadId.empty()
+ && ReadResults.empty();
+ }
+
+ void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) final {
+ auto it = PendingKeysByReadId.find(readId);
+ YQL_ENSURE(it != PendingKeysByReadId.end());
+
+ if (lastProcessedKey) {
+ YQL_ENSURE(firstUnprocessedQuery < it->second.size());
+ auto unprocessedRange = it->second[firstUnprocessedQuery];
+ YQL_ENSURE(!unprocessedRange.Point);
+
+ UnprocessedKeys.emplace_back(*lastProcessedKey, false,
+ unprocessedRange.GetOwnedTo(), unprocessedRange.InclusiveTo);
+ ++firstUnprocessedQuery;
+ }
+
+ for (ui32 keyIdx = firstUnprocessedQuery; keyIdx < it->second.size(); ++keyIdx) {
+ UnprocessedKeys.emplace_back(std::move(it->second[keyIdx]));
+ }
+
+ PendingKeysByReadId.erase(it);
+ }
+
+private:
+ void FillReadRequest(ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) {
+ auto& record = request->Record;
+
+ record.SetReadId(readId);
+
+ record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId);
+ record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId);
+ record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion);
+
+ for (const auto& column : Columns) {
+ if (!IsSystemColumn(column.Name)) {
+ record.AddColumns(column.Id);
+ }
+ }
+
+ YQL_ENSURE(!ranges.empty());
+ if (ranges.front().Point) {
+ request->Keys.reserve(ranges.size());
+ for (auto& range : ranges) {
+ YQL_ENSURE(range.Point);
+ request->Keys.emplace_back(TSerializedCellVec(range.From));
+ }
+ } else {
+ request->Ranges.reserve(ranges.size());
+ for (auto& range : ranges) {
+ YQL_ENSURE(!range.Point);
+
+ if (range.To.size() < KeyColumns.size()) {
+ // absent cells mean infinity => in prefix notation `To` should be inclusive
+ request->Ranges.emplace_back(TSerializedTableRange(range.From, range.InclusiveFrom, range.To, true));
+ } else {
+ request->Ranges.emplace_back(TSerializedTableRange(range));
+ }
+ }
+ }
+ }
+
+private:
+ std::deque<TOwnedTableRange> UnprocessedKeys;
+ std::unordered_map<ui64, std::vector<TOwnedTableRange>> PendingKeysByReadId;
+ std::deque<TShardReadResult> ReadResults;
+};
+
+class TKqpJoinRows : public TKqpStreamLookupWorker {
+public:
+ TKqpJoinRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
+ const NMiniKQL::THolderFactory& holderFactory) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory) {
+
+ // read columns should contain join key and result columns
+ for (auto joinKey : LookupKeyColumns) {
+ ReadColumns.emplace(joinKey->Name, *joinKey);
+ }
+
+ for (auto column : Columns) {
+ ReadColumns.emplace(column.Name, column);
+ }
+ }
+
+ void AddInputRow(NUdf::TUnboxedValue inputRow) final {
+ auto joinKey = inputRow.GetElement(0);
+ std::vector<TCell> joinKeyCells(LookupKeyColumns.size());
+ for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) {
+ const auto* joinKeyColumn = LookupKeyColumns[colId];
+ YQL_ENSURE(joinKeyColumn->KeyOrder < static_cast<i64>(joinKeyCells.size()));
+ joinKeyCells[joinKeyColumn->KeyOrder] = MakeCell(joinKeyColumn->PType,
+ joinKey.GetElement(colId), TypeEnv, true);
+ }
+
+ UnprocessedRows.emplace_back(std::make_pair(TOwnedCellVec(joinKeyCells), std::move(inputRow.GetElement(1))));
+ }
+
+ TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) final {
+ YQL_ENSURE(partitioning);
+
+ std::unordered_map<ui64, std::vector<TOwnedTableRange>> rangesPerShard;
+ std::unordered_map<ui64, std::vector<TOwnedTableRange>> pointsPerShard;
+
+ while (!UnprocessedKeys.empty()) {
+ auto range = std::move(UnprocessedKeys.front());
+ UnprocessedKeys.pop_front();
+
+ auto partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), range);
+ for (auto [shardId, range] : partitions) {
+ if (range.Point) {
+ pointsPerShard[shardId].push_back(std::move(range));
+ } else {
+ rangesPerShard[shardId].push_back(std::move(range));
+ }
+ }
+ }
+
+ while (!UnprocessedRows.empty()) {
+ auto [joinKey, leftData] = UnprocessedRows.front();
+
+ if (PendingLeftRowsByKey.contains(joinKey)) {
+ // TODO: skip key duplicate
+ break;
+ }
+
+ UnprocessedRows.pop_front();
+
+ std::vector<std::pair<ui64, TOwnedTableRange>> partitions;
+ if (joinKey.size() < KeyColumns.size()) {
+ // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf])
+ std::vector<TCell> fromCells(KeyColumns.size());
+ fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end());
+ bool fromInclusive = true;
+ bool toInclusive = false;
+
+ partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(),
+ TOwnedTableRange(fromCells, fromInclusive, joinKey, toInclusive)
+ );
+ } else {
+ // full pk, build point
+ partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(joinKey));
+ }
+
+ for (auto [shardId, range] : partitions) {
+ if (range.Point) {
+ pointsPerShard[shardId].push_back(std::move(range));
+ } else {
+ rangesPerShard[shardId].push_back(std::move(range));
+ }
+ }
+
+ PendingLeftRowsByKey.insert(std::make_pair(std::move(joinKey), TLeftRowInfo{std::move(leftData)}));
+ }
+
+ std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>> requests;
+ requests.reserve(rangesPerShard.size() + pointsPerShard.size());
+
+ for (auto& [shardId, points] : pointsPerShard) {
+ THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
+ FillReadRequest(++readId, request, points);
+ requests.emplace_back(shardId, std::move(request));
+
+ for (const auto& point : points) {
+ auto rowIt = PendingLeftRowsByKey.find(point.From);
+ YQL_ENSURE(rowIt != PendingLeftRowsByKey.end());
+ rowIt->second.PendingReads.insert(readId);
+ }
+
+ PendingKeysByReadId.insert({readId, std::move(points)});
+ }
+
+ for (auto& [shardId, ranges] : rangesPerShard) {
+ THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead());
+ FillReadRequest(++readId, request, ranges);
+ requests.emplace_back(shardId, std::move(request));
+
+ for (const auto& range : ranges) {
+ auto rowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(range));
+ YQL_ENSURE(rowIt != PendingLeftRowsByKey.end());
+ rowIt->second.PendingReads.insert(readId);
+ }
+
+ PendingKeysByReadId.insert({readId, std::move(ranges)});
+ }
+
+ return requests;
+ }
+
+ void AddResult(TShardReadResult result) final {
+ const auto& record = result.ReadResult->Get()->Record;
+ YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS);
+
+ auto it = PendingKeysByReadId.find(record.GetReadId());
+ YQL_ENSURE(it != PendingKeysByReadId.end());
+
+ ReadResults.emplace_back(std::move(result));
+ }
+
+ bool AllRowsProcessed() final {
+ return UnprocessedRows.empty()
+ && UnprocessedKeys.empty()
+ && PendingKeysByReadId.empty()
+ && ReadResults.empty();
+ }
+
+ void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) final {
+ auto readIt = PendingKeysByReadId.find(readId);
+ YQL_ENSURE(readIt != PendingKeysByReadId.end());
+ auto& ranges = readIt->second;
+
+ if (lastProcessedKey) {
+ YQL_ENSURE(firstUnprocessedQuery < ranges.size());
+ auto unprocessedRange = ranges[firstUnprocessedQuery];
+ YQL_ENSURE(!unprocessedRange.Point);
+
+ UnprocessedKeys.emplace_back(*lastProcessedKey, false,
+ unprocessedRange.GetOwnedTo(), unprocessedRange.InclusiveTo);
+ ++firstUnprocessedQuery;
+ }
+
+ for (ui32 i = firstUnprocessedQuery; i < ranges.size(); ++i) {
+ auto& range = ranges[i];
+ auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(range));
+ YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
+ leftRowIt->second.PendingReads.erase(readId);
+
+ UnprocessedKeys.emplace_back(std::move(range));
+ }
+
+ PendingKeysByReadId.erase(readIt);
+ }
+
+ TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
+ TReadResultStats resultStats;
+ bool sizeLimitExceeded = false;
+ batch.clear();
+
+ while (!ReadResults.empty() && !sizeLimitExceeded) {
+ auto& result = ReadResults.front();
+
+ for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
+ const auto& row = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
+ YQL_ENSURE(row.size() <= ReadColumns.size(), "Result columns mismatch");
+
+ std::vector<TCell> joinKeyCells(LookupKeyColumns.size());
+ for (size_t joinKeyIdx = 0; joinKeyIdx < LookupKeyColumns.size(); ++joinKeyIdx) {
+ auto it = ReadColumns.find(LookupKeyColumns[joinKeyIdx]->Name);
+ YQL_ENSURE(it != ReadColumns.end());
+ joinKeyCells[joinKeyIdx] = row[std::distance(ReadColumns.begin(), it)];
+ }
+
+ auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);
+ YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
+
+ i64 resultRowSize = 0;
+ i64 availableSpace = freeSpace - (i64)resultStats.BytesCount;
+ auto resultRow = TryBuildResultRow(leftRowIt->second, row, resultRowSize, availableSpace, result.ShardId);
+
+ if (!resultRow.HasValue()) {
+ sizeLimitExceeded = true;
+ break;
+ }
+
+ batch.push_back(std::move(resultRow));
+
+ resultStats.RowsCount += 1;
+ resultStats.BytesCount += resultRowSize;
+ }
+
+ if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
+ if (result.ReadResult->Get()->Record.GetFinished()) {
+ auto it = PendingKeysByReadId.find(result.ReadResult->Get()->Record.GetReadId());
+ YQL_ENSURE(it != PendingKeysByReadId.end());
+
+ for (const auto& range : it->second) {
+ auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(range));
+ if (leftRowIt != PendingLeftRowsByKey.end()) {
+ leftRowIt->second.PendingReads.erase(result.ReadResult->Get()->Record.GetReadId());
+
+ const bool leftRowCanBeDeleted = leftRowIt->second.PendingReads.empty()
+ && leftRowIt->second.RightRowExist;
+ if (leftRowCanBeDeleted) {
+ PendingLeftRowsByKey.erase(leftRowIt);
+ }
+ }
+ }
+
+ PendingKeysByReadId.erase(it);
+ }
+
+ ReadResults.pop_front();
+ }
+ }
+
+ if (!sizeLimitExceeded) {
+ for (auto leftRowIt = PendingLeftRowsByKey.begin(); leftRowIt != PendingLeftRowsByKey.end();) {
+ const bool leftRowCanBeSent = leftRowIt->second.PendingReads.empty()
+ && !leftRowIt->second.RightRowExist;
+
+ if (leftRowCanBeSent) {
+ i64 resultRowSize = 0;
+ i64 availableSpace = freeSpace - (i64) resultStats.BytesCount;
+ auto resultRow = TryBuildResultRow(leftRowIt->second, {}, resultRowSize, availableSpace);
+
+ if (!resultRow.HasValue()) {
+ break;
+ }
+
+ batch.push_back(std::move(resultRow));
+ PendingLeftRowsByKey.erase(leftRowIt++);
+ } else {
+ ++leftRowIt;
+ }
+ }
+ }
+
+ return resultStats;
+ }
+
+ ~TKqpJoinRows() {
+ UnprocessedRows.clear();
+ PendingLeftRowsByKey.clear();
+ }
+private:
+ struct TLeftRowInfo {
+ TLeftRowInfo(NUdf::TUnboxedValue row) : Row(std::move(row)) {
+ }
+
+ NUdf::TUnboxedValue Row;
+ std::unordered_set<ui64> PendingReads;
+ bool RightRowExist = false;
+ };
+
+ void FillReadRequest(ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) {
+ auto& record = request->Record;
+
+ record.SetReadId(readId);
+
+ record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId);
+ record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId);
+ record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion);
+
+ for (const auto& [name, column] : ReadColumns) {
+ if (!IsSystemColumn(name)) {
+ record.AddColumns(column.Id);
+ }
+ }
+
+ YQL_ENSURE(!ranges.empty());
+ if (ranges.front().Point) {
+ request->Keys.reserve(ranges.size());
+ for (auto& range : ranges) {
+ YQL_ENSURE(range.Point);
+ request->Keys.emplace_back(TSerializedCellVec(range.From));
+ }
+ } else {
+ request->Ranges.reserve(ranges.size());
+ for (auto& range : ranges) {
+ YQL_ENSURE(!range.Point);
+ if (range.To.size() < KeyColumns.size()) {
+ // Absent cells mean infinity. So in prefix notation `To` should be inclusive.
+ request->Ranges.emplace_back(TSerializedTableRange(range.From, range.InclusiveFrom, range.To, true));
+ } else {
+ request->Ranges.emplace_back(TSerializedTableRange(range));
+ }
+ }
+ }
+ }
+
+ TConstArrayRef<TCell> ExtractKeyPrefix(const TOwnedTableRange& range) {
+ if (range.From.size() == LookupKeyColumns.size()) {
+ return range.From;
+ }
+
+ return range.From.subspan(0, LookupKeyColumns.size());
+ }
+
+ NUdf::TUnboxedValue TryBuildResultRow(TLeftRowInfo& leftRowInfo, TConstArrayRef<TCell> rightRow, i64& resultRowSize,
+ i64 freeSpace, TMaybe<ui64> shardId = {}) {
+
+ NUdf::TUnboxedValue* resultRowItems = nullptr;
+ auto resultRow = HolderFactory.CreateDirectArrayHolder(2, resultRowItems);
+
+ resultRowItems[0] = leftRowInfo.Row;
+
+ if (!rightRow.empty()) {
+ leftRowInfo.RightRowExist = true;
+ // TODO: get size for left row
+
+ NUdf::TUnboxedValue* rightRowItems = nullptr;
+ resultRowItems[1] = HolderFactory.CreateDirectArrayHolder(Columns.size(), rightRowItems);
+
+ for (size_t colIndex = 0; colIndex < Columns.size(); ++colIndex) {
+ const auto& column = Columns[colIndex];
+ auto it = ReadColumns.find(column.Name);
+ YQL_ENSURE(it != ReadColumns.end());
+
+ if (IsSystemColumn(column.Name)) {
+ YQL_ENSURE(shardId);
+ NMiniKQL::FillSystemColumn(rightRowItems[colIndex], *shardId, column.Id, column.PType);
+ resultRowSize += sizeof(NUdf::TUnboxedValue);
+ } else {
+ rightRowItems[colIndex] = NMiniKQL::GetCellValue(rightRow[std::distance(ReadColumns.begin(), it)],
+ column.PType);
+ resultRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes;
+ }
+ }
+ } else {
+ resultRowItems[1] = NUdf::TUnboxedValuePod();
+ }
+
+ if (resultRowSize > freeSpace) {
+ resultRow.DeleteUnreferenced();
+ }
+
+ return resultRow;
+ }
+
+private:
+ std::map<std::string, TSysTables::TTableColumnInfo> ReadColumns;
+ std::deque<std::pair<TOwnedCellVec, NUdf::TUnboxedValue>> UnprocessedRows;
+ std::deque<TOwnedTableRange> UnprocessedKeys;
+ std::unordered_map<ui64, std::vector<TOwnedTableRange>> PendingKeysByReadId;
+ absl::flat_hash_map<TOwnedCellVec, TLeftRowInfo, TKeyHash, TKeyEq> PendingLeftRowsByKey;
+ std::deque<TShardReadResult> ReadResults;
+};
+
+std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
+ const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory) {
+
+ switch (settings.GetLookupStrategy()) {
+ case NKqpProto::EStreamLookupStrategy::LOOKUP:
+ return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory);
+ case NKqpProto::EStreamLookupStrategy::JOIN:
+ return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory);
+ default:
+ return {};
+ }
+}
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
new file mode 100644
index 00000000000..56ac40019df
--- /dev/null
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
@@ -0,0 +1,60 @@
+#pragma once
+
+#include <ydb/core/protos/kqp.pb.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/core/scheme/scheme_tabledefs.h>
+#include <ydb/core/tx/datashard/sys_tables.h>
+#include <ydb/core/tx/datashard/datashard.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+class TKqpStreamLookupWorker {
+public:
+ using TReadList = std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>>;
+ using TPartitionInfo = std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>>;
+
+ struct TShardReadResult {
+ const ui64 ShardId;
+ THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult;
+ size_t UnprocessedResultRow = 0;
+ };
+
+ struct TReadResultStats {
+ ui64 RowsCount = 0;
+ ui64 BytesCount = 0;
+ };
+
+public:
+ TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
+ const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory);
+
+ virtual ~TKqpStreamLookupWorker();
+
+ virtual std::string GetTablePath() const;
+ virtual TTableId GetTableId() const;
+ virtual std::vector<NScheme::TTypeInfo> GetKeyColumnTypes() const;
+
+ virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0;
+ virtual TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) = 0;
+ virtual void AddResult(TShardReadResult result) = 0;
+ virtual TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) = 0;
+ virtual bool AllRowsProcessed() = 0;
+ virtual void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) = 0;
+
+protected:
+ const NMiniKQL::TTypeEnvironment& TypeEnv;
+ const NMiniKQL::THolderFactory& HolderFactory;
+ const TString TablePath;
+ const TTableId TableId;
+ std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
+ std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns;
+ std::vector<TSysTables::TTableColumnInfo> Columns;
+};
+
+std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
+ const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory);
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/ya.make b/ydb/core/kqp/runtime/ya.make
index 77d8a0aa4f6..3f41b86b275 100644
--- a/ydb/core/kqp/runtime/ya.make
+++ b/ydb/core/kqp/runtime/ya.make
@@ -18,6 +18,8 @@ SRCS(
kqp_stream_lookup_actor.h
kqp_stream_lookup_factory.cpp
kqp_stream_lookup_factory.h
+ kqp_stream_lookup_worker.cpp
+ kqp_stream_lookup_worker.h
kqp_tasks_runner.cpp
kqp_transport.cpp
)
diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
index 97f32649f84..89361b130c3 100644
--- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
+++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp
@@ -84,8 +84,11 @@ void PrepareTables(TSession session) {
Y_UNIT_TEST_SUITE(KqpIndexLookupJoin) {
-void Test(const TString& query, const TString& answer, size_t rightTableReads) {
- TKikimrSettings settings;
+void Test(const TString& query, const TString& answer, size_t rightTableReads, bool useStreamLookup = false) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookup);
+
+ auto settings = TKikimrSettings().SetAppConfig(appConfig);
TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -101,25 +104,27 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads) {
CompareYson(answer, FormatResultSetYson(result.GetResultSet(0)));
auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
- if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2);
+ if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin()) {
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1);
+
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).name(), "/Root/Right");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).reads().rows(), rightTableReads);
} else {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3);
- }
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5);
- ui32 index = 1;
- if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) {
UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups
- index = 2;
- }
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Right");
- UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), rightTableReads);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access(0).name(), "/Root/Right");
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access(0).reads().rows(), rightTableReads);
+ }
}
Y_UNIT_TEST(MultiJoins) {
@@ -243,6 +248,157 @@ Y_UNIT_TEST(RightSemi) {
])", 3);
}
+Y_UNIT_TEST_TWIN(SimpleInnerJoin, StreamLookup) {
+ Test(
+ R"(
+ SELECT l.Key, l.Fk, l.Value, r.Key, r.Value
+ FROM `/Root/Left` AS l
+ INNER JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ ORDER BY l.Key;
+ )",
+ R"([
+ [[1];[101];["Value1"];[101];["Value21"]];
+ [[2];[102];["Value1"];[102];["Value22"]];
+ [[3];[103];["Value2"];[103];["Value23"]]
+ ])", 3, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(InnerJoinCustomColumnOrder, StreamLookup) {
+ Test(
+ R"(
+ SELECT r.Value, l.Key, r.Key, l.Value, l.Fk
+ FROM `/Root/Left` AS l
+ INNER JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ ORDER BY r.Key;
+ )",
+ R"([
+ [["Value21"];[1];[101];["Value1"];[101]];
+ [["Value22"];[2];[102];["Value1"];[102]];
+ [["Value23"];[3];[103];["Value2"];[103]]
+ ])", 3, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(InnerJoinOnlyRightColumn, StreamLookup) {
+ Test(
+ R"(
+ SELECT r.Value
+ FROM `/Root/Left` AS l
+ INNER JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ ORDER BY r.Value;
+ )",
+ R"([
+ [["Value21"]];
+ [["Value22"]];
+ [["Value23"]]
+ ])", 3, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(InnerJoinOnlyLeftColumn, StreamLookup) {
+ Test(
+ R"(
+ SELECT l.Fk
+ FROM `/Root/Left` AS l
+ INNER JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ ORDER BY l.Fk;
+ )",
+ R"([
+ [[101]];
+ [[102]];
+ [[103]]
+ ])", 3, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(InnerJoinLeftFilter, StreamLookup) {
+ Test(
+ R"(
+ SELECT l.Key, l.Fk, l.Value, r.Key, r.Value
+ FROM `/Root/Left` AS l
+ INNER JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ WHERE l.Value != 'Value1'
+ ORDER BY l.Key;
+ )",
+ R"([
+ [[3];[103];["Value2"];[103];["Value23"]]
+ ])", 1, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(SimpleLeftJoin, StreamLookup) {
+ Test(
+ R"(
+ SELECT l.Key, l.Fk, l.Value, r.Key, r.Value
+ FROM `/Root/Left` AS l
+ LEFT JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ ORDER BY l.Key;
+ )",
+ R"([
+ [[1];[101];["Value1"];[101];["Value21"]];
+ [[2];[102];["Value1"];[102];["Value22"]];
+ [[3];[103];["Value2"];[103];["Value23"]];
+ [[4];[104];["Value2"];#;#];
+ [[5];[105];["Value3"];#;#]
+ ])", 3, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(LeftJoinCustomColumnOrder, StreamLookup) {
+ Test(
+ R"(
+ SELECT r.Value, l.Key, r.Key, l.Value, l.Fk
+ FROM `/Root/Left` AS l
+ LEFT JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ ORDER BY l.Key;
+ )",
+ R"([
+ [["Value21"];[1];[101];["Value1"];[101]];
+ [["Value22"];[2];[102];["Value1"];[102]];
+ [["Value23"];[3];[103];["Value2"];[103]];
+ [#;[4];#;["Value2"];[104]];
+ [#;[5];#;["Value3"];[105]]
+ ])", 3, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(LeftJoinOnlyRightColumn, StreamLookup) {
+ Test(
+ R"(
+ SELECT r.Value
+ FROM `/Root/Left` AS l
+ LEFT JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ ORDER BY r.Value;
+ )",
+ R"([
+ [#];
+ [#];
+ [["Value21"]];
+ [["Value22"]];
+ [["Value23"]]
+ ])", 3, StreamLookup);
+}
+
+Y_UNIT_TEST_TWIN(LeftJoinOnlyLeftColumn, StreamLookup) {
+ Test(
+ R"(
+ SELECT l.Fk
+ FROM `/Root/Left` AS l
+ LEFT JOIN `/Root/Right` AS r
+ ON l.Fk = r.Key
+ ORDER BY l.Fk;
+ )",
+ R"([
+ [[101]];
+ [[102]];
+ [[103]];
+ [[104]];
+ [[105]]
+ ])", 3, StreamLookup);
+}
+
void CreateSimpleTableWithKeyType(TSession session, const TString& columnType) {
using namespace fmt::literals;
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 1036cc5be0f..5f15c0dcdf8 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1355,6 +1355,7 @@ message TTableServiceConfig {
reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false];
optional bool EnablePublishKqpProxyByRM = 34 [default = true];
optional bool EnableKqpScanQueryStreamIdxLookupJoin = 35 [default = false];
+ optional bool EnableKqpDataQueryStreamIdxLookupJoin = 49 [default = false];
optional bool EnablePredicateExtractForScanQueries = 36 [default = true];
optional bool EnablePredicateExtractForDataQueries = 37 [default = true];
optional bool EnableKqpImmediateEffects = 38 [default = true];
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index bf18c4358f2..222068c7bd8 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -640,6 +640,7 @@ message TKqpStreamLookupSettings {
optional uint64 LockTxId = 5;
optional bool ImmediateTx = 6;
repeated string LookupKeyColumns = 7;
+ optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8;
}
message TKqpSequencerSettings {
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 70415ade762..d1810e9c022 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -253,12 +253,19 @@ message TKqpPhyCnMerge {
repeated TKqpPhySortColumn SortColumns = 1;
}
+enum EStreamLookupStrategy {
+ UNSPECIFIED = 0;
+ LOOKUP = 1;
+ JOIN = 2;
+};
+
message TKqpPhyCnStreamLookup {
TKqpPhyTableId Table = 1;
repeated string KeyColumns = 2;
repeated string Columns = 3;
bytes LookupKeysType = 4;
bytes ResultType = 5;
+ EStreamLookupStrategy LookupStrategy = 6;
}
message TKqpPhyCnSequencer {