aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2022-10-04 12:30:30 +0300
committerspuchin <spuchin@ydb.tech>2022-10-04 12:30:30 +0300
commit11734939277c0f1cca0aa21c2e5d28dd7a4077c6 (patch)
treef3cedd80bf19dba9f569b1f75fae8ca8e28046ef
parent35bbcdd0f9d21e73beee28cb7fdd49d1109c2734 (diff)
downloadydb-11734939277c0f1cca0aa21c2e5d28dd7a4077c6.tar.gz
Allow multi-consumed stage as stage input, handle lambda args changes in replace target expressions. ()
-rw-r--r--ydb/core/kqp/ut/kqp_ne_ut.cpp29
-rw-r--r--ydb/library/yql/ast/yql_expr.cpp39
-rw-r--r--ydb/library/yql/dq/opt/dq_opt_phy.cpp2
3 files changed, 58 insertions, 12 deletions
diff --git a/ydb/core/kqp/ut/kqp_ne_ut.cpp b/ydb/core/kqp/ut/kqp_ne_ut.cpp
index 498cefb4b88..f05e8ace315 100644
--- a/ydb/core/kqp/ut/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_ne_ut.cpp
@@ -3571,6 +3571,35 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
UNIT_ASSERT(streamLookup.IsDefined());
}
}
+
+ Y_UNIT_TEST(FlatmapLambdaMutiusedConnections) {
+ auto settings = TKikimrSettings()
+ .SetEnablePredicateExtractForDataQueries(false);
+ TKikimrRunner kikimr{settings};
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+ PRAGMA kikimr.UseNewEngine = "true";
+
+ $values = SELECT Value2 AS Value FROM TwoShard;
+
+ $values_filtered = SELECT * FROM $values WHERE Value < 5;
+
+ SELECT Key FROM `/Root/EightShard`
+ WHERE Data IN $values_filtered OR Data = 0
+ ORDER BY Key;
+
+ SELECT * FROM $values
+ ORDER BY Value;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([[[101u]];[[202u]];[[303u]];[[401u]];[[502u]];[[603u]];[[701u]];[[802u]]])",
+ FormatResultSetYson(result.GetResultSet(0)));
+ CompareYson(R"([[[-1]];[[-1]];[[0]];[[0]];[[1]];[[1]]])",
+ FormatResultSetYson(result.GetResultSet(1)));
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/library/yql/ast/yql_expr.cpp b/ydb/library/yql/ast/yql_expr.cpp
index d3c077308f5..793af107734 100644
--- a/ydb/library/yql/ast/yql_expr.cpp
+++ b/ydb/library/yql/ast/yql_expr.cpp
@@ -2249,17 +2249,32 @@ EChangeState GetChanges(TExprNode* start, const TNodeOnNodeOwnedMap& replaces, c
}
template<bool KeepTypeAnns>
-TExprNode::TPtr DoReplace(const TExprNode::TPtr& start, const TNodeOnNodeOwnedMap& replaces, const TNodeMap<TNodeOnNodeOwnedMap>& localReplaces,
- TNodeMap<EChangeState>& changes, TNodeOnNodeOwnedMap& processed, TExprContext& ctx) {
-
+TExprNode::TPtr DoReplace(const TExprNode::TPtr& start, const TNodeOnNodeOwnedMap& replaces,
+ const TNodeOnNodeOwnedMap& argReplaces, const TNodeMap<TNodeOnNodeOwnedMap>& localReplaces,
+ TNodeMap<EChangeState>& changes, TNodeOnNodeOwnedMap& processed, TExprContext& ctx)
+{
auto& target = processed[start.Get()];
if (target) {
return target;
}
+ TMaybe<TExprNode::TPtr> replace;
const auto it = replaces.find(start.Get());
if (it != replaces.end()) {
- return target = it->second ? it->second : start;
+ replace = it->second;
+ }
+ const auto argIt = argReplaces.find(start.Get());
+ if (argIt != argReplaces.end()) {
+ YQL_ENSURE(!replace.Defined());
+ replace = argIt->second;
+ }
+
+ if (replace.Defined()) {
+ if (*replace) {
+ return target = ctx.ReplaceNodes(std::move(*replace), argReplaces);
+ }
+
+ return target = start;
}
if (start->ChildrenSize() != 0) {
@@ -2268,26 +2283,27 @@ TExprNode::TPtr DoReplace(const TExprNode::TPtr& start, const TNodeOnNodeOwnedMa
const bool isChanged = (changeIt->second & EChangeState::Changed) != 0;
if (isChanged) {
if (start->Type() == TExprNode::Lambda) {
- TNodeOnNodeOwnedMap newReplaces = replaces;
+ TNodeOnNodeOwnedMap newArgReplaces = argReplaces;
const auto locIt = localReplaces.find(start.Get());
YQL_ENSURE(locIt != localReplaces.end(), "Missing local changes");
for (auto& r: locIt->second) {
- newReplaces[r.first] = r.second;
+ newArgReplaces[r.first] = r.second;
}
const auto& args = start->Head();
TExprNode::TListType newArgsList;
newArgsList.reserve(args.ChildrenSize());
args.ForEachChild([&](const TExprNode& arg) {
- const auto argIt = newReplaces.find(&arg);
- YQL_ENSURE(argIt != newReplaces.end(), "Missing argument");
+ const auto argIt = newArgReplaces.find(&arg);
+ YQL_ENSURE(argIt != newArgReplaces.end(), "Missing argument");
processed.emplace(&arg, argIt->second);
newArgsList.emplace_back(argIt->second);
});
auto newBody = GetLambdaBody(*start);
std::for_each(newBody.begin(), newBody.end(), [&](TExprNode::TPtr& node) {
- node = DoReplace<KeepTypeAnns>(node, newReplaces, localReplaces, changes, processed, ctx);
+ node = DoReplace<KeepTypeAnns>(node, replaces, newArgReplaces, localReplaces,
+ changes, processed, ctx);
});
auto newArgs = ctx.NewArguments(start->Pos(), std::move(newArgsList));
if constexpr (KeepTypeAnns)
@@ -2301,7 +2317,8 @@ TExprNode::TPtr DoReplace(const TExprNode::TPtr& start, const TNodeOnNodeOwnedMa
TExprNode::TListType newChildren;
newChildren.reserve(start->ChildrenSize());
for (const auto& child : start->Children()) {
- auto newChild = DoReplace<KeepTypeAnns>(child, replaces, localReplaces, changes, processed, ctx);
+ auto newChild = DoReplace<KeepTypeAnns>(child, replaces, argReplaces, localReplaces,
+ changes, processed, ctx);
if (newChild != child)
replaced = true;
@@ -2386,7 +2403,7 @@ TExprNode::TPtr ReplaceNodesImpl(TExprNode::TPtr&& start, const TNodeOnNodeOwned
}
}
- auto ret = DoReplace<KeepTypeAnns>(start, replaces, localReplaces, changes, processed, ctx);
+ auto ret = DoReplace<KeepTypeAnns>(start, replaces, {}, localReplaces, changes, processed, ctx);
if (InternalDebug) {
Cerr << "After\n" << ret->Dump() << "\n";
EnsureNoBadReplaces(*ret, replaces);
diff --git a/ydb/library/yql/dq/opt/dq_opt_phy.cpp b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
index ee6a8108649..a6d1c0885da 100644
--- a/ydb/library/yql/dq/opt/dq_opt_phy.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt_phy.cpp
@@ -216,7 +216,7 @@ TMaybeNode<TDqStage> DqPushFlatMapInnerConnectionsToStageInput(TCoFlatMapBase& f
return {};
}
- if (!IsSingleConsumerConnection(TDqConnection(cn), parentsMap, false)) {
+ if (!IsSingleConsumerConnection(TDqConnection(cn), parentsMap, true)) {
return {};
}