aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-08-02 22:29:30 +0300
committervvvv <vvvv@ydb.tech>2023-08-02 22:29:30 +0300
commit057da129d69d924d8188a8d7d9a155baebe43456 (patch)
tree12daea4ff358b0e95df281635d7cd12b1749411b
parentb1a8535e42e076174d1585dcd2d463748d75a436 (diff)
downloadydb-057da129d69d924d8188a8d7d9a155baebe43456.tar.gz
Handle If over structs in lineage
-rw-r--r--ydb/library/yql/core/services/yql_lineage.cpp219
-rw-r--r--ydb/library/yql/core/yql_opt_utils.cpp20
-rw-r--r--ydb/library/yql/core/yql_opt_utils.h1
3 files changed, 159 insertions, 81 deletions
diff --git a/ydb/library/yql/core/services/yql_lineage.cpp b/ydb/library/yql/core/services/yql_lineage.cpp
index c84271ed69..f8d79d60dd 100644
--- a/ydb/library/yql/core/services/yql_lineage.cpp
+++ b/ydb/library/yql/core/services/yql_lineage.cpp
@@ -131,6 +131,15 @@ private:
struct TFieldsLineage {
TFieldLineageSet Items;
TMaybe<THashMap<TString, TFieldLineageSet>> StructItems;
+
+ void MergeFrom(const TFieldsLineage& from) {
+ Items.insert(from.Items.begin(), from.Items.end());
+ if (StructItems && from.StructItems) {
+ for (const auto& i : *from.StructItems) {
+ (*StructItems)[i.first].insert(i.second.begin(), i.second.end());
+ }
+ }
+ }
};
static TFieldLineageSet ReplaceTransforms(const TFieldLineageSet& src, const TString& newTransforms) {
@@ -215,62 +224,84 @@ private:
}
}
- void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src,
- TFieldLineageSet& dst, const TString& newTransforms = "") {
+ TMaybe<TFieldsLineage> ScanExprLineage(const TExprNode& node, const TExprNode& arg, const TLineage& src, const TNodeMap<bool>& argDeps,
+ TNodeMap<TMaybe<TFieldsLineage>>& visited) {
+ if (&node == &arg) {
+ return Nothing();
+ }
- if (!IsDepended(expr, arg)) {
- return;
+ auto [it, inserted] = visited.emplace(&node, Nothing());
+ if (!inserted) {
+ return it->second;
}
- TParentsMap parentsMap;
- GatherParents(expr, parentsMap);
+ if (auto depIt = argDeps.find(&node); depIt != argDeps.end() && !depIt->second) {
+ return it->second = TFieldsLineage();
+ }
- bool needAllFields = false;
- auto parentsIt = parentsMap.find(&arg);
- if (parentsIt == parentsMap.end()) {
- needAllFields = true;
- } else {
- for (const auto& p : parentsIt->second) {
- if (p->IsCallable("Member")) {
- auto name = TString(p->Tail().Content());
- const auto& f = (*src.Fields).FindPtr(name);
- bool addAll = true;
- if (f->StructItems) {
- auto grandParentsIt = parentsMap.find(p);
- if (grandParentsIt != parentsMap.end()) {
- addAll = false;
- for (const auto& q : grandParentsIt->second) {
- if (q->IsCallable("Member")) {
- const auto& s = f->StructItems->FindPtr(q->Tail().Content());
- for (const auto& i : *s) {
- dst.insert(ReplaceTransforms(i, newTransforms));
- }
- } else {
- addAll = true;
- break;
- }
- }
- }
- }
+ if (node.IsCallable("Member")) {
+ if (&node.Head() == &arg) {
+ return it->second = *(*src.Fields).FindPtr(node.Tail().Content());
+ }
- if (addAll) {
- for (const auto& i: (*src.Fields).FindPtr(name)->Items) {
- dst.insert(ReplaceTransforms(i, newTransforms));
- }
- }
- } else {
- needAllFields = true;
- break;
- }
+ auto inner = ScanExprLineage(node.Head(), arg, src, argDeps, visited);
+ if (!inner || !inner->StructItems) {
+ return Nothing();
+ }
+
+ TFieldsLineage result;
+ result.Items = *(*inner->StructItems).FindPtr(node.Tail().Content());
+ return it->second = result;
+ }
+
+ std::vector<TFieldsLineage> results;
+ bool hasStructItems = true;
+ for (ui32 index = 0; index < node.ChildrenSize(); ++index) {
+ if (index == 0 && node.IsCallable("If")) {
+ continue;
+ }
+
+ auto inner = ScanExprLineage(*node.Child(index), arg, src, argDeps, visited);
+ if (!inner) {
+ return Nothing();
}
+
+ hasStructItems = hasStructItems && inner->StructItems.Defined();
+ results.emplace_back(std::move(*inner));
+ }
+
+ TFieldsLineage result;
+ if (hasStructItems) {
+ result.StructItems.ConstructInPlace();
}
- if (needAllFields) {
+ for (const auto& r : results) {
+ result.MergeFrom(r);
+ }
+
+ return it->second = result;
+ }
+
+ void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src,
+ TFieldLineageSet& dst, const TString& newTransforms = "") {
+
+ TNodeMap<bool> argDeps;
+ if (!MarkDepended(expr, arg, argDeps)) {
+ return;
+ }
+
+ TNodeMap<TMaybe<TFieldsLineage>> visited;
+ auto res = ScanExprLineage(expr, arg, src, argDeps, visited);
+ if (!res) {
for (const auto& f : *src.Fields) {
for (const auto& i: f.second.Items) {
dst.insert(ReplaceTransforms(i, newTransforms));
}
}
+ } else {
+ for (const auto& i: res->Items) {
+ dst.insert(ReplaceTransforms(i, newTransforms));
+ }
}
}
@@ -304,6 +335,75 @@ private:
MergeLineageFromUsedFields(expr, arg, src, dst.Items, newTransforms);
}
+ void FillStructLineage(TLineage& lineage, const TExprNode* value, const TExprNode& arg, const TLineage& innerLineage) {
+ if (value->IsCallable("Member") && &value->Head() == &arg) {
+ TString field(value->Tail().Content());
+ auto f = innerLineage.Fields->FindPtr(field);
+ if (f->StructItems) {
+ for (const auto& x : *f->StructItems) {
+ auto& res = (*lineage.Fields)[x.first];
+ res.Items = x.second;
+ }
+
+ return;
+ }
+
+ // fallback
+ }
+
+ if (value->IsCallable("If")) {
+ TLineage left, right;
+ left.Fields.ConstructInPlace();
+ right.Fields.ConstructInPlace();
+ FillStructLineage(left, value->Child(1), arg, innerLineage);
+ FillStructLineage(right, value->Child(2), arg, innerLineage);
+ for (const auto& f : *left.Fields) {
+ auto& res = (*lineage.Fields)[f.first];
+ res.Items.insert(f.second.Items.begin(), f.second.Items.end());
+ }
+
+ for (const auto& f : *right.Fields) {
+ auto& res = (*lineage.Fields)[f.first];
+ res.Items.insert(f.second.Items.begin(), f.second.Items.end());
+ }
+
+ return;
+ }
+
+ if (value->IsCallable("AsStruct")) {
+ for (const auto& child : value->Children()) {
+ TString field(child->Head().Content());
+ auto& res = (*lineage.Fields)[field];
+ const auto& expr = child->Tail();
+ TString newTransforms;
+ auto root = &expr;
+ while (root->IsCallable("Just")) {
+ root = &root->Head();
+ }
+
+ if (root->IsCallable("Member") && &root->Head() == &arg) {
+ newTransforms = "Copy";
+ }
+
+ MergeLineageFromUsedFields(expr, arg, innerLineage, res, true, newTransforms);
+ }
+
+ return;
+ }
+
+ auto structType = value->GetTypeAnn()->Cast<TStructExprType>();
+ TFieldLineageSet allLineage;
+ for (const auto& f : *innerLineage.Fields) {
+ allLineage.insert(f.second.Items.begin(), f.second.Items.end());
+ }
+
+ for (const auto& i : structType->GetItems()) {
+ TString field(i->GetName());
+ auto& res = (*lineage.Fields)[field];
+ res.Items = allLineage;
+ }
+ }
+
void HandleFlatMap(TLineage& lineage, const TExprNode& node) {
auto innerLineage = *CollectLineage(node.Head());
if (!innerLineage.Fields.Defined()) {
@@ -327,41 +427,12 @@ private:
return;
}
- if (value->IsCallable("Member") && &value->Head() == &arg) {
- TString field(value->Tail().Content());
- auto f = innerLineage.Fields->FindPtr(field);
- if (f->StructItems) {
- lineage.Fields.ConstructInPlace();
- for (const auto& x : *f->StructItems) {
- auto& res = (*lineage.Fields)[x.first];
- res.Items = x.second;
- }
-
- return;
- }
- }
-
- if (!value->IsCallable("AsStruct")) {
+ if (value->GetTypeAnn()->GetKind() != ETypeAnnotationKind::Struct) {
return;
}
lineage.Fields.ConstructInPlace();
- for (const auto& child : value->Children()) {
- TString field(child->Head().Content());
- auto& res = (*lineage.Fields)[field];
- const auto& expr = child->Tail();
- TString newTransforms;
- auto root = &expr;
- while (root->IsCallable("Just")) {
- root = &root->Head();
- }
-
- if (root->IsCallable("Member") && &root->Head() == &arg) {
- newTransforms = "Copy";
- }
-
- MergeLineageFromUsedFields(expr, arg, innerLineage, res, true, newTransforms);
- }
+ FillStructLineage(lineage, value, arg, innerLineage);
}
void HandleAggregate(TLineage& lineage, const TExprNode& node) {
diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp
index 20df4f4058..2dfaf1d446 100644
--- a/ydb/library/yql/core/yql_opt_utils.cpp
+++ b/ydb/library/yql/core/yql_opt_utils.cpp
@@ -435,25 +435,31 @@ template TExprNode::TPtr FilterByFields(TPositionHandle position, const TExprNod
template TExprNode::TPtr FilterByFields(TPositionHandle position, const TExprNode::TPtr& input, const TSet<TString>& subsetFields, TExprContext& ctx, bool singleValue);
-bool IsDependedImpl(const TExprNode* from, const TExprNode* to, TNodeSet& visited) {
+bool IsDependedImpl(const TExprNode* from, const TExprNode* to, TNodeMap<bool>& deps) {
if (from == to)
return true;
- if (!visited.emplace(from).second) {
- return false;
+ auto [it, inserted] = deps.emplace(from, false);
+ if (!inserted) {
+ return it->second;
}
for (const auto& child : from->Children()) {
- if (IsDependedImpl(child.Get(), to, visited))
- return true;
+ if (IsDependedImpl(child.Get(), to, deps)) {
+ return it->second = true;
+ }
}
return false;
}
bool IsDepended(const TExprNode& from, const TExprNode& to) {
- TNodeSet visited;
- return IsDependedImpl(&from, &to, visited);
+ TNodeMap<bool> deps;
+ return IsDependedImpl(&from, &to, deps);
+}
+
+bool MarkDepended(const TExprNode& from, const TExprNode& to, TNodeMap<bool>& deps) {
+ return IsDependedImpl(&from, &to, deps);
}
bool IsEmpty(const TExprNode& node, const TTypeAnnotationContext& typeCtx) {
diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h
index e6ca839e0d..836780fe81 100644
--- a/ydb/library/yql/core/yql_opt_utils.h
+++ b/ydb/library/yql/core/yql_opt_utils.h
@@ -40,6 +40,7 @@ TExprNode::TPtr FilterByFields(TPositionHandle position, const TExprNode::TPtr&
TExprNode::TPtr AddMembersUsedInside(const TExprNode::TPtr& start, const TExprNode& arg, TExprNode::TPtr&& members, const TParentsMap& parentsMap, TExprContext& ctx);
bool IsDepended(const TExprNode& from, const TExprNode& to);
+bool MarkDepended(const TExprNode& from, const TExprNode& to, TNodeMap<bool>& deps);
bool IsEmpty(const TExprNode& node, const TTypeAnnotationContext& typeCtx);
bool IsEmptyContainer(const TExprNode& node);