diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-10-06 13:11:20 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-10-06 13:47:02 +0300 |
commit | fdac813cf5d2394785944db3cd146c863a87bb92 (patch) | |
tree | d112f68851ddae693111bcaf25e4936eff25ebce | |
parent | 7775f59248540408cbd58e95b8b3a285fcf51e2d (diff) | |
download | ydb-fdac813cf5d2394785944db3cd146c863a87bb92.tar.gz |
KIKIMR-16746: implement stream index lookup join
feature(kqp): implement stream index lookup join
30 files changed, 1761 insertions, 329 deletions
diff --git a/ydb/core/kqp/common/kqp_yql.h b/ydb/core/kqp/common/kqp_yql.h index 3549a1a9854..3aab7b5d085 100644 --- a/ydb/core/kqp/common/kqp_yql.h +++ b/ydb/core/kqp/common/kqp_yql.h @@ -45,6 +45,9 @@ struct TKqpPhyTxSettings { constexpr TStringBuf KqpReadRangesSourceName = "KqpReadRangesSource"; +static constexpr std::string_view TKqpStreamLookupStrategyName = "LookupRows"sv; +static constexpr std::string_view TKqpStreamLookupJoinStrategyName = "LookupJoinRows"sv; + struct TKqpReadTableSettings { static constexpr TStringBuf SkipNullKeysSettingName = "SkipNullKeys"; static constexpr TStringBuf ItemsLimitSettingName = "ItemsLimit"; diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 29485c077cb..276c0469573 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -422,6 +422,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.EnableKqpDataQueryStreamLookup = serviceConfig.GetEnableKqpDataQueryStreamLookup(); kqpConfig.EnableKqpScanQueryStreamLookup = serviceConfig.GetEnableKqpScanQueryStreamLookup(); kqpConfig.EnableKqpScanQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpScanQueryStreamIdxLookupJoin(); + kqpConfig.EnableKqpDataQueryStreamIdxLookupJoin = serviceConfig.GetEnableKqpDataQueryStreamIdxLookupJoin(); kqpConfig.EnablePredicateExtractForDataQuery = serviceConfig.GetEnablePredicateExtractForDataQueries(); kqpConfig.EnablePredicateExtractForScanQuery = serviceConfig.GetEnablePredicateExtractForScanQueries(); kqpConfig.EnableSequentialReads = serviceConfig.GetEnableSequentialReads(); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index d0232c0ea66..e8e18bbfdbd 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -50,6 +50,10 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput return WrapKqpEnsure(callable, ctx); } + if (name == "KqpIndexLookupJoin"sv) { + return WrapKqpIndexLookupJoin(callable, ctx); + } + return nullptr; }; } diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 9d76aaf9ef4..a49088a0c92 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -384,6 +384,8 @@ void BuildStreamLookupChannels(TKqpTasksGraph& graph, const TStageInfo& stageInf columnProto->SetTypeId(columnIt->second.Type.GetTypeId()); } + settings->SetLookupStrategy(streamLookup.GetLookupStrategy()); + TTransform streamLookupTransform; streamLookupTransform.Type = "StreamLookupInputTransformer"; streamLookupTransform.InputType = streamLookup.GetLookupKeysType(); diff --git a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json index d208142f542..09413f17157 100644 --- a/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json +++ b/ydb/core/kqp/expr_nodes/kqp_expr_nodes.json @@ -183,6 +183,20 @@ "Match": {"Type": "Callable", "Name": "KqlStreamLookupTable"} }, { + "Name": "TKqlStreamIdxLookupJoin", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "KqlStreamIdxLookupJoin"}, + "Children": [ + {"Index": 0, "Name": "LeftInput", "Type": "TExprBase"}, + {"Index": 1, "Name": "LeftLabel", "Type": "TCoAtom"}, + {"Index": 2, "Name": "RightTable", "Type": "TKqpTable"}, + {"Index": 3, "Name": "RightColumns", "Type": "TCoAtomList"}, + {"Index": 4, "Name": "RightLabel", "Type": "TCoAtom"}, + {"Index": 5, "Name": "JoinType", "Type": "TCoAtom"} + ] + + }, + { "Name": "TKqlSequencer", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "KqlSequencer"}, @@ -416,7 +430,19 @@ "Children": [ {"Index": 1, "Name": "Table", "Type": "TKqpTable"}, {"Index": 2, "Name": "Columns", "Type": "TCoAtomList"}, - {"Index": 3, "Name": "LookupKeysType", "Type": "TExprBase"} + {"Index": 3, "Name": "InputType", "Type": "TExprBase"}, + {"Index": 4, "Name": "LookupStrategy", "Type": "TCoAtom"} + ] + }, + { + "Name": "TKqpIndexLookupJoin", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "KqpIndexLookupJoin"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"}, + {"Index": 1, "Name": "JoinType", "Type": "TCoAtom"}, + {"Index": 2, "Name": "LeftLabel", "Type": "TCoAtom"}, + {"Index": 3, "Name": "RightLabel", "Type": "TCoAtom"} ] }, { diff --git a/ydb/core/kqp/host/kqp_type_ann.cpp b/ydb/core/kqp/host/kqp_type_ann.cpp index 2d9174ddb55..57c6677cb07 100644 --- a/ydb/core/kqp/host/kqp_type_ann.cpp +++ b/ydb/core/kqp/host/kqp_type_ann.cpp @@ -1281,6 +1281,96 @@ TStatus AnnotateSequencer(const TExprNode::TPtr& node, TExprContext& ctx, const return TStatus::Ok; } +TStatus AnnotateStreamIdxLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, + const TKikimrTablesData& tablesData, bool withSystemColumns) +{ + if (!EnsureArgsCount(*node, 6, ctx)) { + return TStatus::Error; + } + + auto leftInputType = node->Child(TKqlStreamIdxLookupJoin::idx_LeftInput)->GetTypeAnn(); + const TTypeAnnotationNode* leftInputItemType; + if (!EnsureNewSeqType<false>(node->Pos(), *leftInputType, ctx, &leftInputItemType)) { + return TStatus::Error; + } + + YQL_ENSURE(leftInputItemType); + if (!EnsureTupleType(node->Pos(), *leftInputItemType, ctx)) { + return TStatus::Error; + } + + if (!EnsureTupleTypeSize(node->Pos(), leftInputItemType, 2, ctx)) { + return TStatus::Error; + } + + auto leftInputTupleType = leftInputItemType->Cast<TTupleExprType>(); + if (!EnsureStructType(node->Pos(), *leftInputTupleType->GetItems()[0], ctx)) { + return TStatus::Error; + } + + if (!EnsureStructType(node->Pos(), *leftInputTupleType->GetItems()[1], ctx)) { + return TStatus::Error; + } + + if (!EnsureAtom(*node->Child(TKqlStreamIdxLookupJoin::idx_LeftLabel), ctx)) { + return TStatus::Error; + } + + TCoAtom leftLabel(node->Child(TKqlStreamIdxLookupJoin::idx_LeftLabel)); + + auto rightTable = ResolveTable(node->Child(TKqlStreamIdxLookupJoin::idx_RightTable), ctx, cluster, tablesData); + if (!rightTable.second) { + return TStatus::Error; + } + + const TStructExprType* inputKeysType = leftInputTupleType->GetItems()[0]->Cast<TStructExprType>(); + for (const auto& inputKey : inputKeysType->GetItems()) { + if (!rightTable.second->GetKeyColumnIndex(TString(inputKey->GetName()))) { + return TStatus::Error; + } + } + + if (!EnsureTupleOfAtoms(*node->Child(TKqlStreamIdxLookupJoin::idx_RightColumns), ctx)) { + return TStatus::Error; + } + + TCoAtomList rightColumns{node->ChildPtr(TKqlStreamIdxLookupJoin::idx_RightColumns)}; + for (const auto& rightColumn : rightColumns) { + if (!rightTable.second->GetColumnType(TString(rightColumn.Value()))) { + return TStatus::Error; + } + } + + auto rightDataType = GetReadTableRowType(ctx, tablesData, cluster, rightTable.first, rightColumns, withSystemColumns); + if (!rightDataType) { + return TStatus::Error; + } + + if (!EnsureAtom(*node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel), ctx)) { + return TStatus::Error; + } + + TCoAtom rightLabel(node->Child(TKqlStreamIdxLookupJoin::idx_RightLabel)); + + const TStructExprType* leftDataType = leftInputTupleType->GetItems()[1]->Cast<TStructExprType>(); + TVector<const TItemExprType*> resultStructItems; + for (const auto& member : leftDataType->GetItems()) { + resultStructItems.emplace_back( + ctx.MakeType<TItemExprType>(TString::Join(leftLabel.Value(), ".", member->GetName()), member->GetItemType()) + ); + } + + for (const auto& member : rightDataType->Cast<TStructExprType>()->GetItems()) { + resultStructItems.emplace_back( + ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", member->GetName()), member->GetItemType()) + ); + } + + auto rowType = ctx.MakeType<TStructExprType>(resultStructItems); + node->SetTypeAnn(ctx.MakeType<TListExprType>(rowType)); + return TStatus::Ok; +} + TStatus AnnotateKqpProgram(const TExprNode::TPtr& node, TExprContext& ctx) { if (!EnsureArgsCount(*node, 2, ctx)) { return TStatus::Error; @@ -1420,7 +1510,7 @@ TStatus AnnotateSequencerConnection(const TExprNode::TPtr& node, TExprContext& c TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext& ctx, const TString& cluster, const TKikimrTablesData& tablesData, bool withSystemColumns) { - if (!EnsureArgsCount(*node, 4, ctx)) { + if (!EnsureArgsCount(*node, 5, ctx)) { return TStatus::Error; } @@ -1429,7 +1519,7 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext } if (!TDqOutput::Match(node->Child(TKqpCnStreamLookup::idx_Output))) { - ctx.AddError(TIssue(ctx.GetPosition(node->Child(TDqCnMerge::idx_Output)->Pos()), + ctx.AddError(TIssue(ctx.GetPosition(node->Child(TKqpCnStreamLookup::idx_Output)->Pos()), TStringBuilder() << "Expected " << TDqOutput::CallableName())); return TStatus::Error; } @@ -1439,40 +1529,167 @@ TStatus AnnotateStreamLookupConnection(const TExprNode::TPtr& node, TExprContext return TStatus::Error; } - auto lookupKeysTypeNode = node->Child(TKqpCnStreamLookup::idx_LookupKeysType); - if (!EnsureType(*lookupKeysTypeNode, ctx)) { + if (!EnsureTupleOfAtoms(*node->Child(TKqpCnStreamLookup::idx_Columns), ctx)) { return TStatus::Error; } - auto lookupKeysType = lookupKeysTypeNode->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - if (!EnsureListType(node->Pos(), *lookupKeysType, ctx)) { + TCoAtomList columns{node->ChildPtr(TKqpCnStreamLookup::idx_Columns)}; + + if (!EnsureAtom(*node->Child(TKqpCnStreamLookup::idx_LookupStrategy), ctx)) { + return TStatus::Error; + } + + TCoAtom lookupStrategy(node->Child(TKqpCnStreamLookup::idx_LookupStrategy)); + + auto inputTypeNode = node->Child(TKqpCnStreamLookup::idx_InputType); + + if (!EnsureType(*inputTypeNode, ctx)) { return TStatus::Error; } - auto lookupKeyType = lookupKeysType->Cast<TListExprType>()->GetItemType(); - if (!EnsureStructType(node->Pos(), *lookupKeyType, ctx)) { + auto inputType = inputTypeNode->GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + const TTypeAnnotationNode* inputItemType; + if (!EnsureNewSeqType<false>(node->Pos(), *inputType, ctx, &inputItemType)) { return TStatus::Error; } - const auto& lookupKeyColumns = lookupKeyType->Cast<TStructExprType>()->GetItems(); - for (const auto& keyColumn : lookupKeyColumns) { - if (!table.second->GetKeyColumnIndex(TString(keyColumn->GetName()))) { + YQL_ENSURE(inputItemType); + + if (lookupStrategy.Value() == "LookupRows") { + if (!EnsureStructType(node->Pos(), *inputItemType, ctx)) { + return TStatus::Error; + } + + const auto& lookupKeyColumns = inputItemType->Cast<TStructExprType>()->GetItems(); + for (const auto& keyColumn : lookupKeyColumns) { + if (!table.second->GetKeyColumnIndex(TString(keyColumn->GetName()))) { + return TStatus::Error; + } + } + + auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, columns, withSystemColumns); + if (!rowType) { return TStatus::Error; } + + node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType)); + + } else if (lookupStrategy.Value() == "LookupJoinRows") { + if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) { + return TStatus::Error; + } + + if (!EnsureTupleTypeSize(node->Pos(), inputItemType, 2, ctx)) { + return TStatus::Error; + } + + auto inputTupleType = inputItemType->Cast<TTupleExprType>(); + if (!EnsureStructType(node->Pos(), *inputTupleType->GetItems()[0], ctx)) { + return TStatus::Error; + } + + if (!EnsureStructType(node->Pos(), *inputTupleType->GetItems()[1], ctx)) { + return TStatus::Error; + } + + const TStructExprType* joinKeys = inputTupleType->GetItems()[0]->Cast<TStructExprType>(); + const TStructExprType* leftRowType = inputTupleType->GetItems()[1]->Cast<TStructExprType>(); + + for (const auto& inputKey : joinKeys->GetItems()) { + if (!table.second->GetKeyColumnIndex(TString(inputKey->GetName()))) { + return TStatus::Error; + } + } + + auto rightRowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, columns, withSystemColumns); + if (!rightRowType) { + return TStatus::Error; + } + + TVector<const TTypeAnnotationNode*> outputTypes; + outputTypes.push_back(leftRowType); + outputTypes.push_back(ctx.MakeType<TOptionalExprType>(rightRowType)); + + auto outputItemType = ctx.MakeType<TTupleExprType>(outputTypes); + node->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputItemType)); + + } else { + ctx.AddError(TIssue(ctx.GetPosition(node->Child(TKqpCnStreamLookup::idx_LookupStrategy)->Pos()), + TStringBuilder() << "Unexpected lookup strategy: " << lookupStrategy.Value())); + return TStatus::Error; } - if (!EnsureTupleOfAtoms(*node->Child(TKqpCnStreamLookup::idx_Columns), ctx)) { + return TStatus::Ok; +} + +TStatus AnnotateIndexLookupJoin(const TExprNode::TPtr& node, TExprContext& ctx) { + + if (!EnsureArgsCount(*node, 4, ctx)) { return TStatus::Error; } - TCoAtomList columns{node->ChildPtr(TKqpCnStreamLookup::idx_Columns)}; + auto inputType = node->Child(TKqpIndexLookupJoin::idx_Input)->GetTypeAnn(); + const TTypeAnnotationNode* inputItemType; + if (!EnsureNewSeqType<false>(node->Pos(), *inputType, ctx, &inputItemType)) { + return TStatus::Error; + } - auto rowType = GetReadTableRowType(ctx, tablesData, cluster, table.first, columns, withSystemColumns); - if (!rowType) { + YQL_ENSURE(inputItemType); + if (!EnsureTupleType(node->Pos(), *inputItemType, ctx)) { return TStatus::Error; } - node->SetTypeAnn(ctx.MakeType<TStreamExprType>(rowType)); + if (!EnsureTupleTypeSize(node->Pos(), inputItemType, 2, ctx)) { + return TStatus::Error; + } + + auto inputTupleType = inputItemType->Cast<TTupleExprType>(); + if (!EnsureStructType(node->Pos(), *inputTupleType->GetItems()[0], ctx)) { + return TStatus::Error; + } + + const TStructExprType* leftRowType = inputTupleType->GetItems()[0]->Cast<TStructExprType>(); + + if (!EnsureOptionalType(node->Pos(), *inputTupleType->GetItems()[1], ctx)) { + return TStatus::Error; + } + + auto rightRowType = inputTupleType->GetItems()[1]->Cast<TOptionalExprType>()->GetItemType(); + if (!EnsureStructType(node->Pos(), *rightRowType, ctx)) { + return TStatus::Error; + } + + if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_JoinType), ctx)) { + return TStatus::Error; + } + + if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_LeftLabel), ctx)) { + return TStatus::Error; + } + + TCoAtom leftLabel(node->Child(TKqpIndexLookupJoin::idx_LeftLabel)); + + if (!EnsureAtom(*node->Child(TKqpIndexLookupJoin::idx_RightLabel), ctx)) { + return TStatus::Error; + } + + TCoAtom rightLabel(node->Child(TKqpIndexLookupJoin::idx_RightLabel)); + + TVector<const TItemExprType*> resultStructItems; + for (const auto& item : leftRowType->GetItems()) { + resultStructItems.emplace_back( + ctx.MakeType<TItemExprType>(TString::Join(leftLabel.Value(), ".", item->GetName()), item->GetItemType()) + ); + } + + for (const auto& item : rightRowType->Cast<TStructExprType>()->GetItems()) { + resultStructItems.emplace_back( + ctx.MakeType<TItemExprType>(TString::Join(rightLabel.Value(), ".", item->GetName()), item->GetItemType()) + ); + } + + auto outputRowType = ctx.MakeType<TStructExprType>(resultStructItems); + node->SetTypeAnn(ctx.MakeType<TStreamExprType>(outputRowType)); return TStatus::Ok; } @@ -1600,6 +1817,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl return AnnotateStreamLookupConnection(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled()); } + if (TKqpIndexLookupJoin::Match(input.Get())) { + return AnnotateIndexLookupJoin(input, ctx); + } + if (TKqpTxResultBinding::Match(input.Get())) { return AnnotateKqpTxResultBinding(input, ctx); } @@ -1628,6 +1849,10 @@ TAutoPtr<IGraphTransformer> CreateKqpTypeAnnotationTransformer(const TString& cl return AnnotateSequencer(input, ctx, cluster, *tablesData); } + if (TKqlStreamIdxLookupJoin::Match(input.Get())) { + return AnnotateStreamIdxLookupJoin(input, ctx, cluster, *tablesData, config->SystemColumnsEnabled()); + } + if (TKqpProgram::Match(input.Get())) { return AnnotateKqpProgram(input, ctx); } diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 9d849f1f977..f072ae20da9 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -560,7 +560,6 @@ private: TTableRead readInfo; readInfo.Type = EPlanTableReadType::Lookup; - planNode.TypeName = "TableLookup"; TString table(tableLookup.Table().Path().Value()); auto& tableData = SerializerCtx.TablesData->GetTable(SerializerCtx.Cluster, table); planNode.NodeInfo["Table"] = tableData.RelativePath ? *tableData.RelativePath : table; @@ -572,15 +571,25 @@ private: readInfo.Columns.push_back(TString(column.Value())); } - const auto lookupKeysType = tableLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - YQL_ENSURE(lookupKeysType); - YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List); - const auto lookupKeysItemType = lookupKeysType->Cast<TListExprType>()->GetItemType(); - YQL_ENSURE(lookupKeysItemType->GetKind() == ETypeAnnotationKind::Struct); - const auto& lookupKeyColumnsStruct = lookupKeysItemType->Cast<TStructExprType>()->GetItems(); - readInfo.LookupBy.reserve(lookupKeyColumnsStruct.size()); + const auto inputType = tableLookup.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + YQL_ENSURE(inputType); + YQL_ENSURE(inputType->GetKind() == ETypeAnnotationKind::List); + const auto inputItemType = inputType->Cast<TListExprType>()->GetItemType(); + + const TStructExprType* lookupKeyColumnsStruct = nullptr; + if (inputItemType->GetKind() == ETypeAnnotationKind::Struct) { + planNode.TypeName = "TableLookup"; + lookupKeyColumnsStruct = inputItemType->Cast<TStructExprType>(); + } else if (inputItemType->GetKind() == ETypeAnnotationKind::Tuple) { + planNode.TypeName = "TableLookupJoin"; + const auto inputTupleType = inputItemType->Cast<TTupleExprType>(); + lookupKeyColumnsStruct = inputTupleType->GetItems()[0]->Cast<TStructExprType>(); + } + + YQL_ENSURE(lookupKeyColumnsStruct); + readInfo.LookupBy.reserve(lookupKeyColumnsStruct->GetItems().size()); auto& lookupKeyColumns = planNode.NodeInfo["LookupKeyColumns"]; - for (const auto keyColumn : lookupKeyColumnsStruct) { + for (const auto keyColumn : lookupKeyColumnsStruct->GetItems()) { lookupKeyColumns.AppendValue(keyColumn->GetName()); readInfo.LookupBy.push_back(TString(keyColumn->GetName())); } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp index d4f994c0b04..e5e5585b6e1 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log_join.cpp @@ -338,6 +338,21 @@ bool IsParameterToListOfStructsRepack(const TExprBase& expr) { #define DBG(...) template<typename ReadType> +TMaybeNode<TExprBase> BuildKqpStreamIndexLookupJoin(const TDqJoin& join, TExprBase leftInput, ReadType rightRead, TExprContext& ctx) { + TString leftLabel = join.LeftLabel().Maybe<TCoAtom>() ? TString(join.LeftLabel().Cast<TCoAtom>().Value()) : ""; + TString rightLabel = join.RightLabel().Maybe<TCoAtom>() ? TString(join.RightLabel().Cast<TCoAtom>().Value()) : ""; + + return Build<TKqlStreamIdxLookupJoin>(ctx, join.Pos()) + .LeftInput(leftInput) + .LeftLabel().Build(leftLabel) + .RightTable(rightRead.Table()) + .RightColumns(rightRead.Columns()) + .RightLabel().Build(rightLabel) + .JoinType(join.JoinType()) + .Done(); +} + +template<typename ReadType> TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext& ctx, const TKqpOptimizeContext& kqpCtx) { static_assert(std::is_same_v<ReadType, TKqlReadTableBase> || std::is_same_v<ReadType, TKqlReadTableRangesBase>, "unsupported read type"); @@ -347,6 +362,7 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext } static THashSet<TStringBuf> supportedJoinKinds = {"Inner", "Left", "LeftOnly", "LeftSemi", "RightSemi"}; + static THashSet<TStringBuf> supportedStreamJoinKinds = {"Inner", "Left"}; if (!supportedJoinKinds.contains(join.JoinType().Value())) { return {}; } @@ -554,9 +570,14 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext return {}; } + const bool useStreamIndexLookupJoin = kqpCtx.IsDataQuery() + && kqpCtx.Config->EnableKqpDataQueryStreamIdxLookupJoin + && supportedStreamJoinKinds.contains(join.JoinType().Value()); + bool needPrecomputeLeft = kqpCtx.IsDataQuery() && !join.LeftInput().Maybe<TCoParameter>() - && !IsParameterToListOfStructsRepack(join.LeftInput()); + && !IsParameterToListOfStructsRepack(join.LeftInput()) + && !useStreamIndexLookupJoin; TExprBase leftData = needPrecomputeLeft ? Build<TDqPrecompute>(ctx, join.Pos()) @@ -604,6 +625,23 @@ TMaybeNode<TExprBase> KqpJoinToIndexLookupImpl(const TDqJoin& join, TExprContext .Done(); } + if (useStreamIndexLookupJoin) { + auto leftInput = Build<TCoMap>(ctx, join.Pos()) + .Input(leftData) + .Lambda() + .Args({leftRowArg}) + .Body<TExprList>() + .Add<TCoAsStruct>() + .Add(lookupMembers) + .Build() + .Add(leftRowArg) + .Build() + .Build() + .Done(); + + return BuildKqpStreamIndexLookupJoin(join, leftInput, rightRead, ctx); + } + auto leftDataDeduplicated = DeduplicateByMembers(leftData, filter, deduplicateLeftColumns, ctx, join.Pos()); auto keysToLookup = Build<TCoMap>(ctx, join.Pos()) .Input(leftDataDeduplicated) diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 1f3a531fc91..77b029b0028 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -34,6 +34,7 @@ public: AddHandler(0, &TKqlReadTableRanges::Match, HNDL(BuildReadTableRangesStage)); AddHandler(0, &TKqlLookupTable::Match, HNDL(BuildLookupTableStage)); AddHandler(0, &TKqlStreamLookupTable::Match, HNDL(BuildStreamLookupTableStages)); + AddHandler(0, &TKqlStreamIdxLookupJoin::Match, HNDL(BuildStreamIdxLookupJoinStages)); AddHandler(0, &TKqlSequencer::Match, HNDL(BuildSequencerStages)); AddHandler(0, [](auto) { return true; }, HNDL(RemoveRedundantSortByPk)); AddHandler(0, &TCoTake::Match, HNDL(ApplyLimitToReadTable)); @@ -150,6 +151,12 @@ protected: return output; } + TMaybeNode<TExprBase> BuildStreamIdxLookupJoinStages(TExprBase node, TExprContext& ctx) { + TExprBase output = KqpBuildStreamIdxLookupJoinStages(node, ctx); + DumpAppliedRule("BuildStreamIdxLookupJoinStages", node.Ptr(), output.Ptr(), ctx); + return output; + } + TMaybeNode<TExprBase> BuildSequencerStages(TExprBase node, TExprContext& ctx) { TExprBase output = KqpBuildSequencerStages(node, ctx); DumpAppliedRule("BuildSequencerStages", node.Ptr(), output.Ptr(), ctx); diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp index 3cf59734a43..22688e7505a 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_build_stage.cpp @@ -613,7 +613,8 @@ NYql::NNodes::TExprBase KqpRewriteLookupTable(NYql::NNodes::TExprBase node, NYql .Build() .Table(lookupTable.Table()) .Columns(lookupTable.Columns()) - .LookupKeysType(ExpandType(node.Pos(), *keysPrecompute.Ref().GetTypeAnn(), ctx)) + .InputType(ExpandType(node.Pos(), *keysPrecompute.Ref().GetTypeAnn(), ctx)) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); newInputs.emplace_back(std::move(cnStreamLookup)); @@ -647,7 +648,8 @@ NYql::NNodes::TExprBase KqpRewriteLookupTable(NYql::NNodes::TExprBase node, NYql .Build() .Table(lookupTable.Table()) .Columns(lookupTable.Columns()) - .LookupKeysType(ExpandType(node.Pos(), *lookupKeysList.Ref().GetTypeAnn(), ctx)) + .InputType(ExpandType(node.Pos(), *lookupKeysList.Ref().GetTypeAnn(), ctx)) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); newInputs.emplace_back(std::move(cnStreamLookup)); @@ -698,7 +700,8 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase .Build() .Table(lookup.Table()) .Columns(lookup.Columns()) - .LookupKeysType(ExpandType(lookup.Pos(), *lookup.LookupKeys().Ref().GetTypeAnn(), ctx)) + .InputType(ExpandType(lookup.Pos(), *lookup.LookupKeys().Ref().GetTypeAnn(), ctx)) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } else if (lookup.LookupKeys().Maybe<TDqCnUnionAll>()) { @@ -708,7 +711,8 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase .Output(output) .Table(lookup.Table()) .Columns(lookup.Columns()) - .LookupKeysType(ExpandType(lookup.Pos(), *output.Ref().GetTypeAnn(), ctx)) + .InputType(ExpandType(lookup.Pos(), *output.Ref().GetTypeAnn(), ctx)) + .LookupStrategy().Build(TKqpStreamLookupStrategyName) .Done(); } else { return node; @@ -732,4 +736,42 @@ NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase .Build().Done(); } +NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx) { + if (!node.Maybe<TKqlStreamIdxLookupJoin>()) { + return node; + } + + const auto& idxLookupJoin = node.Cast<TKqlStreamIdxLookupJoin>(); + YQL_ENSURE(idxLookupJoin.LeftInput().Maybe<TDqCnUnionAll>(), "Expected UnionAll as left input"); + + auto output = idxLookupJoin.LeftInput().Cast<TDqCnUnionAll>().Output(); + auto cnStreamIdxLookupJoin = Build<TKqpCnStreamLookup>(ctx, idxLookupJoin.Pos()) + .Output(output) + .Table(idxLookupJoin.RightTable()) + .Columns(idxLookupJoin.RightColumns()) + .InputType(ExpandType(idxLookupJoin.Pos(), *output.Ref().GetTypeAnn(), ctx)) + .LookupStrategy().Build(TKqpStreamLookupJoinStrategyName) + .Done(); + + return Build<TDqCnUnionAll>(ctx, node.Pos()) + .Output() + .Stage<TDqStage>() + .Inputs() + .Add(cnStreamIdxLookupJoin) + .Build() + .Program() + .Args({"stream_lookup_join_output"}) + .Body<TKqpIndexLookupJoin>() + .Input("stream_lookup_join_output") + .JoinType(idxLookupJoin.JoinType()) + .LeftLabel(idxLookupJoin.LeftLabel()) + .RightLabel(idxLookupJoin.RightLabel()) + .Build() + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, node.Pos())) + .Build() + .Index().Build("0") + .Build().Done(); +} + } // namespace NKikimr::NKqp::NOpt diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h index f274ae765a8..64c58a73773 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_rules.h @@ -29,6 +29,8 @@ NYql::NNodes::TExprBase KqpBuildSequencerStages(NYql::NNodes::TExprBase node, NY NYql::NNodes::TExprBase KqpBuildStreamLookupTableStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx); +NYql::NNodes::TExprBase KqpBuildStreamIdxLookupJoinStages(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx); + NYql::NNodes::TExprBase KqpRemoveRedundantSortByPk(NYql::NNodes::TExprBase node, NYql::TExprContext& ctx, const TKqpOptimizeContext& kqpCtx); diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 5887882481f..7ade5af9d84 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -144,6 +144,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool EnableKqpScanQueryStreamLookup = false; bool EnableKqpDataQueryStreamLookup = false; bool EnableKqpScanQueryStreamIdxLookupJoin = false; + bool EnableKqpDataQueryStreamIdxLookupJoin = false; bool EnablePredicateExtractForScanQuery = true; bool EnablePredicateExtractForDataQuery = false; bool PredicateExtract20 = false; diff --git a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp index 72a77a87a2b..92fd414df23 100644 --- a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp @@ -425,6 +425,19 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext& return ctx.PgmBuilder().KqpEnsure(value, predicate, issueCode, message); }); + compiler->AddCallable(TKqpIndexLookupJoin::CallableName(), + [&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) { + TKqpIndexLookupJoin indexLookupJoin(&node); + + const TString joinType(indexLookupJoin.JoinType().Value()); + const TString leftLabel(indexLookupJoin.LeftLabel().Value()); + const TString rightLabel(indexLookupJoin.RightLabel().Value()); + + auto input = MkqlBuildExpr(indexLookupJoin.Input().Ref(), buildCtx); + + return ctx.PgmBuilder().KqpIndexLookupJoin(input, joinType, leftLabel, rightLabel); + }); + return compiler; } diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index c9017e14b82..f26ff59b284 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -81,6 +81,20 @@ NKqpProto::TKqpPhyInternalBinding::EType GetPhyInternalBindingType(const std::st return bindingType; } +NKqpProto::EStreamLookupStrategy GetStreamLookupStrategy(const std::string_view strategy) { + NKqpProto::EStreamLookupStrategy lookupStrategy = NKqpProto::EStreamLookupStrategy::UNSPECIFIED; + + if (strategy == "LookupRows"sv) { + lookupStrategy = NKqpProto::EStreamLookupStrategy::LOOKUP; + } else if (strategy == "LookupJoinRows"sv) { + lookupStrategy = NKqpProto::EStreamLookupStrategy::JOIN; + } + + YQL_ENSURE(lookupStrategy != NKqpProto::EStreamLookupStrategy::UNSPECIFIED, + "Unexpected stream lookup strategy: " << strategy); + return lookupStrategy; +} + void FillTableId(const TKqpTable& table, NKqpProto::TKqpPhyTableId& tableProto) { auto pathId = TKikimrPathId::Parse(table.PathId()); @@ -1088,18 +1102,11 @@ private: FillTablesMap(streamLookup.Table(), streamLookup.Columns(), tablesMap); FillTableId(streamLookup.Table(), *streamLookupProto.MutableTable()); - const auto lookupKeysType = streamLookup.LookupKeysType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); - YQL_ENSURE(lookupKeysType, "Empty stream lookup keys type"); - YQL_ENSURE(lookupKeysType->GetKind() == ETypeAnnotationKind::List, "Unexpected stream lookup keys type"); - const auto lookupKeysItemType = lookupKeysType->Cast<TListExprType>()->GetItemType(); - streamLookupProto.SetLookupKeysType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *lookupKeysItemType), TypeEnv)); - - YQL_ENSURE(lookupKeysItemType->GetKind() == ETypeAnnotationKind::Struct); - const auto& lookupKeyColumns = lookupKeysItemType->Cast<TStructExprType>()->GetItems(); - for (const auto keyColumn : lookupKeyColumns) { - YQL_ENSURE(tableMeta->Columns.FindPtr(keyColumn->GetName()), "Unknown column: " << keyColumn->GetName()); - streamLookupProto.AddKeyColumns(TString(keyColumn->GetName())); - } + const auto inputType = streamLookup.InputType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(); + YQL_ENSURE(inputType, "Empty stream lookup input type"); + YQL_ENSURE(inputType->GetKind() == ETypeAnnotationKind::List, "Unexpected stream lookup input type"); + const auto inputItemType = inputType->Cast<TListExprType>()->GetItemType(); + streamLookupProto.SetLookupKeysType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *inputItemType), TypeEnv)); const auto resultType = streamLookup.Ref().GetTypeAnn(); YQL_ENSURE(resultType, "Empty stream lookup result type"); @@ -1107,13 +1114,65 @@ private: const auto resultItemType = resultType->Cast<TStreamExprType>()->GetItemType(); streamLookupProto.SetResultType(NMiniKQL::SerializeNode(CompileType(pgmBuilder, *resultItemType), TypeEnv)); - YQL_ENSURE(resultItemType->GetKind() == ETypeAnnotationKind::Struct); - const auto& resultColumns = resultItemType->Cast<TStructExprType>()->GetItems(); - for (const auto column : resultColumns) { - const auto& systemColumns = GetSystemColumns(); - YQL_ENSURE(tableMeta->Columns.FindPtr(column->GetName()) || systemColumns.find(column->GetName()) != systemColumns.end(), - "Unknown column: " << column->GetName()); - streamLookupProto.AddColumns(TString(column->GetName())); + YQL_ENSURE(streamLookup.LookupStrategy().Maybe<TCoAtom>()); + TString lookupStrategy = streamLookup.LookupStrategy().Maybe<TCoAtom>().Cast().StringValue(); + streamLookupProto.SetLookupStrategy(GetStreamLookupStrategy(lookupStrategy)); + + switch (streamLookupProto.GetLookupStrategy()) { + case NKqpProto::EStreamLookupStrategy::LOOKUP: { + YQL_ENSURE(inputItemType->GetKind() == ETypeAnnotationKind::Struct); + const auto& lookupKeyColumns = inputItemType->Cast<TStructExprType>()->GetItems(); + for (const auto keyColumn : lookupKeyColumns) { + YQL_ENSURE(tableMeta->Columns.FindPtr(keyColumn->GetName()), + "Unknown column: " << keyColumn->GetName()); + streamLookupProto.AddKeyColumns(TString(keyColumn->GetName())); + } + + YQL_ENSURE(resultItemType->GetKind() == ETypeAnnotationKind::Struct); + const auto& resultColumns = resultItemType->Cast<TStructExprType>()->GetItems(); + for (const auto column : resultColumns) { + const auto &systemColumns = GetSystemColumns(); + YQL_ENSURE(tableMeta->Columns.FindPtr(column->GetName()) + || systemColumns.find(column->GetName()) != systemColumns.end(), + "Unknown column: " << column->GetName()); + streamLookupProto.AddColumns(TString(column->GetName())); + } + + break; + } + case NKqpProto::EStreamLookupStrategy::JOIN: { + YQL_ENSURE(inputItemType->GetKind() == ETypeAnnotationKind::Tuple); + const auto inputTupleType = inputItemType->Cast<TTupleExprType>(); + YQL_ENSURE(inputTupleType->GetSize() == 2); + + YQL_ENSURE(inputTupleType->GetItems()[0]->GetKind() == ETypeAnnotationKind::Struct); + const auto& joinKeyColumns = inputTupleType->GetItems()[0]->Cast<TStructExprType>()->GetItems(); + for (const auto keyColumn : joinKeyColumns) { + YQL_ENSURE(tableMeta->Columns.FindPtr(keyColumn->GetName()), + "Unknown column: " << keyColumn->GetName()); + streamLookupProto.AddKeyColumns(TString(keyColumn->GetName())); + } + + YQL_ENSURE(resultItemType->GetKind() == ETypeAnnotationKind::Tuple); + const auto resultTupleType = resultItemType->Cast<TTupleExprType>(); + YQL_ENSURE(resultTupleType->GetSize() == 2); + + YQL_ENSURE(resultTupleType->GetItems()[1]->GetKind() == ETypeAnnotationKind::Optional); + auto rightRowOptionalType = resultTupleType->GetItems()[1]->Cast<TOptionalExprType>()->GetItemType(); + YQL_ENSURE(rightRowOptionalType->GetKind() == ETypeAnnotationKind::Struct); + const auto& rightColumns = rightRowOptionalType->Cast<TStructExprType>()->GetItems(); + for (const auto column : rightColumns) { + const auto& systemColumns = GetSystemColumns(); + YQL_ENSURE(tableMeta->Columns.FindPtr(column->GetName()) + || systemColumns.find(column->GetName()) != systemColumns.end(), + "Unknown column: " << column->GetName()); + streamLookupProto.AddColumns(TString(column->GetName())); + } + + break; + } + default: + YQL_ENSURE(false, "Unexpected lookup strategy for stream lookup: " << lookupStrategy); } return; diff --git a/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt index 6f6ea6b4954..e8854f07a98 100644 --- a/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/runtime/CMakeLists.darwin-x86_64.txt @@ -54,6 +54,7 @@ target_sources(core-kqp-runtime PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp ) diff --git a/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt index 8cd64049312..8a8cb70240a 100644 --- a/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/runtime/CMakeLists.linux-aarch64.txt @@ -55,6 +55,7 @@ target_sources(core-kqp-runtime PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp ) diff --git a/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt index 8cd64049312..8a8cb70240a 100644 --- a/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/runtime/CMakeLists.linux-x86_64.txt @@ -55,6 +55,7 @@ target_sources(core-kqp-runtime PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp ) diff --git a/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt index 6f6ea6b4954..e8854f07a98 100644 --- a/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/runtime/CMakeLists.windows-x86_64.txt @@ -54,6 +54,7 @@ target_sources(core-kqp-runtime PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_spilling_file.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_tasks_runner.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/runtime/kqp_transport.cpp ) diff --git a/ydb/core/kqp/runtime/kqp_compute.cpp b/ydb/core/kqp/runtime/kqp_compute.cpp index bb973df45ec..b2ea8ba2cb3 100644 --- a/ydb/core/kqp/runtime/kqp_compute.cpp +++ b/ydb/core/kqp/runtime/kqp_compute.cpp @@ -2,7 +2,7 @@ #include <ydb/library/yql/minikql/computation/mkql_computation_node_codegen.h> #include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> -#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_node_cast.h> #include <ydb/library/yql/minikql/mkql_program_builder.h> #include <ydb/library/yql/public/udf/udf_terminator.h> #include <ydb/library/yql/public/udf/udf_type_builder.h> @@ -83,6 +83,137 @@ private: IComputationNode* const Message; }; +class TKqpIndexLookupJoinWrapper : public TMutableComputationNode<TKqpIndexLookupJoinWrapper> { +public: + class TStreamValue : public TComputationValue<TStreamValue> { + public: + TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream, TComputationContext& ctx, + const TKqpIndexLookupJoinWrapper* self) + : TComputationValue<TStreamValue>(memInfo) + , Stream(std::move(stream)) + , Self(self) + , Ctx(ctx) { + } + + private: + enum class EOutputMode { + OnlyLeftRow, + Both + }; + + NUdf::TUnboxedValue FillResultItems(NUdf::TUnboxedValue leftRow, NUdf::TUnboxedValue rightRow, EOutputMode mode) { + auto resultRowSize = (mode == EOutputMode::OnlyLeftRow) ? Self->LeftColumnsCount + : Self->LeftColumnsCount + Self->RightColumnsCount; + auto resultRow = Self->ResultRowCache.NewArray(Ctx, resultRowSize, ResultItems); + + size_t resIdx = 0; + + if (mode == EOutputMode::OnlyLeftRow || mode == EOutputMode::Both) { + for (size_t i = 0; i < Self->LeftColumnsCount; ++i) { + ResultItems[resIdx++] = std::move(leftRow.GetElement(i)); + } + } + + if (mode == EOutputMode::Both) { + if (rightRow.HasValue()) { + for (size_t i = 0; i < Self->RightColumnsCount; ++i) { + ResultItems[resIdx++] = std::move(rightRow.GetElement(i)); + } + } else { + for (size_t i = 0; i < Self->RightColumnsCount; ++i) { + ResultItems[resIdx++] = NUdf::TUnboxedValuePod(); + } + } + } + + return resultRow; + } + + bool TryBuildResultRow(NUdf::TUnboxedValue inputRow, NUdf::TUnboxedValue& result) { + auto leftRow = inputRow.GetElement(0); + auto rightRow = inputRow.GetElement(1); + + bool ok = true; + switch (Self->JoinType) { + case EJoinKind::Inner: { + if (!rightRow.HasValue()) { + ok = false; + break; + } + + result = FillResultItems(std::move(leftRow), std::move(rightRow), EOutputMode::Both); + break; + } + case EJoinKind::Left: { + result = FillResultItems(std::move(leftRow), std::move(rightRow), EOutputMode::Both); + break; + } + case EJoinKind::LeftOnly: { + if (rightRow.HasValue()) { + ok = false; + break; + } + + result = FillResultItems(std::move(leftRow), std::move(rightRow), EOutputMode::OnlyLeftRow); + break; + } + default: + MKQL_ENSURE(false, "Unsupported join kind"); + } + + return ok; + } + + NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& result) override { + NUdf::TUnboxedValue row; + NUdf::EFetchStatus status = Stream.Fetch(row); + + while (status == NUdf::EFetchStatus::Ok) { + if (TryBuildResultRow(std::move(row), result)) { + break; + } + + status = Stream.Fetch(row); + } + + return status; + } + + private: + NUdf::TUnboxedValue Stream; + const TKqpIndexLookupJoinWrapper* Self; + TComputationContext& Ctx; + NUdf::TUnboxedValue* ResultItems = nullptr; + }; + +public: + TKqpIndexLookupJoinWrapper(TComputationMutables& mutables, IComputationNode* inputNode, + EJoinKind joinType, ui64 leftColumnsCount, ui64 rightColumnsCount) + : TMutableComputationNode<TKqpIndexLookupJoinWrapper>(mutables) + , InputNode(inputNode) + , JoinType(joinType) + , LeftColumnsCount(leftColumnsCount) + , RightColumnsCount(rightColumnsCount) + , ResultRowCache(mutables) { + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TStreamValue>(InputNode->GetValue(ctx), ctx, this); + } + +private: + void RegisterDependencies() const final { + this->DependsOn(InputNode); + } + +private: + IComputationNode* InputNode; + const EJoinKind JoinType; + const ui64 LeftColumnsCount; + const ui64 RightColumnsCount; + const TContainerCacheOnContext ResultRowCache; +}; + } // namespace IComputationNode* WrapKqpEnsure(TCallable& callable, const TComputationNodeFactoryContext& ctx) { @@ -99,5 +230,16 @@ IComputationNode* WrapKqpEnsure(TCallable& callable, const TComputationNodeFacto return new TKqpEnsureWrapper(ctx.Mutables, value, predicate, issueCode, message); } +IComputationNode* WrapKqpIndexLookupJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); + + auto inputNode = LocateNode(ctx.NodeLocator, callable, 0); + ui32 joinKind = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>(); + ui64 leftColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().Get<ui64>(); + ui64 rightColumnsCount = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui64>(); + + return new TKqpIndexLookupJoinWrapper(ctx.Mutables, inputNode, GetJoinKind(joinKind), leftColumnsCount, rightColumnsCount); +} + } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_compute.h b/ydb/core/kqp/runtime/kqp_compute.h index b3cfc21d2e6..9065325a276 100644 --- a/ydb/core/kqp/runtime/kqp_compute.h +++ b/ydb/core/kqp/runtime/kqp_compute.h @@ -48,6 +48,7 @@ private: }; IComputationNode* WrapKqpEnsure(TCallable& callable, const TComputationNodeFactoryContext& ctx); +IComputationNode* WrapKqpIndexLookupJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx); } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp index 2cf24451483..43fa17bfb3f 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.cpp +++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp @@ -143,6 +143,22 @@ TType* MakeBlockType(TProgramBuilder& builder, TStructType* rowType) { return builder.NewBlockType(builder.NewTupleType(tupleItems), TBlockType::EShape::Many); } +EJoinKind GetIndexLookupJoinKind(const TString& joinKind) { + if (joinKind == "Inner") { + return EJoinKind::Inner; + } else if (joinKind == "Left") { + return EJoinKind::Left; + } else if (joinKind == "LeftOnly") { + return EJoinKind::LeftOnly; + } else if (joinKind == "RightSemi") { + return EJoinKind::RightSemi; + } else if (joinKind == "LeftSemi") { + return EJoinKind::LeftSemi; + } else { + MKQL_ENSURE_S(false, "Unexpected join kind: " << joinKind); + } +} + } // namespace TKqpProgramBuilder::TKqpProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry) @@ -314,5 +330,38 @@ TRuntimeNode TKqpProgramBuilder::KqpEnsure(TRuntimeNode value, TRuntimeNode pred return TRuntimeNode(callableBuilder.Build(), false); } +TRuntimeNode TKqpProgramBuilder::KqpIndexLookupJoin(const TRuntimeNode& input, const TString& joinType, + const TString& leftLabel, const TString& rightLabel) { + + auto inputRowItems = AS_TYPE(TTupleType, AS_TYPE(TStreamType, input.GetStaticType())->GetItemType()); + MKQL_ENSURE(inputRowItems->GetElementsCount() == 2, "Expected 2 elements"); + + auto leftRowType = AS_TYPE(TStructType, inputRowItems->GetElementType(0)); + auto rightRowType = AS_TYPE(TStructType, AS_TYPE(TOptionalType, inputRowItems->GetElementType(1))->GetItemType()); + + TStructTypeBuilder rowTypeBuilder(GetTypeEnvironment()); + + for (ui32 i = 0; i < leftRowType->GetMembersCount(); ++i) { + TString newMemberName = leftLabel.empty() ? TString(leftRowType->GetMemberName(i)) + : TString::Join(leftLabel, ".", leftRowType->GetMemberName(i)); + rowTypeBuilder.Add(newMemberName, leftRowType->GetMemberType(i)); + } + + for (ui32 i = 0; i < rightRowType->GetMembersCount(); ++i) { + TString newMemberName = rightLabel.empty() ? TString(rightRowType->GetMemberName(i)) + : TString::Join(rightLabel, ".", rightRowType->GetMemberName(i)); + rowTypeBuilder.Add(newMemberName, rightRowType->GetMemberType(i)); + } + + auto returnType = NewStreamType(rowTypeBuilder.Build()); + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(input); + callableBuilder.Add(NewDataLiteral<ui32>((ui32)GetIndexLookupJoinKind(joinType))); + callableBuilder.Add(NewDataLiteral<ui64>(leftRowType->GetMembersCount())); + callableBuilder.Add(NewDataLiteral<ui64>(rightRowType->GetMembersCount())); + return TRuntimeNode(callableBuilder.Build(), false); +} + } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_program_builder.h b/ydb/core/kqp/runtime/kqp_program_builder.h index a26938b8db3..3c4825001f0 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.h +++ b/ydb/core/kqp/runtime/kqp_program_builder.h @@ -69,6 +69,8 @@ public: TRuntimeNode KqpEffects(const TArrayRef<const TRuntimeNode>& effects); TRuntimeNode KqpEnsure(TRuntimeNode value, TRuntimeNode predicate, TRuntimeNode issueCode, TRuntimeNode message); + + TRuntimeNode KqpIndexLookupJoin(const TRuntimeNode& input, const TString& joinType, const TString& leftLabel, const TString& rightLabel); }; } // namespace NMiniKQL diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index f35e58bda98..e343ce9ef0d 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -7,13 +7,12 @@ #include <ydb/core/engine/minikql/minikql_engine_host.h> #include <ydb/core/kqp/common/kqp_resolve.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> -#include <ydb/core/protos/kqp.pb.h> #include <ydb/core/protos/kqp_stats.pb.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> -#include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/kqp/common/kqp_event_ids.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/core/kqp/runtime/kqp_scan_data.h> +#include <ydb/core/kqp/runtime/kqp_stream_lookup_worker.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h> namespace NKikimr { @@ -31,50 +30,24 @@ public: std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) : LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId) - , InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv) - , HolderFactory(holderFactory), Alloc(alloc), TablePath(settings.GetTable().GetPath()) - , TableId(MakeTableId(settings.GetTable())) + , InputIndex(inputIndex) + , Input(input) + , ComputeActorId(computeActorId) + , TypeEnv(typeEnv) + , Alloc(alloc) , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>()) , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) + , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory)) , Counters(counters) { - KeyColumns.reserve(settings.GetKeyColumns().size()); - i32 keyOrder = 0; - for (const auto& keyColumn : settings.GetKeyColumns()) { - KeyColumns.emplace( - keyColumn.GetName(), - TSysTables::TTableColumnInfo{ - keyColumn.GetName(), - keyColumn.GetId(), - NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(keyColumn.GetTypeId())}, - "", - keyOrder++ - } - ); - } - - LookupKeyColumns.reserve(KeyColumns.size()); - for (const auto& lookupKeyColumn : settings.GetLookupKeyColumns()) { - auto columnIt = KeyColumns.find(lookupKeyColumn); - YQL_ENSURE(columnIt != KeyColumns.end()); - LookupKeyColumns.push_back(&columnIt->second); - } - - Columns.reserve(settings.GetColumns().size()); - for (const auto& column : settings.GetColumns()) { - Columns.emplace_back(TSysTables::TTableColumnInfo{ - column.GetName(), - column.GetId(), - NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(column.GetTypeId())} - }); - } }; virtual ~TKqpStreamLookupActor() { - if (Input.HasValue() && Alloc) { + if (Alloc) { TGuard<NMiniKQL::TScopedAlloc> allocGuard(*Alloc); Input.Clear(); + StreamLookupWorker.reset(); } } @@ -94,14 +67,14 @@ public: if (last) { NYql::NDqProto::TDqTableStats* tableStats = nullptr; for (auto& table : *stats->MutableTables()) { - if (table.GetTablePath() == TablePath) { + if (table.GetTablePath() == StreamLookupWorker->GetTablePath()) { tableStats = &table; } } if (!tableStats) { tableStats = stats->AddTables(); - tableStats->SetTablePath(TablePath); + tableStats->SetTablePath(StreamLookupWorker->GetTablePath()); } // TODO: use evread statistics after KIKIMR-16924 @@ -135,14 +108,12 @@ private: } struct TReadState { - TReadState(ui64 id, ui64 shardId, std::vector<TOwnedTableRange>&& keys) + TReadState(ui64 id, ui64 shardId) : Id(id) , ShardId(shardId) - , Keys(std::move(keys)) , State(EReadState::Initial) {} void SetFinished() { - Keys.clear(); State = EReadState::Finished; } @@ -152,8 +123,9 @@ private: const ui64 Id; const ui64 ShardId; - std::vector<TOwnedTableRange> Keys; EReadState State; + TMaybe<TOwnedCellVec> LastProcessedKey; + ui32 FirstUnprocessedQuery = 0; }; struct TShardState { @@ -161,12 +133,6 @@ private: std::vector<TReadState*> Reads; }; - struct TResult { - const ui64 ShardId; - THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult; - size_t UnprocessedResultRow = 0; - }; - struct TEvPrivate { enum EEv { EvRetryReadTimeout = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), @@ -191,6 +157,7 @@ private: { auto alloc = BindAllocator(); Input.Clear(); + StreamLookupWorker.reset(); for (auto& [id, state] : Reads) { Counters->SentIteratorCancels->Inc(); auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); @@ -204,24 +171,26 @@ private: } i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { - i64 totalDataSize = 0; - YQL_ENSURE(!batch.IsWide(), "Wide stream is not supported"); - totalDataSize = PackResults(batch, freeSpace); - auto status = FetchLookupKeys(); + auto replyResultStats = StreamLookupWorker->ReplyResult(batch, freeSpace); + ReadRowsCount += replyResultStats.RowsCount; + ReadBytesCount += replyResultStats.BytesCount; + + auto status = FetchInputRows(); if (Partitioning) { - ProcessLookupKeys(); + ProcessInputRows(); } finished = (status == NUdf::EFetchStatus::Finish) - && UnprocessedKeys.empty() && AllReadsFinished() - && Results.empty(); + && StreamLookupWorker->AllRowsProcessed(); + + CA_LOG_D("Returned " << replyResultStats.BytesCount << " bytes, " << replyResultStats.RowsCount + << " rows, finished: " << finished); - CA_LOG_D("Returned " << totalDataSize << " bytes, finished: " << finished); - return totalDataSize; + return replyResultStats.BytesCount; } TMaybe<google::protobuf::Any> ExtraData() override { @@ -257,23 +226,23 @@ private: } void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { - CA_LOG_D("TEvResolveKeySetResult was received for table: " << TablePath); + CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath()); if (ev->Get()->Request->ErrorCount > 0) { - return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " << TableId, - NYql::NDqProto::StatusIds::SCHEME_ERROR); + return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " + << StreamLookupWorker->GetTablePath(), NYql::NDqProto::StatusIds::SCHEME_ERROR); } auto& resultSet = ev->Get()->Request->ResultSet; YQL_ENSURE(resultSet.size() == 1, "Expected one result for range [NULL, +inf)"); Partitioning = resultSet[0].KeyDescription->Partitioning; - ProcessLookupKeys(); + ProcessInputRows(); } void Handle(TEvDataShard::TEvReadResult::TPtr& ev) { const auto& record = ev->Get()->Record; - CA_LOG_D("TEvReadResult was received for table: " << TablePath << + CA_LOG_D("TEvReadResult was received for table: " << StreamLookupWorker->GetTablePath() << ", readId: " << record.GetReadId() << ", finished: " << record.GetFinished()); auto readIt = Reads.find(record.GetReadId()); @@ -303,13 +272,7 @@ private: case Ydb::StatusIds::NOT_FOUND: case Ydb::StatusIds::OVERLOADED: case Ydb::StatusIds::INTERNAL_ERROR: { - TMaybe<NKikimrTxDataShard::TReadContinuationToken> continuationToken; - if (record.HasContinuationToken()) { - bool parseResult = continuationToken->ParseFromString(record.GetContinuationToken()); - YQL_ENSURE(parseResult, "Failed to parse continuation token"); - } - - return RetryTableRead(read, continuationToken); + return RetryTableRead(read); } default: { NYql::TIssues issues; @@ -321,6 +284,17 @@ private: if (record.GetFinished()) { read.SetFinished(); } else { + YQL_ENSURE(record.HasContinuationToken(), "Successful TEvReadResult should contain continuation token"); + NKikimrTxDataShard::TReadContinuationToken continuationToken; + bool parseResult = continuationToken.ParseFromString(record.GetContinuationToken()); + YQL_ENSURE(parseResult, "Failed to parse continuation token"); + read.FirstUnprocessedQuery = continuationToken.GetFirstUnprocessedQuery(); + + if (continuationToken.HasLastProcessedKey()) { + TSerializedCellVec lastKey(continuationToken.GetLastProcessedKey()); + read.LastProcessedKey = TOwnedCellVec(lastKey.GetCells()); + } + Counters->SentIteratorAcks->Inc(); THolder<TEvDataShard::TEvReadAck> request(new TEvDataShard::TEvReadAck()); request->Record.SetReadId(record.GetReadId()); @@ -333,7 +307,9 @@ private: CA_LOG_D("TEvReadAck was sent to shard: " << read.ShardId); } - Results.emplace_back(TResult{read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())}); + StreamLookupWorker->AddResult(TKqpStreamLookupWorker::TShardReadResult{ + read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release()) + }); Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } @@ -347,10 +323,7 @@ private: for (auto* read : shardIt->second.Reads) { if (read->State == EReadState::Running) { Counters->IteratorDeliveryProblems->Inc(); - for (auto& key : read->Keys) { - UnprocessedKeys.emplace_back(std::move(key)); - } - + StreamLookupWorker->ResetRowsProcessing(read->Id, read->FirstUnprocessedQuery, read->LastProcessedKey); read->SetFinished(); } } @@ -359,170 +332,47 @@ private: } void Handle(TEvPrivate::TEvSchemeCacheRequestTimeout::TPtr&) { - CA_LOG_D("TEvSchemeCacheRequestTimeout was received, shards for table " << TablePath + CA_LOG_D("TEvSchemeCacheRequestTimeout was received, shards for table " << StreamLookupWorker->GetTablePath() << " was resolved: " << !!Partitioning); if (!Partitioning) { - RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << TableId + RuntimeError(TStringBuilder() << "Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath() << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT); } } - ui64 PackResults(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) { - i64 totalSize = 0; - bool sizeLimitExceeded = false; - batch.clear(); - - while (!Results.empty() && !sizeLimitExceeded) { - auto& result = Results.front(); - for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { - const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow); - YQL_ENSURE(resultRow.size() <= Columns.size(), "Result columns mismatch"); - - NUdf::TUnboxedValue* rowItems = nullptr; - auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); - - i64 rowSize = 0; - for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) { - const auto& column = Columns[colIndex]; - if (IsSystemColumn(column.Name)) { - NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType); - rowSize += sizeof(NUdf::TUnboxedValue); - } else { - YQL_ENSURE(resultColIndex < resultRow.size()); - rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType); - rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes; - ++resultColIndex; - } - } - - if (totalSize + rowSize > freeSpace) { - row.DeleteUnreferenced(); - sizeLimitExceeded = true; - break; - } - - batch.push_back(std::move(row)); - ++ReadRowsCount; - ReadBytesCount += rowSize; - totalSize += rowSize; - } - - if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) { - Results.pop_front(); - } - } - - CA_LOG_D("Total batch size: " << totalSize << ", size limit exceeded: " << sizeLimitExceeded); - return totalSize; - } - - NUdf::EFetchStatus FetchLookupKeys() { - YQL_ENSURE(LookupKeyColumns.size() <= KeyColumns.size()); + NUdf::EFetchStatus FetchInputRows() { + auto guard = BindAllocator(); NUdf::EFetchStatus status; - NUdf::TUnboxedValue key; - while ((status = Input.Fetch(key)) == NUdf::EFetchStatus::Ok) { - std::vector<TCell> keyCells(LookupKeyColumns.size()); - for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) { - const auto* lookupKeyColumn = LookupKeyColumns[colId]; - YQL_ENSURE(lookupKeyColumn->KeyOrder < static_cast<i64>(keyCells.size())); - keyCells[lookupKeyColumn->KeyOrder] = MakeCell(lookupKeyColumn->PType, - key.GetElement(colId), TypeEnv, /* copy */ true); - } - - UnprocessedKeys.emplace_back(std::move(keyCells)); + NUdf::TUnboxedValue row; + while ((status = Input.Fetch(row)) == NUdf::EFetchStatus::Ok) { + StreamLookupWorker->AddInputRow(std::move(row)); } return status; } - void ProcessLookupKeys() { + void ProcessInputRows() { YQL_ENSURE(Partitioning, "Table partitioning should be initialized before lookup keys processing"); - std::unordered_map<ui64, std::vector<TOwnedTableRange>> shardKeys; - for (; !UnprocessedKeys.empty(); UnprocessedKeys.pop_front()) { - const auto& key = UnprocessedKeys.front(); - YQL_ENSURE(key.Point); - - std::vector<ui64> shardIds; - if (LookupKeyColumns.size() < KeyColumns.size()) { - /* build range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) */ - std::vector<TCell> fromCells(KeyColumns.size()); - fromCells.insert(fromCells.begin(), key.From.begin(), key.From.end()); - std::vector<TCell> toCells(key.From.begin(), key.From.end()); - - shardIds = GetRangePartitioning(TOwnedTableRange{std::move(fromCells), /* inclusiveFrom */ true, - std::move(toCells), /* inclusiveTo */ false}); - } else { - shardIds = GetRangePartitioning(key); - } + auto guard = BindAllocator(); - for (auto shardId : shardIds) { - shardKeys[shardId].emplace_back(std::move(key)); - } - } - - for (auto& [shardId, keys] : shardKeys) { - StartTableRead(shardId, std::move(keys)); + auto requests = StreamLookupWorker->BuildRequests(Partitioning, ReadId); + for (auto& [shardId, request] : requests) { + StartTableRead(shardId, std::move(request)); } } - std::vector<ui64> GetRangePartitioning(const TOwnedTableRange& range) { - YQL_ENSURE(Partitioning); - - std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size()); - for (const auto& [_, columnInfo] : KeyColumns) { - YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(keyColumnTypes.size())); - keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType; - } - - auto it = LowerBound(Partitioning->begin(), Partitioning->end(), /* value */ true, - [&](const auto& partition, bool) { - const int result = CompareBorders<true, false>( - partition.Range->EndKeyPrefix.GetCells(), range.From, - partition.Range->IsInclusive || partition.Range->IsPoint, - range.InclusiveFrom || range.Point, keyColumnTypes - ); - - return (result < 0); - } - ); - - YQL_ENSURE(it != Partitioning->end()); - - std::vector<ui64> rangePartitions; - for (; it != Partitioning->end(); ++it) { - rangePartitions.push_back(it->ShardId); - - if (range.Point) { - break; - } - - auto cmp = CompareBorders<true, true>( - it->Range->EndKeyPrefix.GetCells(), range.To, - it->Range->IsInclusive || it->Range->IsPoint, - range.InclusiveTo || range.Point, keyColumnTypes - ); - - if (cmp >= 0) { - break; - } - } - - return rangePartitions; - } - - TReadState& StartTableRead(ui64 shardId, std::vector<TOwnedTableRange>&& keys) { - const auto readId = GetNextReadId(); - TReadState read(readId, shardId, std::move(keys)); - - CA_LOG_D("Start reading of table: " << TablePath << ", readId: " << readId << ", shardId: " << shardId); - + TReadState& StartTableRead(ui64 shardId, THolder<TEvDataShard::TEvRead> request) { Counters->CreatedIterators->Inc(); - THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); auto& record = request->Record; + CA_LOG_D("Start reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << record.GetReadId() + << ", shardId: " << shardId); + + TReadState read(record.GetReadId(), shardId); + YQL_ENSURE(Snapshot.IsValid(), "Invalid snapshot value"); record.MutableSnapshot()->SetStep(Snapshot.Step); record.MutableSnapshot()->SetTxId(Snapshot.TxId); @@ -531,48 +381,27 @@ private: record.SetLockTxId(*LockTxId); } - record.SetReadId(read.Id); record.SetMaxRows(Max<ui16>()); record.SetMaxBytes(5_MB); record.SetResultFormat(NKikimrTxDataShard::EScanDataFormat::CELLVEC); - record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId); - record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId); - record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion); - - for (const auto& column : Columns) { - if (!IsSystemColumn(column.Name)) { - record.AddColumns(column.Id); - } - } - - for (auto& key : read.Keys) { - YQL_ENSURE(key.Point); - request->Keys.emplace_back(key.From); - } - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), shardId, true), IEventHandle::FlagTrackDelivery); read.State = EReadState::Running; - const auto [readIt, succeeded] = Reads.insert({readId, std::move(read)}); + const auto [readIt, succeeded] = Reads.insert({read.Id, std::move(read)}); YQL_ENSURE(succeeded); ReadsPerShard[shardId].Reads.push_back(&readIt->second); return readIt->second; } - void RetryTableRead(TReadState& failedRead, TMaybe<NKikimrTxDataShard::TReadContinuationToken>& token) { - CA_LOG_D("Retry reading of table: " << TablePath << ", readId: " << failedRead.Id + void RetryTableRead(TReadState& failedRead) { + CA_LOG_D("Retry reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << failedRead.Id << ", shardId: " << failedRead.ShardId); - size_t firstUnprocessedQuery = token ? token->GetFirstUnprocessedQuery() : 0; - YQL_ENSURE(firstUnprocessedQuery <= failedRead.Keys.size()); - for (ui64 idx = firstUnprocessedQuery; idx < failedRead.Keys.size(); ++idx) { - UnprocessedKeys.emplace_back(std::move(failedRead.Keys[idx])); - } - + StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey); failedRead.SetFinished(); auto& shardState = ReadsPerShard[failedRead.ShardId]; @@ -586,26 +415,23 @@ private: } void ResolveTableShards() { - CA_LOG_D("Resolve shards for table: " << TablePath); + CA_LOG_D("Resolve shards for table: " << StreamLookupWorker->GetTablePath()); Partitioning.reset(); auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>(); - TVector<TCell> minusInf(KeyColumns.size()); + auto keyColumnTypes = StreamLookupWorker->GetKeyColumnTypes(); + + TVector<TCell> minusInf(keyColumnTypes.size()); TVector<TCell> plusInf; TTableRange range(minusInf, true, plusInf, true, false); - std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size()); - for (const auto& [_, columnInfo] : KeyColumns) { - keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType; - } - - request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(TableId, range, TKeyDesc::ERowOperation::Read, + request->ResultSet.emplace_back(MakeHolder<TKeyDesc>(StreamLookupWorker->GetTableId(), range, TKeyDesc::ERowOperation::Read, keyColumnTypes, TVector<TKeyDesc::TColumnOp>{})); Counters->IteratorsShardResolve->Inc(); - Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); SchemeCacheRequestTimeoutTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), SchemeCacheRequestTimeout, @@ -622,10 +448,6 @@ private: return true; } - ui64 GetNextReadId() { - return ++ReadId; - } - TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() { return TypeEnv.BindAllocator(); } @@ -647,32 +469,24 @@ private: NUdf::TUnboxedValue Input; const NActors::TActorId ComputeActorId; const NMiniKQL::TTypeEnvironment& TypeEnv; - const NMiniKQL::THolderFactory& HolderFactory; std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; - const TString TablePath; - const TTableId TableId; IKqpGateway::TKqpSnapshot Snapshot; const TMaybe<ui64> LockTxId; - std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns; - std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns; - std::vector<TSysTables::TTableColumnInfo> Columns; - std::deque<TResult> Results; std::unordered_map<ui64, TReadState> Reads; std::unordered_map<ui64, TShardState> ReadsPerShard; std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning; - std::deque<TOwnedTableRange> UnprocessedKeys; const TDuration SchemeCacheRequestTimeout; NActors::TActorId SchemeCacheRequestTimeoutTimer; TVector<NKikimrTxDataShard::TLock> Locks; TVector<NKikimrTxDataShard::TLock> BrokenLocks; + std::unique_ptr<TKqpStreamLookupWorker> StreamLookupWorker; + ui64 ReadId = 0; // stats ui64 ReadRowsCount = 0; ui64 ReadBytesCount = 0; TIntrusivePtr<TKqpCounters> Counters; - - ui64 ReadId = 0; }; } // namespace diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp new file mode 100644 index 00000000000..62613dc376c --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -0,0 +1,761 @@ +#include "kqp_stream_lookup_worker.h" + +#include <ydb/core/kqp/common/kqp_resolve.h> +#include <ydb/core/kqp/runtime/kqp_scan_data.h> +#include <ydb/core/tx/datashard/range_ops.h> + +namespace NKikimr { +namespace NKqp { + +namespace { +std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpStreamLookupWorker::TPartitionInfo& partitionInfo, + const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TOwnedTableRange& range) { + + YQL_ENSURE(partitionInfo); + + std::vector<TCell> minusInf(keyColumnTypes.size()); + + std::vector<std::pair<ui64, TOwnedTableRange>> rangePartition; + for (size_t idx = 0; idx < partitionInfo->size(); ++idx) { + TTableRange partitionRange{ + idx == 0 ? minusInf : (*partitionInfo)[idx - 1].Range->EndKeyPrefix.GetCells(), + idx == 0 ? true : !(*partitionInfo)[idx - 1].Range->IsInclusive, + (*partitionInfo)[idx].Range->EndKeyPrefix.GetCells(), + (*partitionInfo)[idx].Range->IsInclusive + }; + + if (range.Point) { + int intersection = ComparePointAndRange( + range.From, + partitionRange, + keyColumnTypes, + keyColumnTypes); + + if (intersection == 0) { + rangePartition.emplace_back((*partitionInfo)[idx].ShardId, range); + } else if (intersection < 0) { + break; + } + } else { + int intersection = CompareRanges(range, partitionRange, keyColumnTypes); + + if (intersection == 0) { + auto rangeIntersection = Intersect(keyColumnTypes, range, partitionRange); + rangePartition.emplace_back((*partitionInfo)[idx].ShardId, rangeIntersection); + } else if (intersection < 0) { + break; + } + } + } + + return rangePartition; +} + +struct THashableKey { + TConstArrayRef<TCell> Cells; + + template <typename H> + friend H AbslHashValue(H h, const THashableKey& key) { + h = H::combine(std::move(h), key.Cells.size()); + for (const TCell& cell : key.Cells) { + h = H::combine(std::move(h), cell.IsNull()); + if (!cell.IsNull()) { + h = H::combine(std::move(h), cell.Size()); + h = H::combine_contiguous(std::move(h), cell.Data(), cell.Size()); + } + } + return h; + } +}; + +struct TKeyHash { + using is_transparent = void; + + bool operator()(TConstArrayRef<TCell> key) const { + return absl::Hash<THashableKey>()(THashableKey{ key }); + } +}; + +struct TKeyEq { + using is_transparent = void; + + bool operator()(TConstArrayRef<TCell> a, TConstArrayRef<TCell> b) const { + if (a.size() != b.size()) { + return false; + } + + const TCell* pa = a.data(); + const TCell* pb = b.data(); + if (pa == pb) { + return true; + } + + size_t left = a.size(); + while (left > 0) { + if (pa->IsNull()) { + if (!pb->IsNull()) { + return false; + } + } else { + if (pb->IsNull()) { + return false; + } + if (pa->Size() != pb->Size()) { + return false; + } + if (pa->Size() > 0 && ::memcmp(pa->Data(), pb->Data(), pa->Size()) != 0) { + return false; + } + } + ++pa; + ++pb; + --left; + } + + return true; + } +}; +} // !namespace + +TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, + const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory) + : TypeEnv(typeEnv) + , HolderFactory(holderFactory) + , TablePath(settings.GetTable().GetPath()) + , TableId(MakeTableId(settings.GetTable())) { + + KeyColumns.reserve(settings.GetKeyColumns().size()); + i32 keyOrder = 0; + for (const auto& keyColumn : settings.GetKeyColumns()) { + KeyColumns.emplace( + keyColumn.GetName(), + TSysTables::TTableColumnInfo{ + keyColumn.GetName(), + keyColumn.GetId(), + NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(keyColumn.GetTypeId())}, + "", + keyOrder++ + } + ); + } + + LookupKeyColumns.reserve(settings.GetLookupKeyColumns().size()); + for (const auto& lookupKey : settings.GetLookupKeyColumns()) { + auto columnIt = KeyColumns.find(lookupKey); + YQL_ENSURE(columnIt != KeyColumns.end()); + LookupKeyColumns.push_back(&columnIt->second); + } + + Columns.reserve(settings.GetColumns().size()); + for (const auto& column : settings.GetColumns()) { + Columns.emplace_back(TSysTables::TTableColumnInfo{ + column.GetName(), + column.GetId(), + NScheme::TTypeInfo{static_cast<NScheme::TTypeId>(column.GetTypeId())} + }); + } +} + +TKqpStreamLookupWorker::~TKqpStreamLookupWorker() { +} + +std::string TKqpStreamLookupWorker::GetTablePath() const { + return TablePath; +} + +TTableId TKqpStreamLookupWorker::GetTableId() const { + return TableId; +} + +std::vector<NScheme::TTypeInfo> TKqpStreamLookupWorker::GetKeyColumnTypes() const { + std::vector<NScheme::TTypeInfo> keyColumnTypes(KeyColumns.size()); + for (const auto& [_, columnInfo] : KeyColumns) { + YQL_ENSURE(columnInfo.KeyOrder < static_cast<i64>(keyColumnTypes.size())); + keyColumnTypes[columnInfo.KeyOrder] = columnInfo.PType; + } + + return keyColumnTypes; +} + +class TKqpLookupRows : public TKqpStreamLookupWorker { +public: + TKqpLookupRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, + const NMiniKQL::THolderFactory& holderFactory) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory) { + } + + virtual ~TKqpLookupRows() {} + + void AddInputRow(NUdf::TUnboxedValue inputRow) final { + std::vector<TCell> keyCells(LookupKeyColumns.size()); + for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) { + const auto* lookupKeyColumn = LookupKeyColumns[colId]; + YQL_ENSURE(lookupKeyColumn->KeyOrder < static_cast<i64>(keyCells.size())); + keyCells[lookupKeyColumn->KeyOrder] = MakeCell(lookupKeyColumn->PType, + inputRow.GetElement(colId), TypeEnv, /* copy */ true); + } + + if (keyCells.size() < KeyColumns.size()) { + // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) + std::vector<TCell> fromCells(KeyColumns.size()); + for (size_t i = 0; i < keyCells.size(); ++i) { + fromCells[i] = keyCells[i]; + } + + bool fromInclusive = true; + bool toInclusive = false; + + UnprocessedKeys.emplace_back(fromCells, fromInclusive, keyCells, toInclusive); + } else { + // full pk, build point + UnprocessedKeys.emplace_back(std::move(keyCells)); + } + } + + TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) final { + YQL_ENSURE(partitioning); + + std::unordered_map<ui64, std::vector<TOwnedTableRange>> rangesPerShard; + std::unordered_map<ui64, std::vector<TOwnedTableRange>> pointsPerShard; + + while (!UnprocessedKeys.empty()) { + auto range = std::move(UnprocessedKeys.front()); + UnprocessedKeys.pop_front(); + + auto partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), range); + for (auto [shardId, range] : partitions) { + if (range.Point) { + pointsPerShard[shardId].push_back(std::move(range)); + } else { + rangesPerShard[shardId].push_back(std::move(range)); + } + } + } + + std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>> readRequests; + readRequests.reserve(rangesPerShard.size() + pointsPerShard.size()); + + for (auto& [shardId, points] : pointsPerShard) { + THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); + FillReadRequest(++readId, request, points); + readRequests.emplace_back(shardId, std::move(request)); + PendingKeysByReadId.insert({readId, std::move(points)}); + } + + for (auto& [shardId, ranges] : rangesPerShard) { + THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); + FillReadRequest(++readId, request, ranges); + readRequests.emplace_back(shardId, std::move(request)); + PendingKeysByReadId.insert({readId, std::move(ranges)}); + } + + return readRequests; + } + + void AddResult(TShardReadResult result) final { + const auto& record = result.ReadResult->Get()->Record; + YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS); + + auto it = PendingKeysByReadId.find(record.GetReadId()); + YQL_ENSURE(it != PendingKeysByReadId.end()); + + ReadResults.emplace_back(std::move(result)); + } + + TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final { + TReadResultStats resultStats; + bool sizeLimitExceeded = false; + batch.clear(); + + while (!ReadResults.empty() && !sizeLimitExceeded) { + auto& result = ReadResults.front(); + for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { + const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow); + YQL_ENSURE(resultRow.size() <= Columns.size(), "Result columns mismatch"); + + NUdf::TUnboxedValue* rowItems = nullptr; + auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); + + i64 rowSize = 0; + for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) { + const auto& column = Columns[colIndex]; + if (IsSystemColumn(column.Name)) { + NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType); + rowSize += sizeof(NUdf::TUnboxedValue); + } else { + YQL_ENSURE(resultColIndex < resultRow.size()); + rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType); + rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes; + ++resultColIndex; + } + } + + if (rowSize > freeSpace - (i64)resultStats.BytesCount) { + row.DeleteUnreferenced(); + sizeLimitExceeded = true; + break; + } + + batch.push_back(std::move(row)); + + resultStats.RowsCount += 1; + resultStats.BytesCount += rowSize; + } + + if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) { + if (result.ReadResult->Get()->Record.GetFinished()) { + // delete finished read + auto it = PendingKeysByReadId.find(result.ReadResult->Get()->Record.GetReadId()); + PendingKeysByReadId.erase(it); + } + + ReadResults.pop_front(); + } + } + + return resultStats; + } + + bool AllRowsProcessed() final { + return UnprocessedKeys.empty() + && PendingKeysByReadId.empty() + && ReadResults.empty(); + } + + void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) final { + auto it = PendingKeysByReadId.find(readId); + YQL_ENSURE(it != PendingKeysByReadId.end()); + + if (lastProcessedKey) { + YQL_ENSURE(firstUnprocessedQuery < it->second.size()); + auto unprocessedRange = it->second[firstUnprocessedQuery]; + YQL_ENSURE(!unprocessedRange.Point); + + UnprocessedKeys.emplace_back(*lastProcessedKey, false, + unprocessedRange.GetOwnedTo(), unprocessedRange.InclusiveTo); + ++firstUnprocessedQuery; + } + + for (ui32 keyIdx = firstUnprocessedQuery; keyIdx < it->second.size(); ++keyIdx) { + UnprocessedKeys.emplace_back(std::move(it->second[keyIdx])); + } + + PendingKeysByReadId.erase(it); + } + +private: + void FillReadRequest(ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) { + auto& record = request->Record; + + record.SetReadId(readId); + + record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId); + record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId); + record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion); + + for (const auto& column : Columns) { + if (!IsSystemColumn(column.Name)) { + record.AddColumns(column.Id); + } + } + + YQL_ENSURE(!ranges.empty()); + if (ranges.front().Point) { + request->Keys.reserve(ranges.size()); + for (auto& range : ranges) { + YQL_ENSURE(range.Point); + request->Keys.emplace_back(TSerializedCellVec(range.From)); + } + } else { + request->Ranges.reserve(ranges.size()); + for (auto& range : ranges) { + YQL_ENSURE(!range.Point); + + if (range.To.size() < KeyColumns.size()) { + // absent cells mean infinity => in prefix notation `To` should be inclusive + request->Ranges.emplace_back(TSerializedTableRange(range.From, range.InclusiveFrom, range.To, true)); + } else { + request->Ranges.emplace_back(TSerializedTableRange(range)); + } + } + } + } + +private: + std::deque<TOwnedTableRange> UnprocessedKeys; + std::unordered_map<ui64, std::vector<TOwnedTableRange>> PendingKeysByReadId; + std::deque<TShardReadResult> ReadResults; +}; + +class TKqpJoinRows : public TKqpStreamLookupWorker { +public: + TKqpJoinRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, + const NMiniKQL::THolderFactory& holderFactory) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory) { + + // read columns should contain join key and result columns + for (auto joinKey : LookupKeyColumns) { + ReadColumns.emplace(joinKey->Name, *joinKey); + } + + for (auto column : Columns) { + ReadColumns.emplace(column.Name, column); + } + } + + void AddInputRow(NUdf::TUnboxedValue inputRow) final { + auto joinKey = inputRow.GetElement(0); + std::vector<TCell> joinKeyCells(LookupKeyColumns.size()); + for (size_t colId = 0; colId < LookupKeyColumns.size(); ++colId) { + const auto* joinKeyColumn = LookupKeyColumns[colId]; + YQL_ENSURE(joinKeyColumn->KeyOrder < static_cast<i64>(joinKeyCells.size())); + joinKeyCells[joinKeyColumn->KeyOrder] = MakeCell(joinKeyColumn->PType, + joinKey.GetElement(colId), TypeEnv, true); + } + + UnprocessedRows.emplace_back(std::make_pair(TOwnedCellVec(joinKeyCells), std::move(inputRow.GetElement(1)))); + } + + TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) final { + YQL_ENSURE(partitioning); + + std::unordered_map<ui64, std::vector<TOwnedTableRange>> rangesPerShard; + std::unordered_map<ui64, std::vector<TOwnedTableRange>> pointsPerShard; + + while (!UnprocessedKeys.empty()) { + auto range = std::move(UnprocessedKeys.front()); + UnprocessedKeys.pop_front(); + + auto partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), range); + for (auto [shardId, range] : partitions) { + if (range.Point) { + pointsPerShard[shardId].push_back(std::move(range)); + } else { + rangesPerShard[shardId].push_back(std::move(range)); + } + } + } + + while (!UnprocessedRows.empty()) { + auto [joinKey, leftData] = UnprocessedRows.front(); + + if (PendingLeftRowsByKey.contains(joinKey)) { + // TODO: skip key duplicate + break; + } + + UnprocessedRows.pop_front(); + + std::vector<std::pair<ui64, TOwnedTableRange>> partitions; + if (joinKey.size() < KeyColumns.size()) { + // build prefix range [[key_prefix, NULL, ..., NULL], [key_prefix, +inf, ..., +inf]) + std::vector<TCell> fromCells(KeyColumns.size()); + fromCells.insert(fromCells.begin(), joinKey.begin(), joinKey.end()); + bool fromInclusive = true; + bool toInclusive = false; + + partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), + TOwnedTableRange(fromCells, fromInclusive, joinKey, toInclusive) + ); + } else { + // full pk, build point + partitions = GetRangePartitioning(partitioning, GetKeyColumnTypes(), TOwnedTableRange(joinKey)); + } + + for (auto [shardId, range] : partitions) { + if (range.Point) { + pointsPerShard[shardId].push_back(std::move(range)); + } else { + rangesPerShard[shardId].push_back(std::move(range)); + } + } + + PendingLeftRowsByKey.insert(std::make_pair(std::move(joinKey), TLeftRowInfo{std::move(leftData)})); + } + + std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>> requests; + requests.reserve(rangesPerShard.size() + pointsPerShard.size()); + + for (auto& [shardId, points] : pointsPerShard) { + THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); + FillReadRequest(++readId, request, points); + requests.emplace_back(shardId, std::move(request)); + + for (const auto& point : points) { + auto rowIt = PendingLeftRowsByKey.find(point.From); + YQL_ENSURE(rowIt != PendingLeftRowsByKey.end()); + rowIt->second.PendingReads.insert(readId); + } + + PendingKeysByReadId.insert({readId, std::move(points)}); + } + + for (auto& [shardId, ranges] : rangesPerShard) { + THolder<TEvDataShard::TEvRead> request(new TEvDataShard::TEvRead()); + FillReadRequest(++readId, request, ranges); + requests.emplace_back(shardId, std::move(request)); + + for (const auto& range : ranges) { + auto rowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(range)); + YQL_ENSURE(rowIt != PendingLeftRowsByKey.end()); + rowIt->second.PendingReads.insert(readId); + } + + PendingKeysByReadId.insert({readId, std::move(ranges)}); + } + + return requests; + } + + void AddResult(TShardReadResult result) final { + const auto& record = result.ReadResult->Get()->Record; + YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS); + + auto it = PendingKeysByReadId.find(record.GetReadId()); + YQL_ENSURE(it != PendingKeysByReadId.end()); + + ReadResults.emplace_back(std::move(result)); + } + + bool AllRowsProcessed() final { + return UnprocessedRows.empty() + && UnprocessedKeys.empty() + && PendingKeysByReadId.empty() + && ReadResults.empty(); + } + + void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) final { + auto readIt = PendingKeysByReadId.find(readId); + YQL_ENSURE(readIt != PendingKeysByReadId.end()); + auto& ranges = readIt->second; + + if (lastProcessedKey) { + YQL_ENSURE(firstUnprocessedQuery < ranges.size()); + auto unprocessedRange = ranges[firstUnprocessedQuery]; + YQL_ENSURE(!unprocessedRange.Point); + + UnprocessedKeys.emplace_back(*lastProcessedKey, false, + unprocessedRange.GetOwnedTo(), unprocessedRange.InclusiveTo); + ++firstUnprocessedQuery; + } + + for (ui32 i = firstUnprocessedQuery; i < ranges.size(); ++i) { + auto& range = ranges[i]; + auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(range)); + YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end()); + leftRowIt->second.PendingReads.erase(readId); + + UnprocessedKeys.emplace_back(std::move(range)); + } + + PendingKeysByReadId.erase(readIt); + } + + TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final { + TReadResultStats resultStats; + bool sizeLimitExceeded = false; + batch.clear(); + + while (!ReadResults.empty() && !sizeLimitExceeded) { + auto& result = ReadResults.front(); + + for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) { + const auto& row = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow); + YQL_ENSURE(row.size() <= ReadColumns.size(), "Result columns mismatch"); + + std::vector<TCell> joinKeyCells(LookupKeyColumns.size()); + for (size_t joinKeyIdx = 0; joinKeyIdx < LookupKeyColumns.size(); ++joinKeyIdx) { + auto it = ReadColumns.find(LookupKeyColumns[joinKeyIdx]->Name); + YQL_ENSURE(it != ReadColumns.end()); + joinKeyCells[joinKeyIdx] = row[std::distance(ReadColumns.begin(), it)]; + } + + auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells); + YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end()); + + i64 resultRowSize = 0; + i64 availableSpace = freeSpace - (i64)resultStats.BytesCount; + auto resultRow = TryBuildResultRow(leftRowIt->second, row, resultRowSize, availableSpace, result.ShardId); + + if (!resultRow.HasValue()) { + sizeLimitExceeded = true; + break; + } + + batch.push_back(std::move(resultRow)); + + resultStats.RowsCount += 1; + resultStats.BytesCount += resultRowSize; + } + + if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) { + if (result.ReadResult->Get()->Record.GetFinished()) { + auto it = PendingKeysByReadId.find(result.ReadResult->Get()->Record.GetReadId()); + YQL_ENSURE(it != PendingKeysByReadId.end()); + + for (const auto& range : it->second) { + auto leftRowIt = PendingLeftRowsByKey.find(ExtractKeyPrefix(range)); + if (leftRowIt != PendingLeftRowsByKey.end()) { + leftRowIt->second.PendingReads.erase(result.ReadResult->Get()->Record.GetReadId()); + + const bool leftRowCanBeDeleted = leftRowIt->second.PendingReads.empty() + && leftRowIt->second.RightRowExist; + if (leftRowCanBeDeleted) { + PendingLeftRowsByKey.erase(leftRowIt); + } + } + } + + PendingKeysByReadId.erase(it); + } + + ReadResults.pop_front(); + } + } + + if (!sizeLimitExceeded) { + for (auto leftRowIt = PendingLeftRowsByKey.begin(); leftRowIt != PendingLeftRowsByKey.end();) { + const bool leftRowCanBeSent = leftRowIt->second.PendingReads.empty() + && !leftRowIt->second.RightRowExist; + + if (leftRowCanBeSent) { + i64 resultRowSize = 0; + i64 availableSpace = freeSpace - (i64) resultStats.BytesCount; + auto resultRow = TryBuildResultRow(leftRowIt->second, {}, resultRowSize, availableSpace); + + if (!resultRow.HasValue()) { + break; + } + + batch.push_back(std::move(resultRow)); + PendingLeftRowsByKey.erase(leftRowIt++); + } else { + ++leftRowIt; + } + } + } + + return resultStats; + } + + ~TKqpJoinRows() { + UnprocessedRows.clear(); + PendingLeftRowsByKey.clear(); + } +private: + struct TLeftRowInfo { + TLeftRowInfo(NUdf::TUnboxedValue row) : Row(std::move(row)) { + } + + NUdf::TUnboxedValue Row; + std::unordered_set<ui64> PendingReads; + bool RightRowExist = false; + }; + + void FillReadRequest(ui64 readId, THolder<TEvDataShard::TEvRead>& request, const std::vector<TOwnedTableRange>& ranges) { + auto& record = request->Record; + + record.SetReadId(readId); + + record.MutableTableId()->SetOwnerId(TableId.PathId.OwnerId); + record.MutableTableId()->SetTableId(TableId.PathId.LocalPathId); + record.MutableTableId()->SetSchemaVersion(TableId.SchemaVersion); + + for (const auto& [name, column] : ReadColumns) { + if (!IsSystemColumn(name)) { + record.AddColumns(column.Id); + } + } + + YQL_ENSURE(!ranges.empty()); + if (ranges.front().Point) { + request->Keys.reserve(ranges.size()); + for (auto& range : ranges) { + YQL_ENSURE(range.Point); + request->Keys.emplace_back(TSerializedCellVec(range.From)); + } + } else { + request->Ranges.reserve(ranges.size()); + for (auto& range : ranges) { + YQL_ENSURE(!range.Point); + if (range.To.size() < KeyColumns.size()) { + // Absent cells mean infinity. So in prefix notation `To` should be inclusive. + request->Ranges.emplace_back(TSerializedTableRange(range.From, range.InclusiveFrom, range.To, true)); + } else { + request->Ranges.emplace_back(TSerializedTableRange(range)); + } + } + } + } + + TConstArrayRef<TCell> ExtractKeyPrefix(const TOwnedTableRange& range) { + if (range.From.size() == LookupKeyColumns.size()) { + return range.From; + } + + return range.From.subspan(0, LookupKeyColumns.size()); + } + + NUdf::TUnboxedValue TryBuildResultRow(TLeftRowInfo& leftRowInfo, TConstArrayRef<TCell> rightRow, i64& resultRowSize, + i64 freeSpace, TMaybe<ui64> shardId = {}) { + + NUdf::TUnboxedValue* resultRowItems = nullptr; + auto resultRow = HolderFactory.CreateDirectArrayHolder(2, resultRowItems); + + resultRowItems[0] = leftRowInfo.Row; + + if (!rightRow.empty()) { + leftRowInfo.RightRowExist = true; + // TODO: get size for left row + + NUdf::TUnboxedValue* rightRowItems = nullptr; + resultRowItems[1] = HolderFactory.CreateDirectArrayHolder(Columns.size(), rightRowItems); + + for (size_t colIndex = 0; colIndex < Columns.size(); ++colIndex) { + const auto& column = Columns[colIndex]; + auto it = ReadColumns.find(column.Name); + YQL_ENSURE(it != ReadColumns.end()); + + if (IsSystemColumn(column.Name)) { + YQL_ENSURE(shardId); + NMiniKQL::FillSystemColumn(rightRowItems[colIndex], *shardId, column.Id, column.PType); + resultRowSize += sizeof(NUdf::TUnboxedValue); + } else { + rightRowItems[colIndex] = NMiniKQL::GetCellValue(rightRow[std::distance(ReadColumns.begin(), it)], + column.PType); + resultRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes; + } + } + } else { + resultRowItems[1] = NUdf::TUnboxedValuePod(); + } + + if (resultRowSize > freeSpace) { + resultRow.DeleteUnreferenced(); + } + + return resultRow; + } + +private: + std::map<std::string, TSysTables::TTableColumnInfo> ReadColumns; + std::deque<std::pair<TOwnedCellVec, NUdf::TUnboxedValue>> UnprocessedRows; + std::deque<TOwnedTableRange> UnprocessedKeys; + std::unordered_map<ui64, std::vector<TOwnedTableRange>> PendingKeysByReadId; + absl::flat_hash_map<TOwnedCellVec, TLeftRowInfo, TKeyHash, TKeyEq> PendingLeftRowsByKey; + std::deque<TShardReadResult> ReadResults; +}; + +std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, + const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory) { + + switch (settings.GetLookupStrategy()) { + case NKqpProto::EStreamLookupStrategy::LOOKUP: + return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory); + case NKqpProto::EStreamLookupStrategy::JOIN: + return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory); + default: + return {}; + } +} + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h new file mode 100644 index 00000000000..56ac40019df --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h @@ -0,0 +1,60 @@ +#pragma once + +#include <ydb/core/protos/kqp.pb.h> +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/core/scheme/scheme_tabledefs.h> +#include <ydb/core/tx/datashard/sys_tables.h> +#include <ydb/core/tx/datashard/datashard.h> + +namespace NKikimr { +namespace NKqp { + +class TKqpStreamLookupWorker { +public: + using TReadList = std::vector<std::pair<ui64, THolder<TEvDataShard::TEvRead>>>; + using TPartitionInfo = std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>>; + + struct TShardReadResult { + const ui64 ShardId; + THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult; + size_t UnprocessedResultRow = 0; + }; + + struct TReadResultStats { + ui64 RowsCount = 0; + ui64 BytesCount = 0; + }; + +public: + TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, + const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory); + + virtual ~TKqpStreamLookupWorker(); + + virtual std::string GetTablePath() const; + virtual TTableId GetTableId() const; + virtual std::vector<NScheme::TTypeInfo> GetKeyColumnTypes() const; + + virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0; + virtual TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) = 0; + virtual void AddResult(TShardReadResult result) = 0; + virtual TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) = 0; + virtual bool AllRowsProcessed() = 0; + virtual void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) = 0; + +protected: + const NMiniKQL::TTypeEnvironment& TypeEnv; + const NMiniKQL::THolderFactory& HolderFactory; + const TString TablePath; + const TTableId TableId; + std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns; + std::vector<TSysTables::TTableColumnInfo*> LookupKeyColumns; + std::vector<TSysTables::TTableColumnInfo> Columns; +}; + +std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, + const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory); + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/runtime/ya.make b/ydb/core/kqp/runtime/ya.make index 77d8a0aa4f6..3f41b86b275 100644 --- a/ydb/core/kqp/runtime/ya.make +++ b/ydb/core/kqp/runtime/ya.make @@ -18,6 +18,8 @@ SRCS( kqp_stream_lookup_actor.h kqp_stream_lookup_factory.cpp kqp_stream_lookup_factory.h + kqp_stream_lookup_worker.cpp + kqp_stream_lookup_worker.h kqp_tasks_runner.cpp kqp_transport.cpp ) diff --git a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp index 97f32649f84..89361b130c3 100644 --- a/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_index_lookup_join_ut.cpp @@ -84,8 +84,11 @@ void PrepareTables(TSession session) { Y_UNIT_TEST_SUITE(KqpIndexLookupJoin) { -void Test(const TString& query, const TString& answer, size_t rightTableReads) { - TKikimrSettings settings; +void Test(const TString& query, const TString& answer, size_t rightTableReads, bool useStreamLookup = false) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(useStreamLookup); + + auto settings = TKikimrSettings().SetAppConfig(appConfig); TKikimrRunner kikimr(settings); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); @@ -101,25 +104,27 @@ void Test(const TString& query, const TString& answer, size_t rightTableReads) { CompareYson(answer, FormatResultSetYson(result.GetResultSet(0))); auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + if (settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamIdxLookupJoin()) { + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 1); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).name(), "/Root/Right"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(1).reads().rows(), rightTableReads); } else { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 3); - } - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).name(), "/Root/Left"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(0).table_access(0).reads().rows(), 5); - ui32 index = 1; - if (!settings.AppConfig.GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { UNIT_ASSERT(stats.query_phases(1).table_access().empty()); // keys extraction for lookups - index = 2; - } - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).name(), "/Root/Right"); - UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(index).table_access(0).reads().rows(), rightTableReads); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access(0).name(), "/Root/Right"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(2).table_access(0).reads().rows(), rightTableReads); + } } Y_UNIT_TEST(MultiJoins) { @@ -243,6 +248,157 @@ Y_UNIT_TEST(RightSemi) { ])", 3); } +Y_UNIT_TEST_TWIN(SimpleInnerJoin, StreamLookup) { + Test( + R"( + SELECT l.Key, l.Fk, l.Value, r.Key, r.Value + FROM `/Root/Left` AS l + INNER JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY l.Key; + )", + R"([ + [[1];[101];["Value1"];[101];["Value21"]]; + [[2];[102];["Value1"];[102];["Value22"]]; + [[3];[103];["Value2"];[103];["Value23"]] + ])", 3, StreamLookup); +} + +Y_UNIT_TEST_TWIN(InnerJoinCustomColumnOrder, StreamLookup) { + Test( + R"( + SELECT r.Value, l.Key, r.Key, l.Value, l.Fk + FROM `/Root/Left` AS l + INNER JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY r.Key; + )", + R"([ + [["Value21"];[1];[101];["Value1"];[101]]; + [["Value22"];[2];[102];["Value1"];[102]]; + [["Value23"];[3];[103];["Value2"];[103]] + ])", 3, StreamLookup); +} + +Y_UNIT_TEST_TWIN(InnerJoinOnlyRightColumn, StreamLookup) { + Test( + R"( + SELECT r.Value + FROM `/Root/Left` AS l + INNER JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY r.Value; + )", + R"([ + [["Value21"]]; + [["Value22"]]; + [["Value23"]] + ])", 3, StreamLookup); +} + +Y_UNIT_TEST_TWIN(InnerJoinOnlyLeftColumn, StreamLookup) { + Test( + R"( + SELECT l.Fk + FROM `/Root/Left` AS l + INNER JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY l.Fk; + )", + R"([ + [[101]]; + [[102]]; + [[103]] + ])", 3, StreamLookup); +} + +Y_UNIT_TEST_TWIN(InnerJoinLeftFilter, StreamLookup) { + Test( + R"( + SELECT l.Key, l.Fk, l.Value, r.Key, r.Value + FROM `/Root/Left` AS l + INNER JOIN `/Root/Right` AS r + ON l.Fk = r.Key + WHERE l.Value != 'Value1' + ORDER BY l.Key; + )", + R"([ + [[3];[103];["Value2"];[103];["Value23"]] + ])", 1, StreamLookup); +} + +Y_UNIT_TEST_TWIN(SimpleLeftJoin, StreamLookup) { + Test( + R"( + SELECT l.Key, l.Fk, l.Value, r.Key, r.Value + FROM `/Root/Left` AS l + LEFT JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY l.Key; + )", + R"([ + [[1];[101];["Value1"];[101];["Value21"]]; + [[2];[102];["Value1"];[102];["Value22"]]; + [[3];[103];["Value2"];[103];["Value23"]]; + [[4];[104];["Value2"];#;#]; + [[5];[105];["Value3"];#;#] + ])", 3, StreamLookup); +} + +Y_UNIT_TEST_TWIN(LeftJoinCustomColumnOrder, StreamLookup) { + Test( + R"( + SELECT r.Value, l.Key, r.Key, l.Value, l.Fk + FROM `/Root/Left` AS l + LEFT JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY l.Key; + )", + R"([ + [["Value21"];[1];[101];["Value1"];[101]]; + [["Value22"];[2];[102];["Value1"];[102]]; + [["Value23"];[3];[103];["Value2"];[103]]; + [#;[4];#;["Value2"];[104]]; + [#;[5];#;["Value3"];[105]] + ])", 3, StreamLookup); +} + +Y_UNIT_TEST_TWIN(LeftJoinOnlyRightColumn, StreamLookup) { + Test( + R"( + SELECT r.Value + FROM `/Root/Left` AS l + LEFT JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY r.Value; + )", + R"([ + [#]; + [#]; + [["Value21"]]; + [["Value22"]]; + [["Value23"]] + ])", 3, StreamLookup); +} + +Y_UNIT_TEST_TWIN(LeftJoinOnlyLeftColumn, StreamLookup) { + Test( + R"( + SELECT l.Fk + FROM `/Root/Left` AS l + LEFT JOIN `/Root/Right` AS r + ON l.Fk = r.Key + ORDER BY l.Fk; + )", + R"([ + [[101]]; + [[102]]; + [[103]]; + [[104]]; + [[105]] + ])", 3, StreamLookup); +} + void CreateSimpleTableWithKeyType(TSession session, const TString& columnType) { using namespace fmt::literals; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 1036cc5be0f..5f15c0dcdf8 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1355,6 +1355,7 @@ message TTableServiceConfig { reserved 33; // optional bool EnableKqpDataQueryStreamPointLookup = 33 [default = false]; optional bool EnablePublishKqpProxyByRM = 34 [default = true]; optional bool EnableKqpScanQueryStreamIdxLookupJoin = 35 [default = false]; + optional bool EnableKqpDataQueryStreamIdxLookupJoin = 49 [default = false]; optional bool EnablePredicateExtractForScanQueries = 36 [default = true]; optional bool EnablePredicateExtractForDataQueries = 37 [default = true]; optional bool EnableKqpImmediateEffects = 38 [default = true]; diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index bf18c4358f2..222068c7bd8 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -640,6 +640,7 @@ message TKqpStreamLookupSettings { optional uint64 LockTxId = 5; optional bool ImmediateTx = 6; repeated string LookupKeyColumns = 7; + optional NKqpProto.EStreamLookupStrategy LookupStrategy = 8; } message TKqpSequencerSettings { diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 70415ade762..d1810e9c022 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -253,12 +253,19 @@ message TKqpPhyCnMerge { repeated TKqpPhySortColumn SortColumns = 1; } +enum EStreamLookupStrategy { + UNSPECIFIED = 0; + LOOKUP = 1; + JOIN = 2; +}; + message TKqpPhyCnStreamLookup { TKqpPhyTableId Table = 1; repeated string KeyColumns = 2; repeated string Columns = 3; bytes LookupKeysType = 4; bytes ResultType = 5; + EStreamLookupStrategy LookupStrategy = 6; } message TKqpPhyCnSequencer { |