diff options
author | vvvv <vvvv@ydb.tech> | 2023-06-20 14:14:56 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-06-20 14:14:56 +0300 |
commit | 5e86de883f882a9b9af25a9f211ae75e55db0702 (patch) | |
tree | 1e8d52ba8f0aec805e682b27b2ff95bb7e4e42b2 | |
parent | c110663d4cf5a26ba89c524eab4a44503f177819 (diff) | |
download | ydb-5e86de883f882a9b9af25a9f211ae75e55db0702.tar.gz |
Detect copy transforms during lineage
logos/libs/autodoc/mt сломаны намеренно, после обсуждения с заказчиком
-rw-r--r-- | ydb/library/yql/core/services/yql_lineage.cpp | 72 |
1 files changed, 61 insertions, 11 deletions
diff --git a/ydb/library/yql/core/services/yql_lineage.cpp b/ydb/library/yql/core/services/yql_lineage.cpp index 4088f7c628..af173e74cc 100644 --- a/ydb/library/yql/core/services/yql_lineage.cpp +++ b/ydb/library/yql/core/services/yql_lineage.cpp @@ -101,12 +101,45 @@ private: writer.OnEndMap(); } - using TFieldLineage = std::pair<ui32, TString>; + struct TFieldLineage { + ui32 InputIndex; + TString Field; + TString Transforms; + + struct THash { + std::size_t operator()(const TFieldLineage& x) const noexcept { + return CombineHashes( + CombineHashes(std::hash<ui32>()(x.InputIndex), std::hash<TString>()(x.Field)), + std::hash<TString>()(x.Transforms)); + } + }; + + bool operator==(const TFieldLineage& rhs) const { + return std::tie(InputIndex, Field, Transforms) == std::tie(rhs.InputIndex, rhs.Field, rhs.Transforms); + } + + bool operator<(const TFieldLineage& rhs) const { + return std::tie(InputIndex, Field, Transforms) < std::tie(rhs.InputIndex, rhs.Field, rhs.Transforms); + } + }; + + static TFieldLineage ReplaceTransforms(const TFieldLineage& src, const TString& newTransforms) { + return { src.InputIndex, src.Field, (src.Transforms == "Copy" && newTransforms == "Copy") ? newTransforms : "" }; + } struct TFieldsLineage { - THashSet<TFieldLineage> Items; + THashSet<TFieldLineage, TFieldLineage::THash> Items; }; + static TFieldsLineage ReplaceTransforms(const TFieldsLineage& src, const TString& newTransforms) { + TFieldsLineage ret; + for (const auto& i : src.Items) { + ret.Items.insert(ReplaceTransforms(i, newTransforms)); + } + + return ret; + } + struct TLineage { // null - can't calculcate TMaybe<THashMap<TString, TFieldsLineage>> Fields; @@ -129,7 +162,7 @@ private: TString fieldName(i->GetName()); auto& v = (*lineage.Fields)[fieldName]; for (const auto& r : readIt->second) { - v.Items.insert({ r, fieldName }); + v.Items.insert({ r, fieldName, "Copy" }); } } @@ -194,19 +227,19 @@ private: } void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src, - TFieldsLineage& dst) { + TFieldsLineage& dst, const TString& newTransforms = "") { THashSet<TString> usedFields; auto needAllFields = !GetUsedFields(expr, arg, usedFields); if (needAllFields) { for (const auto& f : *src.Fields) { for (const auto& i: f.second.Items) { - dst.Items.insert(i); + dst.Items.insert(ReplaceTransforms(i, newTransforms)); } } } else { for (const auto& f : usedFields) { for (const auto& i: (*src.Fields).FindPtr(f)->Items) { - dst.Items.insert(i); + dst.Items.insert(ReplaceTransforms(i, newTransforms)); } } } @@ -244,7 +277,17 @@ private: TString field(child->Head().Content()); auto& res = (*lineage.Fields)[field]; const auto& expr = child->Tail(); - MergeLineageFromUsedFields(expr, arg, innerLineage, res); + TString newTransforms; + auto root = &expr; + while (root->IsCallable("Just")) { + root = &root->Head(); + } + + if (root->IsCallable("Member") && &root->Head() == &arg) { + newTransforms = "Copy"; + } + + MergeLineageFromUsedFields(expr, arg, innerLineage, res, newTransforms); } } @@ -273,7 +316,7 @@ private: TFieldsLineage source; if (payload->ChildrenSize() == 3) { // distinct - source = (*innerLineage.Fields)[payload->Child(2)->Content()]; + source = ReplaceTransforms((*innerLineage.Fields)[payload->Child(2)->Content()], ""); } else { if (payload->Child(1)->IsCallable("AggregationTraits")) { // merge all used fields from init/update handlers @@ -432,7 +475,7 @@ private: for (const auto& f : fields) { writer.OnKeyedItem(f); writer.OnBeginList(); - TVector<std::pair<ui32, TString>> items; + TVector<TFieldLineage> items; for (const auto& i : lineage.Fields->FindPtr(f)->Items) { items.push_back(i); } @@ -442,9 +485,16 @@ private: writer.OnListItem(); writer.OnBeginMap(); writer.OnKeyedItem("Input"); - writer.OnInt64Scalar(i.first); + writer.OnInt64Scalar(i.InputIndex); writer.OnKeyedItem("Field"); - writer.OnStringScalar(i.second); + writer.OnStringScalar(i.Field); + writer.OnKeyedItem("Transforms"); + const auto& transforms = i.Transforms; + if (transforms.empty()) { + writer.OnEntity(); + } else { + writer.OnStringScalar(transforms); + } writer.OnEndMap(); } writer.OnEndList(); |