aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpavelvelikhov <pavelvelikhov@yandex-team.com>2023-11-24 23:38:19 +0300
committerpavelvelikhov <pavelvelikhov@yandex-team.com>2023-11-25 00:18:33 +0300
commit248983239004802518684d9216393364e48d413a (patch)
tree72027dbe6a4ee002efe0d0d06f5ed00b6f1f5561
parentc58846d335a3191af4317b3cd973f28ce9e06606 (diff)
downloadydb-248983239004802518684d9216393364e48d413a.tar.gz
Added DP table size cutoff to the CBO
Added DP table size cutoff to the CBO
-rw-r--r--ydb/core/fq/libs/config/protos/common.proto1
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log.cpp3
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.cpp1
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_settings.h2
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp127
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_log.h2
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h2
7 files changed, 124 insertions, 14 deletions
diff --git a/ydb/core/fq/libs/config/protos/common.proto b/ydb/core/fq/libs/config/protos/common.proto
index 6c0ed75b22..ea6a994b66 100644
--- a/ydb/core/fq/libs/config/protos/common.proto
+++ b/ydb/core/fq/libs/config/protos/common.proto
@@ -27,4 +27,5 @@ message TCommonConfig {
uint64 MaxTasksPerStage = 12;
bool KeepInternalErrors = 13;
bool UseNativeProtocolForClickHouse = 14;
+ uint64 MaxDPccpDPTableSize = 15;
}
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
index 0793a40ecc..971f65bc26 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
@@ -133,7 +133,8 @@ protected:
}
TMaybeNode<TExprBase> OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) {
- TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, Config->HasOptEnableCostBasedOptimization());
+ auto maxDPccpDPTableSize = Config->MaxDPccpDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPccpDPTableSize);
+ TExprBase output = DqOptimizeEquiJoinWithCosts(node, ctx, TypesCtx, Config->HasOptEnableCostBasedOptimization(), maxDPccpDPTableSize);
DumpAppliedRule("OptimizeEquiJoinWithCosts", node.Ptr(), output.Ptr(), ctx);
return output;
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.cpp b/ydb/core/kqp/provider/yql_kikimr_settings.cpp
index 74e1439c7f..f1f86737b6 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.cpp
@@ -68,6 +68,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
REGISTER_SETTING(*this, OptUseFinalizeByKey);
REGISTER_SETTING(*this, OptEnableCostBasedOptimization);
+ REGISTER_SETTING(*this, MaxDPccpDPTableSize);
REGISTER_SETTING(*this, MaxTasksPerStage);
diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h
index b03c53df7a..13dbdc4140 100644
--- a/ydb/core/kqp/provider/yql_kikimr_settings.h
+++ b/ydb/core/kqp/provider/yql_kikimr_settings.h
@@ -62,6 +62,8 @@ struct TKikimrSettings {
NCommon::TConfSetting<bool, false> OptEnableOlapProvideComputeSharding;
NCommon::TConfSetting<bool, false> OptUseFinalizeByKey;
NCommon::TConfSetting<bool, false> OptEnableCostBasedOptimization;
+ NCommon::TConfSetting<ui32, false> MaxDPccpDPTableSize;
+
NCommon::TConfSetting<ui32, false> MaxTasksPerStage;
diff --git a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp
index fe714028b1..96e8dce4a5 100644
--- a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp
@@ -195,8 +195,8 @@ struct TJoinOptimizerNode : public IBaseOptimizerNode {
std::set<std::pair<TJoinColumn, TJoinColumn>> JoinConditions;
TString JoinType;
- TJoinOptimizerNode(std::shared_ptr<IBaseOptimizerNode> left, std::shared_ptr<IBaseOptimizerNode> right,
- const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, const TString joinType) :
+ TJoinOptimizerNode(const std::shared_ptr<IBaseOptimizerNode>& left, const std::shared_ptr<IBaseOptimizerNode>& right,
+ const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, const TString& joinType) :
IBaseOptimizerNode(JoinNodeType), LeftArg(left), RightArg(right), JoinConditions(joinConditions), JoinType(joinType) {}
virtual ~TJoinOptimizerNode() {}
@@ -463,6 +463,9 @@ public:
// Run DPccp algorithm and produce the join tree in CBO's internal representation
std::shared_ptr<TJoinOptimizerNode> Solve();
+ // Calculate the size of a dynamic programming table with a budget
+ ui32 CountCC(ui32 budget);
+
private:
// Compute the next subset of relations, given by the final bitset
@@ -509,6 +512,9 @@ private:
// Create an exclusion set that contains all the nodes of the bitset that are smaller or equal to
// the provided integer
std::bitset<N> MakeB(const std::bitset<N>&,int);
+
+ // Count the size of the dynamic programming table recursively
+ ui32 CountCCRec(const std::bitset<N>&, const std::bitset<N>&, ui32, ui32);
};
// Print tabs
@@ -789,6 +795,58 @@ template <int N> std::bitset<N> TDPccpSolver<N>::NextBitset(const std::bitset<N>
}
/**
+ * Count the number of items in the DP table of DPcpp
+*/
+template <int N> ui32 TDPccpSolver<N>::CountCC(ui32 budget) {
+ std::bitset<N> allNodes;
+ allNodes.set();
+ ui32 cost = 0;
+
+ for (int i = NNodes - 1; i >= 0; i--) {
+ cost += 1;
+ if (cost > budget) {
+ return cost;
+ }
+ std::bitset<N> S;
+ S.set(i);
+ std::bitset<N> X = MakeB(allNodes,i);
+ cost = CountCCRec(S,X,cost,budget);
+ }
+
+ return cost;
+}
+
+/**
+ * Recursively count the nuber of items in the DP table of DPccp
+*/
+template <int N> ui32 TDPccpSolver<N>::CountCCRec(const std::bitset<N>& S, const std::bitset<N>& X, ui32 cost, ui32 budget) {
+ std::bitset<N> Ns = Neighbors(S, X);
+
+ if (Ns==std::bitset<N>()) {
+ return cost;
+ }
+
+ std::bitset<N> prev;
+ std::bitset<N> next;
+
+ while(true) {
+ next = NextBitset(prev, Ns);
+ cost += 1;
+ if (cost > budget) {
+ return cost;
+ }
+ cost = CountCCRec(S | next, X | Ns, cost, budget);
+ if (next==Ns) {
+ break;
+ }
+ prev = next;
+ }
+
+ return cost;
+}
+
+
+/**
* Build a join tree that will replace the original join tree in equiJoin
* TODO: Add join implementations here
*/
@@ -901,6 +959,11 @@ bool DqCollectJoinRelationsWithStats(
return true;
}
+/**
+ * Convert JoinTuple from AST into an internal representation of a optimizer plan
+ * This procedure also hooks up rels with statistics to the leaf nodes of the plan
+ * Statistics for join nodes are not computed
+*/
std::shared_ptr<TJoinOptimizerNode> ConvertToJoinTree(const TCoEquiJoinTuple& joinTuple,
const TVector<std::shared_ptr<TRelOptimizerNode>>& rels) {
@@ -947,6 +1010,9 @@ std::shared_ptr<TJoinOptimizerNode> ConvertToJoinTree(const TCoEquiJoinTuple& jo
return std::make_shared<TJoinOptimizerNode>(left,right,joinConds,joinTuple.Type().StringValue());
}
+/**
+ * Extract all non orderable joins from a plan is a post-order traversal order
+*/
void ExtractNonOrderables(std::shared_ptr<TJoinOptimizerNode> joinTree,
TVector<std::shared_ptr<TJoinOptimizerNode>>& result) {
@@ -964,6 +1030,11 @@ void ExtractNonOrderables(std::shared_ptr<TJoinOptimizerNode> joinTree,
}
}
+/**
+ * Extract relations and join conditions from an optimizer plan
+ * If we hit a non-orderable join type in recursion, we don't recurse into it and
+ * add it as a relation
+*/
void ExtractRelsAndJoinConditions(const std::shared_ptr<TJoinOptimizerNode>& joinTree,
TVector<std::shared_ptr<IBaseOptimizerNode>> & rels,
std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions) {
@@ -991,8 +1062,25 @@ void ExtractRelsAndJoinConditions(const std::shared_ptr<TJoinOptimizerNode>& joi
}
}
+/**
+ * Recursively computes statistics for a join tree
+*/
+void ComputeStatistics(const std::shared_ptr<TJoinOptimizerNode>& join) {
+ if (join->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) {
+ ComputeStatistics(static_pointer_cast<TJoinOptimizerNode>(join->LeftArg));
+ }
+ if (join->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) {
+ ComputeStatistics(static_pointer_cast<TJoinOptimizerNode>(join->RightArg));
+ }
+ join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats, EJoinImplType::DictJoin));
+}
-std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinOptimizerNode>& joinTree) {
+/**
+ * Optimize a subtree of a plan with DPccp
+ * The root of the subtree that needs to be optimizer needs to be reorderable, otherwise we will
+ * only update the statistics for it and return it unchanged
+*/
+std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinOptimizerNode>& joinTree, ui32 maxDPccpDPTableSize) {
if (!joinTree->Reorderable()) {
joinTree->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*joinTree->LeftArg->Stats, *joinTree->RightArg->Stats, EJoinImplType::DictJoin));
return joinTree;
@@ -1008,6 +1096,14 @@ std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinO
joinGraph.AddNode(i, rels[i]->Labels());
}
+ // Check if we have more rels than DPccp can handle (64)
+ // If that's the case - don't optimize the plan and just return it with
+ // computed statistics
+ if (rels.size() >= 64) {
+ ComputeStatistics(joinTree);
+ return joinTree;
+ }
+
for (auto cond : joinConditions ) {
int fromNode = joinGraph.FindNode(cond.first.RelName);
int toNode = joinGraph.FindNode(cond.second.RelName);
@@ -1031,8 +1127,16 @@ std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinO
YQL_CLOG(TRACE, CoreDq) << str.str();
}
- // feed the graph to DPccp algorithm
TDPccpSolver<64> solver(joinGraph,rels);
+
+ // Check that the dynamic table of DPccp is not too big
+ // If it is, just compute the statistics for the join tree and return it
+ if (solver.CountCC(maxDPccpDPTableSize) >= maxDPccpDPTableSize) {
+ ComputeStatistics(joinTree);
+ return joinTree;
+ }
+
+ // feed the graph to DPccp algorithm
std::shared_ptr<TJoinOptimizerNode> result = solver.Solve();
if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE)) {
@@ -1049,14 +1153,13 @@ std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinO
* Main routine that checks:
* 1. Do we have an equiJoin
* 2. Is the cost already computed
- * 3. FIX: Are all joins InnerJoins
- * 4. Are all the costs of equiJoin inputs computed?
+ * 3. Are all the costs of equiJoin inputs computed?
*
- * Then it extracts join conditions from the join tree, constructs a join graph and
- * optimizes it with the DPccp algorithm
+ * Then it optimizes the join tree by iterating over all non-orderable nodes and optimizing their children,
+ * and finally optimizes the root of the tree
*/
TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
- bool ruleEnabled) {
+ bool ruleEnabled, ui32 maxDPccpDPTableSize) {
if (!ruleEnabled) {
return node;
@@ -1100,16 +1203,16 @@ TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx,
// For all non-orderable joins, optimize the children
for( auto join : nonOrderables ) {
if (join->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) {
- join->LeftArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->LeftArg));
+ join->LeftArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->LeftArg), maxDPccpDPTableSize);
}
if (join->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) {
- join->RightArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->RightArg));
+ join->RightArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->RightArg), maxDPccpDPTableSize);
}
join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats, EJoinImplType::DictJoin));
}
// Optimize the root
- joinTree = OptimizeSubtree(joinTree);
+ joinTree = OptimizeSubtree(joinTree, maxDPccpDPTableSize);
// rewrite the join tree and record the output statistics
TExprBase res = RearrangeEquiJoinTree(ctx, equiJoin, joinTree);
diff --git a/ydb/library/yql/dq/opt/dq_opt_log.h b/ydb/library/yql/dq/opt/dq_opt_log.h
index 22dcd8a9c2..83061e1b3d 100644
--- a/ydb/library/yql/dq/opt/dq_opt_log.h
+++ b/ydb/library/yql/dq/opt/dq_opt_log.h
@@ -19,7 +19,7 @@ NNodes::TExprBase DqRewriteAggregate(NNodes::TExprBase node, TExprContext& ctx,
NNodes::TExprBase DqRewriteTakeSortToTopSort(NNodes::TExprBase node, TExprContext& ctx, const TParentsMap& parents);
-NNodes::TExprBase DqOptimizeEquiJoinWithCosts(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool isRuleEnabled);
+NNodes::TExprBase DqOptimizeEquiJoinWithCosts(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, bool isRuleEnabled, ui32 maxDPccpDPTableSize);
NNodes::TExprBase DqOptimizeEquiJoinWithCosts(
const NNodes::TExprBase& node,
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index c1943584ff..cbeeef4f4d 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -56,6 +56,8 @@ struct TDqSettings {
static constexpr bool ExportStats = false;
static constexpr ETaskRunnerStats TaskRunnerStats = ETaskRunnerStats::Basic;
static constexpr ESpillingEngine SpillingEngine = ESpillingEngine::Disable;
+ static constexpr ui32 MaxDPccpDPTableSize = 10000U;
+
};
using TPtr = std::shared_ptr<TDqSettings>;