diff options
author | pavelvelikhov <pavelvelikhov@yandex-team.com> | 2023-11-24 23:38:19 +0300 |
---|---|---|
committer | pavelvelikhov <pavelvelikhov@yandex-team.com> | 2023-11-25 00:18:33 +0300 |
commit | 248983239004802518684d9216393364e48d413a (patch) | |
tree | 72027dbe6a4ee002efe0d0d06f5ed00b6f1f5561 | |
parent | c58846d335a3191af4317b3cd973f28ce9e06606 (diff) | |
download | ydb-248983239004802518684d9216393364e48d413a.tar.gz |
Added DP table size cutoff to the CBO
Added DP table size cutoff to the CBO
-rw-r--r-- | ydb/core/fq/libs/config/protos/common.proto | 1 | ||||
-rw-r--r-- | ydb/core/kqp/opt/logical/kqp_opt_log.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_settings.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_settings.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp | 127 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_log.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/common/yql_dq_settings.h | 2 |
7 files changed, 124 insertions, 14 deletions
diff --git a/ydb/core/fq/libs/config/protos/common.proto b/ydb/core/fq/libs/config/protos/common.proto index 6c0ed75b22..ea6a994b66 100644 --- a/ydb/core/fq/libs/config/protos/common.proto +++ b/ydb/core/fq/libs/config/protos/common.proto @@ -27,4 +27,5 @@ message TCommonConfig { uint64 MaxTasksPerStage = 12; bool KeepInternalErrors = 13; bool UseNativeProtocolForClickHouse = 14; + uint64 MaxDPccpDPTableSize = 15; } diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index 0793a40ecc..971f65bc26 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -133,7 +133,8 @@ protected: } TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) { - TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, Config->HasOptEnableCostBasedOptimization()); + auto maxDPccpDPTableSize = Config->MaxDPccpDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPccpDPTableSize); + TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, Config->HasOptEnableCostBasedOptimization(), maxDPccpDPTableSize); DumpAppliedRule("OptimizeEquiJoinWithCosts", node.Ptr(), output.Ptr(), ctx); return output; } diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp index 74e1439c7f..f1f86737b6 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp @@ -68,6 +68,7 @@ TKikimrConfiguration::TKikimrConfiguration() { REGISTER_SETTING(*this, OptUseFinalizeByKey); REGISTER_SETTING(*this, OptEnableCostBasedOptimization); + REGISTER_SETTING(*this, MaxDPccpDPTableSize); REGISTER_SETTING(*this, MaxTasksPerStage); diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index b03c53df7a..13dbdc4140 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -62,6 +62,8 @@ struct TKikimrSettings { NCommon::TConfSetting<bool, false> OptEnableOlapProvideComputeSharding; NCommon::TConfSetting<bool, false> OptUseFinalizeByKey; NCommon::TConfSetting<bool, false> OptEnableCostBasedOptimization; + NCommon::TConfSetting<ui32, false> MaxDPccpDPTableSize; + NCommon::TConfSetting<ui32, false> MaxTasksPerStage; diff --git a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp index fe714028b1..96e8dce4a5 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp @@ -195,8 +195,8 @@ struct TJoinOptimizerNode : public IBaseOptimizerNode { std::set<std::pair<TJoinColumn, TJoinColumn>> JoinConditions; TString JoinType; - TJoinOptimizerNode(std::shared_ptr<IBaseOptimizerNode> left, std::shared_ptr<IBaseOptimizerNode> right, - const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, const TString joinType) : + TJoinOptimizerNode(const std::shared_ptr<IBaseOptimizerNode>& left, const std::shared_ptr<IBaseOptimizerNode>& right, + const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, const TString& joinType) : IBaseOptimizerNode(JoinNodeType), LeftArg(left), RightArg(right), JoinConditions(joinConditions), JoinType(joinType) {} virtual ~TJoinOptimizerNode() {} @@ -463,6 +463,9 @@ public: // Run DPccp algorithm and produce the join tree in CBO's internal representation std::shared_ptr<TJoinOptimizerNode> Solve(); + // Calculate the size of a dynamic programming table with a budget + ui32 CountCC(ui32 budget); + private: // Compute the next subset of relations, given by the final bitset @@ -509,6 +512,9 @@ private: // Create an exclusion set that contains all the nodes of the bitset that are smaller or equal to // the provided integer std::bitset<N> MakeB(const std::bitset<N>&,int); + + // Count the size of the dynamic programming table recursively + ui32 CountCCRec(const std::bitset<N>&, const std::bitset<N>&, ui32, ui32); }; // Print tabs @@ -789,6 +795,58 @@ template <int N> std::bitset<N> TDPccpSolver<N>::NextBitset(const std::bitset<N> } /** + * Count the number of items in the DP table of DPcpp +*/ +template <int N> ui32 TDPccpSolver<N>::CountCC(ui32 budget) { + std::bitset<N> allNodes; + allNodes.set(); + ui32 cost = 0; + + for (int i = NNodes - 1; i >= 0; i--) { + cost += 1; + if (cost > budget) { + return cost; + } + std::bitset<N> S; + S.set(i); + std::bitset<N> X = MakeB(allNodes,i); + cost = CountCCRec(S,X,cost,budget); + } + + return cost; +} + +/** + * Recursively count the nuber of items in the DP table of DPccp +*/ +template <int N> ui32 TDPccpSolver<N>::CountCCRec(const std::bitset<N>& S, const std::bitset<N>& X, ui32 cost, ui32 budget) { + std::bitset<N> Ns = Neighbors(S, X); + + if (Ns==std::bitset<N>()) { + return cost; + } + + std::bitset<N> prev; + std::bitset<N> next; + + while(true) { + next = NextBitset(prev, Ns); + cost += 1; + if (cost > budget) { + return cost; + } + cost = CountCCRec(S | next, X | Ns, cost, budget); + if (next==Ns) { + break; + } + prev = next; + } + + return cost; +} + + +/** * Build a join tree that will replace the original join tree in equiJoin * TODO: Add join implementations here */ @@ -901,6 +959,11 @@ bool DqCollectJoinRelationsWithStats( return true; } +/** + * Convert JoinTuple from AST into an internal representation of a optimizer plan + * This procedure also hooks up rels with statistics to the leaf nodes of the plan + * Statistics for join nodes are not computed +*/ std::shared_ptr<TJoinOptimizerNode> ConvertToJoinTree(const TCoEquiJoinTuple& joinTuple, const TVector<std::shared_ptr<TRelOptimizerNode>>& rels) { @@ -947,6 +1010,9 @@ std::shared_ptr<TJoinOptimizerNode> ConvertToJoinTree(const TCoEquiJoinTuple& jo return std::make_shared<TJoinOptimizerNode>(left,right,joinConds,joinTuple.Type().StringValue()); } +/** + * Extract all non orderable joins from a plan is a post-order traversal order +*/ void ExtractNonOrderables(std::shared_ptr<TJoinOptimizerNode> joinTree, TVector<std::shared_ptr<TJoinOptimizerNode>>& result) { @@ -964,6 +1030,11 @@ void ExtractNonOrderables(std::shared_ptr<TJoinOptimizerNode> joinTree, } } +/** + * Extract relations and join conditions from an optimizer plan + * If we hit a non-orderable join type in recursion, we don't recurse into it and + * add it as a relation +*/ void ExtractRelsAndJoinConditions(const std::shared_ptr<TJoinOptimizerNode>& joinTree, TVector<std::shared_ptr<IBaseOptimizerNode>> & rels, std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions) { @@ -991,8 +1062,25 @@ void ExtractRelsAndJoinConditions(const std::shared_ptr<TJoinOptimizerNode>& joi } } +/** + * Recursively computes statistics for a join tree +*/ +void ComputeStatistics(const std::shared_ptr<TJoinOptimizerNode>& join) { + if (join->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) { + ComputeStatistics(static_pointer_cast<TJoinOptimizerNode>(join->LeftArg)); + } + if (join->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) { + ComputeStatistics(static_pointer_cast<TJoinOptimizerNode>(join->RightArg)); + } + join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats, EJoinImplType::DictJoin)); +} -std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinOptimizerNode>& joinTree) { +/** + * Optimize a subtree of a plan with DPccp + * The root of the subtree that needs to be optimizer needs to be reorderable, otherwise we will + * only update the statistics for it and return it unchanged +*/ +std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinOptimizerNode>& joinTree, ui32 maxDPccpDPTableSize) { if (!joinTree->Reorderable()) { joinTree->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*joinTree->LeftArg->Stats, *joinTree->RightArg->Stats, EJoinImplType::DictJoin)); return joinTree; @@ -1008,6 +1096,14 @@ std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinO joinGraph.AddNode(i, rels[i]->Labels()); } + // Check if we have more rels than DPccp can handle (64) + // If that's the case - don't optimize the plan and just return it with + // computed statistics + if (rels.size() >= 64) { + ComputeStatistics(joinTree); + return joinTree; + } + for (auto cond : joinConditions ) { int fromNode = joinGraph.FindNode(cond.first.RelName); int toNode = joinGraph.FindNode(cond.second.RelName); @@ -1031,8 +1127,16 @@ std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinO YQL_CLOG(TRACE, CoreDq) << str.str(); } - // feed the graph to DPccp algorithm TDPccpSolver<64> solver(joinGraph,rels); + + // Check that the dynamic table of DPccp is not too big + // If it is, just compute the statistics for the join tree and return it + if (solver.CountCC(maxDPccpDPTableSize) >= maxDPccpDPTableSize) { + ComputeStatistics(joinTree); + return joinTree; + } + + // feed the graph to DPccp algorithm std::shared_ptr<TJoinOptimizerNode> result = solver.Solve(); if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE)) { @@ -1049,14 +1153,13 @@ std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinO * Main routine that checks: * 1. Do we have an equiJoin * 2. Is the cost already computed - * 3. FIX: Are all joins InnerJoins - * 4. Are all the costs of equiJoin inputs computed? + * 3. Are all the costs of equiJoin inputs computed? * - * Then it extracts join conditions from the join tree, constructs a join graph and - * optimizes it with the DPccp algorithm + * Then it optimizes the join tree by iterating over all non-orderable nodes and optimizing their children, + * and finally optimizes the root of the tree */ TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, - bool ruleEnabled) { + bool ruleEnabled, ui32 maxDPccpDPTableSize) { if (!ruleEnabled) { return node; @@ -1100,16 +1203,16 @@ TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx, // For all non-orderable joins, optimize the children for( auto join : nonOrderables ) { if (join->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) { - join->LeftArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->LeftArg)); + join->LeftArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->LeftArg), maxDPccpDPTableSize); } if (join->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) { - join->RightArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->RightArg)); + join->RightArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->RightArg), maxDPccpDPTableSize); } join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats, EJoinImplType::DictJoin)); } // Optimize the root - joinTree = OptimizeSubtree(joinTree); + joinTree = OptimizeSubtree(joinTree, maxDPccpDPTableSize); // rewrite the join tree and record the output statistics TExprBase res = RearrangeEquiJoinTree(ctx, equiJoin, joinTree); diff --git a/ydb/library/yql/dq/opt/dq_opt_log.h b/ydb/library/yql/dq/opt/dq_opt_log.h index 22dcd8a9c2..83061e1b3d 100644 --- a/ydb/library/yql/dq/opt/dq_opt_log.h +++ b/ydb/library/yql/dq/opt/dq_opt_log.h @@ -19,7 +19,7 @@ NNodes::TExprBase DqRewriteAggregate(NNodes::TExprBase node, TExprContext& ctx, NNodes::TExprBase DqRewriteTakeSortToTopSort(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents); -NNodes::TExprBase DqOptimizeEquiJoinWithCosts(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool isRuleEnabled); +NNodes::TExprBase DqOptimizeEquiJoinWithCosts(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool isRuleEnabled, ui32 maxDPccpDPTableSize); NNodes::TExprBase DqOptimizeEquiJoinWithCosts( const NNodes::TExprBase& node, diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index c1943584ff..cbeeef4f4d 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -56,6 +56,8 @@ struct TDqSettings { static constexpr bool ExportStats = false; static constexpr ETaskRunnerStats TaskRunnerStats = ETaskRunnerStats::Basic; static constexpr ESpillingEngine SpillingEngine = ESpillingEngine::Disable; + static constexpr ui32 MaxDPccpDPTableSize = 10000U; + }; using TPtr = std::shared_ptr<TDqSettings>; |