diff options
author | spuchin <spuchin@ydb.tech> | 2022-10-04 12:30:30 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2022-10-04 12:30:30 +0300 |
commit | 11734939277c0f1cca0aa21c2e5d28dd7a4077c6 (patch) | |
tree | f3cedd80bf19dba9f569b1f75fae8ca8e28046ef | |
parent | 35bbcdd0f9d21e73beee28cb7fdd49d1109c2734 (diff) | |
download | ydb-11734939277c0f1cca0aa21c2e5d28dd7a4077c6.tar.gz |
Allow multi-consumed stage as stage input, handle lambda args changes in replace target expressions. ()
-rw-r--r-- | ydb/core/kqp/ut/kqp_ne_ut.cpp | 29 | ||||
-rw-r--r-- | ydb/library/yql/ast/yql_expr.cpp | 39 | ||||
-rw-r--r-- | ydb/library/yql/dq/opt/dq_opt_phy.cpp | 2 |
3 files changed, 58 insertions, 12 deletions
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp index 498cefb4b88..f05e8ace315 100644 --- a/ydb/core/kqp/ut/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp @@ -3571,6 +3571,35 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { UNIT_ASSERT(streamLookup.IsDefined()); } } + + Y_UNIT_TEST(FlatmapLambdaMutiusedConnections) { + auto settings = TKikimrSettings() + .SetEnablePredicateExtractForDataQueries(false); + TKikimrRunner kikimr{settings}; + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + PRAGMA kikimr.UseNewEngine = "true"; + + $values = SELECT Value2 AS Value FROM TwoShard; + + $values_filtered = SELECT * FROM $values WHERE Value < 5; + + SELECT Key FROM `/Root/EightShard` + WHERE Data IN $values_filtered OR Data = 0 + ORDER BY Key; + + SELECT * FROM $values + ORDER BY Value; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[[101u]];[[202u]];[[303u]];[[401u]];[[502u]];[[603u]];[[701u]];[[802u]]])", + FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[[-1]];[[-1]];[[0]];[[0]];[[1]];[[1]]])", + FormatResultSetYson(result.GetResultSet(1))); + } } } // namespace NKikimr::NKqp diff --git a/ydb/library/yql/ast/yql_expr.cpp b/ydb/library/yql/ast/yql_expr.cpp index d3c077308f5..793af107734 100644 --- a/ydb/library/yql/ast/yql_expr.cpp +++ b/ydb/library/yql/ast/yql_expr.cpp @@ -2249,17 +2249,32 @@ EChangeState GetChanges(TExprNode* start, const TNodeOnNodeOwnedMap& replaces, c } template<bool KeepTypeAnns> -TExprNode::TPtr DoReplace(const TExprNode::TPtr& start, const TNodeOnNodeOwnedMap& replaces, const TNodeMap<TNodeOnNodeOwnedMap>& localReplaces, - TNodeMap<EChangeState>& changes, TNodeOnNodeOwnedMap& processed, TExprContext& ctx) { - +TExprNode::TPtr DoReplace(const TExprNode::TPtr& start, const TNodeOnNodeOwnedMap& replaces, + const TNodeOnNodeOwnedMap& argReplaces, const TNodeMap<TNodeOnNodeOwnedMap>& localReplaces, + TNodeMap<EChangeState>& changes, TNodeOnNodeOwnedMap& processed, TExprContext& ctx) +{ auto& target = processed[start.Get()]; if (target) { return target; } + TMaybe<TExprNode::TPtr> replace; const auto it = replaces.find(start.Get()); if (it != replaces.end()) { - return target = it->second ? it->second : start; + replace = it->second; + } + const auto argIt = argReplaces.find(start.Get()); + if (argIt != argReplaces.end()) { + YQL_ENSURE(!replace.Defined()); + replace = argIt->second; + } + + if (replace.Defined()) { + if (*replace) { + return target = ctx.ReplaceNodes(std::move(*replace), argReplaces); + } + + return target = start; } if (start->ChildrenSize() != 0) { @@ -2268,26 +2283,27 @@ TExprNode::TPtr DoReplace(const TExprNode::TPtr& start, const TNodeOnNodeOwnedMa const bool isChanged = (changeIt->second & EChangeState::Changed) != 0; if (isChanged) { if (start->Type() == TExprNode::Lambda) { - TNodeOnNodeOwnedMap newReplaces = replaces; + TNodeOnNodeOwnedMap newArgReplaces = argReplaces; const auto locIt = localReplaces.find(start.Get()); YQL_ENSURE(locIt != localReplaces.end(), "Missing local changes"); for (auto& r: locIt->second) { - newReplaces[r.first] = r.second; + newArgReplaces[r.first] = r.second; } const auto& args = start->Head(); TExprNode::TListType newArgsList; newArgsList.reserve(args.ChildrenSize()); args.ForEachChild([&](const TExprNode& arg) { - const auto argIt = newReplaces.find(&arg); - YQL_ENSURE(argIt != newReplaces.end(), "Missing argument"); + const auto argIt = newArgReplaces.find(&arg); + YQL_ENSURE(argIt != newArgReplaces.end(), "Missing argument"); processed.emplace(&arg, argIt->second); newArgsList.emplace_back(argIt->second); }); auto newBody = GetLambdaBody(*start); std::for_each(newBody.begin(), newBody.end(), [&](TExprNode::TPtr& node) { - node = DoReplace<KeepTypeAnns>(node, newReplaces, localReplaces, changes, processed, ctx); + node = DoReplace<KeepTypeAnns>(node, replaces, newArgReplaces, localReplaces, + changes, processed, ctx); }); auto newArgs = ctx.NewArguments(start->Pos(), std::move(newArgsList)); if constexpr (KeepTypeAnns) @@ -2301,7 +2317,8 @@ TExprNode::TPtr DoReplace(const TExprNode::TPtr& start, const TNodeOnNodeOwnedMa TExprNode::TListType newChildren; newChildren.reserve(start->ChildrenSize()); for (const auto& child : start->Children()) { - auto newChild = DoReplace<KeepTypeAnns>(child, replaces, localReplaces, changes, processed, ctx); + auto newChild = DoReplace<KeepTypeAnns>(child, replaces, argReplaces, localReplaces, + changes, processed, ctx); if (newChild != child) replaced = true; @@ -2386,7 +2403,7 @@ TExprNode::TPtr ReplaceNodesImpl(TExprNode::TPtr&& start, const TNodeOnNodeOwned } } - auto ret = DoReplace<KeepTypeAnns>(start, replaces, localReplaces, changes, processed, ctx); + auto ret = DoReplace<KeepTypeAnns>(start, replaces, {}, localReplaces, changes, processed, ctx); if (InternalDebug) { Cerr << "After\n" << ret->Dump() << "\n"; EnsureNoBadReplaces(*ret, replaces); diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp index ee6a8108649..a6d1c0885da 100644 --- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp @@ -216,7 +216,7 @@ TMaybeNode<TDqStage> DqPushFlatMapInnerConnectionsToStageInput(TCoFlatMapBase& f return {}; } - if (!IsSingleConsumerConnection(TDqConnection(cn), parentsMap, false)) { + if (!IsSingleConsumerConnection(TDqConnection(cn), parentsMap, true)) { return {}; } |