aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpilik <96681992+pashandor789@users.noreply.github.com>2024-04-17 16:17:44 +0300
committerGitHub <noreply@github.com>2024-04-17 16:17:44 +0300
commitb2c3e7dc88170527cbfaef31f71518c7ee7c4445 (patch)
tree5b3c68d82a37ae1c56822a0fcc05a802ee5038e8
parent62d05d286a413aa242aa3ed83365943de8c3906f (diff)
downloadydb-b2c3e7dc88170527cbfaef31f71518c7ee7c4445.tar.gz
[DPHyp] Impl. added (#3763)
Co-authored-by: Pavel Ivanov <pudge1000-7@mr-nvme-testing-11.search.yandex.net>
-rw-r--r--.github/config/muted_ya.txt3
-rw-r--r--ydb/library/yql/core/cbo/cbo_optimizer_new.cpp2
-rw-r--r--ydb/library/yql/dq/opt/bitset.h76
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.cpp86
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.h148
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_dphyp_solver.h480
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp1316
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_join_cost_based.h29
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_join_hypergraph.h190
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_join_tree_node.cpp44
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_join_tree_node.h80
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_make_join_hypergraph.h98
-rw-r--r--ydb/library/yql/dq/opt/ya.make4
13 files changed, 1357 insertions, 1199 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt
index fd595c8922..6fd1dafaf5 100644
--- a/.github/config/muted_ya.txt
+++ b/.github/config/muted_ya.txt
@@ -34,7 +34,10 @@ ydb/core/viewer/ut Viewer.TabletMerging
ydb/core/viewer/ut Viewer.TabletMergingPacked
ydb/library/actors/http/ut HttpProxy.TooLongHeader
ydb/library/actors/http/ut sole*
+ydb/library/yql/dq/opt/ut DQCBO.JoinSearch3Rels
ydb/library/yql/providers/generic/connector/tests* *
+ydb/library/yql/providers/yt/provider/ut TYqlCBO.NonReordable
+ydb/library/yql/tests/sql/dq_file/part17 *dq-join_cbo_native_3_tables--*
ydb/public/lib/ydb_cli/topic/ut TTopicReaderTests.TestRun_ReadOneMessage
ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut RetryPolicy.RetryWithBatching
ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/with_offset_ranges_mode_ut RetryPolicy.TWriteSession_TestBrokenPolicy
diff --git a/ydb/library/yql/core/cbo/cbo_optimizer_new.cpp b/ydb/library/yql/core/cbo/cbo_optimizer_new.cpp
index d6187d619d..aa010f02d8 100644
--- a/ydb/library/yql/core/cbo/cbo_optimizer_new.cpp
+++ b/ydb/library/yql/core/cbo/cbo_optimizer_new.cpp
@@ -70,7 +70,7 @@ TJoinOptimizerNode::TJoinOptimizerNode(const std::shared_ptr<IBaseOptimizerNode>
JoinConditions(joinConditions),
JoinType(joinType),
JoinAlgo(joinAlgo) {
- IsReorderable = (JoinType==EJoinKind::InnerJoin) && (nonReorderable==false);
+ IsReorderable = !nonReorderable;
for (auto [l,r] : joinConditions ) {
LeftJoinKeys.push_back(l.AttributeName);
RightJoinKeys.push_back(r.AttributeName);
diff --git a/ydb/library/yql/dq/opt/bitset.h b/ydb/library/yql/dq/opt/bitset.h
new file mode 100644
index 0000000000..9edad03adc
--- /dev/null
+++ b/ydb/library/yql/dq/opt/bitset.h
@@ -0,0 +1,76 @@
+#pragma once
+
+#include <stdlib.h>
+
+/*
+ * This header contains helper functions for working with bitsets.
+ * They are templated by TNodeSet, which is a std::bitset<>.
+ * We use the the template for efficiency: for 64 bit nodesets we implement a faster next subset functionality
+ */
+
+namespace NYql::NDq {
+
+template <typename TNodeSet>
+inline bool Overlaps(const TNodeSet& lhs, const TNodeSet& rhs) {
+ return (lhs & rhs) != 0;
+}
+
+template <typename TNodeSet>
+inline bool IsSubset(const TNodeSet& lhs, const TNodeSet& rhs) {
+ return (lhs & rhs) == lhs;
+}
+
+template <typename TNodeSet>
+inline bool HasSingleBit(TNodeSet nodeSet) {
+ return nodeSet.count() == 1;
+}
+
+template <typename TNodeSet>
+inline size_t GetLowestSetBit(TNodeSet nodeSet) {
+ for (size_t i = 0; i < nodeSet.size(); ++i) {
+ if (nodeSet[i]) {
+ return i;
+ }
+ }
+
+ Y_ASSERT(false);
+ return nodeSet.size();
+}
+
+/* Iterates the indecies of the set bits in the TNodeSet. */
+template <typename TNodeSet>
+class TSetBitsIt {
+public:
+ TSetBitsIt(TNodeSet nodeSet)
+ : NodeSet_(nodeSet)
+ , Size_(nodeSet.size())
+ , BitId_(0)
+ {
+ SkipUnsetBits();
+ }
+
+ bool HasNext() {
+ return BitId_ < Size_;
+ }
+
+ size_t Next() {
+ size_t bitId = BitId_++;
+ SkipUnsetBits();
+
+ return bitId;
+ }
+
+private:
+ void SkipUnsetBits() {
+ while (BitId_ < Size_ && !NodeSet_[BitId_]) {
+ ++BitId_;
+ }
+ }
+
+private:
+ TNodeSet NodeSet_;
+ size_t Size_;
+ size_t BitId_;
+};
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.cpp b/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.cpp
new file mode 100644
index 0000000000..b58b8ba130
--- /dev/null
+++ b/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.cpp
@@ -0,0 +1,86 @@
+#include "dq_opt_conflict_rules_collector.h"
+#include <util/generic/hash_set.h>
+
+namespace NYql::NDq {
+
+/* To make ASSOC, RASSCOM, LASSCOM tables simplier */
+EJoinKind GetEquivalentJoinByAlgebraicProperties(EJoinKind joinKind) {
+ switch (joinKind) {
+ case EJoinKind::Exclusion:
+ return EJoinKind::InnerJoin;
+ case EJoinKind::LeftOnly:
+ return EJoinKind::LeftJoin;
+ default:
+ return joinKind;
+ }
+}
+
+bool OperatorIsCommutative(EJoinKind joinKind) {
+ joinKind = GetEquivalentJoinByAlgebraicProperties(joinKind);
+ switch (joinKind) {
+ case EJoinKind::InnerJoin:
+ case EJoinKind::OuterJoin:
+ case EJoinKind::Cross:
+ return true;
+ default:
+ return false;
+ }
+
+ Y_UNREACHABLE();
+}
+
+bool OperatorsAreAssociative(EJoinKind lhs, EJoinKind rhs) {
+ lhs = GetEquivalentJoinByAlgebraicProperties(lhs);
+ rhs = GetEquivalentJoinByAlgebraicProperties(rhs);
+
+ static THashMap<EJoinKind, THashSet<EJoinKind>> ASSOC_TABLE = {
+ {EJoinKind::Cross, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin}},
+ {EJoinKind::InnerJoin, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin}},
+ {EJoinKind::LeftJoin, {EJoinKind::LeftJoin}},
+ {EJoinKind::OuterJoin, {EJoinKind::LeftJoin, EJoinKind::OuterJoin}}
+ };
+
+ if (!(ASSOC_TABLE.contains(lhs))) {
+ return false;
+ }
+
+ return ASSOC_TABLE[lhs].contains(rhs);
+}
+
+bool OperatorsAreLeftAsscom(EJoinKind lhs, EJoinKind rhs) {
+ lhs = GetEquivalentJoinByAlgebraicProperties(lhs);
+ rhs = GetEquivalentJoinByAlgebraicProperties(rhs);
+
+ static THashMap<EJoinKind, THashSet<EJoinKind>> LASSCOM_TABLE = {
+ {EJoinKind::Cross, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin}},
+ {EJoinKind::InnerJoin, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin}},
+ {EJoinKind::LeftSemi, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin}},
+ {EJoinKind::LeftJoin, {EJoinKind::Cross, EJoinKind::InnerJoin, EJoinKind::LeftSemi, EJoinKind::LeftJoin, EJoinKind::OuterJoin}},
+ {EJoinKind::OuterJoin, {EJoinKind::LeftJoin, EJoinKind::OuterJoin}}
+ };
+
+ if (!(LASSCOM_TABLE.contains(lhs))) {
+ return false;
+ }
+
+ return LASSCOM_TABLE[lhs].contains(rhs);
+}
+
+bool OperatorsAreRightAsscom(EJoinKind lhs, EJoinKind rhs) {
+ lhs = GetEquivalentJoinByAlgebraicProperties(lhs);
+ rhs = GetEquivalentJoinByAlgebraicProperties(rhs);
+
+ static THashMap<EJoinKind, THashSet<EJoinKind>> RASSCOM_TABLE = {
+ {EJoinKind::Cross, {EJoinKind::Cross, EJoinKind::InnerJoin}},
+ {EJoinKind::InnerJoin, {EJoinKind::Cross, EJoinKind::InnerJoin}},
+ {EJoinKind::OuterJoin, {EJoinKind::OuterJoin}}
+ };
+
+ if (!(RASSCOM_TABLE.contains(lhs))) {
+ return false;
+ }
+
+ return RASSCOM_TABLE[lhs].contains(rhs);
+}
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.h b/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.h
new file mode 100644
index 0000000000..9cbe532e92
--- /dev/null
+++ b/ydb/library/yql/dq/opt/dq_opt_conflict_rules_collector.h
@@ -0,0 +1,148 @@
+#pragma once
+
+#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h>
+
+/*
+ * This header contains an algorithm for resolving join conflicts with TConflictRulesCollector class
+ * and ConvertConflictRulesIntoTES function, which are used to construct the hypergraph.
+ */
+
+namespace NYql::NDq {
+
+bool OperatorIsCommutative(EJoinKind);
+
+bool OperatorsAreAssociative(EJoinKind, EJoinKind);
+
+/* (e1 o12 e3) o13 e3 == (e1 o13 e3) o12 e2 */
+bool OperatorsAreLeftAsscom(EJoinKind, EJoinKind);
+
+/* e1 o13 (e2 o23 e3) == e2 o23 (e1 o13 e3) */
+bool OperatorsAreRightAsscom(EJoinKind, EJoinKind);
+
+template <typename TNodeSet>
+struct TConflictRule {
+ TConflictRule(const TNodeSet& ruleActivationNodes, const TNodeSet& requiredNodes)
+ : RuleActivationNodes(ruleActivationNodes)
+ , RequiredNodes(requiredNodes)
+ {}
+
+ TNodeSet RuleActivationNodes;
+ TNodeSet RequiredNodes;
+};
+
+/*
+ * This class finds and collect conflicts between root of subtree and its nodes.
+ * It traverses both sides of root and checks algebraic join properties (ASSOC, LASSCOM, RASSCOM).
+ * The name of algorithm is "CD-C", and details are described in white papper -
+ * - "On the Correct and Complete Enumeration of the Core Search Space" in section "5.4 Approach CD-C".
+ */
+template<typename TNodeSet>
+class TConflictRulesCollector {
+public:
+ TConflictRulesCollector(
+ std::shared_ptr<TJoinOptimizerNode> root,
+ std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& subtreeNodes
+ )
+ : Root_(root)
+ , ConflictRules_({})
+ , SubtreeNodes_(subtreeNodes)
+ {}
+
+ TVector<TConflictRule<TNodeSet>> CollectConflicts() {
+ VisitJoinTree(Root_->LeftArg, GetLeftConflictsVisitor());
+ VisitJoinTree(Root_->RightArg, GetRightConflictsVisitor());
+ return std::move(ConflictRules_);
+ }
+
+private:
+ auto GetLeftConflictsVisitor() {
+ auto visitor = [this](const std::shared_ptr<TJoinOptimizerNode>& child) {
+ if (!OperatorsAreAssociative(child->JoinType, Root_->JoinType) || !Root_->IsReorderable || !child->IsReorderable) {
+ ConflictRules_.emplace_back(
+ SubtreeNodes_[child->RightArg],
+ SubtreeNodes_[child->LeftArg]
+ );
+ }
+
+ if (!OperatorsAreLeftAsscom(child->JoinType, Root_->JoinType) || !Root_->IsReorderable || !child->IsReorderable) {
+ ConflictRules_.emplace_back(
+ SubtreeNodes_[child->LeftArg],
+ SubtreeNodes_[child->RightArg]
+ );
+ }
+ };
+
+ return visitor;
+ }
+
+ auto GetRightConflictsVisitor() {
+ auto visitor = [this](const std::shared_ptr<TJoinOptimizerNode>& child) {
+ if (!OperatorsAreAssociative(Root_->JoinType, child->JoinType) || !Root_->IsReorderable || !child->IsReorderable) {
+ ConflictRules_.emplace_back(
+ SubtreeNodes_[child->LeftArg],
+ SubtreeNodes_[child->RightArg]
+ );
+ }
+
+ if (!OperatorsAreRightAsscom(Root_->JoinType, child->JoinType) || !Root_->IsReorderable || !child->IsReorderable) {
+ ConflictRules_.emplace_back(
+ SubtreeNodes_[child->RightArg],
+ SubtreeNodes_[child->LeftArg]
+ );
+ }
+ };
+
+ return visitor;
+ }
+
+private:
+ template <typename TFunction>
+ void VisitJoinTree(const std::shared_ptr<IBaseOptimizerNode>& child, TFunction visitor) {
+ if (child->Kind == EOptimizerNodeKind::RelNodeType) {
+ return;
+ }
+
+ auto childJoinNode = std::static_pointer_cast<TJoinOptimizerNode>(child);
+ VisitJoinTree(childJoinNode->LeftArg, visitor);
+ VisitJoinTree(childJoinNode->RightArg, visitor);
+
+ visitor(childJoinNode);
+ }
+
+private:
+ std::shared_ptr<TJoinOptimizerNode> Root_;
+ TVector<TConflictRule<TNodeSet>> ConflictRules_;
+ std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& SubtreeNodes_;
+};
+
+/*
+ * This function converts conflict rules into TES.
+ * TES (Total Eligibility Set) captures reordering constraints and represents
+ * set of table, that must present, before join expresion can be evaluated.
+ * It is initialized with SES (Syntatic Eligibility Set) - condition used tables.
+ */
+template <typename TNodeSet>
+TNodeSet ConvertConflictRulesIntoTES(const TNodeSet& SES, TVector<TConflictRule<TNodeSet>>& conflictRules) {
+ auto TES = SES;
+
+ while (true) {
+ auto prevTES = TES;
+
+ for (const auto& conflictRule: conflictRules) {
+ if (Overlaps(conflictRule.RuleActivationNodes, TES)) {
+ TES |= conflictRule.RequiredNodes;
+ }
+ }
+
+ EraseIf(
+ conflictRules,
+ [&](const TConflictRule<TNodeSet>& conflictRule){ return IsSubset(conflictRule.RequiredNodes, TES); }
+ );
+
+ if (TES == prevTES || conflictRules.empty()) {
+ return TES;
+ }
+ }
+}
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_dphyp_solver.h b/ydb/library/yql/dq/opt/dq_opt_dphyp_solver.h
new file mode 100644
index 0000000000..1c35a8f49c
--- /dev/null
+++ b/ydb/library/yql/dq/opt/dq_opt_dphyp_solver.h
@@ -0,0 +1,480 @@
+#pragma once
+
+#include "dq_opt_join_hypergraph.h"
+#include "dq_opt_join_tree_node.h"
+#include "bitset.h"
+
+namespace NYql::NDq {
+
+#ifdef DEBUG
+ struct pair_hash {
+ template <class T1, class T2>
+ std::size_t operator () (const std::pair<T1,T2> &p) const {
+ auto h1 = std::hash<T1>{}(p.first);
+ auto h2 = std::hash<T2>{}(p.second);
+
+ // Mainly for demonstration purposes, i.e. works but is overly simple
+ // In the real world, use sth. like boost.hash_combine
+ return h1 ^ h2;
+ }
+ };
+#endif
+
+/*
+ * DPHyp (Dynamic Programming with Hypergraph) is a graph-aware
+ * join eumeration algorithm that only considers CSGs (Connected Sub-Graphs) of
+ * the join graph and computes CMPs (Complement pairs) that are also connected
+ * subgraphs of the join graph. It enumerates CSGs in the order, such that subsets
+ * are enumerated first and no duplicates are ever enumerated. Then, for each emitted
+ * CSG it computes the complements with the same conditions - they much already be
+ * present in the dynamic programming table and no pair should be enumerated twice.
+ * Details are described in white papper - "Dynamic Programming Strikes Back".
+ *
+ * This class is templated by std::bitset with the largest number of joins we can process
+ * or std::bitset<64>, which has a more efficient implementation of enumerating subsets of set.
+ */
+template <typename TNodeSet>
+class TDPHypSolver {
+public:
+ TDPHypSolver(
+ TJoinHypergraph<TNodeSet>& graph,
+ IProviderContext& ctx
+ )
+ : Graph_(graph)
+ , NNodes_(graph.GetNodes().size())
+ , Pctx_(ctx)
+ {}
+
+ // Run DPHyp algorithm and produce the join tree in CBO's internal representation
+ std::shared_ptr<TJoinOptimizerNodeInternal> Solve();
+
+ // Calculate the size of a dynamic programming table with a budget
+ ui32 CountCC(ui32 budget);
+
+private:
+ void EnumerateCsgRec(const TNodeSet& s1, const TNodeSet& x);
+
+ void EmitCsg(const TNodeSet& s1);
+
+ void EnumerateCmpRec(const TNodeSet& s1, const TNodeSet& s2, const TNodeSet& x);
+
+ void EmitCsgCmp(const TNodeSet& s1, const TNodeSet& s2, const TJoinHypergraph<TNodeSet>::TEdge* csgCmpEdge);
+
+private:
+ // Create an exclusion set that contains all the nodes of the graph that are smaller or equal to
+ // the smallest node in the provided bitset
+ inline TNodeSet MakeBiMin(const TNodeSet& s);
+
+ // Create an exclusion set that contains all the nodes of the bitset that are smaller or equal to
+ // the provided integer
+ inline TNodeSet MakeB(const TNodeSet& s, size_t v);
+
+ // Compute the neighbors of a set of nodes, excluding the nodes in exclusion set
+ TNodeSet Neighs(TNodeSet s, TNodeSet x);
+
+ // Compute the next subset of relations, given by the final bitset
+ TNodeSet NextBitset(const TNodeSet& current, const TNodeSet& final);
+
+ // Iterate over all join algorithms and pick the best join that is applicable.
+ // Also considers commuting joins
+ std::shared_ptr<TJoinOptimizerNodeInternal> PickBestJoin(
+ const std::shared_ptr<IBaseOptimizerNode>& left,
+ const std::shared_ptr<IBaseOptimizerNode>& right,
+ EJoinKind joinKind,
+ bool isCommutative,
+ const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
+ const std::set<std::pair<TJoinColumn, TJoinColumn>>& reversedJoinConditions,
+ const TVector<TString>& leftJoinKeys,
+ const TVector<TString>& rightJoinKeys,
+ IProviderContext& ctx
+ );
+
+ // Count the size of the dynamic programming table recursively
+ ui32 CountCCRec(const TNodeSet&, const TNodeSet&, ui32, ui32);
+
+private:
+ TJoinHypergraph<TNodeSet>& Graph_;
+ size_t NNodes_;
+ IProviderContext& Pctx_; // Provider specific contexts?
+ // FIXME: This is a temporary structure that needs to be extended to multiple providers.
+ #ifdef DEBUG
+ THashMap<std::pair<TNodeSet, TNodeSet>, bool, pair_hash> CheckTable_;
+ #endif
+private:
+ THashMap<TNodeSet, std::shared_ptr<IBaseOptimizerNode>, std::hash<TNodeSet>> DpTable_;
+};
+
+/*
+ * Count the number of items in the DP table of DPHyp
+ */
+template <typename TNodeSet> ui32 TDPHypSolver<TNodeSet>::CountCC(ui32 budget) {
+ TNodeSet allNodes;
+ allNodes.set();
+ ui32 cost = 0;
+
+ for (int i = NNodes_ - 1; i >= 0; --i) {
+ ++cost;
+ if (cost > budget) {
+ return cost;
+ }
+ TNodeSet s;
+ s[i] = 1;
+ TNodeSet x = MakeB(allNodes,i);
+ cost = CountCCRec(s, x, cost, budget);
+ }
+
+ return cost;
+}
+
+/**
+ * Recursively count the nuber of items in the DP table of DPccp
+*/
+template <typename TNodeSet> ui32 TDPHypSolver<TNodeSet>::CountCCRec(const TNodeSet& s, const TNodeSet& x, ui32 cost, ui32 budget) {
+ TNodeSet neighs = Neighs(s, x);
+
+ if (neighs == TNodeSet{}) {
+ return cost;
+ }
+
+ TNodeSet prev;
+ TNodeSet next;
+
+ while(true) {
+ next = NextBitset(prev, neighs);
+ cost += 1;
+ if (cost > budget) {
+ return cost;
+ }
+ cost = CountCCRec(s | next, x | neighs, cost, budget);
+ if (next == neighs) {
+ break;
+ }
+ prev = next;
+ }
+
+ return cost;
+}
+
+template<typename TNodeSet> TNodeSet TDPHypSolver<TNodeSet>::Neighs(TNodeSet s, TNodeSet x) {
+ TNodeSet neighs{};
+
+ auto& nodes = Graph_.GetNodes();
+
+ TSetBitsIt<TNodeSet> setBitsIt(s);
+ while (setBitsIt.HasNext()) {
+ size_t nodeId = setBitsIt.Next();
+
+ neighs |= nodes[nodeId].SimpleNeighborhood;
+
+ for (const auto& edgeId: nodes[nodeId].ComplexEdgesId) {
+ auto& edge = Graph_.GetEdge(edgeId);
+ if (
+ IsSubset(edge.Left, s) &&
+ !Overlaps(edge.Right, x) &&
+ !Overlaps(edge.Right, s) &&
+ !Overlaps(edge.Right, neighs)
+ ) {
+ neighs[GetLowestSetBit(edge.Right)] = 1;
+ }
+ }
+ }
+
+ neighs &= ~x;
+ return neighs;
+}
+
+template<>
+inline std::bitset<64> TDPHypSolver<std::bitset<64>>::NextBitset(const std::bitset<64>& prev, const std::bitset<64>& final) {
+ return std::bitset<64>((prev | ~final).to_ulong() + 1) & final;
+}
+
+template<typename TNodeSet> TNodeSet TDPHypSolver<TNodeSet>::NextBitset(const TNodeSet& prev, const TNodeSet& final) {
+ if (prev == final) {
+ return final;
+ }
+
+ TNodeSet res = prev;
+
+ bool carry = true;
+ for (size_t i = 0; i < NNodes_; i++)
+ {
+ if (!carry) {
+ break;
+ }
+
+ if (!final[i]) {
+ continue;
+ }
+
+ if (res[i] == 1 && carry) {
+ res[i] = 0;
+ } else if (res[i] == 0 && carry) {
+ res[i] = 1;
+ carry = false;
+ }
+ }
+
+ return res;
+}
+
+template<typename TNodeSet> std::shared_ptr<TJoinOptimizerNodeInternal> TDPHypSolver<TNodeSet>::Solve() {
+ auto& nodes = Graph_.GetNodes();
+
+ Y_ASSERT(nodes.size() == NNodes_);
+
+ for (int i = NNodes_ - 1; i >= 0; --i) {
+ TNodeSet s{};
+ s[i] = 1;
+ DpTable_[s] = nodes[i].RelationOptimizerNode;
+ }
+
+ for (int i = NNodes_ - 1; i >= 0; --i) {
+ TNodeSet s{};
+ s[i] = 1;
+ EmitCsg(s);
+ auto bi = MakeBiMin(s);
+ EnumerateCsgRec(s, bi);
+ }
+
+ TNodeSet allNodes{};
+ for (size_t i = 0; i < NNodes_; ++i) {
+ allNodes[i] = 1;
+ }
+
+ Y_ASSERT(DpTable_.contains(allNodes));
+
+ return std::static_pointer_cast<TJoinOptimizerNodeInternal>(DpTable_[allNodes]);
+}
+
+/*
+ * Enumerates connected subgraphs
+ * First it emits CSGs that are created by adding neighbors of S to S
+ * Then it recurses on the S fused with its neighbors.
+ */
+template <typename TNodeSet> void TDPHypSolver<TNodeSet>::EnumerateCsgRec(const TNodeSet& s1, const TNodeSet& x) {
+ TNodeSet neighs = Neighs(s1, x);
+
+ if (neighs == TNodeSet{}) {
+ return;
+ }
+
+ TNodeSet prev{};
+ TNodeSet next{};
+
+ while (true) {
+ next = NextBitset(prev, neighs);
+
+ if (DpTable_.contains(s1 | next)) {
+ EmitCsg(s1 | next);
+ }
+
+ if (next == neighs) {
+ break;
+ }
+ prev = next;
+ }
+
+ prev.reset();
+ while (true) {
+ next = NextBitset(prev, neighs);
+
+ EnumerateCsgRec(s1 | next, x | neighs);
+
+ if (next == neighs) {
+ break;
+
+ }
+
+ prev = next;
+ }
+}
+
+/*
+ * EmitCsg emits Connected SubGraphs
+ * First it iterates through neighbors of the initial set S and emits pairs
+ * (S,S2), where S2 is the neighbor of S. Then it recursively emits complement pairs
+ */
+template <typename TNodeSet> void TDPHypSolver<TNodeSet>::EmitCsg(const TNodeSet& s1) {
+ TNodeSet x = s1 | MakeBiMin(s1);
+ TNodeSet neighs = Neighs(s1, x);
+
+ if (neighs == TNodeSet{}) {
+ return;
+ }
+
+ for (int i = NNodes_ - 1; i >= 0; i--) {
+ if (neighs[i]) {
+ TNodeSet s2{};
+ s2[i] = 1;
+
+ if (auto* edge = Graph_.FindEdgeBetween(s1, s2)) {
+ EmitCsgCmp(s1, s2, edge);
+ }
+
+ EnumerateCmpRec(s1, s2, x | MakeB(neighs, GetLowestSetBit(s2)));
+ }
+ }
+}
+
+/*
+ * Enumerates complement pairs
+ * First it emits the pairs (S1,S2+next) where S2+next is the set of relation sets
+ * that are obtained by adding S2's neighbors to itself
+ * Then it recusrses into pairs (S1,S2+next)
+ */
+template <typename TNodeSet> void TDPHypSolver<TNodeSet>::EnumerateCmpRec(const TNodeSet& s1, const TNodeSet& s2, const TNodeSet& x) {
+ TNodeSet neighs = Neighs(s2, x);
+
+ if (neighs == TNodeSet{}) {
+ return;
+ }
+
+ TNodeSet prev{};
+ TNodeSet next{};
+
+ while (true) {
+ next = NextBitset(prev, neighs);
+
+ if (DpTable_.contains(s2 | next)) {
+ if (auto* edge = Graph_.FindEdgeBetween(s1, s2 | next)) {
+ EmitCsgCmp(s1, s2 | next, edge);
+ }
+ }
+
+ if (next == neighs) {
+ break;
+ }
+
+ prev = next;
+ }
+
+ prev.reset();
+ while (true) {
+ next = NextBitset(prev, neighs);
+
+ EnumerateCmpRec(s1, s2 | next, x | neighs);
+
+ if (next == neighs) {
+ break;
+ }
+
+ prev = next;
+ }
+}
+
+template <typename TNodeSet> TNodeSet TDPHypSolver<TNodeSet>::MakeBiMin(const TNodeSet& s) {
+ TNodeSet res{};
+
+ for (size_t i = 0; i < NNodes_; i++) {
+ if (s[i]) {
+ for (size_t j = 0; j <= i; j++) {
+ res[j] = 1;
+ }
+ break;
+ }
+ }
+ return res;
+}
+
+template <typename TNodeSet> TNodeSet TDPHypSolver<TNodeSet>::MakeB(const TNodeSet& s, size_t v) {
+ TNodeSet res{};
+
+ for (size_t i = 0; i < NNodes_; i++) {
+ if (s[i] && i <= v) {
+ res[i] = 1;
+ }
+ }
+
+ return res;
+}
+
+template <typename TNodeSet> std::shared_ptr<TJoinOptimizerNodeInternal> TDPHypSolver<TNodeSet>::PickBestJoin(
+ const std::shared_ptr<IBaseOptimizerNode>& left,
+ const std::shared_ptr<IBaseOptimizerNode>& right,
+ EJoinKind joinKind,
+ bool isCommutative,
+ const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
+ const std::set<std::pair<TJoinColumn, TJoinColumn>>& reversedJoinConditions,
+ const TVector<TString>& leftJoinKeys,
+ const TVector<TString>& rightJoinKeys,
+ IProviderContext& ctx
+) {
+ double bestCost = std::numeric_limits<double>::infinity();
+ EJoinAlgoType bestAlgo{};
+ bool bestJoinIsReversed = false;
+
+ for (auto joinAlgo : AllJoinAlgos) {
+ if (ctx.IsJoinApplicable(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinAlgo)){
+ auto cost = ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, joinAlgo, ctx).Cost;
+ if (cost < bestCost) {
+ bestCost = cost;
+ bestAlgo = joinAlgo;
+ bestJoinIsReversed = false;
+ }
+ }
+
+ if (isCommutative) {
+ if (ctx.IsJoinApplicable(right, left, reversedJoinConditions, rightJoinKeys, leftJoinKeys, joinAlgo)){
+ auto cost = ComputeJoinStats(*right->Stats, *left->Stats, rightJoinKeys, leftJoinKeys, joinAlgo, ctx).Cost;
+ if (cost < bestCost) {
+ bestCost = cost;
+ bestAlgo = joinAlgo;
+ bestJoinIsReversed = true;
+ }
+ }
+ }
+ }
+
+ Y_ENSURE(bestCost != std::numeric_limits<double>::infinity(), "No join was chosen!");
+
+ if (bestJoinIsReversed) {
+ return MakeJoinInternal(right, left, reversedJoinConditions, rightJoinKeys, leftJoinKeys, joinKind, bestAlgo, ctx);
+ }
+
+ return MakeJoinInternal(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinKind, bestAlgo, ctx);
+}
+
+/*
+ * Emit a single CSG + CMP pair
+ */
+template<typename TNodeSet> void TDPHypSolver<TNodeSet>::EmitCsgCmp(const TNodeSet& s1, const TNodeSet& s2, const TJoinHypergraph<TNodeSet>::TEdge* csgCmpEdge) {
+ // Here we actually build the join and choose and compare the
+ // new plan to what's in the dpTable, if it there
+
+ Y_ENSURE(DpTable_.contains(s1), "DP Table does not contain S1");
+ Y_ENSURE(DpTable_.contains(s2), "DP Table does not conaint S2");
+
+ const auto* reversedEdge = &Graph_.GetEdge(csgCmpEdge->ReversedEdgeId);
+ auto leftNodes = DpTable_[s1];
+ auto rightNodes = DpTable_[s2];
+
+ if (csgCmpEdge->IsReversed) {
+ std::swap(csgCmpEdge, reversedEdge);
+ std::swap(leftNodes, rightNodes);
+ }
+
+ auto bestJoin = PickBestJoin(
+ leftNodes,
+ rightNodes,
+ csgCmpEdge->JoinKind,
+ csgCmpEdge->IsCommutative,
+ csgCmpEdge->JoinConditions,
+ reversedEdge->JoinConditions,
+ csgCmpEdge->LeftJoinKeys,
+ csgCmpEdge->RightJoinKeys,
+ Pctx_
+ );
+
+ TNodeSet joined = s1 | s2;
+ if (!DpTable_.contains(joined) || bestJoin->Stats->Cost < DpTable_[joined]->Stats->Cost) {
+ DpTable_[joined] = bestJoin;
+ }
+
+ #ifdef DEBUG
+ auto pair = std::make_pair(s1, s2);
+ Y_ENSURE (!CheckTable_.contains(pair), "Check table already contains pair S1|S2");
+ CheckTable_[ std::pair<TNodeSet,TNodeSet>(s1, s2) ] = true;
+ #endif
+}
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp
index e0871a7a7c..809c88611f 100644
--- a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp
@@ -1,117 +1,89 @@
-#include "dq_opt_join.h"
-#include "dq_opt_phy.h"
+#include "dq_opt_join_cost_based.h"
+#include "dq_opt_dphyp_solver.h"
+#include "dq_opt_make_join_hypergraph.h"
-#include <ydb/library/yql/core/yql_join.h>
-#include <ydb/library/yql/core/yql_opt_utils.h>
-#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
+#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/library/yql/dq/opt/dq_opt.h>
#include <ydb/library/yql/utils/log/log.h>
-#include <ydb/library/yql/providers/common/provider/yql_provider.h>
-#include <ydb/library/yql/core/yql_type_helpers.h>
-#include <ydb/library/yql/core/yql_statistics.h>
-#include <ydb/library/yql/core/yql_cost_function.h>
-
-#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h> //new interface
-
-#include <library/cpp/disjoint_sets/disjoint_sets.h>
-
-
-#include <bitset>
-#include <set>
-#include <unordered_map>
-#include <unordered_set>
-#include <vector>
-#include <queue>
-#include <memory>
-#include <sstream>
namespace NYql::NDq {
-
using namespace NYql::NNodes;
-/**
- * Edge structure records an edge in a Join graph.
- * - from is the integer id of the source vertex of the graph
- * - to is the integer id of the target vertex of the graph
- * - joinConditions records the set of join conditions of this edge
-*/
-struct TEdge {
- int From;
- int To;
- mutable std::set<std::pair<TJoinColumn, TJoinColumn>> JoinConditions;
- mutable TVector<TString> LeftJoinKeys;
- mutable TVector<TString> RightJoinKeys;
-
- TEdge(): From(-1), To(-1) {}
-
- TEdge(int f, int t): From(f), To(t) {}
-
- TEdge(int f, int t, std::pair<TJoinColumn, TJoinColumn> cond): From(f), To(t) {
- JoinConditions.insert(cond);
- BuildCondVectors();
+/*
+ * Collects EquiJoin inputs with statistics for cost based optimization
+ */
+bool DqCollectJoinRelationsWithStats(
+ TVector<std::shared_ptr<TRelOptimizerNode>>& rels,
+ TTypeAnnotationContext& typesCtx,
+ const TCoEquiJoin& equiJoin,
+ const TProviderCollectFunction& collector
+) {
+ if (equiJoin.ArgCount() < 3) {
+ return false;
}
- TEdge(int f, int t, std::set<std::pair<TJoinColumn, TJoinColumn>> conds): From(f), To(t),
- JoinConditions(conds) {
- BuildCondVectors();
- }
-
- void BuildCondVectors() {
- LeftJoinKeys.clear();
- RightJoinKeys.clear();
-
- for (auto [left, right] : JoinConditions) {
- auto leftKey = left.AttributeName;
- auto rightKey = right.AttributeName;
-
- for (size_t i = leftKey.size() - 1; i>0; i--) {
- if (leftKey[i]=='.') {
- leftKey = leftKey.substr(i+1);
- break;
- }
- }
+ for (size_t i = 0; i < equiJoin.ArgCount() - 2; ++i) {
+ auto input = equiJoin.Arg(i).Cast<TCoEquiJoinInput>();
+ auto joinArg = input.List();
- for (size_t i = rightKey.size() - 1; i>0; i--) {
- if (rightKey[i]=='.') {
- rightKey = rightKey.substr(i+1);
- break;
- }
- }
+ auto maybeStat = typesCtx.StatisticsMap.find(joinArg.Raw());
- LeftJoinKeys.emplace_back(leftKey);
- RightJoinKeys.emplace_back(rightKey);
+ if (maybeStat == typesCtx.StatisticsMap.end()) {
+ YQL_CLOG(TRACE, CoreDq) << "Didn't find statistics for scope " << input.Scope().Cast<TCoAtom>().StringValue() << "\n";
+ return false;
}
- }
-
- bool operator==(const TEdge& other) const
- {
- return From==other.From && To==other.To;
- }
- struct HashFunction
- {
- size_t operator()(const TEdge& e) const
- {
- return e.From + e.To;
+ auto scope = input.Scope();
+ if (!scope.Maybe<TCoAtom>()){
+ return false;
}
- };
- static const struct TEdge ErrorEdge;
-};
+ TStringBuf label = scope.Cast<TCoAtom>();
+ auto stats = maybeStat->second;
+ collector(rels, label, joinArg.Ptr(), stats);
+ }
+ return true;
+}
/**
- * Fetch join conditions from the equi-join tree
+ * Convert JoinTuple from AST into an internal representation of a optimizer plan
+ * This procedure also hooks up rels with statistics to the leaf nodes of the plan
+ * Statistics for join nodes are not computed
*/
-void ComputeJoinConditions(const TCoEquiJoinTuple& joinTuple,
- std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions) {
+std::shared_ptr<TJoinOptimizerNode> ConvertToJoinTree(
+ const TCoEquiJoinTuple& joinTuple,
+ const TVector<std::shared_ptr<TRelOptimizerNode>>& rels
+) {
+
+ std::shared_ptr<IBaseOptimizerNode> left;
+ std::shared_ptr<IBaseOptimizerNode> right;
+
+
if (joinTuple.LeftScope().Maybe<TCoEquiJoinTuple>()) {
- ComputeJoinConditions(joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), joinConditions);
+ left = ConvertToJoinTree(joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), rels);
+ }
+ else {
+ auto scope = joinTuple.LeftScope().Cast<TCoAtom>().StringValue();
+ auto it = find_if(rels.begin(), rels.end(), [scope] (const std::shared_ptr<TRelOptimizerNode>& n) {
+ return scope == n->Label;
+ } );
+ left = *it;
}
if (joinTuple.RightScope().Maybe<TCoEquiJoinTuple>()) {
- ComputeJoinConditions(joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), joinConditions);
+ right = ConvertToJoinTree(joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), rels);
+ }
+ else {
+ auto scope = joinTuple.RightScope().Cast<TCoAtom>().StringValue();
+ auto it = find_if(rels.begin(), rels.end(), [scope] (const std::shared_ptr<TRelOptimizerNode>& n) {
+ return scope == n->Label;
+ } );
+ right = *it;
}
+ std::set<std::pair<TJoinColumn, TJoinColumn>> joinConds;
+
size_t joinKeysCount = joinTuple.LeftKeys().Size() / 2;
for (size_t i = 0; i < joinKeysCount; ++i) {
size_t keyIndex = i * 2;
@@ -121,865 +93,13 @@ void ComputeJoinConditions(const TCoEquiJoinTuple& joinTuple,
auto rightScope = joinTuple.RightKeys().Item(keyIndex).StringValue();
auto rightColumn = joinTuple.RightKeys().Item(keyIndex + 1).StringValue();
- joinConditions.insert( std::make_pair( TJoinColumn(leftScope, leftColumn),
+ joinConds.insert( std::make_pair( TJoinColumn(leftScope, leftColumn),
TJoinColumn(rightScope, rightColumn)));
}
-}
-
-/**
- * Internal Join nodes are used inside the CBO. They don't own join condition data structures
- * and therefore avoid copying them during generation of candidate plans.
- *
- * These datastructures are owned by the query graph, so it is important to keep the graph around
- * while internal nodes are being used.
- *
- * After join enumeration, internal nodes need to be converted to regular nodes, that own the data
- * structures
-*/
-struct TJoinOptimizerNodeInternal : public IBaseOptimizerNode {
- std::shared_ptr<IBaseOptimizerNode> LeftArg;
- std::shared_ptr<IBaseOptimizerNode> RightArg;
- const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& JoinConditions;
- const TVector<TString>& LeftJoinKeys;
- const TVector<TString>& RightJoinKeys;
- EJoinKind JoinType;
- EJoinAlgoType JoinAlgo;
- bool IsReorderable;
-
- TJoinOptimizerNodeInternal(const std::shared_ptr<IBaseOptimizerNode>& left,
- const std::shared_ptr<IBaseOptimizerNode>& right,
- const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& joinConditions,
- const TVector<TString>& leftJoinKeys,
- const TVector<TString>& rightJoinKeys,
- const EJoinKind joinType,
- const EJoinAlgoType joinAlgo,
- bool nonReorderable=false);
-
- virtual ~TJoinOptimizerNodeInternal() {}
- virtual TVector<TString> Labels();
- virtual void Print(std::stringstream& stream, int ntabs=0);
-};
-
-TJoinOptimizerNodeInternal::TJoinOptimizerNodeInternal(const std::shared_ptr<IBaseOptimizerNode>& left, const std::shared_ptr<IBaseOptimizerNode>& right,
- const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions, const TVector<TString>& leftJoinKeys,
- const TVector<TString>& rightJoinKeys, const EJoinKind joinType, const EJoinAlgoType joinAlgo, bool nonReorderable) :
- IBaseOptimizerNode(JoinNodeType),
- LeftArg(left),
- RightArg(right),
- JoinConditions(joinConditions),
- LeftJoinKeys(leftJoinKeys),
- RightJoinKeys(rightJoinKeys),
- JoinType(joinType),
- JoinAlgo(joinAlgo) {
- IsReorderable = (JoinType==EJoinKind::InnerJoin) && (nonReorderable==false);
- }
-
-TVector<TString> TJoinOptimizerNodeInternal::Labels() {
- auto res = LeftArg->Labels();
- auto rightLabels = RightArg->Labels();
- res.insert(res.begin(),rightLabels.begin(),rightLabels.end());
- return res;
-}
-
-/**
- * Convert a tree of internal optimizer nodes to external nodes that own the data structures.
- *
- * The internal node tree can have references to external nodes (since some subtrees are optimized
- * separately if the plan contains non-orderable joins). So we check the instances and if we encounter
- * an external node, we return the whole subtree unchanged.
-*/
-std::shared_ptr<TJoinOptimizerNode> ConvertFromInternal(const std::shared_ptr<IBaseOptimizerNode> internal) {
- Y_ENSURE(internal->Kind == EOptimizerNodeKind::JoinNodeType);
-
- if (dynamic_cast<TJoinOptimizerNode*>(internal.get()) != nullptr) {
- return std::static_pointer_cast<TJoinOptimizerNode>(internal);
- }
-
- auto join = std::static_pointer_cast<TJoinOptimizerNodeInternal>(internal);
-
- auto left = join->LeftArg;
- auto right = join->RightArg;
-
- if (left->Kind == EOptimizerNodeKind::JoinNodeType) {
- left = ConvertFromInternal(left);
- }
- if (right->Kind == EOptimizerNodeKind::JoinNodeType) {
- right = ConvertFromInternal(right);
- }
-
- auto newJoin = std::make_shared<TJoinOptimizerNode>(left, right, join->JoinConditions, join->JoinType, join->JoinAlgo, !join->IsReorderable);
- newJoin->Stats = join->Stats;
- return newJoin;
-}
-
-void TJoinOptimizerNodeInternal::Print(std::stringstream& stream, int ntabs) {
- for (int i = 0; i < ntabs; i++){
- stream << "\t";
- }
-
- stream << "Join: (" << JoinType << "," << ToString(JoinAlgo) << ") ";
- for (auto c : JoinConditions){
- stream << c.first.RelName << "." << c.first.AttributeName
- << "=" << c.second.RelName << "."
- << c.second.AttributeName << ", ";
- }
- stream << "\n";
-
- for (int i = 0; i < ntabs; i++){
- stream << "\t";
- }
-
- if (Stats) {
- stream << *Stats << "\n";
- }
-
- LeftArg->Print(stream, ntabs+1);
- RightArg->Print(stream, ntabs+1);
-}
-
-/**
- * Create a new external join node and compute its statistics and cost
-*/
-std::shared_ptr<TJoinOptimizerNode> MakeJoin(std::shared_ptr<IBaseOptimizerNode> left,
- std::shared_ptr<IBaseOptimizerNode> right,
- const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
- const TVector<TString>& leftJoinKeys,
- const TVector<TString>& rightJoinKeys,
- EJoinKind joinKind,
- EJoinAlgoType joinAlgo,
- bool nonReorderable,
- IProviderContext& ctx) {
-
- auto res = std::make_shared<TJoinOptimizerNode>(left, right, joinConditions, joinKind, joinAlgo, nonReorderable);
- res->Stats = std::make_shared<TOptimizerStatistics>( ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, joinAlgo, ctx));
- return res;
-}
-
-/**
- * Create a new internal join node and compute its statistics and cost
-*/
-std::shared_ptr<TJoinOptimizerNodeInternal> MakeJoinInternal(std::shared_ptr<IBaseOptimizerNode> left,
- std::shared_ptr<IBaseOptimizerNode> right,
- const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
- const TVector<TString>& leftJoinKeys,
- const TVector<TString>& rightJoinKeys,
- EJoinKind joinKind,
- EJoinAlgoType joinAlgo,
- IProviderContext& ctx) {
-
- auto res = std::make_shared<TJoinOptimizerNodeInternal>(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinKind, joinAlgo);
- res->Stats = std::make_shared<TOptimizerStatistics>( ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, joinAlgo, ctx));
- return res;
-}
-
-/**
- * Iterate over all join algorithms and pick the best join that is applicable.
- * Also considers commuting joins
-*/
-std::shared_ptr<TJoinOptimizerNodeInternal> PickBestJoin(std::shared_ptr<IBaseOptimizerNode> left,
- std::shared_ptr<IBaseOptimizerNode> right,
- const std::set<std::pair<TJoinColumn, TJoinColumn>>& leftJoinConditions,
- const std::set<std::pair<TJoinColumn, TJoinColumn>>& rightJoinConditions,
- const TVector<TString>& leftJoinKeys,
- const TVector<TString>& rightJoinKeys,
- IProviderContext& ctx) {
-
- double bestCost;
- EJoinAlgoType bestAlgo;
- bool bestJoinLeftRight = true;
- bool bestJoinValid = false;
-
- for ( auto joinAlgo : AllJoinAlgos ) {
- if (ctx.IsJoinApplicable(left, right, leftJoinConditions, leftJoinKeys, rightJoinKeys, joinAlgo)){
- auto cost = ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, joinAlgo, ctx).Cost;
- if (bestJoinValid){
- if (cost < bestCost) {
- bestCost = cost;
- bestAlgo = joinAlgo;
- bestJoinLeftRight = true;
- }
- } else {
- bestCost = cost;
- bestAlgo = joinAlgo;
- bestJoinLeftRight = true;
- bestJoinValid = true;
- }
- }
-
- if (ctx.IsJoinApplicable(right, left, rightJoinConditions, rightJoinKeys, leftJoinKeys, joinAlgo)){
- auto cost = ComputeJoinStats(*right->Stats, *left->Stats, rightJoinKeys, leftJoinKeys, joinAlgo, ctx).Cost;
- if (bestJoinValid){
- if (cost < bestCost) {
- bestCost = cost;
- bestAlgo = joinAlgo;
- bestJoinLeftRight = false;
- }
- } else {
- bestCost = cost;
- bestAlgo = joinAlgo;
- bestJoinLeftRight = false;
- bestJoinValid = true;
- }
- }
- }
-
- Y_ENSURE(bestJoinValid,"No join was chosen!");
-
- if (bestJoinLeftRight) {
- return MakeJoinInternal(left, right, leftJoinConditions, leftJoinKeys, rightJoinKeys, EJoinKind::InnerJoin, bestAlgo, ctx);
- } else {
- return MakeJoinInternal(right, left, rightJoinConditions, rightJoinKeys, leftJoinKeys, EJoinKind::InnerJoin, bestAlgo, ctx);
- }
-}
-
-/**
- * Iterate over all join algorithms and pick the best join that is applicable
-*/
-std::shared_ptr<TJoinOptimizerNode> PickBestNonReorderabeJoin(const std::shared_ptr<TJoinOptimizerNode>& node,
- IProviderContext& ctx) {
-
- EJoinAlgoType bestJoinAlgo;
- bool bestJoinValid = false;
- double bestJoinCost;
- const auto& left = node->LeftArg;
- const auto& right = node->RightArg;
- const auto& joinConditions = node->JoinConditions;
- const auto& leftJoinKeys = node->LeftJoinKeys;
- const auto& rightJoinKeys = node->RightJoinKeys;
-
- for ( auto joinAlgo : AllJoinAlgos ) {
- if (ctx.IsJoinApplicable(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinAlgo)){
- auto cost = ComputeJoinStats(*right->Stats, *left->Stats, rightJoinKeys, leftJoinKeys, joinAlgo, ctx).Cost;
- if (bestJoinValid) {
- if (cost < bestJoinCost) {
- bestJoinAlgo = joinAlgo;
- bestJoinCost = cost;
- }
- } else {
- bestJoinAlgo = joinAlgo;
- bestJoinCost = cost;
- bestJoinValid = true;
- }
- }
- }
-
- Y_ENSURE(bestJoinValid,"No join was chosen!");
- node->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, bestJoinAlgo, ctx));
- node->JoinAlgo = bestJoinAlgo;
- return node;
-}
-
-struct pair_hash {
- template <class T1, class T2>
- std::size_t operator () (const std::pair<T1,T2> &p) const {
- auto h1 = std::hash<T1>{}(p.first);
- auto h2 = std::hash<T2>{}(p.second);
-
- // Mainly for demonstration purposes, i.e. works but is overly simple
- // In the real world, use sth. like boost.hash_combine
- return h1 ^ h2;
- }
-};
-
-/**
- * Graph is a data structure for the join graph
- * It is an undirected graph, with two edges per connection (from,to) and (to,from)
- * It needs to be constructed with addNode and addEdge methods, since its
- * keeping various indexes updated.
- * The graph also needs to be reordered with the breadth-first search method
-*/
-template <int N>
-struct TGraph {
- // set of edges of the graph
- std::unordered_set<TEdge,TEdge::HashFunction> Edges;
-
- // neightborgh index
- TVector<std::bitset<N>> EdgeIdx;
-
- // number of nodes in a graph
- int NNodes;
-
- // mapping from rel label to node in the graph
- THashMap<TString,int> ScopeMapping;
-
- // mapping from node in the graph to rel label
- TVector<TString> RevScopeMapping;
-
- // Empty graph constructor intializes indexes to size N
- TGraph() : EdgeIdx(N), RevScopeMapping(N) {}
-
- // Add a node to a graph with a rel label
- void AddNode(int nodeId, TString scope){
- NNodes = nodeId + 1;
- ScopeMapping[scope] = nodeId;
- RevScopeMapping[nodeId] = scope;
- }
-
- // Add a node to a graph with a vector of rel label
- void AddNode(int nodeId, const TVector<TString>& scopes){
- NNodes = nodeId + 1;
- TString revScope;
- for (auto s: scopes ) {
- ScopeMapping[s] = nodeId;
- revScope += s + ",";
- }
- RevScopeMapping[nodeId] = revScope;
- }
-
-
- // Add an edge to the graph, if the edge is already in the graph
- // (we check both directions), no action is taken. Otherwise we
- // insert two edges, the forward edge with original joinConditions
- // and a reverse edge with swapped joinConditions
- void AddEdge(TEdge e){
- if (Edges.contains(e) || Edges.contains(TEdge(e.To, e.From))) {
- return;
- }
-
- Edges.insert(e);
- std::set<std::pair<TJoinColumn, TJoinColumn>> swappedSet;
- for (auto c : e.JoinConditions){
- swappedSet.insert(std::make_pair(c.second, c.first));
- }
- Edges.insert(TEdge(e.To,e.From,swappedSet));
-
- EdgeIdx[e.From].set(e.To);
- EdgeIdx[e.To].set(e.From);
- }
-
- // Find a node by the rel scope
- int FindNode(TString scope){
- return ScopeMapping[scope];
- }
-
- // Return a bitset of node's neighbors
- inline std::bitset<N> FindNeighbors(int fromVertex)
- {
- return EdgeIdx[fromVertex];
- }
-
- // Find an edge that connects two subsets of graph's nodes
- // We are guaranteed to find a match
- const TEdge& FindCrossingEdge(const std::bitset<N>& S1, const std::bitset<N>& S2) {
- for(int i = 0; i < NNodes; i++){
- if (!S1[i]) {
- continue;
- }
- for (int j = 0; j < NNodes; j++) {
- if (!S2[j]) {
- continue;
- }
- if (EdgeIdx[i].test(j)) {
- auto it = Edges.find(TEdge(i, j));
- Y_DEBUG_ABORT_UNLESS(it != Edges.end());
- return *it;
- }
- }
- }
- Y_ENSURE(false,"Connecting edge not found!");
- return TEdge::ErrorEdge;
- }
-
- /**
- * Create a union-set from the join conditions to record the equivalences.
- * Then use the equivalence set to compute transitive closure of the graph.
- * Transitive closure means that if we have an edge from (1,2) with join
- * condition R.A = S.A and we have an edge from (2,3) with join condition
- * S.A = T.A, we will find out that the join conditions form an equivalence set
- * and add an edge (1,3) with join condition R.A = T.A.
- */
- void ComputeTransitiveClosure(const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions) {
- std::set<TJoinColumn> columnSet;
- for (auto [ leftCondition, rightCondition ] : joinConditions) {
- columnSet.insert(leftCondition);
- columnSet.insert(rightCondition);
- }
- std::vector<TJoinColumn> columns;
- for (auto c : columnSet ) {
- columns.push_back(c);
- }
-
- THashMap<TJoinColumn, int, TJoinColumn::HashFunction> indexMapping;
- for (size_t i=0; i<columns.size(); i++) {
- indexMapping[columns[i]] = i;
- }
-
- TDisjointSets ds = TDisjointSets( columns.size() );
- for (auto [ leftCondition, rightCondition ] : joinConditions ) {
- int leftIndex = indexMapping[leftCondition];
- int rightIndex = indexMapping[rightCondition];
- ds.UnionSets(leftIndex,rightIndex);
- }
-
- for (size_t i = 0; i < columns.size(); i++) {
- for (size_t j = 0; j < i; j++) {
- if (ds.CanonicSetElement(i) == ds.CanonicSetElement(j)) {
- TJoinColumn left = columns[i];
- TJoinColumn right = columns[j];
- int leftNodeId = ScopeMapping[left.RelName];
- int rightNodeId = ScopeMapping[right.RelName];
-
- auto maybeEdge1 = Edges.find({leftNodeId, rightNodeId});
- auto maybeEdge2 = Edges.find({rightNodeId, leftNodeId});
-
- if (maybeEdge1 == Edges.end() && maybeEdge2 == Edges.end()) {
- AddEdge(TEdge(leftNodeId,rightNodeId,std::make_pair(left, right)));
- } else {
- Y_ABORT_UNLESS(maybeEdge1 != Edges.end() && maybeEdge2 != Edges.end());
- auto edge1 = *maybeEdge1;
- auto edge2 = *maybeEdge2;
-
- edge1.JoinConditions.emplace(left, right);
- edge2.JoinConditions.emplace(right, left);
- edge1.BuildCondVectors();
- edge2.BuildCondVectors();
-
- Edges.erase(maybeEdge1);
- Edges.erase(maybeEdge2);
-
- Edges.emplace(edge1);
- Edges.emplace(edge2);
- }
- }
- }
- }
- }
-
- /**
- * Print the graph
- */
- void PrintGraph(std::stringstream& stream) {
- stream << "Join Graph:\n";
- stream << "nNodes: " << NNodes << ", nEdges: " << Edges.size() << "\n";
-
- for(int i = 0; i < NNodes; i++) {
- stream << "Node:" << i << "," << RevScopeMapping[i] << "\n";
- }
- for (const TEdge& e: Edges ) {
- stream << "Edge: " << e.From << " -> " << e.To << "\n";
- for (auto p : e.JoinConditions) {
- stream << p.first.RelName << "."
- << p.first.AttributeName << "="
- << p.second.RelName << "."
- << p.second.AttributeName << "\n";
- }
- for (auto l : e.LeftJoinKeys) {
- stream << l << ",";
- }
- stream << "=";
- for (auto r : e.RightJoinKeys) {
- stream << r << ",";
- }
- stream << "\n";
- }
- }
-};
-
-/**
- * DPcpp (Dynamic Programming with connected complement pairs) is a graph-aware
- * join eumeration algorithm that only considers CSGs (Connected Sub-Graphs) of
- * the join graph and computes CMPs (Complement pairs) that are also connected
- * subgraphs of the join graph. It enumerates CSGs in the order, such that subsets
- * are enumerated first and no duplicates are ever enumerated. Then, for each emitted
- * CSG it computes the complements with the same conditions - they much already be
- * present in the dynamic programming table and no pair should be enumerated twice.
- *
- * The DPccp solver is templated by the largest number of joins we can process, this
- * is in turn used by bitsets that represent sets of relations.
-*/
-template <int N>
-class TDPccpSolver {
-public:
-
- // Construct the DPccp solver based on the join graph and data about input relations
- TDPccpSolver(TGraph<N>& g, TVector<std::shared_ptr<IBaseOptimizerNode>> rels, IProviderContext& ctx):
- Graph(g), Rels(rels), Pctx(ctx) {
- NNodes = g.NNodes;
- }
-
- // Run DPccp algorithm and produce the join tree in CBO's internal representation
- std::shared_ptr<TJoinOptimizerNodeInternal> Solve();
-
- // Calculate the size of a dynamic programming table with a budget
- ui32 CountCC(ui32 budget);
-
-private:
-
- // Compute the next subset of relations, given by the final bitset
- std::bitset<N> NextBitset(const std::bitset<N>& current, const std::bitset<N>& final);
-
- // Print the set of relations in a bitset
- void PrintBitset(std::stringstream& stream, const std::bitset<N>& s, std::string name, int ntabs=0);
-
- // Dynamic programming table that records optimal join subtrees
- THashMap<std::bitset<N>, std::shared_ptr<IBaseOptimizerNode>, std::hash<std::bitset<N>>> DpTable;
-
- // REMOVE: Sanity check table that tracks that we don't consider the same pair twice
- THashMap<std::pair<std::bitset<N>, std::bitset<N>>, bool, pair_hash> CheckTable;
-
- // number of nodes in a graph
- int NNodes;
-
- // Join graph
- TGraph<N>& Graph;
-
- // List of input relations to DPccp
- TVector<std::shared_ptr<IBaseOptimizerNode>> Rels;
-
- // Provider specific contexts?
- // FIXME: This is a temporary structure that needs to be extended to multiple providers
- IProviderContext& Pctx;
-
- // Emit connected subgraph
- void EmitCsg(const std::bitset<N>&, int=0);
-
- // Enumerate subgraphs recursively
- void EnumerateCsgRec(const std::bitset<N>&, const std::bitset<N>&,int=0);
-
- // Emit the final pair of CSG and CMP - compute the join and record it in the
- // DP table
- void EmitCsgCmp(const std::bitset<N>&, const std::bitset<N>&,int=0);
-
- // Enumerate complement pairs recursively
- void EnumerateCmpRec(const std::bitset<N>&, const std::bitset<N>&, const std::bitset<N>&,int=0);
-
- // Compute the neighbors of a set of nodes, excluding the nodes in exclusion set
- std::bitset<N> Neighbors(const std::bitset<N>&, const std::bitset<N>&);
-
- // Create an exclusion set that contains all the nodes of the graph that are smaller or equal to
- // the smallest node in the provided bitset
- std::bitset<N> MakeBiMin(const std::bitset<N>&);
-
- // Create an exclusion set that contains all the nodes of the bitset that are smaller or equal to
- // the provided integer
- std::bitset<N> MakeB(const std::bitset<N>&,int);
-
- // Count the size of the dynamic programming table recursively
- ui32 CountCCRec(const std::bitset<N>&, const std::bitset<N>&, ui32, ui32);
-};
-
-// Print tabs
-void PrintTabs(std::stringstream& stream, int ntabs) {
-
- for (int i = 0; i < ntabs; i++)
- stream << "\t";
-}
-
-// Print a set of nodes in the graph given by this bitset
-template <int N> void TDPccpSolver<N>::PrintBitset(std::stringstream& stream,
- const std::bitset<N>& s, std::string name, int ntabs) {
-
- PrintTabs(stream, ntabs);
-
- stream << name << ": " << "{";
- for (int i = 0; i < NNodes; i++)
- if (s[i])
- stream << i << ",";
-
- stream <<"}\n";
-}
-
-// Compute neighbors of a set of nodes S, exclusing the exclusion set X
-template<int N> std::bitset<N> TDPccpSolver<N>::Neighbors(const std::bitset<N>& S, const std::bitset<N>& X) {
-
- std::bitset<N> res;
-
- for (int i = 0; i < Graph.NNodes; i++) {
- if (S[i]) {
- std::bitset<N> n = Graph.FindNeighbors(i);
- res = res | n;
- }
- }
-
- res = res & ~ X;
- return res;
-}
-
-// Run the entire DPccp algorithm and compute the optimal join tree
-template<int N> std::shared_ptr<TJoinOptimizerNodeInternal> TDPccpSolver<N>::Solve()
-{
- // Process singleton sets
- for (int i = NNodes-1; i >= 0; i--) {
- std::bitset<N> s;
- s.set(i);
- DpTable[s] = Rels[i];
- }
-
- // Expand singleton sets
- for (int i = NNodes-1; i >= 0; i--) {
- std::bitset<N> s;
- s.set(i);
- EmitCsg(s);
- EnumerateCsgRec(s, MakeBiMin(s));
- }
-
- // Return the entry of the dpTable that corresponds to the full
- // set of nodes in the graph
- std::bitset<N> V;
- for (int i = 0; i < NNodes; i++) {
- V.set(i);
- }
-
- Y_ENSURE(DpTable.contains(V), "Final relset not in dptable");
- return std::static_pointer_cast<TJoinOptimizerNodeInternal>(DpTable[V]);
-}
-
-/**
- * EmitCsg emits Connected SubGraphs
- * First it iterates through neighbors of the initial set S and emits pairs
- * (S,S2), where S2 is the neighbor of S. Then it recursively emits complement pairs
-*/
- template <int N> void TDPccpSolver<N>::EmitCsg(const std::bitset<N>& S, int ntabs) {
- std::bitset<N> X = S | MakeBiMin(S);
- std::bitset<N> Ns = Neighbors(S, X);
-
- if (Ns==std::bitset<N>()) {
- return;
- }
-
- for (int i = NNodes - 1; i >= 0; i--) {
- if (Ns[i]) {
- std::bitset<N> S2;
- S2.set(i);
- EmitCsgCmp(S, S2, ntabs+1);
- EnumerateCmpRec(S, S2, X | MakeB(Ns, i), ntabs+1);
- }
- }
- }
-
- /**
- * Enumerates connected subgraphs
- * First it emits CSGs that are created by adding neighbors of S to S
- * Then it recurses on the S fused with its neighbors.
- */
- template <int N> void TDPccpSolver<N>::EnumerateCsgRec(const std::bitset<N>& S, const std::bitset<N>& X, int ntabs) {
-
- std::bitset<N> Ns = Neighbors(S, X);
-
- if (Ns == std::bitset<N>()) {
- return;
- }
-
- std::bitset<N> prev;
- std::bitset<N> next;
-
- while(true) {
- next = NextBitset(prev, Ns);
- EmitCsg(S | next );
- if (next == Ns) {
- break;
- }
- prev = next;
- }
-
- prev.reset();
- while(true) {
- next = NextBitset(prev, Ns);
- EnumerateCsgRec(S | next, X | Ns , ntabs+1);
- if (next==Ns) {
- break;
- }
- prev = next;
- }
- }
-
-/***
- * Enumerates complement pairs
- * First it emits the pairs (S1,S2+next) where S2+next is the set of relation sets
- * that are obtained by adding S2's neighbors to itself
- * Then it recusrses into pairs (S1,S2+next)
-*/
- template <int N> void TDPccpSolver<N>::EnumerateCmpRec(const std::bitset<N>& S1,
- const std::bitset<N>& S2, const std::bitset<N>& X, int ntabs) {
-
- std::bitset<N> Ns = Neighbors(S2, X);
-
- if (Ns==std::bitset<N>()) {
- return;
- }
-
- std::bitset<N> prev;
- std::bitset<N> next;
-
- while(true) {
- next = NextBitset(prev, Ns);
- EmitCsgCmp(S1, S2 | next, ntabs+1);
- if (next==Ns) {
- break;
- }
- prev = next;
- }
-
- prev.reset();
- while(true) {
- next = NextBitset(prev, Ns);
- EnumerateCmpRec(S1, S2 | next, X | Ns, ntabs+1);
- if (next==Ns) {
- break;
- }
- prev = next;
- }
- }
-
-/**
- * Emit a single CSG + CMP pair
-*/
-template <int N> void TDPccpSolver<N>::EmitCsgCmp(const std::bitset<N>& S1, const std::bitset<N>& S2, int ntabs) {
-
- Y_UNUSED(ntabs);
- // Here we actually build the join and choose and compare the
- // new plan to what's in the dpTable, if it there
-
- Y_ENSURE(DpTable.contains(S1),"DP Table does not contain S1");
- Y_ENSURE(DpTable.contains(S2),"DP Table does not conaint S2");
-
- std::bitset<N> joined = S1 | S2;
-
- const TEdge& e1 = Graph.FindCrossingEdge(S1, S2);
- const TEdge& e2 = Graph.FindCrossingEdge(S2, S1);
- auto bestJoin = PickBestJoin(DpTable[S1], DpTable[S2], e1.JoinConditions, e2.JoinConditions, e1.LeftJoinKeys, e1.RightJoinKeys, Pctx);
-
- if (! DpTable.contains(joined)) {
- DpTable[joined] = bestJoin;
- } else {
- if (bestJoin->Stats->Cost < DpTable[joined]->Stats->Cost) {
- DpTable[joined] = bestJoin;
- }
- }
- /*
- * This is a sanity check that slows down the optimizer
- *
-
- auto pair = std::make_pair(S1, S2);
- Y_ENSURE (!CheckTable.contains(pair), "Check table already contains pair S1|S2");
-
- CheckTable[ std::pair<std::bitset<N>,std::bitset<N>>(S1, S2) ] = true;
- */
-}
-
-/**
- * Create an exclusion set that contains all the nodes of the graph that are smaller or equal to
- * the smallest node in the provided bitset
-*/
-template <int N> std::bitset<N> TDPccpSolver<N>::MakeBiMin(const std::bitset<N>& S) {
- std::bitset<N> res;
-
- for (int i = 0; i < NNodes; i++) {
- if (S[i]) {
- for (int j = 0; j <= i; j++) {
- res.set(j);
- }
- break;
- }
- }
- return res;
-}
-
-/**
- * Create an exclusion set that contains all the nodes of the bitset that are smaller or equal to
- * the provided integer
-*/
-template <int N> std::bitset<N> TDPccpSolver<N>::MakeB(const std::bitset<N>& S, int x) {
- std::bitset<N> res;
-
- for (int i = 0; i < NNodes; i++) {
- if (S[i] && i <= x) {
- res.set(i);
- }
- }
-
- return res;
-}
-
-/**
- * Compute the next subset of relations, given by the final bitset
-*/
-template <int N> std::bitset<N> TDPccpSolver<N>::NextBitset(const std::bitset<N>& prev, const std::bitset<N>& final) {
- if (prev==final)
- return final;
-
- std::bitset<N> res = prev;
-
- bool carry = true;
- for (int i = 0; i < NNodes; i++)
- {
- if (!carry) {
- break;
- }
-
- if (!final[i]) {
- continue;
- }
-
- if (res[i]==1 && carry) {
- res.reset(i);
- } else if (res[i]==0 && carry)
- {
- res.set(i);
- carry = false;
- }
- }
-
- return res;
-
- // TODO: We can optimize this with a few long integer operations,
- // but it will only work for 64 bit bitsets
- // return std::bitset<N>((prev | ~final).to_ulong() + 1) & final;
-}
-
-/**
- * Count the number of items in the DP table of DPcpp
-*/
-template <int N> ui32 TDPccpSolver<N>::CountCC(ui32 budget) {
- std::bitset<N> allNodes;
- allNodes.set();
- ui32 cost = 0;
-
- for (int i = NNodes - 1; i >= 0; i--) {
- cost += 1;
- if (cost > budget) {
- return cost;
- }
- std::bitset<N> S;
- S.set(i);
- std::bitset<N> X = MakeB(allNodes,i);
- cost = CountCCRec(S,X,cost,budget);
- }
-
- return cost;
-}
-
-/**
- * Recursively count the nuber of items in the DP table of DPccp
-*/
-template <int N> ui32 TDPccpSolver<N>::CountCCRec(const std::bitset<N>& S, const std::bitset<N>& X, ui32 cost, ui32 budget) {
- std::bitset<N> Ns = Neighbors(S, X);
-
- if (Ns==std::bitset<N>()) {
- return cost;
- }
-
- std::bitset<N> prev;
- std::bitset<N> next;
-
- while(true) {
- next = NextBitset(prev, Ns);
- cost += 1;
- if (cost > budget) {
- return cost;
- }
- cost = CountCCRec(S | next, X | Ns, cost, budget);
- if (next==Ns) {
- break;
- }
- prev = next;
- }
-
- return cost;
+ return std::make_shared<TJoinOptimizerNode>(left, right, joinConds, ConvertToJoinKind(joinTuple.Type().StringValue()), EJoinAlgoType::Undefined);
}
-
/**
* Build a join tree that will replace the original join tree in equiJoin
* TODO: Add join implementations here
@@ -1066,149 +186,9 @@ TExprBase RearrangeEquiJoinTree(TExprContext& ctx, const TCoEquiJoin& equiJoin,
.Done();
}
-/**
- * Collects EquiJoin inputs with statistics for cost based optimization
-*/
-bool DqCollectJoinRelationsWithStats(
- TVector<std::shared_ptr<TRelOptimizerNode>>& rels,
- TTypeAnnotationContext& typesCtx,
- const TCoEquiJoin& equiJoin,
- const std::function<void(TVector<std::shared_ptr<TRelOptimizerNode>>&, TStringBuf, const TExprNode::TPtr, const std::shared_ptr<TOptimizerStatistics>&)>& collector)
-{
- if (equiJoin.ArgCount() < 3) {
- return false;
- }
-
- for (size_t i = 0; i < equiJoin.ArgCount() - 2; ++i) {
- auto input = equiJoin.Arg(i).Cast<TCoEquiJoinInput>();
- auto joinArg = input.List();
-
- auto maybeStat = typesCtx.StatisticsMap.find(joinArg.Raw());
-
- if (maybeStat == typesCtx.StatisticsMap.end()) {
- YQL_CLOG(TRACE, CoreDq) << "Didn't find statistics for scope " << input.Scope().Cast<TCoAtom>().StringValue() << "\n";
- return false;
- }
-
- auto scope = input.Scope();
- if (!scope.Maybe<TCoAtom>()){
- return false;
- }
-
- TStringBuf label = scope.Cast<TCoAtom>();
- auto stats = maybeStat->second;
- collector(rels, label, joinArg.Ptr(), stats);
- }
- return true;
-}
-
-/**
- * Convert JoinTuple from AST into an internal representation of a optimizer plan
- * This procedure also hooks up rels with statistics to the leaf nodes of the plan
- * Statistics for join nodes are not computed
-*/
-std::shared_ptr<TJoinOptimizerNode> ConvertToJoinTree(const TCoEquiJoinTuple& joinTuple,
- const TVector<std::shared_ptr<TRelOptimizerNode>>& rels) {
-
- std::shared_ptr<IBaseOptimizerNode> left;
- std::shared_ptr<IBaseOptimizerNode> right;
-
-
- if (joinTuple.LeftScope().Maybe<TCoEquiJoinTuple>()) {
- left = ConvertToJoinTree(joinTuple.LeftScope().Cast<TCoEquiJoinTuple>(), rels);
- }
- else {
- auto scope = joinTuple.LeftScope().Cast<TCoAtom>().StringValue();
- auto it = find_if(rels.begin(), rels.end(), [scope] (const std::shared_ptr<TRelOptimizerNode>& n) {
- return scope == n->Label;
- } );
- left = *it;
- }
-
- if (joinTuple.RightScope().Maybe<TCoEquiJoinTuple>()) {
- right = ConvertToJoinTree(joinTuple.RightScope().Cast<TCoEquiJoinTuple>(), rels);
- }
- else {
- auto scope = joinTuple.RightScope().Cast<TCoAtom>().StringValue();
- auto it = find_if(rels.begin(), rels.end(), [scope] (const std::shared_ptr<TRelOptimizerNode>& n) {
- return scope == n->Label;
- } );
- right = *it;
- }
-
- std::set<std::pair<TJoinColumn, TJoinColumn>> joinConds;
-
- size_t joinKeysCount = joinTuple.LeftKeys().Size() / 2;
- for (size_t i = 0; i < joinKeysCount; ++i) {
- size_t keyIndex = i * 2;
-
- auto leftScope = joinTuple.LeftKeys().Item(keyIndex).StringValue();
- auto leftColumn = joinTuple.LeftKeys().Item(keyIndex + 1).StringValue();
- auto rightScope = joinTuple.RightKeys().Item(keyIndex).StringValue();
- auto rightColumn = joinTuple.RightKeys().Item(keyIndex + 1).StringValue();
-
- joinConds.insert( std::make_pair( TJoinColumn(leftScope, leftColumn),
- TJoinColumn(rightScope, rightColumn)));
- }
-
- return std::make_shared<TJoinOptimizerNode>(left, right, joinConds, ConvertToJoinKind(joinTuple.Type().StringValue()), EJoinAlgoType::Undefined);
-}
-
-/**
- * Extract all non orderable joins from a plan is a post-order traversal order
-*/
-void ExtractNonOrderables(std::shared_ptr<TJoinOptimizerNode> joinTree,
- TVector<std::shared_ptr<TJoinOptimizerNode>>& result) {
-
- if (joinTree->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) {
- auto left = static_pointer_cast<TJoinOptimizerNode>(joinTree->LeftArg);
- ExtractNonOrderables(left, result);
- }
- if (joinTree->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) {
- auto right = static_pointer_cast<TJoinOptimizerNode>(joinTree->RightArg);
- ExtractNonOrderables(right, result);
- }
- if (!joinTree->IsReorderable)
- {
- result.emplace_back(joinTree);
- }
-}
-
-/**
- * Extract relations and join conditions from an optimizer plan
- * If we hit a non-orderable join type in recursion, we don't recurse into it and
- * add it as a relation
-*/
-void ExtractRelsAndJoinConditions(const std::shared_ptr<TJoinOptimizerNode>& joinTree,
- TVector<std::shared_ptr<IBaseOptimizerNode>> & rels,
- std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions) {
- if (!joinTree->IsReorderable){
- rels.emplace_back( joinTree );
- return;
- }
-
- for (auto c : joinTree->JoinConditions) {
- joinConditions.insert(c);
- }
-
- if (joinTree->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) {
- ExtractRelsAndJoinConditions(static_pointer_cast<TJoinOptimizerNode>(joinTree->LeftArg), rels, joinConditions);
- }
- else {
- rels.emplace_back(joinTree->LeftArg);
- }
-
- if (joinTree->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) {
- ExtractRelsAndJoinConditions(static_pointer_cast<TJoinOptimizerNode>(joinTree->RightArg), rels, joinConditions);
- }
- else {
- rels.emplace_back(joinTree->RightArg);
- }
- }
-
-/**
+/*
* Recursively computes statistics for a join tree
-*/
+ */
void ComputeStatistics(const std::shared_ptr<TJoinOptimizerNode>& join, IProviderContext& ctx) {
if (join->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) {
ComputeStatistics(static_pointer_cast<TJoinOptimizerNode>(join->LeftArg), ctx);
@@ -1216,128 +196,74 @@ void ComputeStatistics(const std::shared_ptr<TJoinOptimizerNode>& join, IProvide
if (join->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) {
ComputeStatistics(static_pointer_cast<TJoinOptimizerNode>(join->RightArg), ctx);
}
- join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats,
- join->LeftJoinKeys, join->RightJoinKeys, EJoinAlgoType::GraceJoin, ctx));
+ join->Stats = std::make_shared<TOptimizerStatistics>(
+ ComputeJoinStats(
+ *join->LeftArg->Stats,
+ *join->RightArg->Stats,
+ join->LeftJoinKeys,
+ join->RightJoinKeys,
+ EJoinAlgoType::GraceJoin,
+ ctx
+ )
+ );
}
-/**
- * Optimize a subtree of a plan with DPccp
- * The root of the subtree that needs to be optimizer needs to be reorderable, otherwise we will
- * only update the statistics for it and return it unchanged
-*/
-std::shared_ptr<TJoinOptimizerNode> OptimizeSubtree(const std::shared_ptr<TJoinOptimizerNode>& joinTree, ui32 maxDPccpDPTableSize, IProviderContext& ctx) {
- if (!joinTree->IsReorderable) {
- return PickBestNonReorderabeJoin(joinTree, ctx);
- }
-
- TVector<std::shared_ptr<IBaseOptimizerNode>> rels;
- std::set<std::pair<TJoinColumn, TJoinColumn>> joinConditions;
- ExtractRelsAndJoinConditions(joinTree, rels, joinConditions);
-
- TGraph<128> joinGraph;
-
- for (size_t i = 0; i < rels.size(); i++) {
- joinGraph.AddNode(i, rels[i]->Labels());
- }
-
- // Check if we have more rels than DPccp can handle (128)
- // If that's the case - don't optimize the plan and just return it with
- // computed statistics
- if (rels.size() >= 128) {
- ComputeStatistics(joinTree, ctx);
- YQL_CLOG(TRACE, CoreDq) << "Too many rels";
- return joinTree;
- }
-
- for (auto cond : joinConditions ) {
- int fromNode = joinGraph.FindNode(cond.first.RelName);
- int toNode = joinGraph.FindNode(cond.second.RelName);
- joinGraph.AddEdge(TEdge(fromNode, toNode, cond));
- }
-
- if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::CoreDq, NYql::NLog::ELevel::TRACE)) {
- std::stringstream str;
- str << "Initial join graph:\n";
- joinGraph.PrintGraph(str);
- YQL_CLOG(TRACE, CoreDq) << str.str();
- }
+class TOptimizerNativeNew: public IOptimizerNew {
+public:
+ TOptimizerNativeNew(IProviderContext& ctx, ui32 maxDPhypDPTableSize)
+ : IOptimizerNew(ctx)
+ , MaxDPhypTableSize_(maxDPhypDPTableSize)
+ {}
- // make a transitive closure of the graph and reorder the graph via BFS
- joinGraph.ComputeTransitiveClosure(joinConditions);
+ std::shared_ptr<TJoinOptimizerNode> JoinSearch(const std::shared_ptr<TJoinOptimizerNode>& joinTree) override {
+ auto relsCount = joinTree->Labels().size();
- if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::CoreDq, NYql::NLog::ELevel::TRACE)) {
- std::stringstream str;
- str << "Join graph after transitive closure:\n";
- joinGraph.PrintGraph(str);
- YQL_CLOG(TRACE, CoreDq) << str.str();
- }
+ if (relsCount <= 64) { // The algorithm is more efficient.
+ return JoinSearchImpl<TNodeSet64>(joinTree);
+ }
- TDPccpSolver<128> solver(joinGraph, rels, ctx);
+ if (64 < relsCount && relsCount <= 128) {
+ JoinSearchImpl<TNodeSet128>(joinTree);
+ }
- // Check that the dynamic table of DPccp is not too big
- // If it is, just compute the statistics for the join tree and return it
- if (solver.CountCC(maxDPccpDPTableSize) >= maxDPccpDPTableSize) {
- ComputeStatistics(joinTree, ctx);
+ ComputeStatistics(joinTree, this->Pctx);
return joinTree;
}
- // feed the graph to DPccp algorithm
- auto result = solver.Solve();
-
- if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::CoreDq, NYql::NLog::ELevel::TRACE)) {
- std::stringstream str;
- str << "Join tree after cost based optimization:\n";
- result->Print(str);
- YQL_CLOG(TRACE, CoreDq) << str.str();
- }
-
- return ConvertFromInternal(result);
-}
-
-class TOptimizerNativeNew: public IOptimizerNew {
-public:
- TOptimizerNativeNew(IProviderContext& ctx, const ui32 maxDPccpDPTableSize)
- : IOptimizerNew(ctx), MaxDPccpDPTableSize(maxDPccpDPTableSize) { }
-
- std::shared_ptr<TJoinOptimizerNode> JoinSearch(const std::shared_ptr<TJoinOptimizerNode>& joinTree) override {
+private:
+ using TNodeSet64 = std::bitset<64>;
+ using TNodeSet128 = std::bitset<128>;
- // Traverse the join tree and generate a list of non-orderable joins in a post-order
- TVector<std::shared_ptr<TJoinOptimizerNode>> nonOrderables;
- ExtractNonOrderables(joinTree, nonOrderables);
+ template <typename TNodeSet>
+ std::shared_ptr<TJoinOptimizerNode> JoinSearchImpl(const std::shared_ptr<TJoinOptimizerNode>& joinTree) {
+ TJoinHypergraph<TNodeSet> hypergraph = MakeJoinHypergraph<TNodeSet>(joinTree);
+ TDPHypSolver<TNodeSet> solver(hypergraph, this->Pctx);
- // For all non-orderable joins, optimize the children
- for( auto join : nonOrderables ) {
- if (join->LeftArg->Kind == EOptimizerNodeKind::JoinNodeType) {
- join->LeftArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->LeftArg), MaxDPccpDPTableSize, Pctx);
- }
- if (join->RightArg->Kind == EOptimizerNodeKind::JoinNodeType) {
- join->RightArg = OptimizeSubtree(static_pointer_cast<TJoinOptimizerNode>(join->RightArg), MaxDPccpDPTableSize, Pctx);
- }
- join->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*join->LeftArg->Stats, *join->RightArg->Stats,
- join->LeftJoinKeys, join->RightJoinKeys, EJoinAlgoType::GraceJoin, Pctx));
+ if (solver.CountCC(MaxDPhypTableSize_) >= MaxDPhypTableSize_) {
+ ComputeStatistics(joinTree, this->Pctx);
+ return joinTree;
}
- // Optimize the root
- return OptimizeSubtree(joinTree, MaxDPccpDPTableSize, Pctx);
+ auto bestJoinOrder = solver.Solve();
+ return ConvertFromInternal(bestJoinOrder);
}
-
- const ui32 MaxDPccpDPTableSize;
+private:
+ ui32 MaxDPhypTableSize_;
};
-/**
- * Main routine that checks:
- * 1. Do we have an equiJoin
- * 2. Is the cost already computed
- * 3. Are all the costs of equiJoin inputs computed?
- *
- * Then it optimizes the join tree by iterating over all non-orderable nodes and optimizing their children,
- * and finally optimizes the root of the tree
-*/
-TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx,
- ui32 optLevel, IOptimizerNew& opt,
- const std::function<void(TVector<std::shared_ptr<TRelOptimizerNode>>&, TStringBuf, const TExprNode::TPtr, const std::shared_ptr<TOptimizerStatistics>&)>& providerCollect) {
+IOptimizerNew* MakeNativeOptimizerNew(IProviderContext& ctx, const ui32 maxDPhypDPTableSize) {
+ return new TOptimizerNativeNew(ctx, maxDPhypDPTableSize);
+}
- if (optLevel==0) {
+TExprBase DqOptimizeEquiJoinWithCosts(
+ const TExprBase& node,
+ TExprContext& ctx,
+ TTypeAnnotationContext& typesCtx,
+ ui32 optLevel,
+ IOptimizerNew& opt,
+ const TProviderCollectFunction& providerCollect
+) {
+ if (optLevel == 0) {
return node;
}
@@ -1348,7 +274,6 @@ TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx,
YQL_ENSURE(equiJoin.ArgCount() >= 4);
if (typesCtx.StatisticsMap.contains(equiJoin.Raw())) {
-
return node;
}
@@ -1369,7 +294,7 @@ TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx,
auto joinTuple = equiJoin.Arg(equiJoin.ArgCount() - 2).Cast<TCoEquiJoinTuple>();
// Generate an initial tree
- auto joinTree = ConvertToJoinTree(joinTuple,rels);
+ auto joinTree = ConvertToJoinTree(joinTuple, rels);
if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE)) {
std::stringstream str;
@@ -1382,12 +307,9 @@ TExprBase DqOptimizeEquiJoinWithCosts(const TExprBase& node, TExprContext& ctx,
// rewrite the join tree and record the output statistics
TExprBase res = RearrangeEquiJoinTree(ctx, equiJoin, joinTree);
- typesCtx.StatisticsMap[ res.Raw() ] = joinTree->Stats;
+ typesCtx.StatisticsMap[res.Raw()] = joinTree->Stats;
return res;
-}
-IOptimizerNew* MakeNativeOptimizerNew(IProviderContext& ctx, const ui32 maxDPccpDPTableSize) {
- return new TOptimizerNativeNew(ctx, maxDPccpDPTableSize);
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.h b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.h
new file mode 100644
index 0000000000..ff6225b3d9
--- /dev/null
+++ b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h>
+#include <ydb/library/yql/core/expr_nodes_gen/yql_expr_nodes_gen.h>
+#include <ydb/library/yql/core/yql_type_annotation.h>
+
+namespace NYql::NDq {
+
+using TProviderCollectFunction =
+ std::function<void(TVector<std::shared_ptr<TRelOptimizerNode>>&, TStringBuf, const TExprNode::TPtr, const std::shared_ptr<TOptimizerStatistics>&)>;
+
+/*
+ * Main routine that checks:
+ * 1. Do we have an equiJoin
+ * 2. Is the cost already computed
+ * 3. Are all the costs of equiJoin inputs computed?
+ *
+ * Then it optimizes the join tree.
+*/
+NYql::NNodes::TExprBase DqOptimizeEquiJoinWithCosts(
+ const NYql::NNodes::TExprBase& node,
+ TExprContext& ctx,
+ TTypeAnnotationContext& typesCtx,
+ ui32 optLevel,
+ IOptimizerNew& opt,
+ const TProviderCollectFunction& providerCollect
+);
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_join_hypergraph.h b/ydb/library/yql/dq/opt/dq_opt_join_hypergraph.h
new file mode 100644
index 0000000000..7abd379038
--- /dev/null
+++ b/ydb/library/yql/dq/opt/dq_opt_join_hypergraph.h
@@ -0,0 +1,190 @@
+#pragma once
+
+#include <vector>
+#include <util/string/printf.h>
+#include "bitset.h"
+
+#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h>
+#include <ydb/library/yql/core/yql_cost_function.h>
+
+namespace NYql::NDq {
+
+/*
+ * JoinHypergraph - a graph, whose edge connects two sets of nodes.
+ * It represents relation between tables and ordering constraints.
+ * Graph is undirected, so it stores each edge twice (original and reversed) for DPHyp algorithm.
+ */
+template <typename TNodeSet>
+class TJoinHypergraph {
+public:
+ struct TEdge {
+ TEdge(
+ const TNodeSet& left,
+ const TNodeSet& right,
+ EJoinKind joinKind,
+ bool isCommutative,
+ const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions
+ )
+ : Left(left)
+ , Right(right)
+ , JoinKind(joinKind)
+ , IsCommutative(isCommutative)
+ , JoinConditions(joinConditions)
+ , IsReversed(false)
+ {
+ BuildCondVectors();
+ }
+
+ inline bool IsSimpleEdge() {
+ return HasSingleBit(Left) && HasSingleBit(Right);
+ }
+
+ TNodeSet Left;
+ TNodeSet Right;
+ EJoinKind JoinKind;
+ bool IsCommutative;
+ std::set<std::pair<TJoinColumn, TJoinColumn>> JoinConditions;
+ TVector<TString> LeftJoinKeys;
+ TVector<TString> RightJoinKeys;
+
+ // JoinKind may not be commutative, so we need to know which edge is original and which is reversed.
+ bool IsReversed;
+ int64_t ReversedEdgeId = -1;
+
+ void BuildCondVectors() {
+ LeftJoinKeys.clear();
+ RightJoinKeys.clear();
+
+ for (auto [left, right] : JoinConditions) {
+ auto leftKey = left.AttributeName;
+ auto rightKey = right.AttributeName;
+
+ for (size_t i = leftKey.size() - 1; i > 0; --i) {
+ if (leftKey[i] == '.') {
+ leftKey = leftKey.substr(i + 1);
+ break;
+ }
+ }
+
+ for (size_t i = rightKey.size() - 1; i > 0; --i) {
+ if (rightKey[i] == '.') {
+ rightKey = rightKey.substr(i + 1);
+ break;
+ }
+ }
+
+ LeftJoinKeys.emplace_back(leftKey);
+ RightJoinKeys.emplace_back(rightKey);
+ }
+ }
+ };
+
+ struct TNode {
+ TNodeSet SimpleNeighborhood;
+ TVector<size_t> ComplexEdgesId;
+ std::shared_ptr<IBaseOptimizerNode> RelationOptimizerNode;
+ };
+
+public:
+ /* Add node to the hypergraph and returns its id */
+ size_t AddNode(const std::shared_ptr<IBaseOptimizerNode>& relationNode) {
+ size_t nodeId = Nodes_.size();
+ NodeIdByRelationOptimizerNode_.insert({relationNode, nodeId});
+
+ Nodes_.push_back({});
+ Nodes_.back().RelationOptimizerNode = relationNode;
+
+ return nodeId;
+ }
+
+ /* Adds an edge, and its reversed version. */
+ void AddEdge(TEdge edge) {
+ size_t edgeId = Edges_.size();
+ size_t reversedEdgeId = edgeId + 1;
+ edge.ReversedEdgeId = reversedEdgeId;
+
+ AddEdgeImpl(edge);
+
+ std::set<std::pair<TJoinColumn, TJoinColumn>> reversedJoinConditions;
+ for (const auto& [lhs, rhs]: edge.JoinConditions) {
+ reversedJoinConditions.insert({rhs, lhs});
+ }
+
+ TEdge reversedEdge = std::move(edge);
+ std::swap(reversedEdge.Left, reversedEdge.Right);
+ reversedEdge.JoinConditions = std::move(reversedJoinConditions);
+ reversedEdge.IsReversed = true;
+ reversedEdge.ReversedEdgeId = edgeId;
+
+ AddEdgeImpl(reversedEdge);
+ }
+
+ TNodeSet GetNodesByRelNamesInSubtree(const std::shared_ptr<IBaseOptimizerNode>& subtreeRoot, const TVector<TString>& relationNames) {
+ if (subtreeRoot->Kind == RelNodeType) {
+ TString relationName = subtreeRoot->Labels()[0];
+
+ TNodeSet nodeSet{};
+ if (std::find(relationNames.begin(), relationNames.end(), relationName) != relationNames.end()) {
+ nodeSet[NodeIdByRelationOptimizerNode_[subtreeRoot]] = 1;
+ }
+ return nodeSet;
+ }
+
+ auto joinNode = std::static_pointer_cast<TJoinOptimizerNode>(subtreeRoot);
+
+ auto leftNodeSet = GetNodesByRelNamesInSubtree(joinNode->LeftArg, relationNames);
+ auto rightNodeSet = GetNodesByRelNamesInSubtree(joinNode->RightArg, relationNames);
+
+ TNodeSet nodeSet = leftNodeSet | rightNodeSet;
+
+ return nodeSet;
+ }
+
+ TEdge& GetEdge(size_t edgeId) {
+ Y_ASSERT(edgeId < Edges_.size());
+ return Edges_[edgeId];
+ }
+
+ inline TVector<TNode>& GetNodes() {
+ return Nodes_;
+ }
+
+ const TEdge* FindEdgeBetween(const TNodeSet& lhs, const TNodeSet& rhs) {
+ for (const auto& edge: Edges_) {
+ if (
+ IsSubset(edge.Left, lhs) &&
+ !Overlaps(edge.Left, rhs) &&
+ IsSubset(edge.Right, rhs) &&
+ !Overlaps(edge.Right, lhs)
+ ) {
+ return &edge;
+ }
+ }
+
+ return nullptr;
+ }
+
+private:
+ /* Attach edges to nodes */
+ void AddEdgeImpl(TEdge edge) {
+ Edges_.push_back(edge);
+
+ if (edge.IsSimpleEdge()) {
+ Nodes_[GetLowestSetBit(edge.Left)].SimpleNeighborhood |= edge.Right;
+ return;
+ }
+
+ auto setBitsIt = TSetBitsIt(edge.Left);
+ while (setBitsIt.HasNext()) {
+ Nodes_[setBitsIt.Next()].ComplexEdgesId.push_back(Edges_.size() - 1);
+ }
+ }
+
+private:
+ std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, size_t> NodeIdByRelationOptimizerNode_;
+
+ TVector<TNode> Nodes_;
+ TVector<TEdge> Edges_;
+};
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_join_tree_node.cpp b/ydb/library/yql/dq/opt/dq_opt_join_tree_node.cpp
new file mode 100644
index 0000000000..1c4db89bb6
--- /dev/null
+++ b/ydb/library/yql/dq/opt/dq_opt_join_tree_node.cpp
@@ -0,0 +1,44 @@
+#include "dq_opt_join_tree_node.h"
+
+namespace NYql::NDq {
+
+std::shared_ptr<TJoinOptimizerNodeInternal> MakeJoinInternal(
+ std::shared_ptr<IBaseOptimizerNode> left,
+ std::shared_ptr<IBaseOptimizerNode> right,
+ const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
+ const TVector<TString>& leftJoinKeys,
+ const TVector<TString>& rightJoinKeys,
+ EJoinKind joinKind,
+ EJoinAlgoType joinAlgo,
+ IProviderContext& ctx) {
+
+ auto res = std::make_shared<TJoinOptimizerNodeInternal>(left, right, joinConditions, leftJoinKeys, rightJoinKeys, joinKind, joinAlgo);
+ res->Stats = std::make_shared<TOptimizerStatistics>(ComputeJoinStats(*left->Stats, *right->Stats, leftJoinKeys, rightJoinKeys, joinAlgo, ctx));
+ return res;
+}
+
+std::shared_ptr<TJoinOptimizerNode> ConvertFromInternal(const std::shared_ptr<IBaseOptimizerNode> internal) {
+ Y_ENSURE(internal->Kind == EOptimizerNodeKind::JoinNodeType);
+
+ if (dynamic_cast<TJoinOptimizerNode*>(internal.get()) != nullptr) {
+ return std::static_pointer_cast<TJoinOptimizerNode>(internal);
+ }
+
+ auto join = std::static_pointer_cast<TJoinOptimizerNodeInternal>(internal);
+
+ auto left = join->LeftArg;
+ auto right = join->RightArg;
+
+ if (left->Kind == EOptimizerNodeKind::JoinNodeType) {
+ left = ConvertFromInternal(left);
+ }
+ if (right->Kind == EOptimizerNodeKind::JoinNodeType) {
+ right = ConvertFromInternal(right);
+ }
+
+ auto newJoin = std::make_shared<TJoinOptimizerNode>(left, right, join->JoinConditions, join->JoinType, join->JoinAlgo);
+ newJoin->Stats = join->Stats;
+ return newJoin;
+}
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_join_tree_node.h b/ydb/library/yql/dq/opt/dq_opt_join_tree_node.h
new file mode 100644
index 0000000000..37a3075175
--- /dev/null
+++ b/ydb/library/yql/dq/opt/dq_opt_join_tree_node.h
@@ -0,0 +1,80 @@
+#pragma once
+
+#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h>
+
+namespace NYql::NDq {
+
+/**
+ * Internal Join nodes are used inside the CBO. They don't own join condition data structures
+ * and therefore avoid copying them during generation of candidate plans.
+ *
+ * These datastructures are owned by the query graph, so it is important to keep the graph around
+ * while internal nodes are being used.
+ *
+ * After join enumeration, internal nodes need to be converted to regular nodes, that own the data
+ * structures
+*/
+struct TJoinOptimizerNodeInternal : public IBaseOptimizerNode {
+ TJoinOptimizerNodeInternal(
+ const std::shared_ptr<IBaseOptimizerNode>& left,
+ const std::shared_ptr<IBaseOptimizerNode>& right,
+ const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
+ const TVector<TString>& leftJoinKeys,
+ const TVector<TString>& rightJoinKeys,
+ const EJoinKind joinType,
+ const EJoinAlgoType joinAlgo
+ )
+ : IBaseOptimizerNode(JoinNodeType)
+ , LeftArg(left)
+ , RightArg(right)
+ , JoinConditions(joinConditions)
+ , LeftJoinKeys(leftJoinKeys)
+ , RightJoinKeys(rightJoinKeys)
+ , JoinType(joinType)
+ , JoinAlgo(joinAlgo)
+ {}
+
+ virtual ~TJoinOptimizerNodeInternal() = default;
+ virtual TVector<TString> Labels() {
+ auto res = LeftArg->Labels();
+ auto rightLabels = RightArg->Labels();
+ res.insert(res.begin(),rightLabels.begin(),rightLabels.end());
+ return res;
+ }
+
+ virtual void Print(std::stringstream&, int) {
+ }
+
+ std::shared_ptr<IBaseOptimizerNode> LeftArg;
+ std::shared_ptr<IBaseOptimizerNode> RightArg;
+ const std::set<std::pair<NDq::TJoinColumn, NDq::TJoinColumn>>& JoinConditions;
+ const TVector<TString>& LeftJoinKeys;
+ const TVector<TString>& RightJoinKeys;
+ EJoinKind JoinType;
+ EJoinAlgoType JoinAlgo;
+};
+
+/**
+ * Create a new internal join node and compute its statistics and cost
+*/
+std::shared_ptr<TJoinOptimizerNodeInternal> MakeJoinInternal(
+ std::shared_ptr<IBaseOptimizerNode> left,
+ std::shared_ptr<IBaseOptimizerNode> right,
+ const std::set<std::pair<TJoinColumn, TJoinColumn>>& joinConditions,
+ const TVector<TString>& leftJoinKeys,
+ const TVector<TString>& rightJoinKeys,
+ EJoinKind joinKind,
+ EJoinAlgoType joinAlgo,
+ IProviderContext& ctx
+);
+
+/**
+ * Convert a tree of internal optimizer nodes to external nodes that own the data structures.
+ *
+ * The internal node tree can have references to external nodes (since some subtrees are optimized
+ * separately if the plan contains non-orderable joins). So we check the instances and if we encounter
+ * an external node, we return the whole subtree unchanged.
+*/
+std::shared_ptr<TJoinOptimizerNode> ConvertFromInternal(const std::shared_ptr<IBaseOptimizerNode> internal);
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/dq_opt_make_join_hypergraph.h b/ydb/library/yql/dq/opt/dq_opt_make_join_hypergraph.h
new file mode 100644
index 0000000000..0f7f386e65
--- /dev/null
+++ b/ydb/library/yql/dq/opt/dq_opt_make_join_hypergraph.h
@@ -0,0 +1,98 @@
+#pragma once
+
+#include "dq_opt_join_hypergraph.h"
+#include "dq_opt_conflict_rules_collector.h"
+
+#include <ydb/library/yql/core/cbo/cbo_optimizer_new.h>
+
+#include <memory.h>
+
+/*
+ * This header contains MakeJoinHypergraph function to construct the hypergraph from inner optimizer nodes.
+ * Pipeline works as follows:
+ * 1) MakeJoinHypergraph calls MakeJoinHypergraphRec recursively.
+ * 2) MakeJoinHypergraphRec calls MakeHyperedge for each join node.
+ * 3) MakeHyperedge finds conflicts with TConflictRulesCollector and collect them into TES.
+ * If join has conflicts or complex predicate, then MakeHyperedge will create a complex edge.
+ */
+
+namespace NYql::NDq {
+
+inline TVector<TString> GetConditionUsedRelationNames(const std::shared_ptr<TJoinOptimizerNode>& joinNode) {
+ TVector<TString> res;
+ res.reserve(joinNode->JoinConditions.size());
+
+ for (const auto& [lhsTable, rhsTable]: joinNode->JoinConditions) {
+ res.push_back(lhsTable.RelName);
+ res.push_back(rhsTable.RelName);
+ }
+
+ return res;
+}
+
+template <typename TNodeSet>
+TJoinHypergraph<TNodeSet>::TEdge MakeHyperedge(
+ const std::shared_ptr<TJoinOptimizerNode>& joinNode,
+ const TNodeSet& conditionUsedRels,
+ std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& subtreeNodes
+) {
+ auto conflictRulesCollector = TConflictRulesCollector<TNodeSet>(joinNode, subtreeNodes);
+ auto conflictRules = conflictRulesCollector.CollectConflicts();
+
+ TNodeSet TES = ConvertConflictRulesIntoTES(conditionUsedRels, conflictRules);
+
+
+ /* For CROSS Join and degenerate predicates (if subtree tables and joinCondition tables do not intersect) */
+ if (!Overlaps(TES, subtreeNodes[joinNode->LeftArg])) {
+ TES |= subtreeNodes[joinNode->LeftArg];
+ TES = ConvertConflictRulesIntoTES(TES, conflictRules);
+ }
+
+ if (!Overlaps(TES, subtreeNodes[joinNode->RightArg])) {
+ TES |= subtreeNodes[joinNode->RightArg];
+ TES = ConvertConflictRulesIntoTES(TES, conflictRules);
+ }
+
+ TNodeSet left = TES & subtreeNodes[joinNode->LeftArg];
+ TNodeSet right = TES & subtreeNodes[joinNode->RightArg];
+
+ return typename TJoinHypergraph<TNodeSet>::TEdge(left, right, joinNode->JoinType, OperatorIsCommutative(joinNode->JoinType) && joinNode->IsReorderable, joinNode->JoinConditions);
+}
+
+template<typename TNodeSet>
+void MakeJoinHypergraphRec(
+ TJoinHypergraph<TNodeSet>& graph,
+ const std::shared_ptr<IBaseOptimizerNode>& joinTree,
+ std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& subtreeNodes
+) {
+ if (joinTree->Kind == RelNodeType) {
+ size_t nodeId = graph.AddNode(joinTree);
+ TNodeSet node{};
+ node[nodeId] = 1;
+ subtreeNodes[joinTree] = node;
+ return;
+ }
+
+ auto joinNode = std::static_pointer_cast<TJoinOptimizerNode>(joinTree);
+ MakeJoinHypergraphRec(graph, joinNode->LeftArg, subtreeNodes);
+ MakeJoinHypergraphRec(graph, joinNode->RightArg, subtreeNodes);
+
+ subtreeNodes[joinTree] = subtreeNodes[joinNode->LeftArg] | subtreeNodes[joinNode->RightArg];
+
+ TNodeSet conditionUsedRels{};
+ conditionUsedRels = graph.GetNodesByRelNamesInSubtree(joinTree, GetConditionUsedRelationNames(joinNode));
+
+ graph.AddEdge(MakeHyperedge<TNodeSet>(joinNode, conditionUsedRels, subtreeNodes));
+}
+
+template <typename TNodeSet>
+TJoinHypergraph<TNodeSet> MakeJoinHypergraph(
+ const std::shared_ptr<IBaseOptimizerNode>& joinTree
+) {
+ TJoinHypergraph<TNodeSet> graph{};
+ std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet> subtreeNodes{};
+ MakeJoinHypergraphRec(graph, joinTree, subtreeNodes);
+ return graph;
+}
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/opt/ya.make b/ydb/library/yql/dq/opt/ya.make
index 999fbec7dc..0e939390ec 100644
--- a/ydb/library/yql/dq/opt/ya.make
+++ b/ydb/library/yql/dq/opt/ya.make
@@ -14,7 +14,10 @@ PEERDIR(
SRCS(
dq_opt.cpp
dq_opt_build.cpp
+ dq_opt_conflict_rules_collector.cpp
dq_opt_join.cpp
+ dq_opt_join_cost_based.cpp
+ dq_opt_join_tree_node.cpp
dq_opt_hopping.cpp
dq_opt_log.cpp
dq_opt_peephole.cpp
@@ -22,7 +25,6 @@ SRCS(
dq_opt_phy.cpp
dq_opt_stat.cpp
dq_opt_stat_transformer_base.cpp
- dq_opt_join_cost_based.cpp
dq_opt_predicate_selectivity.cpp
)