diff options
author | udovichenko-r <[email protected]> | 2023-09-01 14:19:21 +0300 |
---|---|---|
committer | udovichenko-r <[email protected]> | 2023-09-01 14:51:57 +0300 |
commit | 44ac4e59f8b4b8420d2ccd14b65e1697155141e7 (patch) | |
tree | 1409983607ebd591035e5ecfe19b48282053fc4e | |
parent | a716b1dc563343460f3a4dc0c505f0cf850bafa9 (diff) |
[yql] Stable plan after finalyzing opts
YQL-16453
4 files changed, 54 insertions, 52 deletions
diff --git a/ydb/library/yql/core/ut/yql_execution_ut.cpp b/ydb/library/yql/core/ut/yql_execution_ut.cpp index 3c405952b07..29c0c48bfa7 100644 --- a/ydb/library/yql/core/ut/yql_execution_ut.cpp +++ b/ydb/library/yql/core/ut/yql_execution_ut.cpp @@ -616,8 +616,8 @@ namespace NYql { //~ Cerr << res[2] << Endl; UNIT_ASSERT_NO_DIFF( "{\"Write\"=[" - "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/bb686f68-2245bd5f-2318fa4e-1\"];\"Columns\"=[\"bar\"];\"Remove\"=%true}]};" - "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/7ae6459a-7382d1e7-7935c08e-2\"];\"Columns\"=[\"foo\"];\"Remove\"=%true}]}" + "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/7ae6459a-7382d1e7-7935c08e-2\"];\"Columns\"=[\"bar\"];\"Remove\"=%true}]};" + "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/bb686f68-2245bd5f-2318fa4e-1\"];\"Columns\"=[\"foo\"];\"Remove\"=%true}]}" "]}", res[0] ); diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.cpp index d9ca1f18a85..ced9714acdf 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.cpp @@ -630,9 +630,9 @@ TExprNode::TPtr THorizontalJoinOptimizer::HandleList(const TExprNode::TPtr& node } const size_t mapInputCount = map.Input().Item(0).Paths().Size(); - if (InputCount + mapInputCount > MaxTables) { + if (const auto nextCount = InputCount + mapInputCount; nextCount > MaxTables) { if (MakeJoinedMap(node->Pos(), ctx)) { - YQL_CLOG(INFO, ProviderYt) << "HorizontalJoin: split by max input tables: " << (InputCount + mapInputCount); + YQL_CLOG(INFO, ProviderYt) << "HorizontalJoin: split by max input tables: " << nextCount; } // Reinit outNdx because MakeJoinedMap() clears JoinedMaps outNdx = JoinedMaps.size(); @@ -1224,8 +1224,7 @@ TExprNode::TPtr THorizontalJoinOptimizer::RebuildList(const TExprNode::TPtr& nod IGraphTransformer::TStatus TMultiHorizontalJoinOptimizer::Optimize(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) { THashMap<TGroupKey, TVector<TYtMap>> mapGroups; - for (auto& x: OpDeps) { - auto writer = x.first; + for (auto writer: OpDepsOrder) { if (auto maybeMap = TMaybeNode<TYtMap>(writer)) { auto map = maybeMap.Cast(); if (IsGoodForHorizontalJoin(map)) { @@ -1261,7 +1260,7 @@ IGraphTransformer::TStatus TMultiHorizontalJoinOptimizer::Optimize(TExprNode::TP if (good) { std::set<ui64> readerIds; - for (auto& reader: x.second) { + for (auto& reader: OpDeps.at(writer)) { readerIds.insert(std::get<0>(reader)->UniqueId()); } ui32 flags = 0; @@ -1333,15 +1332,15 @@ IGraphTransformer::TStatus TMultiHorizontalJoinOptimizer::Optimize(TExprNode::TP bool TMultiHorizontalJoinOptimizer::HandleGroup(const TVector<TYtMap>& maps, TExprContext& ctx) { for (TYtMap map: maps) { const size_t mapInputCount = map.Input().Item(0).Paths().Size(); - if (InputCount + mapInputCount > MaxTables) { + if (const auto nextCount = InputCount + mapInputCount; nextCount > MaxTables) { if (MakeJoinedMap(ctx)) { - YQL_CLOG(INFO, ProviderYt) << "MultiHorizontalJoin: split by max input tables: " << (InputCount + mapInputCount); + YQL_CLOG(INFO, ProviderYt) << "MultiHorizontalJoin: split by max input tables: " << nextCount; } } - if (JoinedMaps.size() + 1 > MaxOutTables) { + if (const auto nextMapCount = JoinedMaps.size() + 1; nextMapCount > MaxOutTables) { if (MakeJoinedMap(ctx)) { - YQL_CLOG(INFO, ProviderYt) << "MultiHorizontalJoin: split by max output tables: " << (JoinedMaps.size() + 1); + YQL_CLOG(INFO, ProviderYt) << "MultiHorizontalJoin: split by max output tables: " << nextMapCount; } } @@ -1727,9 +1726,9 @@ bool TOutHorizontalJoinOptimizer::HandleGroup(TPositionHandle pos, const TGroupK auto itemType = std::get<2>(key)->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); for (TYtMap map: maps) { - if (JoinedMaps.size() + 1 > MaxOutTables) { + if (const auto nextMapCount = JoinedMaps.size() + 1; nextMapCount > MaxOutTables) { if (MakeJoinedMap(pos, key, itemType, ctx)) { - YQL_CLOG(INFO, ProviderYt) << "OutHorizontalJoin: split by max output tables: " << (JoinedMaps.size() + 1); + YQL_CLOG(INFO, ProviderYt) << "OutHorizontalJoin: split by max output tables: " << nextMapCount; } } @@ -1804,8 +1803,7 @@ IGraphTransformer::TStatus TOutHorizontalJoinOptimizer::Optimize(TExprNode::TPtr THashMap<TGroupKey, TVector<TYtMap>> opGroups; THashMap<TGroupKey, TVector<TYtMap>> tableGroups; - for (auto& x: OpDeps) { - auto writer = x.first; + for (auto writer: OpDepsOrder) { if (IsGoodForOutHorizontalJoin(writer)) { auto map = TYtMap(writer); auto section = map.Input().Item(0); @@ -1837,7 +1835,7 @@ IGraphTransformer::TStatus TOutHorizontalJoinOptimizer::Optimize(TExprNode::TPtr } opGroups.clear(); - for (auto& reader: x.second) { + for (auto& reader: OpDeps.at(writer)) { if (IsGoodForOutHorizontalJoin(std::get<0>(reader))) { auto map = TYtMap(std::get<0>(reader)); auto section = map.Input().Item(0); @@ -1870,9 +1868,6 @@ IGraphTransformer::TStatus TOutHorizontalJoinOptimizer::Optimize(TExprNode::TPtr continue; } - Sort(group.second.begin(), group.second.end(), - [](const TYtMap& m1, const TYtMap& m2) { return m1.Ref().UniqueId() < m2.Ref().UniqueId(); }); - if (!HandleGroup(writer->Pos(), group.first, group.second, ctx)) { return IGraphTransformer::TStatus::Error; } @@ -1892,9 +1887,6 @@ IGraphTransformer::TStatus TOutHorizontalJoinOptimizer::Optimize(TExprNode::TPtr continue; } - Sort(group.second.begin(), group.second.end(), - [](const TYtMap& m1, const TYtMap& m2) { return m1.Ref().UniqueId() < m2.Ref().UniqueId(); }); - if (!HandleGroup(std::get<2>(group.first)->Pos(), group.first, group.second, ctx)) { return IGraphTransformer::TStatus::Error; } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.h b/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.h index aad783760bf..e15d4701514 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.h @@ -27,8 +27,9 @@ namespace NYql { class THorizontalJoinBase { public: - THorizontalJoinBase(const TYtState::TPtr& state, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps) + THorizontalJoinBase(const TYtState::TPtr& state, const std::vector<const TExprNode*>& opDepsOrder, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps) : State_(state) + , OpDepsOrder(opDepsOrder) , OpDeps(opDeps) , HasWorldDeps(hasWorldDeps) , MaxTables(State_->Configuration->MaxInputTables.Get().GetOrElse(DEFAULT_MAX_INPUT_TABLES)) @@ -58,6 +59,7 @@ protected: protected: TYtState::TPtr State_; + const std::vector<const TExprNode*>& OpDepsOrder; const TOpDeps& OpDeps; const TNodeSet& HasWorldDeps; @@ -82,8 +84,8 @@ class THorizontalJoinOptimizer: public THorizontalJoinBase { using TGroupKey = std::tuple<TString, TMaybe<TSampleParams>, const TExprNode*, ui32>; public: - THorizontalJoinOptimizer(const TYtState::TPtr& state, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps, TProcessedNodesSet* processedNodes) - : THorizontalJoinBase(state, opDeps, hasWorldDeps) + THorizontalJoinOptimizer(const TYtState::TPtr& state, const std::vector<const TExprNode*>& opDepsOrder, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps, TProcessedNodesSet* processedNodes) + : THorizontalJoinBase(state, opDepsOrder, opDeps, hasWorldDeps) , ProcessedNodes(processedNodes) { } @@ -123,8 +125,8 @@ class TMultiHorizontalJoinOptimizer: public THorizontalJoinBase { using TGroupKey = std::tuple<TString, std::set<ui64>, TMaybe<TSampleParams>, const TExprNode*, ui32>; public: - TMultiHorizontalJoinOptimizer(const TYtState::TPtr& state, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps) - : THorizontalJoinBase(state, opDeps, hasWorldDeps) + TMultiHorizontalJoinOptimizer(const TYtState::TPtr& state, const std::vector<const TExprNode*>& opDepsOrder, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps) + : THorizontalJoinBase(state, opDepsOrder, opDeps, hasWorldDeps) { } @@ -149,8 +151,8 @@ class TOutHorizontalJoinOptimizer: public THorizontalJoinBase { using TGroupKey = std::tuple<TString, const TExprNode*, const TExprNode*, const TExprNode*, const TExprNode*, ui32>; public: - TOutHorizontalJoinOptimizer(const TYtState::TPtr& state, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps) - : THorizontalJoinBase(state, opDeps, hasWorldDeps) + TOutHorizontalJoinOptimizer(const TYtState::TPtr& state, const std::vector<const TExprNode*>& opDepsOrder, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps) + : THorizontalJoinBase(state, opDepsOrder, opDeps, hasWorldDeps) { } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_physical_finalizing.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_physical_finalizing.cpp index 82651debc81..4fc5753e84e 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_physical_finalizing.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_physical_finalizing.cpp @@ -54,6 +54,7 @@ public: output = input; TOpDeps opDeps; + std::vector<const TExprNode*> opDepsOrder; TNodeSet hasWorldDeps; // Operations, which have world dependencies on them from another nodes TNodeSet toCombine; TNodeSet neverCombine; @@ -62,6 +63,15 @@ public: bool enableChunkCombining = IsChunkCombiningEnabled(); + auto storeDep = [&opDeps, &opDepsOrder](const TYtOutput& out, const TExprNode* reader, const TExprNode* sec, const TExprNode* path) { + const auto realOp = GetRealOperation(out.Operation()).Raw(); + auto& res = opDeps[realOp]; + if (res.empty()) { + opDepsOrder.push_back(realOp); + } + res.emplace_back(reader, sec, out.Raw(), path); + }; + VisitExpr(input, [&](const TExprNode::TPtr& node)->bool { if (auto maybeOp = TMaybeNode<TYtTransientOpBase>(node)) { auto op = maybeOp.Cast(); @@ -69,7 +79,7 @@ public: for (auto path: section.Paths()) { if (auto maybeOutput = path.Table().Maybe<TYtOutput>()) { auto out = maybeOutput.Cast(); - opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(op.Raw(), section.Raw(), out.Raw(), path.Raw()); + storeDep(out, op.Raw(), section.Raw(), path.Raw()); if (enableChunkCombining) { CollectForCombine(out, toCombine, neverCombine); } @@ -83,7 +93,7 @@ public: for (auto path: section.Paths()) { if (auto maybeOutput = path.Table().Maybe<TYtOutput>()) { auto out = maybeOutput.Cast(); - opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(read.Raw(), section.Raw(), out.Raw(), path.Raw()); + storeDep(out, read.Raw(), section.Raw(), path.Raw()); if (enableChunkCombining) { CollectForCombine(out, toCombine, neverCombine); } @@ -94,21 +104,21 @@ public: else if (auto maybePublish = TMaybeNode<TYtPublish>(node)) { auto publish = maybePublish.Cast(); for (auto out: publish.Input()) { - opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(publish.Raw(), nullptr, out.Raw(), nullptr); + storeDep(out, publish.Raw(), nullptr, nullptr); } } else if (auto maybeLength = TMaybeNode<TYtLength>(node)) { auto length = maybeLength.Cast(); if (auto maybeOutput = length.Input().Maybe<TYtOutput>()) { auto out = maybeOutput.Cast(); - opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(length.Raw(), nullptr, out.Raw(), nullptr); + storeDep(out, length.Raw(), nullptr, nullptr); } } else if (auto maybeTableContent = TMaybeNode<TYtTableContent>(node)) { auto tableContent = maybeTableContent.Cast(); if (auto maybeOutput = tableContent.Input().Maybe<TYtOutput>()) { auto out = maybeOutput.Cast(); - opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(tableContent.Raw(), nullptr, out.Raw(), nullptr); + storeDep(out, tableContent.Raw(), nullptr, nullptr); if (enableChunkCombining) { CollectForCombine(out, toCombine, neverCombine); } @@ -118,7 +128,7 @@ public: auto resWrite = maybeResWrite.Cast(); if (auto maybeOutput = resWrite.Data().Maybe<TYtOutput>()) { auto out = maybeOutput.Cast(); - opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(resWrite.Raw(), nullptr, out.Raw(), nullptr); + storeDep(out, resWrite.Raw(), nullptr, nullptr); if (enableChunkCombining) { CollectForCombine(out, toCombine, neverCombine); } @@ -128,13 +138,13 @@ public: auto sqlIn = maybeSqlIn.Cast(); if (auto maybeOutput = sqlIn.Collection().Maybe<TYtOutput>()) { auto out = maybeOutput.Cast(); - opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(sqlIn.Raw(), nullptr, out.Raw(), nullptr); + storeDep(out, sqlIn.Raw(), nullptr, nullptr); } } else if (auto maybeStatOut = TMaybeNode<TYtStatOut>(node)) { auto statOut = maybeStatOut.Cast(); auto out = statOut.Input(); - opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(statOut.Raw(), nullptr, out.Raw(), nullptr); + storeDep(out, statOut.Raw(), nullptr, nullptr); if (enableChunkCombining) { CollectForCombine(out, toCombine, neverCombine); } @@ -155,6 +165,7 @@ public: return true; }); + YQL_ENSURE(opDeps.size() == opDepsOrder.size()); const auto disableOptimizers = State_->Configuration->DisableOptimizers.Get().GetOrElse(TSet<TString>()); @@ -243,7 +254,7 @@ public: } if (!disableOptimizers.contains("UnorderedOuts") && ctx.IsConstraintEnabled<TSortedConstraintNode>()) { - status = OptimizeUnorderedOuts(input, output, opDeps, lefts, ctx); + status = OptimizeUnorderedOuts(input, output, opDepsOrder, opDeps, lefts, ctx); if (status.Level != TStatus::Ok) { return status; } @@ -397,21 +408,21 @@ public: } if (!disableOptimizers.contains("HorizontalJoin")) { - status = THorizontalJoinOptimizer(State_, opDeps, hasWorldDeps, &ProcessedHorizontalJoin).Optimize(output, output, ctx); + status = THorizontalJoinOptimizer(State_, opDepsOrder, opDeps, hasWorldDeps, &ProcessedHorizontalJoin).Optimize(output, output, ctx); if (status.Level != TStatus::Ok) { return status; } } if (!disableOptimizers.contains("MultiHorizontalJoin")) { - status = TMultiHorizontalJoinOptimizer(State_, opDeps, hasWorldDeps).Optimize(output, output, ctx); + status = TMultiHorizontalJoinOptimizer(State_, opDepsOrder, opDeps, hasWorldDeps).Optimize(output, output, ctx); if (status.Level != TStatus::Ok) { return status; } } if (!disableOptimizers.contains("OutHorizontalJoin")) { - status = TOutHorizontalJoinOptimizer(State_, opDeps, hasWorldDeps).Optimize(output, output, ctx); + status = TOutHorizontalJoinOptimizer(State_, opDepsOrder, opDeps, hasWorldDeps).Optimize(output, output, ctx); if (status.Level != TStatus::Ok) { return status; } @@ -855,12 +866,11 @@ private: return TStatus::Ok; } - TStatus OptimizeUnorderedOuts(TExprNode::TPtr input, TExprNode::TPtr& output, const TOpDeps& opDeps, const TNodeSet& lefts, TExprContext& ctx) { - TVector<const TOpDeps::value_type*> matchedOps; - for (auto& x: opDeps) { - auto writer = x.first; + TStatus OptimizeUnorderedOuts(TExprNode::TPtr input, TExprNode::TPtr& output, const std::vector<const TExprNode*>& opDepsOrder, const TOpDeps& opDeps, const TNodeSet& lefts, TExprContext& ctx) { + std::vector<const TExprNode*> matchedOps; + for (auto writer: opDepsOrder) { if (!TYtEquiJoin::Match(writer) && !writer->StartsExecution() && (!writer->HasResult() || writer->GetResult().Type() != TExprNode::World)) { - matchedOps.push_back(&x); + matchedOps.push_back(writer); } } @@ -868,15 +878,13 @@ private: return TStatus::Ok; } - Sort(matchedOps, [](const TOpDeps::value_type* m1, const TOpDeps::value_type* m2) { return m1->first->UniqueId() < m2->first->UniqueId(); }); - TNodeOnNodeOwnedMap replaces; TNodeOnNodeOwnedMap newOps; - for (auto x: matchedOps) { - auto writer = x->first; + for (auto writer: matchedOps) { TDynBitMap orderedOuts; TDynBitMap unorderedOuts; - for (auto& item: x->second) { + const auto& readers = opDeps.at(writer); + for (auto& item: readers) { auto out = TYtOutput(std::get<2>(item)); if (IsUnorderedOutput(out)) { unorderedOuts.Set(FromString<size_t>(out.OutIndex().Value())); @@ -890,7 +898,7 @@ private: if (newOp) { newOps[writer] = newOp; } - for (auto& item: x->second) { + for (auto& item: readers) { auto out = std::get<2>(item); replaces[out] = Build<TYtOutput>(ctx, out->Pos()) .Operation(newOp ? newOp : out->ChildPtr(TYtOutput::idx_Operation)) |