diff options
author | pilik <96681992+pashandor789@users.noreply.github.com> | 2024-04-17 16:17:44 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-17 16:17:44 +0300 |
commit | b2c3e7dc88170527cbfaef31f71518c7ee7c4445 (patch) | |
tree | 5b3c68d82a37ae1c56822a0fcc05a802ee5038e8 | |
parent | 62d05d286a413aa242aa3ed83365943de8c3906f (diff) | |
download | ydb-b2c3e7dc88170527cbfaef31f71518c7ee7c4445.tar.gz |
[DPHyp] Impl. added (#3763)
Co-authored-by: Pavel Ivanov <pudge1000-7@mr-nvme-testing-11.search.yandex.net>
-rw-r--r-- | .github/config/muted_ya.txt | 3 | ||||
-rw-r--r-- | ydb/library/yql/core/cbo/cbo_optimizer_new.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/bitset.h | 76 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.cpp | 86 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.h | 148 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_dphyp_solver.h | 480 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp | 1316 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_join_cost_based.h | 29 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_join_hypergraph.h | 190 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_join_tree_node.cpp | 44 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_join_tree_node.h | 80 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_make_join_hypergraph.h | 98 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/ya.make | 4 |
13 files changed, 1357 insertions, 1199 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index fd595c8922..6fd1dafaf5 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -34,7 +34,10 @@ ydb/core/viewer/ut Viewer.TabletMerging ydb/core/viewer/ut Viewer.TabletMergingPacked ydb/library/actors/http/ut HttpProxy.TooLongHeader ydb/library/actors/http/ut sole* +ydb/library/yql/dq/opt/ut DQCBO.JoinSearch3Rels ydb/library/yql/providers/generic/connector/tests* * +ydb/library/yql/providers/yt/provider/ut TYqlCBO.NonReordable +ydb/library/yql/tests/sql/dq_file/part17 *dq-join_cbo_native_3_tables--* ydb/public/lib/ydb_cli/topic/ut TTopicReaderTests.TestRun_ReadOneMessage ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut RetryPolicy.RetryWithBatching ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut RetryPolicy.TWriteSession_TestBrokenPolicy diff --git a/ydb/library/yql/core/cbo/cbo_optimizer_new.cpp b/ydb/library/yql/core/cbo/cbo_optimizer_new.cpp index d6187d619d..aa010f02d8 100644 --- a/ydb/library/yql/core/cbo/cbo_optimizer_new.cpp +++ b/ydb/library/yql/core/cbo/cbo_optimizer_new.cpp @@ -70,7 +70,7 @@ TJoinOptimizerNode::TJoinOptimizerNode(const std::shared_ptr<IBaseOptimizerNode> JoinConditions(joinConditions), JoinType(joinType), JoinAlgo(joinAlgo) { - IsReorderable = (JoinType==EJoinKind::InnerJoin) && (nonReorderable==false); + IsReorderable = !nonReorderable; for (auto [l,r] : joinConditions ) { LeftJoinKeys.push_back(l.AttributeName); RightJoinKeys.push_back(r.AttributeName); diff --git a/ydb/library/yql/dq/opt/bitset.h b/ydb/library/yql/dq/opt/bitset.h new file mode 100644 index 0000000000..9edad03adc --- /dev/null +++ b/ydb/library/yql/dq/opt/bitset.h @@ -0,0 +1,76 @@ +#pragma once + +#include <stdlib.h> + +/* + * This header contains helper functions for working with bitsets. + * They are templated by TNodeSet, which is a std::bitset<>. + * We use the the template for efficiency: for 64 bit nodesets we implement a faster next subset functionality + */ + +namespace NYql::NDq { + +template <typename TNodeSet> +inline bool Overlaps(const TNodeSet& lhs, const TNodeSet& rhs) { + return (lhs & rhs) != 0; +} + +template <typename TNodeSet> +inline bool IsSubset(const TNodeSet& lhs, const TNodeSet& rhs) { + return (lhs & rhs) == lhs; +} + +template <typename TNodeSet> +inline bool HasSingleBit(TNodeSet nodeSet) { + return nodeSet.count() == 1; +} + +template <typename TNodeSet> +inline size_t GetLowestSetBit(TNodeSet nodeSet) { + for (size_t i = 0; i < nodeSet.size(); ++i) { + if (nodeSet[i]) { + return i; + } + } + + Y_ASSERT(false); + return nodeSet.size(); +} + +/* Iterates the indecies of the set bits in the TNodeSet. */ +template <typename TNodeSet> +class TSetBitsIt { +public: + TSetBitsIt(TNodeSet nodeSet) + : NodeSet_(nodeSet) + , Size_(nodeSet.size()) + , BitId_(0) + { + SkipUnsetBits(); + } + + bool HasNext() { + return BitId_ < Size_; + } + + size_t Next() { + size_t bitId = BitId_++; + SkipUnsetBits(); + + return bitId; + } + +private: + void SkipUnsetBits() { + while (BitId_ < Size_ && !NodeSet_[BitId_]) { + ++BitId_; + } + } + +private: + TNodeSet NodeSet_; + size_t Size_; + size_t BitId_; +}; + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.cpp b/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.cpp new file mode 100644 index 0000000000..b58b8ba130 --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.cpp @@ -0,0 +1,86 @@ +#include "dq_opt_conflict_rules_collector.h" +#include <util/generic/hash_set.h> + +namespace NYql::NDq { + +/* To make ASSOC, RASSCOM, LASSCOM tables simplier */ +EJoinKind GetEquivalentJoinByAlgebraicProperties(EJoinKind joinKind) { + switch (joinKind) { + case EJoinKind::Exclusion: + return EJoinKind::InnerJoin; + case EJoinKind::LeftOnly: + return EJoinKind::LeftJoin; + default: + return joinKind; + } +} + +bool OperatorIsCommutative(EJoinKind joinKind) { + joinKind = GetEquivalentJoinByAlgebraicProperties(joinKind); + switch (joinKind) { + case EJoinKind::InnerJoin: + case EJoinKind::OuterJoin: + case EJoinKind::Cross: + return true; + default: + return false; + } + + Y_UNREACHABLE(); +} + +bool OperatorsAreAssociative(EJoinKind lhs, EJoinKind rhs) { + lhs = GetEquivalentJoinByAlgebraicProperties(lhs); + rhs = GetEquivalentJoinByAlgebraicProperties(rhs); + + static THashMap<EJoinKind, THashSet<EJoinKind>> ASSOC_TABLE = { + {EJoinKind::Cross, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin}}, + {EJoinKind::InnerJoin, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin}}, + {EJoinKind::LeftJoin, {EJoinKind::LeftJoin}}, + {EJoinKind::OuterJoin, {EJoinKind::LeftJoin, EJoinKind::OuterJoin}} + }; + + if (!(ASSOC_TABLE.contains(lhs))) { + return false; + } + + return ASSOC_TABLE[lhs].contains(rhs); +} + +bool OperatorsAreLeftAsscom(EJoinKind lhs, EJoinKind rhs) { + lhs = GetEquivalentJoinByAlgebraicProperties(lhs); + rhs = GetEquivalentJoinByAlgebraicProperties(rhs); + + static THashMap<EJoinKind, THashSet<EJoinKind>> LASSCOM_TABLE = { + {EJoinKind::Cross, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin}}, + {EJoinKind::InnerJoin, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin}}, + {EJoinKind::LeftSemi, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin}}, + {EJoinKind::LeftJoin, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin, EJoinKind::OuterJoin}}, + {EJoinKind::OuterJoin, {EJoinKind::LeftJoin, EJoinKind::OuterJoin}} + }; + + if (!(LASSCOM_TABLE.contains(lhs))) { + return false; + } + + return LASSCOM_TABLE[lhs].contains(rhs); +} + +bool OperatorsAreRightAsscom(EJoinKind lhs, EJoinKind rhs) { + lhs = GetEquivalentJoinByAlgebraicProperties(lhs); + rhs = GetEquivalentJoinByAlgebraicProperties(rhs); + + static THashMap<EJoinKind, THashSet<EJoinKind>> RASSCOM_TABLE = { + {EJoinKind::Cross, {EJoinKind::Cross, EJoinKind::InnerJoin}}, + {EJoinKind::InnerJoin, {EJoinKind::Cross, EJoinKind::InnerJoin}}, + {EJoinKind::OuterJoin, {EJoinKind::OuterJoin}} + }; + + if (!(RASSCOM_TABLE.contains(lhs))) { + return false; + } + + return RASSCOM_TABLE[lhs].contains(rhs); +} + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.h b/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.h new file mode 100644 index 0000000000..9cbe532e92 --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.h @@ -0,0 +1,148 @@ +#pragma once + +#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h> + +/* + * This header contains an algorithm for resolving join conflicts with TConflictRulesCollector class + * and ConvertConflictRulesIntoTES function, which are used to construct the hypergraph. + */ + +namespace NYql::NDq { + +bool OperatorIsCommutative(EJoinKind); + +bool OperatorsAreAssociative(EJoinKind, EJoinKind); + +/* (e1 o12 e3) o13 e3 == (e1 o13 e3) o12 e2 */ +bool OperatorsAreLeftAsscom(EJoinKind, EJoinKind); + +/* e1 o13 (e2 o23 e3) == e2 o23 (e1 o13 e3) */ +bool OperatorsAreRightAsscom(EJoinKind, EJoinKind); + +template <typename TNodeSet> +struct TConflictRule { + TConflictRule(const TNodeSet& ruleActivationNodes, const TNodeSet& requiredNodes) + : RuleActivationNodes(ruleActivationNodes) + , RequiredNodes(requiredNodes) + {} + + TNodeSet RuleActivationNodes; + TNodeSet RequiredNodes; +}; + +/* + * This class finds and collect conflicts between root of subtree and its nodes. + * It traverses both sides of root and checks algebraic join properties (ASSOC, LASSCOM, RASSCOM). + * The name of algorithm is "CD-C", and details are described in white papper - + * - "On the Correct and Complete Enumeration of the Core Search Space" in section "5.4 Approach CD-C". + */ +template<typename TNodeSet> +class TConflictRulesCollector { +public: + TConflictRulesCollector( + std::shared_ptr<TJoinOptimizerNode> root, + std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& subtreeNodes + ) + : Root_(root) + , ConflictRules_({}) + , SubtreeNodes_(subtreeNodes) + {} + + TVector<TConflictRule<TNodeSet>> CollectConflicts() { + VisitJoinTree(Root_->LeftArg, GetLeftConflictsVisitor()); + VisitJoinTree(Root_->RightArg, GetRightConflictsVisitor()); + return std::move(ConflictRules_); + } + +private: + auto GetLeftConflictsVisitor() { + auto visitor = [this](const std::shared_ptr<TJoinOptimizerNode>& child) { + if (!OperatorsAreAssociative(child->JoinType, Root_->JoinType) || !Root_->IsReorderable || !child->IsReorderable) { + ConflictRules_.emplace_back( + SubtreeNodes_[child->RightArg], + SubtreeNodes_[child->LeftArg] + ); + } + + if (!OperatorsAreLeftAsscom(child->JoinType, Root_->JoinType) || !Root_->IsReorderable || !child->IsReorderable) { + ConflictRules_.emplace_back( + SubtreeNodes_[child->LeftArg], + SubtreeNodes_[child->RightArg] + ); + } + }; + + return visitor; + } + + auto GetRightConflictsVisitor() { + auto visitor = [this](const std::shared_ptr<TJoinOptimizerNode>& child) { + if (!OperatorsAreAssociative(Root_->JoinType, child->JoinType) || !Root_->IsReorderable || !child->IsReorderable) { + ConflictRules_.emplace_back( + SubtreeNodes_[child->LeftArg], + SubtreeNodes_[child->RightArg] + ); + } + + if (!OperatorsAreRightAsscom(Root_->JoinType, child->JoinType) || !Root_->IsReorderable || !child->IsReorderable) { + ConflictRules_.emplace_back( + SubtreeNodes_[child->RightArg], + SubtreeNodes_[child->LeftArg] + ); + } + }; + + return visitor; + } + +private: + template <typename TFunction> + void VisitJoinTree(const std::shared_ptr<IBaseOptimizerNode>& child, TFunction visitor) { + if (child->Kind == EOptimizerNodeKind::RelNodeType) { + return; + } + + auto childJoinNode = std::static_pointer_cast<TJoinOptimizerNode>(child); + VisitJoinTree(childJoinNode->LeftArg, visitor); + VisitJoinTree(childJoinNode->RightArg, visitor); + + visitor(childJoinNode); + } + +private: + std::shared_ptr<TJoinOptimizerNode> Root_; + TVector<TConflictRule<TNodeSet>> ConflictRules_; + std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& SubtreeNodes_; +}; + +/* + * This function converts conflict rules into TES. + * TES (Total Eligibility Set) captures reordering constraints and represents + * set of table, that must present, before join expresion can be evaluated. + * It is initialized with SES (Syntatic Eligibility Set) - condition used tables. + */ +template <typename TNodeSet> +TNodeSet ConvertConflictRulesIntoTES(const TNodeSet& SES, TVector<TConflictRule<TNodeSet>>& conflictRules) { + auto TES = SES; + + while (true) { + auto prevTES = TES; + + for (const auto& conflictRule: conflictRules) { + if (Overlaps(conflictRule.RuleActivationNodes, TES)) { + TES |= conflictRule.RequiredNodes; + } + } + + EraseIf( + conflictRules, + [&](const TConflictRule<TNodeSet>& conflictRule){ return IsSubset(conflictRule.RequiredNodes, TES); } + ); + + if (TES == prevTES || conflictRules.empty()) { + return TES; + } + } +} + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_dphyp_solver.h b/ydb/library/yql/dq/opt/dq_opt_dphyp_solver.h new file mode 100644 index 0000000000..1c35a8f49c --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_dphyp_solver.h @@ -0,0 +1,480 @@ +#pragma once + +#include "dq_opt_join_hypergraph.h" +#include "dq_opt_join_tree_node.h" +#include "bitset.h" + +namespace NYql::NDq { + +#ifdef DEBUG + struct pair_hash { + template <class T1, class T2> + std::size_t operator () (const std::pair<T1,T2> &p) const { + auto h1 = std::hash<T1>{}(p.first); + auto h2 = std::hash<T2>{}(p.second); + + // Mainly for demonstration purposes, i.e. works but is overly simple + // In the real world, use sth. like boost.hash_combine + return h1 ^ h2; + } + }; +#endif + +/* + * DPHyp (Dynamic Programming with Hypergraph) is a graph-aware + * join eumeration algorithm that only considers CSGs (Connected Sub-Graphs) of + * the join graph and computes CMPs (Complement pairs) that are also connected + * subgraphs of the join graph. It enumerates CSGs in the order, such that subsets + * are enumerated first and no duplicates are ever enumerated. Then, for each emitted + * CSG it computes the complements with the same conditions - they much already be + * present in the dynamic programming table and no pair should be enumerated twice. + * Details are described in white papper - "Dynamic Programming Strikes Back". + * + * This class is templated by std::bitset with the largest number of joins we can process + * or std::bitset<64>, which has a more efficient implementation of enumerating subsets of set. + */ +template <typename TNodeSet> +class TDPHypSolver { +public: + TDPHypSolver( + TJoinHypergraph<TNodeSet>& graph, + IProviderContext& ctx + ) + : Graph_(graph) + , NNodes_(graph.GetNodes().size()) + , Pctx_(ctx) + {} + + // Run DPHyp algorithm and produce the join tree in CBO's internal representation + std::shared_ptr<TJoinOptimizerNodeInternal> Solve(); + + // Calculate the size of a dynamic programming table with a budget + ui32 CountCC(ui32 budget); + +private: + void EnumerateCsgRec(const TNodeSet& s1, const TNodeSet& x); + + void EmitCsg(const TNodeSet& s1); + + void EnumerateCmpRec(const TNodeSet& s1, const TNodeSet& s2, const TNodeSet& x); + + void EmitCsgCmp(const TNodeSet& s1, const TNodeSet& s2, const TJoinHypergraph<TNodeSet>::TEdge* csgCmpEdge); + +private: + // Create an exclusion set that contains all the nodes of the graph that are smaller or equal to + // the smallest node in the provided bitset + inline TNodeSet MakeBiMin(const TNodeSet& s); + + // Create an exclusion set that contains all the nodes of the bitset that are smaller or equal to + // the provided integer + inline TNodeSet MakeB(const TNodeSet& s, size_t v); + + // Compute the neighbors of a set of nodes, excluding the nodes in exclusion set + TNodeSet Neighs(TNodeSet s, TNodeSet x); + + // Compute the next subset of relations, given by the final bitset + TNodeSet NextBitset(const TNodeSet& current, const TNodeSet& final); + + // Iterate over all join algorithms and pick the best join that is applicable. + // Also considers commuting joins + std::shared_ptr<TJoinOptimizerNodeInternal> PickBestJoin( + const std::shared_ptr<IBaseOptimizerNode>& left, + const std::shared_ptr<IBaseOptimizerNode>& right, + EJoinKind joinKind, + bool isCommutative, + const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, + const std::set<std::pair<TJoinColumn, TJoinColumn>>& reversedJoinConditions, + const TVector<TString>& leftJoinKeys, + const TVector<TString>& rightJoinKeys, + IProviderContext& ctx + ); + + // Count the size of the dynamic programming table recursively + ui32 CountCCRec(const TNodeSet&, const TNodeSet&, ui32, ui32); + +private: + TJoinHypergraph<TNodeSet>& Graph_; + size_t NNodes_; + IProviderContext& Pctx_; // Provider specific contexts? + // FIXME: This is a temporary structure that needs to be extended to multiple providers. + #ifdef DEBUG + THashMap<std::pair<TNodeSet, TNodeSet>, bool, pair_hash> CheckTable_; + #endif +private: + THashMap<TNodeSet, std::shared_ptr<IBaseOptimizerNode>, std::hash<TNodeSet>> DpTable_; +}; + +/* + * Count the number of items in the DP table of DPHyp + */ +template <typename TNodeSet> ui32 TDPHypSolver<TNodeSet>::CountCC(ui32 budget) { + TNodeSet allNodes; + allNodes.set(); + ui32 cost = 0; + + for (int i = NNodes_ - 1; i >= 0; --i) { + ++cost; + if (cost > budget) { + return cost; + } + TNodeSet s; + s[i] = 1; + TNodeSet x = MakeB(allNodes,i); + cost = CountCCRec(s, x, cost, budget); + } + + return cost; +} + +/** + * Recursively count the nuber of items in the DP table of DPccp +*/ +template <typename TNodeSet> ui32 TDPHypSolver<TNodeSet>::CountCCRec(const TNodeSet& s, const TNodeSet& x, ui32 cost, ui32 budget) { + TNodeSet neighs = Neighs(s, x); + + if (neighs == TNodeSet{}) { + return cost; + } + + TNodeSet prev; + TNodeSet next; + + while(true) { + next = NextBitset(prev, neighs); + cost += 1; + if (cost > budget) { + return cost; + } + cost = CountCCRec(s | next, x | neighs, cost, budget); + if (next == neighs) { + break; + } + prev = next; + } + + return cost; +} + +template<typename TNodeSet> TNodeSet TDPHypSolver<TNodeSet>::Neighs(TNodeSet s, TNodeSet x) { + TNodeSet neighs{}; + + auto& nodes = Graph_.GetNodes(); + + TSetBitsIt<TNodeSet> setBitsIt(s); + while (setBitsIt.HasNext()) { + size_t nodeId = setBitsIt.Next(); + + neighs |= nodes[nodeId].SimpleNeighborhood; + + for (const auto& edgeId: nodes[nodeId].ComplexEdgesId) { + auto& edge = Graph_.GetEdge(edgeId); + if ( + IsSubset(edge.Left, s) && + !Overlaps(edge.Right, x) && + !Overlaps(edge.Right, s) && + !Overlaps(edge.Right, neighs) + ) { + neighs[GetLowestSetBit(edge.Right)] = 1; + } + } + } + + neighs &= ~x; + return neighs; +} + +template<> +inline std::bitset<64> TDPHypSolver<std::bitset<64>>::NextBitset(const std::bitset<64>& prev, const std::bitset<64>& final) { + return std::bitset<64>((prev | ~final).to_ulong() + 1) & final; +} + +template<typename TNodeSet> TNodeSet TDPHypSolver<TNodeSet>::NextBitset(const TNodeSet& prev, const TNodeSet& final) { + if (prev == final) { + return final; + } + + TNodeSet res = prev; + + bool carry = true; + for (size_t i = 0; i < NNodes_; i++) + { + if (!carry) { + break; + } + + if (!final[i]) { + continue; + } + + if (res[i] == 1 && carry) { + res[i] = 0; + } else if (res[i] == 0 && carry) { + res[i] = 1; + carry = false; + } + } + + return res; +} + +template<typename TNodeSet> std::shared_ptr<TJoinOptimizerNodeInternal> TDPHypSolver<TNodeSet>::Solve() { + auto& nodes = Graph_.GetNodes(); + + Y_ASSERT(nodes.size() == NNodes_); + + for (int i = NNodes_ - 1; i >= 0; --i) { + TNodeSet s{}; + s[i] = 1; + DpTable_[s] = nodes[i].RelationOptimizerNode; + } + + for (int i = NNodes_ - 1; i >= 0; --i) { + TNodeSet s{}; + s[i] = 1; + EmitCsg(s); + auto bi = MakeBiMin(s); + EnumerateCsgRec(s, bi); + } + + TNodeSet allNodes{}; + for (size_t i = 0; i < NNodes_; ++i) { + allNodes[i] = 1; + } + + Y_ASSERT(DpTable_.contains(allNodes)); + + return std::static_pointer_cast<TJoinOptimizerNodeInternal>(DpTable_[allNodes]); +} + +/* + * Enumerates connected subgraphs + * First it emits CSGs that are created by adding neighbors of S to S + * Then it recurses on the S fused with its neighbors. + */ +template <typename TNodeSet> void TDPHypSolver<TNodeSet>::EnumerateCsgRec(const TNodeSet& s1, const TNodeSet& x) { + TNodeSet neighs = Neighs(s1, x); + + if (neighs == TNodeSet{}) { + return; + } + + TNodeSet prev{}; + TNodeSet next{}; + + while (true) { + next = NextBitset(prev, neighs); + + if (DpTable_.contains(s1 | next)) { + EmitCsg(s1 | next); + } + + if (next == neighs) { + break; + } + prev = next; + } + + prev.reset(); + while (true) { + next = NextBitset(prev, neighs); + + EnumerateCsgRec(s1 | next, x | neighs); + + if (next == neighs) { + break; + + } + + prev = next; + } +} + +/* + * EmitCsg emits Connected SubGraphs + * First it iterates through neighbors of the initial set S and emits pairs + * (S,S2), where S2 is the neighbor of S. Then it recursively emits complement pairs + */ +template <typename TNodeSet> void TDPHypSolver<TNodeSet>::EmitCsg(const TNodeSet& s1) { + TNodeSet x = s1 | MakeBiMin(s1); + TNodeSet neighs = Neighs(s1, x); + + if (neighs == TNodeSet{}) { + return; + } + + for (int i = NNodes_ - 1; i >= 0; i--) { + if (neighs[i]) { + TNodeSet s2{}; + s2[i] = 1; + + if (auto* edge = Graph_.FindEdgeBetween(s1, s2)) { + EmitCsgCmp(s1, s2, edge); + } + + EnumerateCmpRec(s1, s2, x | MakeB(neighs, GetLowestSetBit(s2))); + } + } +} + +/* + * Enumerates complement pairs + * First it emits the pairs (S1,S2+next) where S2+next is the set of relation sets + * that are obtained by adding S2's neighbors to itself + * Then it recusrses into pairs (S1,S2+next) + */ +template <typename TNodeSet> void TDPHypSolver<TNodeSet>::EnumerateCmpRec(const TNodeSet& s1, const TNodeSet& s2, const TNodeSet& x) { + TNodeSet neighs = Neighs(s2, x); + + if (neighs == TNodeSet{}) { + return; + } + + TNodeSet prev{}; + TNodeSet next{}; + + while (true) { + next = NextBitset(prev, neighs); + + if (DpTable_.contains(s2 | next)) { + if (auto* edge = Graph_.FindEdgeBetween(s1, s2 | next)) { + EmitCsgCmp(s1, s2 | next, edge); + } + } + + if (next == neighs) { + break; + } + + prev = next; + } + + prev.reset(); + while (true) { + next = NextBitset(prev, neighs); + + EnumerateCmpRec(s1, s2 | next, x | neighs); + + if (next == neighs) { + break; + } + + prev = next; + } +} + +template <typename TNodeSet> TNodeSet TDPHypSolver<TNodeSet>::MakeBiMin(const TNodeSet& s) { + TNodeSet res{}; + + for (size_t i = 0; i < NNodes_; i++) { + if (s[i]) { + for (size_t j = 0; j <= i; j++) { + res[j] = 1; + } + break; + } + } + return res; +} + +template <typename TNodeSet> TNodeSet TDPHypSolver<TNodeSet>::MakeB(const TNodeSet& s, size_t v) { + TNodeSet res{}; + + for (size_t i = 0; i < NNodes_; i++) { + if (s[i] && i <= v) { + res[i] = 1; + } + } + + return res; +} + +template <typename TNodeSet> std::shared_ptr<TJoinOptimizerNodeInternal> TDPHypSolver<TNodeSet>::PickBestJoin( + const std::shared_ptr<IBaseOptimizerNode>& left, + const std::shared_ptr<IBaseOptimizerNode>& right, + EJoinKind joinKind, + bool isCommutative, + const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, + const std::set<std::pair<TJoinColumn, TJoinColumn>>& reversedJoinConditions, + const TVector<TString>& leftJoinKeys, + const TVector<TString>& rightJoinKeys, + IProviderContext& ctx +) { + double bestCost = std::numeric_limits<double>::infinity(); + EJoinAlgoType bestAlgo{}; + bool bestJoinIsReversed = false; + + for (auto joinAlgo : AllJoinAlgos) { + if (ctx.IsJoinApplicable(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinAlgo)){ + auto cost = ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, joinAlgo, ctx).Cost; + if (cost < bestCost) { + bestCost = cost; + bestAlgo = joinAlgo; + bestJoinIsReversed = false; + } + } + + if (isCommutative) { + if (ctx.IsJoinApplicable(right, left, reversedJoinConditions, rightJoinKeys, leftJoinKeys, joinAlgo)){ + auto cost = ComputeJoinStats(*right->Stats, *left->Stats, rightJoinKeys, leftJoinKeys, joinAlgo, ctx).Cost; + if (cost < bestCost) { + bestCost = cost; + bestAlgo = joinAlgo; + bestJoinIsReversed = true; + } + } + } + } + + Y_ENSURE(bestCost != std::numeric_limits<double>::infinity(), "No join was chosen!"); + + if (bestJoinIsReversed) { + return MakeJoinInternal(right, left, reversedJoinConditions, rightJoinKeys, leftJoinKeys, joinKind, bestAlgo, ctx); + } + + return MakeJoinInternal(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinKind, bestAlgo, ctx); +} + +/* + * Emit a single CSG + CMP pair + */ +template<typename TNodeSet> void TDPHypSolver<TNodeSet>::EmitCsgCmp(const TNodeSet& s1, const TNodeSet& s2, const TJoinHypergraph<TNodeSet>::TEdge* csgCmpEdge) { + // Here we actually build the join and choose and compare the + // new plan to what's in the dpTable, if it there + + Y_ENSURE(DpTable_.contains(s1), "DP Table does not contain S1"); + Y_ENSURE(DpTable_.contains(s2), "DP Table does not conaint S2"); + + const auto* reversedEdge = &Graph_.GetEdge(csgCmpEdge->ReversedEdgeId); + auto leftNodes = DpTable_[s1]; + auto rightNodes = DpTable_[s2]; + + if (csgCmpEdge->IsReversed) { + std::swap(csgCmpEdge, reversedEdge); + std::swap(leftNodes, rightNodes); + } + + auto bestJoin = PickBestJoin( + leftNodes, + rightNodes, + csgCmpEdge->JoinKind, + csgCmpEdge->IsCommutative, + csgCmpEdge->JoinConditions, + reversedEdge->JoinConditions, + csgCmpEdge->LeftJoinKeys, + csgCmpEdge->RightJoinKeys, + Pctx_ + ); + + TNodeSet joined = s1 | s2; + if (!DpTable_.contains(joined) || bestJoin->Stats->Cost < DpTable_[joined]->Stats->Cost) { + DpTable_[joined] = bestJoin; + } + + #ifdef DEBUG + auto pair = std::make_pair(s1, s2); + Y_ENSURE (!CheckTable_.contains(pair), "Check table already contains pair S1|S2"); + CheckTable_[ std::pair<TNodeSet,TNodeSet>(s1, s2) ] = true; + #endif +} + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp index e0871a7a7c..809c88611f 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp @@ -1,117 +1,89 @@ -#include "dq_opt_join.h" -#include "dq_opt_phy.h" +#include "dq_opt_join_cost_based.h" +#include "dq_opt_dphyp_solver.h" +#include "dq_opt_make_join_hypergraph.h" -#include <ydb/library/yql/core/yql_join.h> -#include <ydb/library/yql/core/yql_opt_utils.h> -#include <ydb/library/yql/dq/type_ann/dq_type_ann.h> +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/dq/opt/dq_opt.h> #include <ydb/library/yql/utils/log/log.h> -#include <ydb/library/yql/providers/common/provider/yql_provider.h> -#include <ydb/library/yql/core/yql_type_helpers.h> -#include <ydb/library/yql/core/yql_statistics.h> -#include <ydb/library/yql/core/yql_cost_function.h> - -#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h> //new interface - -#include <library/cpp/disjoint_sets/disjoint_sets.h> - - -#include <bitset> -#include <set> -#include <unordered_map> -#include <unordered_set> -#include <vector> -#include <queue> -#include <memory> -#include <sstream> namespace NYql::NDq { - using namespace NYql::NNodes; -/** - * Edge structure records an edge in a Join graph. - * - from is the integer id of the source vertex of the graph - * - to is the integer id of the target vertex of the graph - * - joinConditions records the set of join conditions of this edge -*/ -struct TEdge { - int From; - int To; - mutable std::set<std::pair<TJoinColumn, TJoinColumn>> JoinConditions; - mutable TVector<TString> LeftJoinKeys; - mutable TVector<TString> RightJoinKeys; - - TEdge(): From(-1), To(-1) {} - - TEdge(int f, int t): From(f), To(t) {} - - TEdge(int f, int t, std::pair<TJoinColumn, TJoinColumn> cond): From(f), To(t) { - JoinConditions.insert(cond); - BuildCondVectors(); +/* + * Collects EquiJoin inputs with statistics for cost based optimization + */ +bool DqCollectJoinRelationsWithStats( + TVector<std::shared_ptr<TRelOptimizerNode>>& rels, + TTypeAnnotationContext& typesCtx, + const TCoEquiJoin& equiJoin, + const TProviderCollectFunction& collector +) { + if (equiJoin.ArgCount() < 3) { + return false; } - TEdge(int f, int t, std::set<std::pair<TJoinColumn, TJoinColumn>> conds): From(f), To(t), - JoinConditions(conds) { - BuildCondVectors(); - } - - void BuildCondVectors() { - LeftJoinKeys.clear(); - RightJoinKeys.clear(); - - for (auto [left, right] : JoinConditions) { - auto leftKey = left.AttributeName; - auto rightKey = right.AttributeName; - - for (size_t i = leftKey.size() - 1; i>0; i--) { - if (leftKey[i]=='.') { - leftKey = leftKey.substr(i+1); - break; - } - } + for (size_t i = 0; i < equiJoin.ArgCount() - 2; ++i) { + auto input = equiJoin.Arg(i).Cast<TCoEquiJoinInput>(); + auto joinArg = input.List(); - for (size_t i = rightKey.size() - 1; i>0; i--) { - if (rightKey[i]=='.') { - rightKey = rightKey.substr(i+1); - break; - } - } + auto maybeStat = typesCtx.StatisticsMap.find(joinArg.Raw()); - LeftJoinKeys.emplace_back(leftKey); - RightJoinKeys.emplace_back(rightKey); + if (maybeStat == typesCtx.StatisticsMap.end()) { + YQL_CLOG(TRACE, CoreDq) << "Didn't find statistics for scope " << input.Scope().Cast<TCoAtom>().StringValue() << "\n"; + return false; } - } - - bool operator==(const TEdge& other) const - { - return From==other.From && To==other.To; - } - struct HashFunction - { - size_t operator()(const TEdge& e) const - { - return e.From + e.To; + auto scope = input.Scope(); + if (!scope.Maybe<TCoAtom>()){ + return false; } - }; - static const struct TEdge ErrorEdge; -}; + TStringBuf label = scope.Cast<TCoAtom>(); + auto stats = maybeStat->second; + collector(rels, label, joinArg.Ptr(), stats); + } + return true; +} /** - * Fetch join conditions from the equi-join tree + * Convert JoinTuple from AST into an internal representation of a optimizer plan + * This procedure also hooks up rels with statistics to the leaf nodes of the plan + * Statistics for join nodes are not computed */ -void ComputeJoinConditions(const TCoEquiJoinTuple& joinTuple, - std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions) { +std::shared_ptr<TJoinOptimizerNode> ConvertToJoinTree( + const TCoEquiJoinTuple& joinTuple, + const TVector<std::shared_ptr<TRelOptimizerNode>>& rels +) { + + std::shared_ptr<IBaseOptimizerNode> left; + std::shared_ptr<IBaseOptimizerNode> right; + + if (joinTuple.LeftScope().Maybe<TCoEquiJoinTuple>()) { - ComputeJoinConditions(joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), joinConditions); + left = ConvertToJoinTree(joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), rels); + } + else { + auto scope = joinTuple.LeftScope().Cast<TCoAtom>().StringValue(); + auto it = find_if(rels.begin(), rels.end(), [scope] (const std::shared_ptr<TRelOptimizerNode>& n) { + return scope == n->Label; + } ); + left = *it; } if (joinTuple.RightScope().Maybe<TCoEquiJoinTuple>()) { - ComputeJoinConditions(joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), joinConditions); + right = ConvertToJoinTree(joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), rels); + } + else { + auto scope = joinTuple.RightScope().Cast<TCoAtom>().StringValue(); + auto it = find_if(rels.begin(), rels.end(), [scope] (const std::shared_ptr<TRelOptimizerNode>& n) { + return scope == n->Label; + } ); + right = *it; } + std::set<std::pair<TJoinColumn, TJoinColumn>> joinConds; + size_t joinKeysCount = joinTuple.LeftKeys().Size() / 2; for (size_t i = 0; i < joinKeysCount; ++i) { size_t keyIndex = i * 2; @@ -121,865 +93,13 @@ void ComputeJoinConditions(const TCoEquiJoinTuple& joinTuple, auto rightScope = joinTuple.RightKeys().Item(keyIndex).StringValue(); auto rightColumn = joinTuple.RightKeys().Item(keyIndex + 1).StringValue(); - joinConditions.insert( std::make_pair( TJoinColumn(leftScope, leftColumn), + joinConds.insert( std::make_pair( TJoinColumn(leftScope, leftColumn), TJoinColumn(rightScope, rightColumn))); } -} - -/** - * Internal Join nodes are used inside the CBO. They don't own join condition data structures - * and therefore avoid copying them during generation of candidate plans. - * - * These datastructures are owned by the query graph, so it is important to keep the graph around - * while internal nodes are being used. - * - * After join enumeration, internal nodes need to be converted to regular nodes, that own the data - * structures -*/ -struct TJoinOptimizerNodeInternal : public IBaseOptimizerNode { - std::shared_ptr<IBaseOptimizerNode> LeftArg; - std::shared_ptr<IBaseOptimizerNode> RightArg; - const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& JoinConditions; - const TVector<TString>& LeftJoinKeys; - const TVector<TString>& RightJoinKeys; - EJoinKind JoinType; - EJoinAlgoType JoinAlgo; - bool IsReorderable; - - TJoinOptimizerNodeInternal(const std::shared_ptr<IBaseOptimizerNode>& left, - const std::shared_ptr<IBaseOptimizerNode>& right, - const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions, - const TVector<TString>& leftJoinKeys, - const TVector<TString>& rightJoinKeys, - const EJoinKind joinType, - const EJoinAlgoType joinAlgo, - bool nonReorderable=false); - - virtual ~TJoinOptimizerNodeInternal() {} - virtual TVector<TString> Labels(); - virtual void Print(std::stringstream& stream, int ntabs=0); -}; - -TJoinOptimizerNodeInternal::TJoinOptimizerNodeInternal(const std::shared_ptr<IBaseOptimizerNode>& left, const std::shared_ptr<IBaseOptimizerNode>& right, - const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, const TVector<TString>& leftJoinKeys, - const TVector<TString>& rightJoinKeys, const EJoinKind joinType, const EJoinAlgoType joinAlgo, bool nonReorderable) : - IBaseOptimizerNode(JoinNodeType), - LeftArg(left), - RightArg(right), - JoinConditions(joinConditions), - LeftJoinKeys(leftJoinKeys), - RightJoinKeys(rightJoinKeys), - JoinType(joinType), - JoinAlgo(joinAlgo) { - IsReorderable = (JoinType==EJoinKind::InnerJoin) && (nonReorderable==false); - } - -TVector<TString> TJoinOptimizerNodeInternal::Labels() { - auto res = LeftArg->Labels(); - auto rightLabels = RightArg->Labels(); - res.insert(res.begin(),rightLabels.begin(),rightLabels.end()); - return res; -} - -/** - * Convert a tree of internal optimizer nodes to external nodes that own the data structures. - * - * The internal node tree can have references to external nodes (since some subtrees are optimized - * separately if the plan contains non-orderable joins). So we check the instances and if we encounter - * an external node, we return the whole subtree unchanged. -*/ -std::shared_ptr<TJoinOptimizerNode> ConvertFromInternal(const std::shared_ptr<IBaseOptimizerNode> internal) { - Y_ENSURE(internal->Kind == EOptimizerNodeKind::JoinNodeType); - - if (dynamic_cast<TJoinOptimizerNode*>(internal.get()) != nullptr) { - return std::static_pointer_cast<TJoinOptimizerNode>(internal); - } - - auto join = std::static_pointer_cast<TJoinOptimizerNodeInternal>(internal); - - auto left = join->LeftArg; - auto right = join->RightArg; - - if (left->Kind == EOptimizerNodeKind::JoinNodeType) { - left = ConvertFromInternal(left); - } - if (right->Kind == EOptimizerNodeKind::JoinNodeType) { - right = ConvertFromInternal(right); - } - - auto newJoin = std::make_shared<TJoinOptimizerNode>(left, right, join->JoinConditions, join->JoinType, join->JoinAlgo, !join->IsReorderable); - newJoin->Stats = join->Stats; - return newJoin; -} - -void TJoinOptimizerNodeInternal::Print(std::stringstream& stream, int ntabs) { - for (int i = 0; i < ntabs; i++){ - stream << "\t"; - } - - stream << "Join: (" << JoinType << "," << ToString(JoinAlgo) << ") "; - for (auto c : JoinConditions){ - stream << c.first.RelName << "." << c.first.AttributeName - << "=" << c.second.RelName << "." - << c.second.AttributeName << ", "; - } - stream << "\n"; - - for (int i = 0; i < ntabs; i++){ - stream << "\t"; - } - - if (Stats) { - stream << *Stats << "\n"; - } - - LeftArg->Print(stream, ntabs+1); - RightArg->Print(stream, ntabs+1); -} - -/** - * Create a new external join node and compute its statistics and cost -*/ -std::shared_ptr<TJoinOptimizerNode> MakeJoin(std::shared_ptr<IBaseOptimizerNode> left, - std::shared_ptr<IBaseOptimizerNode> right, - const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, - const TVector<TString>& leftJoinKeys, - const TVector<TString>& rightJoinKeys, - EJoinKind joinKind, - EJoinAlgoType joinAlgo, - bool nonReorderable, - IProviderContext& ctx) { - - auto res = std::make_shared<TJoinOptimizerNode>(left, right, joinConditions, joinKind, joinAlgo, nonReorderable); - res->Stats = std::make_shared<TOptimizerStatistics>( ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, joinAlgo, ctx)); - return res; -} - -/** - * Create a new internal join node and compute its statistics and cost -*/ -std::shared_ptr<TJoinOptimizerNodeInternal> MakeJoinInternal(std::shared_ptr<IBaseOptimizerNode> left, - std::shared_ptr<IBaseOptimizerNode> right, - const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, - const TVector<TString>& leftJoinKeys, - const TVector<TString>& rightJoinKeys, - EJoinKind joinKind, - EJoinAlgoType joinAlgo, - IProviderContext& ctx) { - - auto res = std::make_shared<TJoinOptimizerNodeInternal>(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinKind, joinAlgo); - res->Stats = std::make_shared<TOptimizerStatistics>( ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, joinAlgo, ctx)); - return res; -} - -/** - * Iterate over all join algorithms and pick the best join that is applicable. - * Also considers commuting joins -*/ -std::shared_ptr<TJoinOptimizerNodeInternal> PickBestJoin(std::shared_ptr<IBaseOptimizerNode> left, - std::shared_ptr<IBaseOptimizerNode> right, - const std::set<std::pair<TJoinColumn, TJoinColumn>>& leftJoinConditions, - const std::set<std::pair<TJoinColumn, TJoinColumn>>& rightJoinConditions, - const TVector<TString>& leftJoinKeys, - const TVector<TString>& rightJoinKeys, - IProviderContext& ctx) { - - double bestCost; - EJoinAlgoType bestAlgo; - bool bestJoinLeftRight = true; - bool bestJoinValid = false; - - for ( auto joinAlgo : AllJoinAlgos ) { - if (ctx.IsJoinApplicable(left, right, leftJoinConditions, leftJoinKeys, rightJoinKeys, joinAlgo)){ - auto cost = ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, joinAlgo, ctx).Cost; - if (bestJoinValid){ - if (cost < bestCost) { - bestCost = cost; - bestAlgo = joinAlgo; - bestJoinLeftRight = true; - } - } else { - bestCost = cost; - bestAlgo = joinAlgo; - bestJoinLeftRight = true; - bestJoinValid = true; - } - } - - if (ctx.IsJoinApplicable(right, left, rightJoinConditions, rightJoinKeys, leftJoinKeys, joinAlgo)){ - auto cost = ComputeJoinStats(*right->Stats, *left->Stats, rightJoinKeys, leftJoinKeys, joinAlgo, ctx).Cost; - if (bestJoinValid){ - if (cost < bestCost) { - bestCost = cost; - bestAlgo = joinAlgo; - bestJoinLeftRight = false; - } - } else { - bestCost = cost; - bestAlgo = joinAlgo; - bestJoinLeftRight = false; - bestJoinValid = true; - } - } - } - - Y_ENSURE(bestJoinValid,"No join was chosen!"); - - if (bestJoinLeftRight) { - return MakeJoinInternal(left, right, leftJoinConditions, leftJoinKeys, rightJoinKeys, EJoinKind::InnerJoin, bestAlgo, ctx); - } else { - return MakeJoinInternal(right, left, rightJoinConditions, rightJoinKeys, leftJoinKeys, EJoinKind::InnerJoin, bestAlgo, ctx); - } -} - -/** - * Iterate over all join algorithms and pick the best join that is applicable -*/ -std::shared_ptr<TJoinOptimizerNode> PickBestNonReorderabeJoin(const std::shared_ptr<TJoinOptimizerNode>& node, - IProviderContext& ctx) { - - EJoinAlgoType bestJoinAlgo; - bool bestJoinValid = false; - double bestJoinCost; - const auto& left = node->LeftArg; - const auto& right = node->RightArg; - const auto& joinConditions = node->JoinConditions; - const auto& leftJoinKeys = node->LeftJoinKeys; - const auto& rightJoinKeys = node->RightJoinKeys; - - for ( auto joinAlgo : AllJoinAlgos ) { - if (ctx.IsJoinApplicable(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinAlgo)){ - auto cost = ComputeJoinStats(*right->Stats, *left->Stats, rightJoinKeys, leftJoinKeys, joinAlgo, ctx).Cost; - if (bestJoinValid) { - if (cost < bestJoinCost) { - bestJoinAlgo = joinAlgo; - bestJoinCost = cost; - } - } else { - bestJoinAlgo = joinAlgo; - bestJoinCost = cost; - bestJoinValid = true; - } - } - } - - Y_ENSURE(bestJoinValid,"No join was chosen!"); - node->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, bestJoinAlgo, ctx)); - node->JoinAlgo = bestJoinAlgo; - return node; -} - -struct pair_hash { - template <class T1, class T2> - std::size_t operator () (const std::pair<T1,T2> &p) const { - auto h1 = std::hash<T1>{}(p.first); - auto h2 = std::hash<T2>{}(p.second); - - // Mainly for demonstration purposes, i.e. works but is overly simple - // In the real world, use sth. like boost.hash_combine - return h1 ^ h2; - } -}; - -/** - * Graph is a data structure for the join graph - * It is an undirected graph, with two edges per connection (from,to) and (to,from) - * It needs to be constructed with addNode and addEdge methods, since its - * keeping various indexes updated. - * The graph also needs to be reordered with the breadth-first search method -*/ -template <int N> -struct TGraph { - // set of edges of the graph - std::unordered_set<TEdge,TEdge::HashFunction> Edges; - - // neightborgh index - TVector<std::bitset<N>> EdgeIdx; - - // number of nodes in a graph - int NNodes; - - // mapping from rel label to node in the graph - THashMap<TString,int> ScopeMapping; - - // mapping from node in the graph to rel label - TVector<TString> RevScopeMapping; - - // Empty graph constructor intializes indexes to size N - TGraph() : EdgeIdx(N), RevScopeMapping(N) {} - - // Add a node to a graph with a rel label - void AddNode(int nodeId, TString scope){ - NNodes = nodeId + 1; - ScopeMapping[scope] = nodeId; - RevScopeMapping[nodeId] = scope; - } - - // Add a node to a graph with a vector of rel label - void AddNode(int nodeId, const TVector<TString>& scopes){ - NNodes = nodeId + 1; - TString revScope; - for (auto s: scopes ) { - ScopeMapping[s] = nodeId; - revScope += s + ","; - } - RevScopeMapping[nodeId] = revScope; - } - - - // Add an edge to the graph, if the edge is already in the graph - // (we check both directions), no action is taken. Otherwise we - // insert two edges, the forward edge with original joinConditions - // and a reverse edge with swapped joinConditions - void AddEdge(TEdge e){ - if (Edges.contains(e) || Edges.contains(TEdge(e.To, e.From))) { - return; - } - - Edges.insert(e); - std::set<std::pair<TJoinColumn, TJoinColumn>> swappedSet; - for (auto c : e.JoinConditions){ - swappedSet.insert(std::make_pair(c.second, c.first)); - } - Edges.insert(TEdge(e.To,e.From,swappedSet)); - - EdgeIdx[e.From].set(e.To); - EdgeIdx[e.To].set(e.From); - } - - // Find a node by the rel scope - int FindNode(TString scope){ - return ScopeMapping[scope]; - } - - // Return a bitset of node's neighbors - inline std::bitset<N> FindNeighbors(int fromVertex) - { - return EdgeIdx[fromVertex]; - } - - // Find an edge that connects two subsets of graph's nodes - // We are guaranteed to find a match - const TEdge& FindCrossingEdge(const std::bitset<N>& S1, const std::bitset<N>& S2) { - for(int i = 0; i < NNodes; i++){ - if (!S1[i]) { - continue; - } - for (int j = 0; j < NNodes; j++) { - if (!S2[j]) { - continue; - } - if (EdgeIdx[i].test(j)) { - auto it = Edges.find(TEdge(i, j)); - Y_DEBUG_ABORT_UNLESS(it != Edges.end()); - return *it; - } - } - } - Y_ENSURE(false,"Connecting edge not found!"); - return TEdge::ErrorEdge; - } - - /** - * Create a union-set from the join conditions to record the equivalences. - * Then use the equivalence set to compute transitive closure of the graph. - * Transitive closure means that if we have an edge from (1,2) with join - * condition R.A = S.A and we have an edge from (2,3) with join condition - * S.A = T.A, we will find out that the join conditions form an equivalence set - * and add an edge (1,3) with join condition R.A = T.A. - */ - void ComputeTransitiveClosure(const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions) { - std::set<TJoinColumn> columnSet; - for (auto [ leftCondition, rightCondition ] : joinConditions) { - columnSet.insert(leftCondition); - columnSet.insert(rightCondition); - } - std::vector<TJoinColumn> columns; - for (auto c : columnSet ) { - columns.push_back(c); - } - - THashMap<TJoinColumn, int, TJoinColumn::HashFunction> indexMapping; - for (size_t i=0; i<columns.size(); i++) { - indexMapping[columns[i]] = i; - } - - TDisjointSets ds = TDisjointSets( columns.size() ); - for (auto [ leftCondition, rightCondition ] : joinConditions ) { - int leftIndex = indexMapping[leftCondition]; - int rightIndex = indexMapping[rightCondition]; - ds.UnionSets(leftIndex,rightIndex); - } - - for (size_t i = 0; i < columns.size(); i++) { - for (size_t j = 0; j < i; j++) { - if (ds.CanonicSetElement(i) == ds.CanonicSetElement(j)) { - TJoinColumn left = columns[i]; - TJoinColumn right = columns[j]; - int leftNodeId = ScopeMapping[left.RelName]; - int rightNodeId = ScopeMapping[right.RelName]; - - auto maybeEdge1 = Edges.find({leftNodeId, rightNodeId}); - auto maybeEdge2 = Edges.find({rightNodeId, leftNodeId}); - - if (maybeEdge1 == Edges.end() && maybeEdge2 == Edges.end()) { - AddEdge(TEdge(leftNodeId,rightNodeId,std::make_pair(left, right))); - } else { - Y_ABORT_UNLESS(maybeEdge1 != Edges.end() && maybeEdge2 != Edges.end()); - auto edge1 = *maybeEdge1; - auto edge2 = *maybeEdge2; - - edge1.JoinConditions.emplace(left, right); - edge2.JoinConditions.emplace(right, left); - edge1.BuildCondVectors(); - edge2.BuildCondVectors(); - - Edges.erase(maybeEdge1); - Edges.erase(maybeEdge2); - - Edges.emplace(edge1); - Edges.emplace(edge2); - } - } - } - } - } - - /** - * Print the graph - */ - void PrintGraph(std::stringstream& stream) { - stream << "Join Graph:\n"; - stream << "nNodes: " << NNodes << ", nEdges: " << Edges.size() << "\n"; - - for(int i = 0; i < NNodes; i++) { - stream << "Node:" << i << "," << RevScopeMapping[i] << "\n"; - } - for (const TEdge& e: Edges ) { - stream << "Edge: " << e.From << " -> " << e.To << "\n"; - for (auto p : e.JoinConditions) { - stream << p.first.RelName << "." - << p.first.AttributeName << "=" - << p.second.RelName << "." - << p.second.AttributeName << "\n"; - } - for (auto l : e.LeftJoinKeys) { - stream << l << ","; - } - stream << "="; - for (auto r : e.RightJoinKeys) { - stream << r << ","; - } - stream << "\n"; - } - } -}; - -/** - * DPcpp (Dynamic Programming with connected complement pairs) is a graph-aware - * join eumeration algorithm that only considers CSGs (Connected Sub-Graphs) of - * the join graph and computes CMPs (Complement pairs) that are also connected - * subgraphs of the join graph. It enumerates CSGs in the order, such that subsets - * are enumerated first and no duplicates are ever enumerated. Then, for each emitted - * CSG it computes the complements with the same conditions - they much already be - * present in the dynamic programming table and no pair should be enumerated twice. - * - * The DPccp solver is templated by the largest number of joins we can process, this - * is in turn used by bitsets that represent sets of relations. -*/ -template <int N> -class TDPccpSolver { -public: - - // Construct the DPccp solver based on the join graph and data about input relations - TDPccpSolver(TGraph<N>& g, TVector<std::shared_ptr<IBaseOptimizerNode>> rels, IProviderContext& ctx): - Graph(g), Rels(rels), Pctx(ctx) { - NNodes = g.NNodes; - } - - // Run DPccp algorithm and produce the join tree in CBO's internal representation - std::shared_ptr<TJoinOptimizerNodeInternal> Solve(); - - // Calculate the size of a dynamic programming table with a budget - ui32 CountCC(ui32 budget); - -private: - - // Compute the next subset of relations, given by the final bitset - std::bitset<N> NextBitset(const std::bitset<N>& current, const std::bitset<N>& final); - - // Print the set of relations in a bitset - void PrintBitset(std::stringstream& stream, const std::bitset<N>& s, std::string name, int ntabs=0); - - // Dynamic programming table that records optimal join subtrees - THashMap<std::bitset<N>, std::shared_ptr<IBaseOptimizerNode>, std::hash<std::bitset<N>>> DpTable; - - // REMOVE: Sanity check table that tracks that we don't consider the same pair twice - THashMap<std::pair<std::bitset<N>, std::bitset<N>>, bool, pair_hash> CheckTable; - - // number of nodes in a graph - int NNodes; - - // Join graph - TGraph<N>& Graph; - - // List of input relations to DPccp - TVector<std::shared_ptr<IBaseOptimizerNode>> Rels; - - // Provider specific contexts? - // FIXME: This is a temporary structure that needs to be extended to multiple providers - IProviderContext& Pctx; - - // Emit connected subgraph - void EmitCsg(const std::bitset<N>&, int=0); - - // Enumerate subgraphs recursively - void EnumerateCsgRec(const std::bitset<N>&, const std::bitset<N>&,int=0); - - // Emit the final pair of CSG and CMP - compute the join and record it in the - // DP table - void EmitCsgCmp(const std::bitset<N>&, const std::bitset<N>&,int=0); - - // Enumerate complement pairs recursively - void EnumerateCmpRec(const std::bitset<N>&, const std::bitset<N>&, const std::bitset<N>&,int=0); - - // Compute the neighbors of a set of nodes, excluding the nodes in exclusion set - std::bitset<N> Neighbors(const std::bitset<N>&, const std::bitset<N>&); - - // Create an exclusion set that contains all the nodes of the graph that are smaller or equal to - // the smallest node in the provided bitset - std::bitset<N> MakeBiMin(const std::bitset<N>&); - - // Create an exclusion set that contains all the nodes of the bitset that are smaller or equal to - // the provided integer - std::bitset<N> MakeB(const std::bitset<N>&,int); - - // Count the size of the dynamic programming table recursively - ui32 CountCCRec(const std::bitset<N>&, const std::bitset<N>&, ui32, ui32); -}; - -// Print tabs -void PrintTabs(std::stringstream& stream, int ntabs) { - - for (int i = 0; i < ntabs; i++) - stream << "\t"; -} - -// Print a set of nodes in the graph given by this bitset -template <int N> void TDPccpSolver<N>::PrintBitset(std::stringstream& stream, - const std::bitset<N>& s, std::string name, int ntabs) { - - PrintTabs(stream, ntabs); - - stream << name << ": " << "{"; - for (int i = 0; i < NNodes; i++) - if (s[i]) - stream << i << ","; - - stream <<"}\n"; -} - -// Compute neighbors of a set of nodes S, exclusing the exclusion set X -template<int N> std::bitset<N> TDPccpSolver<N>::Neighbors(const std::bitset<N>& S, const std::bitset<N>& X) { - - std::bitset<N> res; - - for (int i = 0; i < Graph.NNodes; i++) { - if (S[i]) { - std::bitset<N> n = Graph.FindNeighbors(i); - res = res | n; - } - } - - res = res & ~ X; - return res; -} - -// Run the entire DPccp algorithm and compute the optimal join tree -template<int N> std::shared_ptr<TJoinOptimizerNodeInternal> TDPccpSolver<N>::Solve() -{ - // Process singleton sets - for (int i = NNodes-1; i >= 0; i--) { - std::bitset<N> s; - s.set(i); - DpTable[s] = Rels[i]; - } - - // Expand singleton sets - for (int i = NNodes-1; i >= 0; i--) { - std::bitset<N> s; - s.set(i); - EmitCsg(s); - EnumerateCsgRec(s, MakeBiMin(s)); - } - - // Return the entry of the dpTable that corresponds to the full - // set of nodes in the graph - std::bitset<N> V; - for (int i = 0; i < NNodes; i++) { - V.set(i); - } - - Y_ENSURE(DpTable.contains(V), "Final relset not in dptable"); - return std::static_pointer_cast<TJoinOptimizerNodeInternal>(DpTable[V]); -} - -/** - * EmitCsg emits Connected SubGraphs - * First it iterates through neighbors of the initial set S and emits pairs - * (S,S2), where S2 is the neighbor of S. Then it recursively emits complement pairs -*/ - template <int N> void TDPccpSolver<N>::EmitCsg(const std::bitset<N>& S, int ntabs) { - std::bitset<N> X = S | MakeBiMin(S); - std::bitset<N> Ns = Neighbors(S, X); - - if (Ns==std::bitset<N>()) { - return; - } - - for (int i = NNodes - 1; i >= 0; i--) { - if (Ns[i]) { - std::bitset<N> S2; - S2.set(i); - EmitCsgCmp(S, S2, ntabs+1); - EnumerateCmpRec(S, S2, X | MakeB(Ns, i), ntabs+1); - } - } - } - - /** - * Enumerates connected subgraphs - * First it emits CSGs that are created by adding neighbors of S to S - * Then it recurses on the S fused with its neighbors. - */ - template <int N> void TDPccpSolver<N>::EnumerateCsgRec(const std::bitset<N>& S, const std::bitset<N>& X, int ntabs) { - - std::bitset<N> Ns = Neighbors(S, X); - - if (Ns == std::bitset<N>()) { - return; - } - - std::bitset<N> prev; - std::bitset<N> next; - - while(true) { - next = NextBitset(prev, Ns); - EmitCsg(S | next ); - if (next == Ns) { - break; - } - prev = next; - } - - prev.reset(); - while(true) { - next = NextBitset(prev, Ns); - EnumerateCsgRec(S | next, X | Ns , ntabs+1); - if (next==Ns) { - break; - } - prev = next; - } - } - -/*** - * Enumerates complement pairs - * First it emits the pairs (S1,S2+next) where S2+next is the set of relation sets - * that are obtained by adding S2's neighbors to itself - * Then it recusrses into pairs (S1,S2+next) -*/ - template <int N> void TDPccpSolver<N>::EnumerateCmpRec(const std::bitset<N>& S1, - const std::bitset<N>& S2, const std::bitset<N>& X, int ntabs) { - - std::bitset<N> Ns = Neighbors(S2, X); - - if (Ns==std::bitset<N>()) { - return; - } - - std::bitset<N> prev; - std::bitset<N> next; - - while(true) { - next = NextBitset(prev, Ns); - EmitCsgCmp(S1, S2 | next, ntabs+1); - if (next==Ns) { - break; - } - prev = next; - } - - prev.reset(); - while(true) { - next = NextBitset(prev, Ns); - EnumerateCmpRec(S1, S2 | next, X | Ns, ntabs+1); - if (next==Ns) { - break; - } - prev = next; - } - } - -/** - * Emit a single CSG + CMP pair -*/ -template <int N> void TDPccpSolver<N>::EmitCsgCmp(const std::bitset<N>& S1, const std::bitset<N>& S2, int ntabs) { - - Y_UNUSED(ntabs); - // Here we actually build the join and choose and compare the - // new plan to what's in the dpTable, if it there - - Y_ENSURE(DpTable.contains(S1),"DP Table does not contain S1"); - Y_ENSURE(DpTable.contains(S2),"DP Table does not conaint S2"); - - std::bitset<N> joined = S1 | S2; - - const TEdge& e1 = Graph.FindCrossingEdge(S1, S2); - const TEdge& e2 = Graph.FindCrossingEdge(S2, S1); - auto bestJoin = PickBestJoin(DpTable[S1], DpTable[S2], e1.JoinConditions, e2.JoinConditions, e1.LeftJoinKeys, e1.RightJoinKeys, Pctx); - - if (! DpTable.contains(joined)) { - DpTable[joined] = bestJoin; - } else { - if (bestJoin->Stats->Cost < DpTable[joined]->Stats->Cost) { - DpTable[joined] = bestJoin; - } - } - /* - * This is a sanity check that slows down the optimizer - * - - auto pair = std::make_pair(S1, S2); - Y_ENSURE (!CheckTable.contains(pair), "Check table already contains pair S1|S2"); - - CheckTable[ std::pair<std::bitset<N>,std::bitset<N>>(S1, S2) ] = true; - */ -} - -/** - * Create an exclusion set that contains all the nodes of the graph that are smaller or equal to - * the smallest node in the provided bitset -*/ -template <int N> std::bitset<N> TDPccpSolver<N>::MakeBiMin(const std::bitset<N>& S) { - std::bitset<N> res; - - for (int i = 0; i < NNodes; i++) { - if (S[i]) { - for (int j = 0; j <= i; j++) { - res.set(j); - } - break; - } - } - return res; -} - -/** - * Create an exclusion set that contains all the nodes of the bitset that are smaller or equal to - * the provided integer -*/ -template <int N> std::bitset<N> TDPccpSolver<N>::MakeB(const std::bitset<N>& S, int x) { - std::bitset<N> res; - - for (int i = 0; i < NNodes; i++) { - if (S[i] && i <= x) { - res.set(i); - } - } - - return res; -} - -/** - * Compute the next subset of relations, given by the final bitset -*/ -template <int N> std::bitset<N> TDPccpSolver<N>::NextBitset(const std::bitset<N>& prev, const std::bitset<N>& final) { - if (prev==final) - return final; - - std::bitset<N> res = prev; - - bool carry = true; - for (int i = 0; i < NNodes; i++) - { - if (!carry) { - break; - } - - if (!final[i]) { - continue; - } - - if (res[i]==1 && carry) { - res.reset(i); - } else if (res[i]==0 && carry) - { - res.set(i); - carry = false; - } - } - - return res; - - // TODO: We can optimize this with a few long integer operations, - // but it will only work for 64 bit bitsets - // return std::bitset<N>((prev | ~final).to_ulong() + 1) & final; -} - -/** - * Count the number of items in the DP table of DPcpp -*/ -template <int N> ui32 TDPccpSolver<N>::CountCC(ui32 budget) { - std::bitset<N> allNodes; - allNodes.set(); - ui32 cost = 0; - - for (int i = NNodes - 1; i >= 0; i--) { - cost += 1; - if (cost > budget) { - return cost; - } - std::bitset<N> S; - S.set(i); - std::bitset<N> X = MakeB(allNodes,i); - cost = CountCCRec(S,X,cost,budget); - } - - return cost; -} - -/** - * Recursively count the nuber of items in the DP table of DPccp -*/ -template <int N> ui32 TDPccpSolver<N>::CountCCRec(const std::bitset<N>& S, const std::bitset<N>& X, ui32 cost, ui32 budget) { - std::bitset<N> Ns = Neighbors(S, X); - - if (Ns==std::bitset<N>()) { - return cost; - } - - std::bitset<N> prev; - std::bitset<N> next; - - while(true) { - next = NextBitset(prev, Ns); - cost += 1; - if (cost > budget) { - return cost; - } - cost = CountCCRec(S | next, X | Ns, cost, budget); - if (next==Ns) { - break; - } - prev = next; - } - - return cost; + return std::make_shared<TJoinOptimizerNode>(left, right, joinConds, ConvertToJoinKind(joinTuple.Type().StringValue()), EJoinAlgoType::Undefined); } - /** * Build a join tree that will replace the original join tree in equiJoin * TODO: Add join implementations here @@ -1066,149 +186,9 @@ TExprBase RearrangeEquiJoinTree(TExprContext& ctx, const TCoEquiJoin& equiJoin, .Done(); } -/** - * Collects EquiJoin inputs with statistics for cost based optimization -*/ -bool DqCollectJoinRelationsWithStats( - TVector<std::shared_ptr<TRelOptimizerNode>>& rels, - TTypeAnnotationContext& typesCtx, - const TCoEquiJoin& equiJoin, - const std::function<void(TVector<std::shared_ptr<TRelOptimizerNode>>&, TStringBuf, const TExprNode::TPtr, const std::shared_ptr<TOptimizerStatistics>&)>& collector) -{ - if (equiJoin.ArgCount() < 3) { - return false; - } - - for (size_t i = 0; i < equiJoin.ArgCount() - 2; ++i) { - auto input = equiJoin.Arg(i).Cast<TCoEquiJoinInput>(); - auto joinArg = input.List(); - - auto maybeStat = typesCtx.StatisticsMap.find(joinArg.Raw()); - - if (maybeStat == typesCtx.StatisticsMap.end()) { - YQL_CLOG(TRACE, CoreDq) << "Didn't find statistics for scope " << input.Scope().Cast<TCoAtom>().StringValue() << "\n"; - return false; - } - - auto scope = input.Scope(); - if (!scope.Maybe<TCoAtom>()){ - return false; - } - - TStringBuf label = scope.Cast<TCoAtom>(); - auto stats = maybeStat->second; - collector(rels, label, joinArg.Ptr(), stats); - } - return true; -} - -/** - * Convert JoinTuple from AST into an internal representation of a optimizer plan - * This procedure also hooks up rels with statistics to the leaf nodes of the plan - * Statistics for join nodes are not computed -*/ -std::shared_ptr<TJoinOptimizerNode> ConvertToJoinTree(const TCoEquiJoinTuple& joinTuple, - const TVector<std::shared_ptr<TRelOptimizerNode>>& rels) { - - std::shared_ptr<IBaseOptimizerNode> left; - std::shared_ptr<IBaseOptimizerNode> right; - - - if (joinTuple.LeftScope().Maybe<TCoEquiJoinTuple>()) { - left = ConvertToJoinTree(joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), rels); - } - else { - auto scope = joinTuple.LeftScope().Cast<TCoAtom>().StringValue(); - auto it = find_if(rels.begin(), rels.end(), [scope] (const std::shared_ptr<TRelOptimizerNode>& n) { - return scope == n->Label; - } ); - left = *it; - } - - if (joinTuple.RightScope().Maybe<TCoEquiJoinTuple>()) { - right = ConvertToJoinTree(joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), rels); - } - else { - auto scope = joinTuple.RightScope().Cast<TCoAtom>().StringValue(); - auto it = find_if(rels.begin(), rels.end(), [scope] (const std::shared_ptr<TRelOptimizerNode>& n) { - return scope == n->Label; - } ); - right = *it; - } - - std::set<std::pair<TJoinColumn, TJoinColumn>> joinConds; - - size_t joinKeysCount = joinTuple.LeftKeys().Size() / 2; - for (size_t i = 0; i < joinKeysCount; ++i) { - size_t keyIndex = i * 2; - - auto leftScope = joinTuple.LeftKeys().Item(keyIndex).StringValue(); - auto leftColumn = joinTuple.LeftKeys().Item(keyIndex + 1).StringValue(); - auto rightScope = joinTuple.RightKeys().Item(keyIndex).StringValue(); - auto rightColumn = joinTuple.RightKeys().Item(keyIndex + 1).StringValue(); - - joinConds.insert( std::make_pair( TJoinColumn(leftScope, leftColumn), - TJoinColumn(rightScope, rightColumn))); - } - - return std::make_shared<TJoinOptimizerNode>(left, right, joinConds, ConvertToJoinKind(joinTuple.Type().StringValue()), EJoinAlgoType::Undefined); -} - -/** - * Extract all non orderable joins from a plan is a post-order traversal order -*/ -void ExtractNonOrderables(std::shared_ptr<TJoinOptimizerNode> joinTree, - TVector<std::shared_ptr<TJoinOptimizerNode>>& result) { - - if (joinTree->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) { - auto left = static_pointer_cast<TJoinOptimizerNode>(joinTree->LeftArg); - ExtractNonOrderables(left, result); - } - if (joinTree->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) { - auto right = static_pointer_cast<TJoinOptimizerNode>(joinTree->RightArg); - ExtractNonOrderables(right, result); - } - if (!joinTree->IsReorderable) - { - result.emplace_back(joinTree); - } -} - -/** - * Extract relations and join conditions from an optimizer plan - * If we hit a non-orderable join type in recursion, we don't recurse into it and - * add it as a relation -*/ -void ExtractRelsAndJoinConditions(const std::shared_ptr<TJoinOptimizerNode>& joinTree, - TVector<std::shared_ptr<IBaseOptimizerNode>> & rels, - std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions) { - if (!joinTree->IsReorderable){ - rels.emplace_back( joinTree ); - return; - } - - for (auto c : joinTree->JoinConditions) { - joinConditions.insert(c); - } - - if (joinTree->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) { - ExtractRelsAndJoinConditions(static_pointer_cast<TJoinOptimizerNode>(joinTree->LeftArg), rels, joinConditions); - } - else { - rels.emplace_back(joinTree->LeftArg); - } - - if (joinTree->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) { - ExtractRelsAndJoinConditions(static_pointer_cast<TJoinOptimizerNode>(joinTree->RightArg), rels, joinConditions); - } - else { - rels.emplace_back(joinTree->RightArg); - } - } - -/** +/* * Recursively computes statistics for a join tree -*/ + */ void ComputeStatistics(const std::shared_ptr<TJoinOptimizerNode>& join, IProviderContext& ctx) { if (join->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) { ComputeStatistics(static_pointer_cast<TJoinOptimizerNode>(join->LeftArg), ctx); @@ -1216,128 +196,74 @@ void ComputeStatistics(const std::shared_ptr<TJoinOptimizerNode>& join, IProvide if (join->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) { ComputeStatistics(static_pointer_cast<TJoinOptimizerNode>(join->RightArg), ctx); } - join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats, - join->LeftJoinKeys, join->RightJoinKeys, EJoinAlgoType::GraceJoin, ctx)); + join->Stats = std::make_shared<TOptimizerStatistics>( + ComputeJoinStats( + *join->LeftArg->Stats, + *join->RightArg->Stats, + join->LeftJoinKeys, + join->RightJoinKeys, + EJoinAlgoType::GraceJoin, + ctx + ) + ); } -/** - * Optimize a subtree of a plan with DPccp - * The root of the subtree that needs to be optimizer needs to be reorderable, otherwise we will - * only update the statistics for it and return it unchanged -*/ -std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinOptimizerNode>& joinTree, ui32 maxDPccpDPTableSize, IProviderContext& ctx) { - if (!joinTree->IsReorderable) { - return PickBestNonReorderabeJoin(joinTree, ctx); - } - - TVector<std::shared_ptr<IBaseOptimizerNode>> rels; - std::set<std::pair<TJoinColumn, TJoinColumn>> joinConditions; - ExtractRelsAndJoinConditions(joinTree, rels, joinConditions); - - TGraph<128> joinGraph; - - for (size_t i = 0; i < rels.size(); i++) { - joinGraph.AddNode(i, rels[i]->Labels()); - } - - // Check if we have more rels than DPccp can handle (128) - // If that's the case - don't optimize the plan and just return it with - // computed statistics - if (rels.size() >= 128) { - ComputeStatistics(joinTree, ctx); - YQL_CLOG(TRACE, CoreDq) << "Too many rels"; - return joinTree; - } - - for (auto cond : joinConditions ) { - int fromNode = joinGraph.FindNode(cond.first.RelName); - int toNode = joinGraph.FindNode(cond.second.RelName); - joinGraph.AddEdge(TEdge(fromNode, toNode, cond)); - } - - if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::CoreDq, NYql::NLog::ELevel::TRACE)) { - std::stringstream str; - str << "Initial join graph:\n"; - joinGraph.PrintGraph(str); - YQL_CLOG(TRACE, CoreDq) << str.str(); - } +class TOptimizerNativeNew: public IOptimizerNew { +public: + TOptimizerNativeNew(IProviderContext& ctx, ui32 maxDPhypDPTableSize) + : IOptimizerNew(ctx) + , MaxDPhypTableSize_(maxDPhypDPTableSize) + {} - // make a transitive closure of the graph and reorder the graph via BFS - joinGraph.ComputeTransitiveClosure(joinConditions); + std::shared_ptr<TJoinOptimizerNode> JoinSearch(const std::shared_ptr<TJoinOptimizerNode>& joinTree) override { + auto relsCount = joinTree->Labels().size(); - if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::CoreDq, NYql::NLog::ELevel::TRACE)) { - std::stringstream str; - str << "Join graph after transitive closure:\n"; - joinGraph.PrintGraph(str); - YQL_CLOG(TRACE, CoreDq) << str.str(); - } + if (relsCount <= 64) { // The algorithm is more efficient. + return JoinSearchImpl<TNodeSet64>(joinTree); + } - TDPccpSolver<128> solver(joinGraph, rels, ctx); + if (64 < relsCount && relsCount <= 128) { + JoinSearchImpl<TNodeSet128>(joinTree); + } - // Check that the dynamic table of DPccp is not too big - // If it is, just compute the statistics for the join tree and return it - if (solver.CountCC(maxDPccpDPTableSize) >= maxDPccpDPTableSize) { - ComputeStatistics(joinTree, ctx); + ComputeStatistics(joinTree, this->Pctx); return joinTree; } - // feed the graph to DPccp algorithm - auto result = solver.Solve(); - - if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::CoreDq, NYql::NLog::ELevel::TRACE)) { - std::stringstream str; - str << "Join tree after cost based optimization:\n"; - result->Print(str); - YQL_CLOG(TRACE, CoreDq) << str.str(); - } - - return ConvertFromInternal(result); -} - -class TOptimizerNativeNew: public IOptimizerNew { -public: - TOptimizerNativeNew(IProviderContext& ctx, const ui32 maxDPccpDPTableSize) - : IOptimizerNew(ctx), MaxDPccpDPTableSize(maxDPccpDPTableSize) { } - - std::shared_ptr<TJoinOptimizerNode> JoinSearch(const std::shared_ptr<TJoinOptimizerNode>& joinTree) override { +private: + using TNodeSet64 = std::bitset<64>; + using TNodeSet128 = std::bitset<128>; - // Traverse the join tree and generate a list of non-orderable joins in a post-order - TVector<std::shared_ptr<TJoinOptimizerNode>> nonOrderables; - ExtractNonOrderables(joinTree, nonOrderables); + template <typename TNodeSet> + std::shared_ptr<TJoinOptimizerNode> JoinSearchImpl(const std::shared_ptr<TJoinOptimizerNode>& joinTree) { + TJoinHypergraph<TNodeSet> hypergraph = MakeJoinHypergraph<TNodeSet>(joinTree); + TDPHypSolver<TNodeSet> solver(hypergraph, this->Pctx); - // For all non-orderable joins, optimize the children - for( auto join : nonOrderables ) { - if (join->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) { - join->LeftArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->LeftArg), MaxDPccpDPTableSize, Pctx); - } - if (join->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) { - join->RightArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->RightArg), MaxDPccpDPTableSize, Pctx); - } - join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats, - join->LeftJoinKeys, join->RightJoinKeys, EJoinAlgoType::GraceJoin, Pctx)); + if (solver.CountCC(MaxDPhypTableSize_) >= MaxDPhypTableSize_) { + ComputeStatistics(joinTree, this->Pctx); + return joinTree; } - // Optimize the root - return OptimizeSubtree(joinTree, MaxDPccpDPTableSize, Pctx); + auto bestJoinOrder = solver.Solve(); + return ConvertFromInternal(bestJoinOrder); } - - const ui32 MaxDPccpDPTableSize; +private: + ui32 MaxDPhypTableSize_; }; -/** - * Main routine that checks: - * 1. Do we have an equiJoin - * 2. Is the cost already computed - * 3. Are all the costs of equiJoin inputs computed? - * - * Then it optimizes the join tree by iterating over all non-orderable nodes and optimizing their children, - * and finally optimizes the root of the tree -*/ -TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx, - ui32 optLevel, IOptimizerNew& opt, - const std::function<void(TVector<std::shared_ptr<TRelOptimizerNode>>&, TStringBuf, const TExprNode::TPtr, const std::shared_ptr<TOptimizerStatistics>&)>& providerCollect) { +IOptimizerNew* MakeNativeOptimizerNew(IProviderContext& ctx, const ui32 maxDPhypDPTableSize) { + return new TOptimizerNativeNew(ctx, maxDPhypDPTableSize); +} - if (optLevel==0) { +TExprBase DqOptimizeEquiJoinWithCosts( + const TExprBase& node, + TExprContext& ctx, + TTypeAnnotationContext& typesCtx, + ui32 optLevel, + IOptimizerNew& opt, + const TProviderCollectFunction& providerCollect +) { + if (optLevel == 0) { return node; } @@ -1348,7 +274,6 @@ TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx, YQL_ENSURE(equiJoin.ArgCount() >= 4); if (typesCtx.StatisticsMap.contains(equiJoin.Raw())) { - return node; } @@ -1369,7 +294,7 @@ TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx, auto joinTuple = equiJoin.Arg(equiJoin.ArgCount() - 2).Cast<TCoEquiJoinTuple>(); // Generate an initial tree - auto joinTree = ConvertToJoinTree(joinTuple,rels); + auto joinTree = ConvertToJoinTree(joinTuple, rels); if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE)) { std::stringstream str; @@ -1382,12 +307,9 @@ TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx, // rewrite the join tree and record the output statistics TExprBase res = RearrangeEquiJoinTree(ctx, equiJoin, joinTree); - typesCtx.StatisticsMap[ res.Raw() ] = joinTree->Stats; + typesCtx.StatisticsMap[res.Raw()] = joinTree->Stats; return res; -} -IOptimizerNew* MakeNativeOptimizerNew(IProviderContext& ctx, const ui32 maxDPccpDPTableSize) { - return new TOptimizerNativeNew(ctx, maxDPccpDPTableSize); } } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.h b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.h new file mode 100644 index 0000000000..ff6225b3d9 --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.h @@ -0,0 +1,29 @@ +#pragma once + +#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h> +#include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h> +#include <ydb/library/yql/core/yql_type_annotation.h> + +namespace NYql::NDq { + +using TProviderCollectFunction = + std::function<void(TVector<std::shared_ptr<TRelOptimizerNode>>&, TStringBuf, const TExprNode::TPtr, const std::shared_ptr<TOptimizerStatistics>&)>; + +/* + * Main routine that checks: + * 1. Do we have an equiJoin + * 2. Is the cost already computed + * 3. Are all the costs of equiJoin inputs computed? + * + * Then it optimizes the join tree. +*/ +NYql::NNodes::TExprBase DqOptimizeEquiJoinWithCosts( + const NYql::NNodes::TExprBase& node, + TExprContext& ctx, + TTypeAnnotationContext& typesCtx, + ui32 optLevel, + IOptimizerNew& opt, + const TProviderCollectFunction& providerCollect +); + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_join_hypergraph.h b/ydb/library/yql/dq/opt/dq_opt_join_hypergraph.h new file mode 100644 index 0000000000..7abd379038 --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_join_hypergraph.h @@ -0,0 +1,190 @@ +#pragma once + +#include <vector> +#include <util/string/printf.h> +#include "bitset.h" + +#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h> +#include <ydb/library/yql/core/yql_cost_function.h> + +namespace NYql::NDq { + +/* + * JoinHypergraph - a graph, whose edge connects two sets of nodes. + * It represents relation between tables and ordering constraints. + * Graph is undirected, so it stores each edge twice (original and reversed) for DPHyp algorithm. + */ +template <typename TNodeSet> +class TJoinHypergraph { +public: + struct TEdge { + TEdge( + const TNodeSet& left, + const TNodeSet& right, + EJoinKind joinKind, + bool isCommutative, + const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions + ) + : Left(left) + , Right(right) + , JoinKind(joinKind) + , IsCommutative(isCommutative) + , JoinConditions(joinConditions) + , IsReversed(false) + { + BuildCondVectors(); + } + + inline bool IsSimpleEdge() { + return HasSingleBit(Left) && HasSingleBit(Right); + } + + TNodeSet Left; + TNodeSet Right; + EJoinKind JoinKind; + bool IsCommutative; + std::set<std::pair<TJoinColumn, TJoinColumn>> JoinConditions; + TVector<TString> LeftJoinKeys; + TVector<TString> RightJoinKeys; + + // JoinKind may not be commutative, so we need to know which edge is original and which is reversed. + bool IsReversed; + int64_t ReversedEdgeId = -1; + + void BuildCondVectors() { + LeftJoinKeys.clear(); + RightJoinKeys.clear(); + + for (auto [left, right] : JoinConditions) { + auto leftKey = left.AttributeName; + auto rightKey = right.AttributeName; + + for (size_t i = leftKey.size() - 1; i > 0; --i) { + if (leftKey[i] == '.') { + leftKey = leftKey.substr(i + 1); + break; + } + } + + for (size_t i = rightKey.size() - 1; i > 0; --i) { + if (rightKey[i] == '.') { + rightKey = rightKey.substr(i + 1); + break; + } + } + + LeftJoinKeys.emplace_back(leftKey); + RightJoinKeys.emplace_back(rightKey); + } + } + }; + + struct TNode { + TNodeSet SimpleNeighborhood; + TVector<size_t> ComplexEdgesId; + std::shared_ptr<IBaseOptimizerNode> RelationOptimizerNode; + }; + +public: + /* Add node to the hypergraph and returns its id */ + size_t AddNode(const std::shared_ptr<IBaseOptimizerNode>& relationNode) { + size_t nodeId = Nodes_.size(); + NodeIdByRelationOptimizerNode_.insert({relationNode, nodeId}); + + Nodes_.push_back({}); + Nodes_.back().RelationOptimizerNode = relationNode; + + return nodeId; + } + + /* Adds an edge, and its reversed version. */ + void AddEdge(TEdge edge) { + size_t edgeId = Edges_.size(); + size_t reversedEdgeId = edgeId + 1; + edge.ReversedEdgeId = reversedEdgeId; + + AddEdgeImpl(edge); + + std::set<std::pair<TJoinColumn, TJoinColumn>> reversedJoinConditions; + for (const auto& [lhs, rhs]: edge.JoinConditions) { + reversedJoinConditions.insert({rhs, lhs}); + } + + TEdge reversedEdge = std::move(edge); + std::swap(reversedEdge.Left, reversedEdge.Right); + reversedEdge.JoinConditions = std::move(reversedJoinConditions); + reversedEdge.IsReversed = true; + reversedEdge.ReversedEdgeId = edgeId; + + AddEdgeImpl(reversedEdge); + } + + TNodeSet GetNodesByRelNamesInSubtree(const std::shared_ptr<IBaseOptimizerNode>& subtreeRoot, const TVector<TString>& relationNames) { + if (subtreeRoot->Kind == RelNodeType) { + TString relationName = subtreeRoot->Labels()[0]; + + TNodeSet nodeSet{}; + if (std::find(relationNames.begin(), relationNames.end(), relationName) != relationNames.end()) { + nodeSet[NodeIdByRelationOptimizerNode_[subtreeRoot]] = 1; + } + return nodeSet; + } + + auto joinNode = std::static_pointer_cast<TJoinOptimizerNode>(subtreeRoot); + + auto leftNodeSet = GetNodesByRelNamesInSubtree(joinNode->LeftArg, relationNames); + auto rightNodeSet = GetNodesByRelNamesInSubtree(joinNode->RightArg, relationNames); + + TNodeSet nodeSet = leftNodeSet | rightNodeSet; + + return nodeSet; + } + + TEdge& GetEdge(size_t edgeId) { + Y_ASSERT(edgeId < Edges_.size()); + return Edges_[edgeId]; + } + + inline TVector<TNode>& GetNodes() { + return Nodes_; + } + + const TEdge* FindEdgeBetween(const TNodeSet& lhs, const TNodeSet& rhs) { + for (const auto& edge: Edges_) { + if ( + IsSubset(edge.Left, lhs) && + !Overlaps(edge.Left, rhs) && + IsSubset(edge.Right, rhs) && + !Overlaps(edge.Right, lhs) + ) { + return &edge; + } + } + + return nullptr; + } + +private: + /* Attach edges to nodes */ + void AddEdgeImpl(TEdge edge) { + Edges_.push_back(edge); + + if (edge.IsSimpleEdge()) { + Nodes_[GetLowestSetBit(edge.Left)].SimpleNeighborhood |= edge.Right; + return; + } + + auto setBitsIt = TSetBitsIt(edge.Left); + while (setBitsIt.HasNext()) { + Nodes_[setBitsIt.Next()].ComplexEdgesId.push_back(Edges_.size() - 1); + } + } + +private: + std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, size_t> NodeIdByRelationOptimizerNode_; + + TVector<TNode> Nodes_; + TVector<TEdge> Edges_; +}; + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_join_tree_node.cpp b/ydb/library/yql/dq/opt/dq_opt_join_tree_node.cpp new file mode 100644 index 0000000000..1c4db89bb6 --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_join_tree_node.cpp @@ -0,0 +1,44 @@ +#include "dq_opt_join_tree_node.h" + +namespace NYql::NDq { + +std::shared_ptr<TJoinOptimizerNodeInternal> MakeJoinInternal( + std::shared_ptr<IBaseOptimizerNode> left, + std::shared_ptr<IBaseOptimizerNode> right, + const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, + const TVector<TString>& leftJoinKeys, + const TVector<TString>& rightJoinKeys, + EJoinKind joinKind, + EJoinAlgoType joinAlgo, + IProviderContext& ctx) { + + auto res = std::make_shared<TJoinOptimizerNodeInternal>(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinKind, joinAlgo); + res->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, joinAlgo, ctx)); + return res; +} + +std::shared_ptr<TJoinOptimizerNode> ConvertFromInternal(const std::shared_ptr<IBaseOptimizerNode> internal) { + Y_ENSURE(internal->Kind == EOptimizerNodeKind::JoinNodeType); + + if (dynamic_cast<TJoinOptimizerNode*>(internal.get()) != nullptr) { + return std::static_pointer_cast<TJoinOptimizerNode>(internal); + } + + auto join = std::static_pointer_cast<TJoinOptimizerNodeInternal>(internal); + + auto left = join->LeftArg; + auto right = join->RightArg; + + if (left->Kind == EOptimizerNodeKind::JoinNodeType) { + left = ConvertFromInternal(left); + } + if (right->Kind == EOptimizerNodeKind::JoinNodeType) { + right = ConvertFromInternal(right); + } + + auto newJoin = std::make_shared<TJoinOptimizerNode>(left, right, join->JoinConditions, join->JoinType, join->JoinAlgo); + newJoin->Stats = join->Stats; + return newJoin; +} + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_join_tree_node.h b/ydb/library/yql/dq/opt/dq_opt_join_tree_node.h new file mode 100644 index 0000000000..37a3075175 --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_join_tree_node.h @@ -0,0 +1,80 @@ +#pragma once + +#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h> + +namespace NYql::NDq { + +/** + * Internal Join nodes are used inside the CBO. They don't own join condition data structures + * and therefore avoid copying them during generation of candidate plans. + * + * These datastructures are owned by the query graph, so it is important to keep the graph around + * while internal nodes are being used. + * + * After join enumeration, internal nodes need to be converted to regular nodes, that own the data + * structures +*/ +struct TJoinOptimizerNodeInternal : public IBaseOptimizerNode { + TJoinOptimizerNodeInternal( + const std::shared_ptr<IBaseOptimizerNode>& left, + const std::shared_ptr<IBaseOptimizerNode>& right, + const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, + const TVector<TString>& leftJoinKeys, + const TVector<TString>& rightJoinKeys, + const EJoinKind joinType, + const EJoinAlgoType joinAlgo + ) + : IBaseOptimizerNode(JoinNodeType) + , LeftArg(left) + , RightArg(right) + , JoinConditions(joinConditions) + , LeftJoinKeys(leftJoinKeys) + , RightJoinKeys(rightJoinKeys) + , JoinType(joinType) + , JoinAlgo(joinAlgo) + {} + + virtual ~TJoinOptimizerNodeInternal() = default; + virtual TVector<TString> Labels() { + auto res = LeftArg->Labels(); + auto rightLabels = RightArg->Labels(); + res.insert(res.begin(),rightLabels.begin(),rightLabels.end()); + return res; + } + + virtual void Print(std::stringstream&, int) { + } + + std::shared_ptr<IBaseOptimizerNode> LeftArg; + std::shared_ptr<IBaseOptimizerNode> RightArg; + const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& JoinConditions; + const TVector<TString>& LeftJoinKeys; + const TVector<TString>& RightJoinKeys; + EJoinKind JoinType; + EJoinAlgoType JoinAlgo; +}; + +/** + * Create a new internal join node and compute its statistics and cost +*/ +std::shared_ptr<TJoinOptimizerNodeInternal> MakeJoinInternal( + std::shared_ptr<IBaseOptimizerNode> left, + std::shared_ptr<IBaseOptimizerNode> right, + const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, + const TVector<TString>& leftJoinKeys, + const TVector<TString>& rightJoinKeys, + EJoinKind joinKind, + EJoinAlgoType joinAlgo, + IProviderContext& ctx +); + +/** + * Convert a tree of internal optimizer nodes to external nodes that own the data structures. + * + * The internal node tree can have references to external nodes (since some subtrees are optimized + * separately if the plan contains non-orderable joins). So we check the instances and if we encounter + * an external node, we return the whole subtree unchanged. +*/ +std::shared_ptr<TJoinOptimizerNode> ConvertFromInternal(const std::shared_ptr<IBaseOptimizerNode> internal); + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/dq_opt_make_join_hypergraph.h b/ydb/library/yql/dq/opt/dq_opt_make_join_hypergraph.h new file mode 100644 index 0000000000..0f7f386e65 --- /dev/null +++ b/ydb/library/yql/dq/opt/dq_opt_make_join_hypergraph.h @@ -0,0 +1,98 @@ +#pragma once + +#include "dq_opt_join_hypergraph.h" +#include "dq_opt_conflict_rules_collector.h" + +#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h> + +#include <memory.h> + +/* + * This header contains MakeJoinHypergraph function to construct the hypergraph from inner optimizer nodes. + * Pipeline works as follows: + * 1) MakeJoinHypergraph calls MakeJoinHypergraphRec recursively. + * 2) MakeJoinHypergraphRec calls MakeHyperedge for each join node. + * 3) MakeHyperedge finds conflicts with TConflictRulesCollector and collect them into TES. + * If join has conflicts or complex predicate, then MakeHyperedge will create a complex edge. + */ + +namespace NYql::NDq { + +inline TVector<TString> GetConditionUsedRelationNames(const std::shared_ptr<TJoinOptimizerNode>& joinNode) { + TVector<TString> res; + res.reserve(joinNode->JoinConditions.size()); + + for (const auto& [lhsTable, rhsTable]: joinNode->JoinConditions) { + res.push_back(lhsTable.RelName); + res.push_back(rhsTable.RelName); + } + + return res; +} + +template <typename TNodeSet> +TJoinHypergraph<TNodeSet>::TEdge MakeHyperedge( + const std::shared_ptr<TJoinOptimizerNode>& joinNode, + const TNodeSet& conditionUsedRels, + std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& subtreeNodes +) { + auto conflictRulesCollector = TConflictRulesCollector<TNodeSet>(joinNode, subtreeNodes); + auto conflictRules = conflictRulesCollector.CollectConflicts(); + + TNodeSet TES = ConvertConflictRulesIntoTES(conditionUsedRels, conflictRules); + + + /* For CROSS Join and degenerate predicates (if subtree tables and joinCondition tables do not intersect) */ + if (!Overlaps(TES, subtreeNodes[joinNode->LeftArg])) { + TES |= subtreeNodes[joinNode->LeftArg]; + TES = ConvertConflictRulesIntoTES(TES, conflictRules); + } + + if (!Overlaps(TES, subtreeNodes[joinNode->RightArg])) { + TES |= subtreeNodes[joinNode->RightArg]; + TES = ConvertConflictRulesIntoTES(TES, conflictRules); + } + + TNodeSet left = TES & subtreeNodes[joinNode->LeftArg]; + TNodeSet right = TES & subtreeNodes[joinNode->RightArg]; + + return typename TJoinHypergraph<TNodeSet>::TEdge(left, right, joinNode->JoinType, OperatorIsCommutative(joinNode->JoinType) && joinNode->IsReorderable, joinNode->JoinConditions); +} + +template<typename TNodeSet> +void MakeJoinHypergraphRec( + TJoinHypergraph<TNodeSet>& graph, + const std::shared_ptr<IBaseOptimizerNode>& joinTree, + std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& subtreeNodes +) { + if (joinTree->Kind == RelNodeType) { + size_t nodeId = graph.AddNode(joinTree); + TNodeSet node{}; + node[nodeId] = 1; + subtreeNodes[joinTree] = node; + return; + } + + auto joinNode = std::static_pointer_cast<TJoinOptimizerNode>(joinTree); + MakeJoinHypergraphRec(graph, joinNode->LeftArg, subtreeNodes); + MakeJoinHypergraphRec(graph, joinNode->RightArg, subtreeNodes); + + subtreeNodes[joinTree] = subtreeNodes[joinNode->LeftArg] | subtreeNodes[joinNode->RightArg]; + + TNodeSet conditionUsedRels{}; + conditionUsedRels = graph.GetNodesByRelNamesInSubtree(joinTree, GetConditionUsedRelationNames(joinNode)); + + graph.AddEdge(MakeHyperedge<TNodeSet>(joinNode, conditionUsedRels, subtreeNodes)); +} + +template <typename TNodeSet> +TJoinHypergraph<TNodeSet> MakeJoinHypergraph( + const std::shared_ptr<IBaseOptimizerNode>& joinTree +) { + TJoinHypergraph<TNodeSet> graph{}; + std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet> subtreeNodes{}; + MakeJoinHypergraphRec(graph, joinTree, subtreeNodes); + return graph; +} + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/opt/ya.make b/ydb/library/yql/dq/opt/ya.make index 999fbec7dc..0e939390ec 100644 --- a/ydb/library/yql/dq/opt/ya.make +++ b/ydb/library/yql/dq/opt/ya.make @@ -14,7 +14,10 @@ PEERDIR( SRCS( dq_opt.cpp dq_opt_build.cpp + dq_opt_conflict_rules_collector.cpp dq_opt_join.cpp + dq_opt_join_cost_based.cpp + dq_opt_join_tree_node.cpp dq_opt_hopping.cpp dq_opt_log.cpp dq_opt_peephole.cpp @@ -22,7 +25,6 @@ SRCS( dq_opt_phy.cpp dq_opt_stat.cpp dq_opt_stat_transformer_base.cpp - dq_opt_join_cost_based.cpp dq_opt_predicate_selectivity.cpp ) |