diff options
author | aakulaga <aakulaga@ydb.tech> | 2022-10-04 12:19:30 +0300 |
---|---|---|
committer | aakulaga <aakulaga@ydb.tech> | 2022-10-04 12:19:30 +0300 |
commit | 35bbcdd0f9d21e73beee28cb7fdd49d1109c2734 (patch) | |
tree | fcc0900a44de9b6b659efa54eba30172215dbbdc | |
parent | a04bcbd6d1f7cef1565bd1a474f3860e8f2238a0 (diff) | |
download | ydb-35bbcdd0f9d21e73beee28cb7fdd49d1109c2734.tar.gz |
Grace Join optimizer
Grace Join optimizer added
-rw-r--r-- | ydb/library/yql/core/expr_nodes/yql_expr_nodes.json | 15 | ||||
-rw-r--r-- | ydb/library/yql/core/type_ann/type_ann_join.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json | 5 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_join.cpp | 249 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 7 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.h | 6 | ||||
-rw-r--r-- | ydb/library/yql/dq/type_ann/dq_type_ann.cpp | 11 | ||||
-rw-r--r-- | ydb/library/yql/dq/type_ann/dq_type_ann.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp | 22 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp | 4 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp | 132 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/common/yql_dq_settings.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/common/yql_dq_settings.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/opt/physical_optimize.cpp | 3 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp | 1 |
15 files changed, 430 insertions, 32 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index dc77e6f1275..5c5625782d9 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -1439,6 +1439,21 @@ ] }, { + "Name": "TCoGraceJoinCore", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "GraceJoinCore"}, + "Children": [ + {"Index": 0, "Name": "LeftInput", "Type": "TExprBase", "Optional": true}, + {"Index": 1, "Name": "RightInput", "Type": "TExprBase" , "Optional": true}, + {"Index": 2, "Name": "JoinKind", "Type": "TCoAtom" , "Optional": true}, + {"Index": 3, "Name": "LeftKeysColumns", "Type": "TCoAtomList" , "Optional": true}, + {"Index": 4, "Name": "RightKeysColumns", "Type": "TCoAtomList", "Optional": true}, + {"Index": 5, "Name": "LeftRenames", "Type": "TCoAtomList", "Optional": true}, + {"Index": 6, "Name": "RightRenames", "Type": "TCoAtomList", "Optional": true} + ] + }, + + { "Name": "TCoJoinDict", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "JoinDict"}, diff --git a/ydb/library/yql/core/type_ann/type_ann_join.cpp b/ydb/library/yql/core/type_ann/type_ann_join.cpp index cd7c8f54ec1..4d71278884b 100644 --- a/ydb/library/yql/core/type_ann/type_ann_join.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_join.cpp @@ -653,7 +653,7 @@ namespace NTypeAnnImpl { } const TTypeAnnotationNode* leftItemType = nullptr; - if (!EnsureNewSeqType<true>(input->Head(), ctx.Expr, &leftItemType)) { + if (!EnsureNewSeqType<true>(*input->Child(0), ctx.Expr, &leftItemType)) { return IGraphTransformer::TStatus::Error; } diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json index 11e2d47da40..6fd47f46b1b 100644 --- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json +++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json @@ -47,6 +47,11 @@ "Match": {"Type": "Callable", "Name": "DqPhyMapJoin"} }, { + "Name": "TDqPhyGraceJoin", + "Base": "TDqJoinBase", + "Match": {"Type": "Callable", "Name": "GraceJoinCore"} + }, + { "Name": "TDqPhyCrossJoin", "Base": "TDqJoinBase", "Match": {"Type": "Callable", "Name": "DqPhyCrossJoin"} diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp index 7ce13c9b83d..5fa805102dc 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp @@ -5,6 +5,7 @@ #include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/utils/log/log.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> +#include <ydb/library/yql/core/yql_type_helpers.h> namespace NYql::NDq { @@ -271,6 +272,7 @@ std::pair<TVector<TCoAtom>, TVector<TCoAtom>> GetJoinKeys(const TDqJoin& join, T return std::make_pair(std::move(leftJoinKeys), std::move(rightJoinKeys)); } + TDqPhyMapJoin DqMakePhyMapJoin(const TDqJoin& join, const TExprBase& leftInput, const TExprBase& rightInput, TExprContext& ctx) { @@ -690,6 +692,7 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& return newConnection.Cast(); } + TExprBase DqBuildJoinDict(const TDqJoin& join, TExprContext& ctx) { auto joinType = join.JoinType().Value(); @@ -840,4 +843,250 @@ TExprBase DqBuildJoinDict(const TDqJoin& join, TExprContext& ctx) { return join; } +TExprBase DqBuildGraceJoin(const TDqJoin& join, TExprContext& ctx) { + + static const std::set<std::string_view> supportedTypes = { + "Inner"sv, + "Left"sv, + "Cross"sv, + "LeftOnly"sv, + "LeftSemi"sv, + "Right"sv, + "RightOnly"sv, + "RightSemi"sv, + "Full"sv, + "Exclusion"sv + }; + + auto joinType = join.JoinType().Value(); + + if (!supportedTypes.contains(joinType)) { + return join; + } + + auto buildShuffle = [&ctx, &join](const TExprBase& input, const TVector<TCoAtom>& keys) { + auto stage = Build<TDqStage>(ctx, join.Pos()) + .Inputs() + .Add(input) + .Build() + .Program() + .Args({"stream"}) + .Body("stream") + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, join.Pos())) + .Done(); + + return Build<TDqCnHashShuffle>(ctx, join.Pos()) + .Output() + .Stage(stage) + .Index().Build("0") + .Build() + .KeyColumns() + .Add(keys) + .Build() + .Done(); + }; + + TMaybeNode<TDqStage> joinStage; + + auto leftCn = join.LeftInput().Cast<TDqCnUnionAll>(); + auto rightCn = join.RightInput().Cast<TDqCnUnionAll>(); + + const TStructExprType* leftStructType = nullptr; + auto leftSeqType = GetSequenceItemType(leftCn, false, ctx); + leftStructType = leftSeqType->Cast<TStructExprType>(); + + const TStructExprType* rightStructType = nullptr; + auto rightSeqType = GetSequenceItemType(rightCn, false, ctx); + rightStructType = rightSeqType->Cast<TStructExprType>(); + + + const TVector<const TItemExprType*> & leftItems = leftStructType->GetItems(); + const TVector<const TItemExprType*> & rightItems = rightStructType->GetItems(); + + std::map<TStringBuf, ui32> leftNames; + for (ui32 i = 0; i < leftItems.size(); i++) { + auto v = leftItems[i]; + leftNames.emplace(v->GetName(), i); + } + + std::map<TStringBuf, ui32> rightNames; + for (ui32 i = 0; i < rightItems.size(); i++) { + auto v = rightItems[i]; + rightNames.emplace(v->GetName(), i); + } + + auto [leftJoinKeys, rightJoinKeys] = GetJoinKeys(join, ctx); + + auto leftJoinKeysVar = leftJoinKeys; + auto rightJoinKeysVar = rightJoinKeys; + + auto rightShuffle = buildShuffle(rightCn, rightJoinKeys); + auto leftShuffle = buildShuffle(leftCn, leftJoinKeys); + + TCoArgument leftInputArg{ctx.NewArgument(join.Pos(), "_dq_join_left")}; + TCoArgument rightInputArg{ctx.NewArgument(join.Pos(), "_dq_join_right")}; + + auto leftWideFlow = ctx.Builder(join.Pos()) + .Callable("ExpandMap") + .Add(0, leftInputArg.Ptr()) + .Lambda(1) + .Param("item") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (ui32 i = 0U; i < leftItems.size(); ++i) { + parent.Callable(i, "Member") + .Arg(0, "item") + .Atom(1, leftItems[i]->GetName()) + .Seal(); + } + return parent; + }) + .Seal() + .Seal() + .Build(); + + auto rightWideFlow = ctx.Builder(join.Pos()) + .Callable("ExpandMap") + .Add(0, rightInputArg.Ptr()) + .Lambda(1) + .Param("item") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (ui32 i = 0U; i < rightItems.size(); ++i) { + parent.Callable(i, "Member") + .Arg(0, "item") + .Atom(1, rightItems[i]->GetName()) + .Seal(); + } + return parent; + }) + .Seal() + .Seal() + .Build(); + + auto graceJoin = ctx.Builder(join.Pos()) + .Callable("GraceJoinCore") + .Add(0, leftWideFlow) + .Add(1, rightWideFlow) + .Atom(2, joinType) + .List(3) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (ui32 i = 0U; i < leftJoinKeysVar.size(); ++i) { + parent.Atom(i, std::to_string(leftNames[leftJoinKeysVar[i]]) ); + } + return parent; + }) + .Seal() + .List(4) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (ui32 i = 0U; i < rightJoinKeysVar.size(); ++i) { + parent.Atom(i, std::to_string(rightNames[rightJoinKeysVar[i]]) ); + } + return parent; + }) + .Seal() + .List(5) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + for (ui32 i = 0U; i < leftNames.size(); ++i) { + parent.Atom(2*i, std::to_string(i)); + parent.Atom(2*i + 1, std::to_string(i)); + } + return parent; + }) + .Seal() + .List(6) + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 colIndexStart = leftNames.size(); + for (ui32 i = 0U; i < rightNames.size(); ++i) { + parent.Atom(2*i, std::to_string(i) ); + parent.Atom(2*i + 1, std::to_string(colIndexStart + i) ); + } + return parent; + }) + .Seal() + .Seal() + .Build(); + + TStringBuf leftTableName, rightTableName; + + if ( join.LeftLabel().Ref().IsAtom() ) { + leftTableName = join.LeftLabel().Cast<TCoAtom>().Value(); + } + + if ( join.RightLabel().Ref().IsAtom() ) { + rightTableName = join.RightLabel().Cast<TCoAtom>().Value(); + } + + + TVector<TString> fullColNames; + + + for (const auto & v: leftNames ) { + auto name = FullColumnName(leftTableName, v.first); + if ( leftTableName.size() > 0) { + fullColNames.emplace_back(name); + } else { + fullColNames.emplace_back(v.first); + } + } + + for (const auto & v: rightNames ) { + auto name = FullColumnName(rightTableName, v.first); + if ( rightTableName.size() > 0) { + fullColNames.emplace_back(name); + } else { + fullColNames.emplace_back(v.first); + } + } + + + auto narrowMapJoin = ctx.Builder(join.Pos()) + .Callable("NarrowMap") + .Add(0, graceJoin) + .Lambda(1) + .Params("fields", fullColNames.size()) + .Callable("AsStruct") + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + ui32 i = 0U; + for (const auto& colName : fullColNames) { + parent.List(i) + .Atom(0, colName) + .Arg(1, "fields", i) + .Seal(); + ++i; + } + return parent; + }) + .Seal() + .Seal() + .Seal() + .Build(); + + + joinStage = Build<TDqStage>(ctx, join.Pos()) + .Inputs() + .Add(leftShuffle) + .Add(rightShuffle) + .Build() + .Program() + .Args({leftInputArg, rightInputArg}) + .Body(narrowMapJoin) + .Build() + .Settings(TDqStageSettings().BuildNode(ctx, join.Pos())) + .Done(); + + + + if (joinStage) { + return Build<TDqCnUnionAll>(ctx, join.Pos()) + .Output() + .Stage(joinStage.Cast()) + .Index().Build("0") + .Build() + .Done(); + } + + return join; +} + + } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index d457c0780fa..ee6a8108649 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -2055,8 +2055,9 @@ TMaybeNode<TDqJoin> DqFlipJoin(const TDqJoin& join, TExprContext& ctx) { .Done(); } + TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx, - const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage) + const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, bool useGraceJoin) { if (!node.Maybe<TDqJoin>()) { return node; @@ -2078,6 +2079,10 @@ TExprBase DqBuildJoin(const TExprBase& node, TExprContext& ctx, IOptimizationCon auto joinType = join.JoinType().Value(); + if (useGraceJoin) { + return DqBuildGraceJoin(join, ctx); + } + if (joinType == "Full"sv || joinType == "Exclusion"sv) { return DqBuildJoinDict(join, ctx); } diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.h b/ydb/library/yql/dq/opt/dq_opt_phy.h index 87b8f5a7041..d955441fa20 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.h +++ b/ydb/library/yql/dq/opt/dq_opt_phy.h @@ -75,7 +75,11 @@ NNodes::TExprBase DqRewriteLeftPureJoin(const NNodes::TExprBase node, TExprConte NNodes::TExprBase DqBuildPhyJoin(const NNodes::TDqJoin& join, bool pushLeftStage, TExprContext& ctx, IOptimizationContext& optCtx); -NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx, IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage); +NNodes::TExprBase DqBuildJoin(const NNodes::TExprBase& node, TExprContext& ctx, + IOptimizationContext& optCtx, const TParentsMap& parentsMap, bool allowStageMultiUsage, bool pushLeftStage, bool useGraceJoin = false); + +NNodes::TExprBase DqBuildGraceJoin(const NNodes::TDqJoin& join, TExprContext& ctx); + bool DqValidateJoinInputs( const NNodes::TExprBase& left, const NNodes::TExprBase& right, const TParentsMap& parentsMap, diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index 7d8428140cb..f533dbd948c 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -722,6 +722,17 @@ TStatus AnnotateDqMapOrDictJoin(const TExprNode::TPtr& input, TExprContext& ctx) return TStatus::Ok; } +TStatus AnnotateDqGraceJoin(const TExprNode::TPtr& input, TExprContext& ctx) { + auto resultRowType = GetDqJoinResultType<true>(input, true, ctx); + if (!resultRowType) { + return TStatus::Error; + } + + input->SetTypeAnn(ctx.MakeType<TFlowExprType>(resultRowType)); + return TStatus::Ok; +} + + TStatus AnnotateDqCrossJoin(const TExprNode::TPtr& input, TExprContext& ctx) { auto resultRowType = GetDqJoinResultType<false>(input, true, ctx); if (!resultRowType) { diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.h b/ydb/library/yql/dq/type_ann/dq_type_ann.h index afcc29eb88e..92269430375 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.h +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.h @@ -18,6 +18,7 @@ IGraphTransformer::TStatus AnnotateDqConnection(const TExprNode::TPtr& input, TE IGraphTransformer::TStatus AnnotateDqCnMerge(const TExprNode::TPtr& input, TExprContext& ctx); IGraphTransformer::TStatus AnnotateDqJoin(const TExprNode::TPtr& input, TExprContext& ctx); IGraphTransformer::TStatus AnnotateDqMapOrDictJoin(const TExprNode::TPtr& input, TExprContext& ctx); +IGraphTransformer::TStatus AnnotateDqGraceJoin(const TExprNode::TPtr& input, TExprContext& ctx); IGraphTransformer::TStatus AnnotateDqCrossJoin(const TExprNode::TPtr& input, TExprContext& ctx); IGraphTransformer::TStatus AnnotateDqSource(const TExprNode::TPtr& input, TExprContext& ctx); IGraphTransformer::TStatus AnnotateDqSink(const TExprNode::TPtr& input, TExprContext& ctx); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 24fedca4b22..27c64571cb2 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -387,7 +387,9 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con DataIntColumnsNum = TotalIntColumnsNum - KeyIntColumnsNum; DataStrColumnsNum = TotalStrColumnsNum - KeyStrColumnsNum; - TupleIntVals.resize(2 * totalIntColumnsNum ); + NullsBitmapSize = (nColumns / (8 * sizeof(ui64)) + 1) ; + + TupleIntVals.resize(2 * totalIntColumnsNum + NullsBitmapSize); TupleStrings.resize(totalStrColumnsNum); TupleStrSizes.resize(totalStrColumnsNum); @@ -399,10 +401,10 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con { if (a.IsKeyColumn && !b.IsKeyColumn) return true; if (a.Bytes > b.Bytes) return true; - if (a.ColumnIdx > b.ColumnIdx ) return true; + if (a.ColumnIdx < b.ColumnIdx ) return true; return false; }); - + Offsets.resize(nColumns); PackedIdx.resize(nColumns); TupleHolder.resize(nColumns); @@ -410,7 +412,6 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con std::transform(TupleHolder.begin(), TupleHolder.end(), std::back_inserter(TuplePtrs), [](NUdf::TUnboxedValue& v) { return std::addressof(v); }); - NullsBitmapSize = (nColumns / (8 * sizeof(ui64)) + 1) ; ui32 currIntOffset = NullsBitmapSize * sizeof(ui64) ; ui32 currStrOffset = 0; ui32 currIdx = 0; @@ -618,7 +619,7 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - while (!*JoinCompleted) { + while (!*JoinCompleted) { const NKikimr::NMiniKQL::EFetchResult resultLeft = FlowLeft->FetchValues(ctx, LeftPacker->TuplePtrs.data()); const NKikimr::NMiniKQL::EFetchResult resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data()); @@ -653,14 +654,21 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox auto &valsLeft = LeftPacker->TupleHolder; auto &valsRight = RightPacker->TupleHolder; + for (ui32 i = 0; i < LeftRenames.size() / 2; i++) { - *output[LeftRenames[2 * i + 1]] = valsLeft[LeftRenames[2 * i]]; + auto & valPtr = output[LeftRenames[2 * i + 1]]; + if ( valPtr ) { + *valPtr = valsLeft[LeftRenames[2 * i]]; + } } for (ui32 i = 0; i < RightRenames.size() / 2; i++) { - *output[RightRenames[2 * i + 1]] = valsRight[RightRenames[2 * i]]; + auto & valPtr = output[RightRenames[2 * i + 1]]; + if ( valPtr ) { + *valPtr = valsRight[RightRenames[2 * i]]; + } } return EFetchResult::One; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index 76cd026c517..8909a2c7016 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -367,7 +367,9 @@ inline void TTable::GetTupleData(ui32 bucketNum, ui32 tupleId, TupleData & td) { char *strPtr = nullptr; if(NumberOfKeyStringColumns != 0) { keyStringsOffset = tb.StringsOffsets[stringsOffsetsIdx] + HeaderSize; - strPtr = reinterpret_cast<char *>(tb.KeyIntVals[keyStringsOffset]); + + strPtr = reinterpret_cast<char *>(tb.KeyIntVals.data() + keyStringsOffset); + for (ui64 i = 0; i < NumberOfKeyStringColumns; ++i) { td.StrColumns[i] = strPtr; diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp index 2d42af3b417..99622ac38e1 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp @@ -25,7 +25,7 @@ constexpr bool IsVerbose = false; Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinMemTest) { Y_UNIT_TEST(TestMem1) { - + const ui64 TupleSize = 1024; const ui64 NBuckets = 128; const ui64 NTuples = 10000; @@ -162,14 +162,14 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinMemTest) { CTEST << Endl; UNIT_ASSERT(true); - + } } Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { - Y_UNIT_TEST(TestImp1) { + Y_UNIT_TEST_LLVM(TestImp1) { ui64 tuple[11] = {0,1,2,3,4,5,6,7,8,9,10}; ui32 strSizes[2] = {4, 4}; @@ -179,7 +179,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { (char *)"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"}; ui32 bigStrSize[2] = {151, 151}; - + GraceJoin::TTable bigTable(1,1,1,1); GraceJoin::TTable smallTable(1,1,1,1); @@ -189,7 +189,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { const ui64 TupleSize = 1024; - ui64 bigTuple[TupleSize]; + ui64 bigTuple[TupleSize]; for (ui64 i = 0; i < TupleSize; i++) { bigTuple[i] = std::rand() / ( RAND_MAX / 10000 ); @@ -197,13 +197,13 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { ui64 milliseconds = 0; - + const ui64 BigTableTuples = 60000; const ui64 SmallTableTuples = 15000; const ui64 BigTupleSize = 32; - std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point begin03 = std::chrono::steady_clock::now(); for ( ui64 i = 0; i < BigTableTuples; i++) { @@ -229,7 +229,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { bigTable.Clear(); smallTable.Clear(); - begin03 = std::chrono::steady_clock::now(); + begin03 = std::chrono::steady_clock::now(); for ( ui64 i = 0; i < BigTableTuples; i++) { @@ -251,7 +251,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { CTEST << "Time for hash = " << milliseconds << "[ms]" << Endl; CTEST << "Adding tuples speed: " << (BigTupleSize * (BigTableTuples + SmallTableTuples) * 1000) / ( milliseconds * 1024 * 1024) << "MB/sec" << Endl; CTEST << Endl; - + std::vector<ui64> vals1, vals2; @@ -274,7 +274,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { ui64 numBigTuples = 0; bigTable.ResetIterator(); - std::chrono::steady_clock::time_point begin04 = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point begin04 = std::chrono::steady_clock::now(); while(bigTable.NextTuple(td1)) { numBigTuples++; } @@ -287,7 +287,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { numBigTuples = 0; bigTable.ResetIterator(); - std::chrono::steady_clock::time_point begin041 = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point begin041 = std::chrono::steady_clock::now(); while(bigTable.NextTuple(td2)) { numBigTuples++; } @@ -298,7 +298,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { CTEST << Endl; - std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point begin05 = std::chrono::steady_clock::now(); joinTable.Join(bigTable,smallTable); @@ -310,7 +310,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { ui64 numJoinedTuples = 0; - std::chrono::steady_clock::time_point begin042 = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point begin042 = std::chrono::steady_clock::now(); while(joinTable.NextJoinedData(td1, td2)) { numJoinedTuples++; } @@ -331,8 +331,11 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinImpTest) { } } + + Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { Y_UNIT_TEST_LLVM(TestInner1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -395,10 +398,81 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } + + } + + + Y_UNIT_TEST_LLVM(TestInnerStringKey1) { + + for (ui32 pass = 0; pass < 1; ++pass) { + TSetup<LLVM> setup; + TProgramBuilder& pb = *setup.PgmBuilder; + + const auto key1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("1"); + const auto key2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("2"); + const auto key3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("4"); + const auto key4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("4"); + const auto payload1 = pb.NewDataLiteral<NUdf::EDataSlot::String>("A"); + const auto payload2 = pb.NewDataLiteral<NUdf::EDataSlot::String>("B"); + const auto payload3 = pb.NewDataLiteral<NUdf::EDataSlot::String>("C"); + const auto payload4 = pb.NewDataLiteral<NUdf::EDataSlot::String>("X"); + const auto payload5 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Y"); + const auto payload6 = pb.NewDataLiteral<NUdf::EDataSlot::String>("Z"); + + const auto tupleType = pb.NewTupleType({ + pb.NewDataType(NUdf::TDataType<char*>::Id), + pb.NewDataType(NUdf::TDataType<char*>::Id) + }); + + const auto list1 = pb.NewList(tupleType, { + pb.NewTuple({key1, payload1}), + pb.NewTuple({key2, payload2}), + pb.NewTuple({key3, payload3}) + }); + + const auto list2 = pb.NewList(tupleType, { + pb.NewTuple({key2, payload4}), + pb.NewTuple({key3, payload5}), + pb.NewTuple({key4, payload6}) + }); + + + const auto resultType = pb.NewFlowType(pb.NewTupleType({ + pb.NewDataType(NUdf::TDataType<char*>::Id), + pb.NewDataType(NUdf::TDataType<char*>::Id) + })); + + const auto pgmReturn = pb.Collect(pb.NarrowMap(pb.GraceJoin( + pb.ExpandMap(pb.ToFlow(list1), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), + pb.ExpandMap(pb.ToFlow(list2), [&](TRuntimeNode item) -> TRuntimeNode::TList { return {pb.Nth(item, 0U), pb.Nth(item, 1U)}; }), + EJoinKind::Inner, {0U}, {0U}, {1U, 0U}, {1U, 1U}, resultType), + [&](TRuntimeNode::TList items) -> TRuntimeNode { return pb.NewTuple(items); }) + ); + + const auto graph = setup.BuildGraph(pgmReturn); + + const auto iterator = graph->GetValue().GetListIterator(); + + NUdf::TUnboxedValue tuple; + + UNIT_ASSERT(iterator.Next(tuple)); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "B"); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "X"); + UNIT_ASSERT(iterator.Next(tuple)); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C"); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y"); + UNIT_ASSERT(iterator.Next(tuple)); + UNIT_ASSERT(!iterator.Next(tuple)); + + } + } + + Y_UNIT_TEST_LLVM(TMiniKQLGraceJoinTestInnerMulti1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -466,6 +540,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeft1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -537,6 +612,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeftMulti1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -609,6 +685,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeftSemi1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -671,6 +748,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeftOnly1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -728,6 +806,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeftSemiWithNullKey1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -792,6 +871,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestLeftOnlyWithNullKey1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -856,6 +936,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestRight1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -926,7 +1007,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } } - Y_UNIT_TEST_LLVM(TestRightOnly1) { + + Y_UNIT_TEST_LLVM(TestRightOnly1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -982,7 +1065,10 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } } + + Y_UNIT_TEST_LLVM(TestRightSemi1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1044,7 +1130,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } } + Y_UNIT_TEST_LLVM(TestRightMulti1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1116,7 +1204,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } } - Y_UNIT_TEST_LLVM(TestRightSemiWithNullKey1) { + + Y_UNIT_TEST_LLVM(TestRightSemiWithNullKey1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1181,6 +1271,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestRightOnlyWithNullKey1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1245,6 +1336,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } Y_UNIT_TEST_LLVM(TestFull1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1309,9 +1401,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "C"); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Y"); UNIT_ASSERT(iterator.Next(tuple)); - UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A"); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A"); UNIT_ASSERT(!tuple.GetElement(1)); - UNIT_ASSERT(iterator.Next(tuple)); + UNIT_ASSERT(iterator.Next(tuple)); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z"); UNIT_ASSERT(!tuple.GetElement(0)); UNIT_ASSERT(!iterator.Next(tuple)); @@ -1321,6 +1413,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { Y_UNIT_TEST_LLVM(TestExclusion1) { + for (ui32 pass = 0; pass < 1; ++pass) { TSetup<LLVM> setup; TProgramBuilder& pb = *setup.PgmBuilder; @@ -1373,9 +1466,9 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { UNIT_ASSERT(iterator.Next(tuple)); - UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A"); + UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(0), "A"); UNIT_ASSERT(!tuple.GetElement(1)); - UNIT_ASSERT(iterator.Next(tuple)); + UNIT_ASSERT(iterator.Next(tuple)); UNBOXED_VALUE_STR_EQUAL(tuple.GetElement(1), "Z"); UNIT_ASSERT(!tuple.GetElement(0)); UNIT_ASSERT(!iterator.Next(tuple)); @@ -1385,6 +1478,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLGraceJoinTest) { } + } } diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index 110b78eb312..716b7273cd7 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -47,6 +47,7 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, _FallbackOnRuntimeErrors); REGISTER_SETTING(*this, WorkerFilter); REGISTER_SETTING(*this, _EnablePrecompute); + REGISTER_SETTING(*this, EnableGraceJoin); REGISTER_SETTING(*this, EnableDqReplicate); REGISTER_SETTING(*this, WatermarksMode); REGISTER_SETTING(*this, WatermarksGranularityMs); diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index dd4b1b35077..5d0fc8aa524 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -71,7 +71,7 @@ struct TDqSettings { NCommon::TConfSetting<TString, false> _FallbackOnRuntimeErrors; NCommon::TConfSetting<bool, false> _EnablePrecompute; NCommon::TConfSetting<bool, false> EnableDqReplicate; - NCommon::TConfSetting<bool, false> _EnableGraceJoin; + NCommon::TConfSetting<bool, false> EnableGraceJoin; NCommon::TConfSetting<TString, false> WatermarksMode; NCommon::TConfSetting<ui64, false> WatermarksGranularityMs; @@ -114,6 +114,7 @@ struct TDqSettings { SAVE_SETTING(WorkerFilter); SAVE_SETTING(ComputeActorType); SAVE_SETTING(WatermarksMode); + SAVE_SETTING(EnableGraceJoin); SAVE_SETTING(WatermarksGranularityMs); #undef SAVE_SETTING diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index ddd45515bb4..8ededee8059 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -301,7 +301,8 @@ protected: TMaybeNode<TExprBase> BuildJoin(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) { auto join = node.Cast<TDqJoin>(); const TParentsMap* parentsMap = getParents(); - return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */); + bool enableGraceJoin = Config->EnableGraceJoin.Get().GetOrElse(false); + return DqBuildJoin(join, ctx, optCtx, *parentsMap, IsGlobal, /* pushLeftStage = */ false /* TODO */, enableGraceJoin); } TMaybeNode<TExprBase> BuildHasItems(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx) { diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp index f6aa1df5420..ef5458527f6 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp @@ -33,6 +33,7 @@ public: } AddHandler({TDqJoin::CallableName()}, Hndl(&NDq::AnnotateDqJoin)); AddHandler({TDqPhyMapJoin::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin)); + AddHandler({TDqPhyGraceJoin::CallableName()}, Hndl(&NDq::AnnotateDqGraceJoin)); AddHandler({TDqPhyCrossJoin::CallableName()}, Hndl(&NDq::AnnotateDqCrossJoin)); AddHandler({TDqPhyJoinDict::CallableName()}, Hndl(&NDq::AnnotateDqMapOrDictJoin)); AddHandler({TDqSink::CallableName()}, Hndl(&NDq::AnnotateDqSink)); |