aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-06-20 14:14:56 +0300
committervvvv <vvvv@ydb.tech>2023-06-20 14:14:56 +0300
commit5e86de883f882a9b9af25a9f211ae75e55db0702 (patch)
tree1e8d52ba8f0aec805e682b27b2ff95bb7e4e42b2
parentc110663d4cf5a26ba89c524eab4a44503f177819 (diff)
downloadydb-5e86de883f882a9b9af25a9f211ae75e55db0702.tar.gz
Detect copy transforms during lineage
logos/libs/autodoc/mt сломаны намеренно, после обсуждения с заказчиком
-rw-r--r--ydb/library/yql/core/services/yql_lineage.cpp72
1 files changed, 61 insertions, 11 deletions
diff --git a/ydb/library/yql/core/services/yql_lineage.cpp b/ydb/library/yql/core/services/yql_lineage.cpp
index 4088f7c628..af173e74cc 100644
--- a/ydb/library/yql/core/services/yql_lineage.cpp
+++ b/ydb/library/yql/core/services/yql_lineage.cpp
@@ -101,12 +101,45 @@ private:
writer.OnEndMap();
}
- using TFieldLineage = std::pair<ui32, TString>;
+ struct TFieldLineage {
+ ui32 InputIndex;
+ TString Field;
+ TString Transforms;
+
+ struct THash {
+ std::size_t operator()(const TFieldLineage& x) const noexcept {
+ return CombineHashes(
+ CombineHashes(std::hash<ui32>()(x.InputIndex), std::hash<TString>()(x.Field)),
+ std::hash<TString>()(x.Transforms));
+ }
+ };
+
+ bool operator==(const TFieldLineage& rhs) const {
+ return std::tie(InputIndex, Field, Transforms) == std::tie(rhs.InputIndex, rhs.Field, rhs.Transforms);
+ }
+
+ bool operator<(const TFieldLineage& rhs) const {
+ return std::tie(InputIndex, Field, Transforms) < std::tie(rhs.InputIndex, rhs.Field, rhs.Transforms);
+ }
+ };
+
+ static TFieldLineage ReplaceTransforms(const TFieldLineage& src, const TString& newTransforms) {
+ return { src.InputIndex, src.Field, (src.Transforms == "Copy" && newTransforms == "Copy") ? newTransforms : "" };
+ }
struct TFieldsLineage {
- THashSet<TFieldLineage> Items;
+ THashSet<TFieldLineage, TFieldLineage::THash> Items;
};
+ static TFieldsLineage ReplaceTransforms(const TFieldsLineage& src, const TString& newTransforms) {
+ TFieldsLineage ret;
+ for (const auto& i : src.Items) {
+ ret.Items.insert(ReplaceTransforms(i, newTransforms));
+ }
+
+ return ret;
+ }
+
struct TLineage {
// null - can't calculcate
TMaybe<THashMap<TString, TFieldsLineage>> Fields;
@@ -129,7 +162,7 @@ private:
TString fieldName(i->GetName());
auto& v = (*lineage.Fields)[fieldName];
for (const auto& r : readIt->second) {
- v.Items.insert({ r, fieldName });
+ v.Items.insert({ r, fieldName, "Copy" });
}
}
@@ -194,19 +227,19 @@ private:
}
void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src,
- TFieldsLineage& dst) {
+ TFieldsLineage& dst, const TString& newTransforms = "") {
THashSet<TString> usedFields;
auto needAllFields = !GetUsedFields(expr, arg, usedFields);
if (needAllFields) {
for (const auto& f : *src.Fields) {
for (const auto& i: f.second.Items) {
- dst.Items.insert(i);
+ dst.Items.insert(ReplaceTransforms(i, newTransforms));
}
}
} else {
for (const auto& f : usedFields) {
for (const auto& i: (*src.Fields).FindPtr(f)->Items) {
- dst.Items.insert(i);
+ dst.Items.insert(ReplaceTransforms(i, newTransforms));
}
}
}
@@ -244,7 +277,17 @@ private:
TString field(child->Head().Content());
auto& res = (*lineage.Fields)[field];
const auto& expr = child->Tail();
- MergeLineageFromUsedFields(expr, arg, innerLineage, res);
+ TString newTransforms;
+ auto root = &expr;
+ while (root->IsCallable("Just")) {
+ root = &root->Head();
+ }
+
+ if (root->IsCallable("Member") && &root->Head() == &arg) {
+ newTransforms = "Copy";
+ }
+
+ MergeLineageFromUsedFields(expr, arg, innerLineage, res, newTransforms);
}
}
@@ -273,7 +316,7 @@ private:
TFieldsLineage source;
if (payload->ChildrenSize() == 3) {
// distinct
- source = (*innerLineage.Fields)[payload->Child(2)->Content()];
+ source = ReplaceTransforms((*innerLineage.Fields)[payload->Child(2)->Content()], "");
} else {
if (payload->Child(1)->IsCallable("AggregationTraits")) {
// merge all used fields from init/update handlers
@@ -432,7 +475,7 @@ private:
for (const auto& f : fields) {
writer.OnKeyedItem(f);
writer.OnBeginList();
- TVector<std::pair<ui32, TString>> items;
+ TVector<TFieldLineage> items;
for (const auto& i : lineage.Fields->FindPtr(f)->Items) {
items.push_back(i);
}
@@ -442,9 +485,16 @@ private:
writer.OnListItem();
writer.OnBeginMap();
writer.OnKeyedItem("Input");
- writer.OnInt64Scalar(i.first);
+ writer.OnInt64Scalar(i.InputIndex);
writer.OnKeyedItem("Field");
- writer.OnStringScalar(i.second);
+ writer.OnStringScalar(i.Field);
+ writer.OnKeyedItem("Transforms");
+ const auto& transforms = i.Transforms;
+ if (transforms.empty()) {
+ writer.OnEntity();
+ } else {
+ writer.OnStringScalar(transforms);
+ }
writer.OnEndMap();
}
writer.OnEndList();