summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <[email protected]>2023-09-01 14:19:21 +0300
committerudovichenko-r <[email protected]>2023-09-01 14:51:57 +0300
commit44ac4e59f8b4b8420d2ccd14b65e1697155141e7 (patch)
tree1409983607ebd591035e5ecfe19b48282053fc4e
parenta716b1dc563343460f3a4dc0c505f0cf850bafa9 (diff)
[yql] Stable plan after finalyzing opts
YQL-16453
-rw-r--r--ydb/library/yql/core/ut/yql_execution_ut.cpp4
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.cpp32
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.h16
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_physical_finalizing.cpp54
4 files changed, 54 insertions, 52 deletions
diff --git a/ydb/library/yql/core/ut/yql_execution_ut.cpp b/ydb/library/yql/core/ut/yql_execution_ut.cpp
index 3c405952b07..29c0c48bfa7 100644
--- a/ydb/library/yql/core/ut/yql_execution_ut.cpp
+++ b/ydb/library/yql/core/ut/yql_execution_ut.cpp
@@ -616,8 +616,8 @@ namespace NYql {
//~ Cerr << res[2] << Endl;
UNIT_ASSERT_NO_DIFF(
"{\"Write\"=["
- "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/bb686f68-2245bd5f-2318fa4e-1\"];\"Columns\"=[\"bar\"];\"Remove\"=%true}]};"
- "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/7ae6459a-7382d1e7-7935c08e-2\"];\"Columns\"=[\"foo\"];\"Remove\"=%true}]}"
+ "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/7ae6459a-7382d1e7-7935c08e-2\"];\"Columns\"=[\"bar\"];\"Remove\"=%true}]};"
+ "{\"Ref\"=[{\"Reference\"=[\"yt\";\"plato\";\"tmp/bb686f68-2245bd5f-2318fa4e-1\"];\"Columns\"=[\"foo\"];\"Remove\"=%true}]}"
"]}",
res[0]
);
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.cpp
index d9ca1f18a85..ced9714acdf 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.cpp
@@ -630,9 +630,9 @@ TExprNode::TPtr THorizontalJoinOptimizer::HandleList(const TExprNode::TPtr& node
}
const size_t mapInputCount = map.Input().Item(0).Paths().Size();
- if (InputCount + mapInputCount > MaxTables) {
+ if (const auto nextCount = InputCount + mapInputCount; nextCount > MaxTables) {
if (MakeJoinedMap(node->Pos(), ctx)) {
- YQL_CLOG(INFO, ProviderYt) << "HorizontalJoin: split by max input tables: " << (InputCount + mapInputCount);
+ YQL_CLOG(INFO, ProviderYt) << "HorizontalJoin: split by max input tables: " << nextCount;
}
// Reinit outNdx because MakeJoinedMap() clears JoinedMaps
outNdx = JoinedMaps.size();
@@ -1224,8 +1224,7 @@ TExprNode::TPtr THorizontalJoinOptimizer::RebuildList(const TExprNode::TPtr& nod
IGraphTransformer::TStatus TMultiHorizontalJoinOptimizer::Optimize(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) {
THashMap<TGroupKey, TVector<TYtMap>> mapGroups;
- for (auto& x: OpDeps) {
- auto writer = x.first;
+ for (auto writer: OpDepsOrder) {
if (auto maybeMap = TMaybeNode<TYtMap>(writer)) {
auto map = maybeMap.Cast();
if (IsGoodForHorizontalJoin(map)) {
@@ -1261,7 +1260,7 @@ IGraphTransformer::TStatus TMultiHorizontalJoinOptimizer::Optimize(TExprNode::TP
if (good) {
std::set<ui64> readerIds;
- for (auto& reader: x.second) {
+ for (auto& reader: OpDeps.at(writer)) {
readerIds.insert(std::get<0>(reader)->UniqueId());
}
ui32 flags = 0;
@@ -1333,15 +1332,15 @@ IGraphTransformer::TStatus TMultiHorizontalJoinOptimizer::Optimize(TExprNode::TP
bool TMultiHorizontalJoinOptimizer::HandleGroup(const TVector<TYtMap>& maps, TExprContext& ctx) {
for (TYtMap map: maps) {
const size_t mapInputCount = map.Input().Item(0).Paths().Size();
- if (InputCount + mapInputCount > MaxTables) {
+ if (const auto nextCount = InputCount + mapInputCount; nextCount > MaxTables) {
if (MakeJoinedMap(ctx)) {
- YQL_CLOG(INFO, ProviderYt) << "MultiHorizontalJoin: split by max input tables: " << (InputCount + mapInputCount);
+ YQL_CLOG(INFO, ProviderYt) << "MultiHorizontalJoin: split by max input tables: " << nextCount;
}
}
- if (JoinedMaps.size() + 1 > MaxOutTables) {
+ if (const auto nextMapCount = JoinedMaps.size() + 1; nextMapCount > MaxOutTables) {
if (MakeJoinedMap(ctx)) {
- YQL_CLOG(INFO, ProviderYt) << "MultiHorizontalJoin: split by max output tables: " << (JoinedMaps.size() + 1);
+ YQL_CLOG(INFO, ProviderYt) << "MultiHorizontalJoin: split by max output tables: " << nextMapCount;
}
}
@@ -1727,9 +1726,9 @@ bool TOutHorizontalJoinOptimizer::HandleGroup(TPositionHandle pos, const TGroupK
auto itemType = std::get<2>(key)->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
for (TYtMap map: maps) {
- if (JoinedMaps.size() + 1 > MaxOutTables) {
+ if (const auto nextMapCount = JoinedMaps.size() + 1; nextMapCount > MaxOutTables) {
if (MakeJoinedMap(pos, key, itemType, ctx)) {
- YQL_CLOG(INFO, ProviderYt) << "OutHorizontalJoin: split by max output tables: " << (JoinedMaps.size() + 1);
+ YQL_CLOG(INFO, ProviderYt) << "OutHorizontalJoin: split by max output tables: " << nextMapCount;
}
}
@@ -1804,8 +1803,7 @@ IGraphTransformer::TStatus TOutHorizontalJoinOptimizer::Optimize(TExprNode::TPtr
THashMap<TGroupKey, TVector<TYtMap>> opGroups;
THashMap<TGroupKey, TVector<TYtMap>> tableGroups;
- for (auto& x: OpDeps) {
- auto writer = x.first;
+ for (auto writer: OpDepsOrder) {
if (IsGoodForOutHorizontalJoin(writer)) {
auto map = TYtMap(writer);
auto section = map.Input().Item(0);
@@ -1837,7 +1835,7 @@ IGraphTransformer::TStatus TOutHorizontalJoinOptimizer::Optimize(TExprNode::TPtr
}
opGroups.clear();
- for (auto& reader: x.second) {
+ for (auto& reader: OpDeps.at(writer)) {
if (IsGoodForOutHorizontalJoin(std::get<0>(reader))) {
auto map = TYtMap(std::get<0>(reader));
auto section = map.Input().Item(0);
@@ -1870,9 +1868,6 @@ IGraphTransformer::TStatus TOutHorizontalJoinOptimizer::Optimize(TExprNode::TPtr
continue;
}
- Sort(group.second.begin(), group.second.end(),
- [](const TYtMap& m1, const TYtMap& m2) { return m1.Ref().UniqueId() < m2.Ref().UniqueId(); });
-
if (!HandleGroup(writer->Pos(), group.first, group.second, ctx)) {
return IGraphTransformer::TStatus::Error;
}
@@ -1892,9 +1887,6 @@ IGraphTransformer::TStatus TOutHorizontalJoinOptimizer::Optimize(TExprNode::TPtr
continue;
}
- Sort(group.second.begin(), group.second.end(),
- [](const TYtMap& m1, const TYtMap& m2) { return m1.Ref().UniqueId() < m2.Ref().UniqueId(); });
-
if (!HandleGroup(std::get<2>(group.first)->Pos(), group.first, group.second, ctx)) {
return IGraphTransformer::TStatus::Error;
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.h b/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.h
index aad783760bf..e15d4701514 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_horizontal_join.h
@@ -27,8 +27,9 @@ namespace NYql {
class THorizontalJoinBase {
public:
- THorizontalJoinBase(const TYtState::TPtr& state, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps)
+ THorizontalJoinBase(const TYtState::TPtr& state, const std::vector<const TExprNode*>& opDepsOrder, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps)
: State_(state)
+ , OpDepsOrder(opDepsOrder)
, OpDeps(opDeps)
, HasWorldDeps(hasWorldDeps)
, MaxTables(State_->Configuration->MaxInputTables.Get().GetOrElse(DEFAULT_MAX_INPUT_TABLES))
@@ -58,6 +59,7 @@ protected:
protected:
TYtState::TPtr State_;
+ const std::vector<const TExprNode*>& OpDepsOrder;
const TOpDeps& OpDeps;
const TNodeSet& HasWorldDeps;
@@ -82,8 +84,8 @@ class THorizontalJoinOptimizer: public THorizontalJoinBase {
using TGroupKey = std::tuple<TString, TMaybe<TSampleParams>, const TExprNode*, ui32>;
public:
- THorizontalJoinOptimizer(const TYtState::TPtr& state, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps, TProcessedNodesSet* processedNodes)
- : THorizontalJoinBase(state, opDeps, hasWorldDeps)
+ THorizontalJoinOptimizer(const TYtState::TPtr& state, const std::vector<const TExprNode*>& opDepsOrder, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps, TProcessedNodesSet* processedNodes)
+ : THorizontalJoinBase(state, opDepsOrder, opDeps, hasWorldDeps)
, ProcessedNodes(processedNodes)
{
}
@@ -123,8 +125,8 @@ class TMultiHorizontalJoinOptimizer: public THorizontalJoinBase {
using TGroupKey = std::tuple<TString, std::set<ui64>, TMaybe<TSampleParams>, const TExprNode*, ui32>;
public:
- TMultiHorizontalJoinOptimizer(const TYtState::TPtr& state, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps)
- : THorizontalJoinBase(state, opDeps, hasWorldDeps)
+ TMultiHorizontalJoinOptimizer(const TYtState::TPtr& state, const std::vector<const TExprNode*>& opDepsOrder, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps)
+ : THorizontalJoinBase(state, opDepsOrder, opDeps, hasWorldDeps)
{
}
@@ -149,8 +151,8 @@ class TOutHorizontalJoinOptimizer: public THorizontalJoinBase {
using TGroupKey = std::tuple<TString, const TExprNode*, const TExprNode*, const TExprNode*, const TExprNode*, ui32>;
public:
- TOutHorizontalJoinOptimizer(const TYtState::TPtr& state, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps)
- : THorizontalJoinBase(state, opDeps, hasWorldDeps)
+ TOutHorizontalJoinOptimizer(const TYtState::TPtr& state, const std::vector<const TExprNode*>& opDepsOrder, const TOpDeps& opDeps, const TNodeSet& hasWorldDeps)
+ : THorizontalJoinBase(state, opDepsOrder, opDeps, hasWorldDeps)
{
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_physical_finalizing.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_physical_finalizing.cpp
index 82651debc81..4fc5753e84e 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_physical_finalizing.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_physical_finalizing.cpp
@@ -54,6 +54,7 @@ public:
output = input;
TOpDeps opDeps;
+ std::vector<const TExprNode*> opDepsOrder;
TNodeSet hasWorldDeps; // Operations, which have world dependencies on them from another nodes
TNodeSet toCombine;
TNodeSet neverCombine;
@@ -62,6 +63,15 @@ public:
bool enableChunkCombining = IsChunkCombiningEnabled();
+ auto storeDep = [&opDeps, &opDepsOrder](const TYtOutput& out, const TExprNode* reader, const TExprNode* sec, const TExprNode* path) {
+ const auto realOp = GetRealOperation(out.Operation()).Raw();
+ auto& res = opDeps[realOp];
+ if (res.empty()) {
+ opDepsOrder.push_back(realOp);
+ }
+ res.emplace_back(reader, sec, out.Raw(), path);
+ };
+
VisitExpr(input, [&](const TExprNode::TPtr& node)->bool {
if (auto maybeOp = TMaybeNode<TYtTransientOpBase>(node)) {
auto op = maybeOp.Cast();
@@ -69,7 +79,7 @@ public:
for (auto path: section.Paths()) {
if (auto maybeOutput = path.Table().Maybe<TYtOutput>()) {
auto out = maybeOutput.Cast();
- opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(op.Raw(), section.Raw(), out.Raw(), path.Raw());
+ storeDep(out, op.Raw(), section.Raw(), path.Raw());
if (enableChunkCombining) {
CollectForCombine(out, toCombine, neverCombine);
}
@@ -83,7 +93,7 @@ public:
for (auto path: section.Paths()) {
if (auto maybeOutput = path.Table().Maybe<TYtOutput>()) {
auto out = maybeOutput.Cast();
- opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(read.Raw(), section.Raw(), out.Raw(), path.Raw());
+ storeDep(out, read.Raw(), section.Raw(), path.Raw());
if (enableChunkCombining) {
CollectForCombine(out, toCombine, neverCombine);
}
@@ -94,21 +104,21 @@ public:
else if (auto maybePublish = TMaybeNode<TYtPublish>(node)) {
auto publish = maybePublish.Cast();
for (auto out: publish.Input()) {
- opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(publish.Raw(), nullptr, out.Raw(), nullptr);
+ storeDep(out, publish.Raw(), nullptr, nullptr);
}
}
else if (auto maybeLength = TMaybeNode<TYtLength>(node)) {
auto length = maybeLength.Cast();
if (auto maybeOutput = length.Input().Maybe<TYtOutput>()) {
auto out = maybeOutput.Cast();
- opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(length.Raw(), nullptr, out.Raw(), nullptr);
+ storeDep(out, length.Raw(), nullptr, nullptr);
}
}
else if (auto maybeTableContent = TMaybeNode<TYtTableContent>(node)) {
auto tableContent = maybeTableContent.Cast();
if (auto maybeOutput = tableContent.Input().Maybe<TYtOutput>()) {
auto out = maybeOutput.Cast();
- opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(tableContent.Raw(), nullptr, out.Raw(), nullptr);
+ storeDep(out, tableContent.Raw(), nullptr, nullptr);
if (enableChunkCombining) {
CollectForCombine(out, toCombine, neverCombine);
}
@@ -118,7 +128,7 @@ public:
auto resWrite = maybeResWrite.Cast();
if (auto maybeOutput = resWrite.Data().Maybe<TYtOutput>()) {
auto out = maybeOutput.Cast();
- opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(resWrite.Raw(), nullptr, out.Raw(), nullptr);
+ storeDep(out, resWrite.Raw(), nullptr, nullptr);
if (enableChunkCombining) {
CollectForCombine(out, toCombine, neverCombine);
}
@@ -128,13 +138,13 @@ public:
auto sqlIn = maybeSqlIn.Cast();
if (auto maybeOutput = sqlIn.Collection().Maybe<TYtOutput>()) {
auto out = maybeOutput.Cast();
- opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(sqlIn.Raw(), nullptr, out.Raw(), nullptr);
+ storeDep(out, sqlIn.Raw(), nullptr, nullptr);
}
}
else if (auto maybeStatOut = TMaybeNode<TYtStatOut>(node)) {
auto statOut = maybeStatOut.Cast();
auto out = statOut.Input();
- opDeps[GetRealOperation(out.Operation()).Raw()].emplace_back(statOut.Raw(), nullptr, out.Raw(), nullptr);
+ storeDep(out, statOut.Raw(), nullptr, nullptr);
if (enableChunkCombining) {
CollectForCombine(out, toCombine, neverCombine);
}
@@ -155,6 +165,7 @@ public:
return true;
});
+ YQL_ENSURE(opDeps.size() == opDepsOrder.size());
const auto disableOptimizers = State_->Configuration->DisableOptimizers.Get().GetOrElse(TSet<TString>());
@@ -243,7 +254,7 @@ public:
}
if (!disableOptimizers.contains("UnorderedOuts") && ctx.IsConstraintEnabled<TSortedConstraintNode>()) {
- status = OptimizeUnorderedOuts(input, output, opDeps, lefts, ctx);
+ status = OptimizeUnorderedOuts(input, output, opDepsOrder, opDeps, lefts, ctx);
if (status.Level != TStatus::Ok) {
return status;
}
@@ -397,21 +408,21 @@ public:
}
if (!disableOptimizers.contains("HorizontalJoin")) {
- status = THorizontalJoinOptimizer(State_, opDeps, hasWorldDeps, &ProcessedHorizontalJoin).Optimize(output, output, ctx);
+ status = THorizontalJoinOptimizer(State_, opDepsOrder, opDeps, hasWorldDeps, &ProcessedHorizontalJoin).Optimize(output, output, ctx);
if (status.Level != TStatus::Ok) {
return status;
}
}
if (!disableOptimizers.contains("MultiHorizontalJoin")) {
- status = TMultiHorizontalJoinOptimizer(State_, opDeps, hasWorldDeps).Optimize(output, output, ctx);
+ status = TMultiHorizontalJoinOptimizer(State_, opDepsOrder, opDeps, hasWorldDeps).Optimize(output, output, ctx);
if (status.Level != TStatus::Ok) {
return status;
}
}
if (!disableOptimizers.contains("OutHorizontalJoin")) {
- status = TOutHorizontalJoinOptimizer(State_, opDeps, hasWorldDeps).Optimize(output, output, ctx);
+ status = TOutHorizontalJoinOptimizer(State_, opDepsOrder, opDeps, hasWorldDeps).Optimize(output, output, ctx);
if (status.Level != TStatus::Ok) {
return status;
}
@@ -855,12 +866,11 @@ private:
return TStatus::Ok;
}
- TStatus OptimizeUnorderedOuts(TExprNode::TPtr input, TExprNode::TPtr& output, const TOpDeps& opDeps, const TNodeSet& lefts, TExprContext& ctx) {
- TVector<const TOpDeps::value_type*> matchedOps;
- for (auto& x: opDeps) {
- auto writer = x.first;
+ TStatus OptimizeUnorderedOuts(TExprNode::TPtr input, TExprNode::TPtr& output, const std::vector<const TExprNode*>& opDepsOrder, const TOpDeps& opDeps, const TNodeSet& lefts, TExprContext& ctx) {
+ std::vector<const TExprNode*> matchedOps;
+ for (auto writer: opDepsOrder) {
if (!TYtEquiJoin::Match(writer) && !writer->StartsExecution() && (!writer->HasResult() || writer->GetResult().Type() != TExprNode::World)) {
- matchedOps.push_back(&x);
+ matchedOps.push_back(writer);
}
}
@@ -868,15 +878,13 @@ private:
return TStatus::Ok;
}
- Sort(matchedOps, [](const TOpDeps::value_type* m1, const TOpDeps::value_type* m2) { return m1->first->UniqueId() < m2->first->UniqueId(); });
-
TNodeOnNodeOwnedMap replaces;
TNodeOnNodeOwnedMap newOps;
- for (auto x: matchedOps) {
- auto writer = x->first;
+ for (auto writer: matchedOps) {
TDynBitMap orderedOuts;
TDynBitMap unorderedOuts;
- for (auto& item: x->second) {
+ const auto& readers = opDeps.at(writer);
+ for (auto& item: readers) {
auto out = TYtOutput(std::get<2>(item));
if (IsUnorderedOutput(out)) {
unorderedOuts.Set(FromString<size_t>(out.OutIndex().Value()));
@@ -890,7 +898,7 @@ private:
if (newOp) {
newOps[writer] = newOp;
}
- for (auto& item: x->second) {
+ for (auto& item: readers) {
auto out = std::get<2>(item);
replaces[out] = Build<TYtOutput>(ctx, out->Pos())
.Operation(newOp ? newOp : out->ChildPtr(TYtOutput::idx_Operation))