aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-05-10 20:27:18 +0300
committervvvv <vvvv@ydb.tech>2023-05-10 20:27:18 +0300
commit753ef423a15108af1961b3f9045752f7bcb44b09 (patch)
tree694268f05a3a4041ab26b7e577ed406dccfdb5d8
parent4b476cc91a385f6a51e0416974bcf1846ba11a82 (diff)
downloadydb-753ef423a15108af1961b3f9045752f7bcb44b09.tar.gz
Initial implementation of lineage scanner (for yqlrun/mrrun/embedded)
Пример запуска %% ~/repo/arcadia/yql/tools/mrrun$./mrrun --format pretty -s --program - --lineage <<< 'use hahn; insert into `tmp/vvvv/test1` select age, 0 as x, age + region as mixed, StablePickle(TableRow()) as pick from `home/yql/tutorial/users`' 2>/dev/null %% результат https://paste.yandex-team.ru/ef1fe012-33de-4fea-8e3e-a4764b89a0a9/text
-rw-r--r--ydb/library/yql/core/facade/yql_facade.cpp105
-rw-r--r--ydb/library/yql/core/facade/yql_facade.h15
-rw-r--r--ydb/library/yql/core/services/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/core/services/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/core/services/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/core/services/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/core/services/yql_lineage.cpp474
-rw-r--r--ydb/library/yql/core/services/yql_lineage.h10
-rw-r--r--ydb/library/yql/core/services/yql_transform_pipeline.cpp17
-rw-r--r--ydb/library/yql/core/services/yql_transform_pipeline.h1
-rw-r--r--ydb/library/yql/core/yql_data_provider.h2
-rw-r--r--ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp10
-rw-r--r--ydb/library/yql/providers/common/provider/yql_data_provider_impl.h2
13 files changed, 638 insertions, 2 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp
index 3e4d699722..5a8916df48 100644
--- a/ydb/library/yql/core/facade/yql_facade.cpp
+++ b/ydb/library/yql/core/facade/yql_facade.cpp
@@ -567,6 +567,53 @@ TProgram::TFutureStatus TProgram::DiscoverAsync(const TString& username) {
});
}
+TProgram::TStatus TProgram::Lineage(const TString& username, IOutputStream* traceOut, IOutputStream* exprOut, bool withTypes) {
+ YQL_PROFILE_FUNC(TRACE);
+ auto m = &TProgram::LineageAsync;
+ return SyncExecution(this, m, username, traceOut, exprOut, withTypes);
+}
+
+TProgram::TFutureStatus TProgram::LineageAsync(const TString& username, IOutputStream* traceOut, IOutputStream* exprOut, bool withTypes) {
+ if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
+ return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
+ }
+ TypeCtx_->IsReadOnly = true;
+
+ Y_ENSURE(ExprRoot_, "Program not compiled yet");
+
+ ExprStream_ = exprOut;
+ Transformer_ = TTransformationPipeline(TypeCtx_)
+ .AddServiceTransformers()
+ .AddParametersEvaluation(*FunctionRegistry_)
+ .AddPreTypeAnnotation()
+ .AddExpressionEvaluation(*FunctionRegistry_)
+ .AddIOAnnotation()
+ .AddTypeAnnotation()
+ .AddPostTypeAnnotation()
+ .Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput")
+ .AddLineageOptimization(LineageStr_)
+ .Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput")
+ .Build();
+
+ TFuture<void> openSession = OpenSession(username);
+ if (!openSession.Initialized())
+ return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
+
+ SaveExprRoot();
+
+ return openSession.Apply([this](const TFuture<void>& f) {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
+ try {
+ f.GetValue();
+ } catch (const std::exception& e) {
+ YQL_LOG(ERROR) << "OpenSession error: " << e.what();
+ ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
+ return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
+ }
+ return AsyncTransformWithFallback(false);
+ });
+}
+
TProgram::TStatus TProgram::Validate(const TString& username, IOutputStream* exprOut, bool withTypes) {
YQL_PROFILE_FUNC(TRACE);
auto m = &TProgram::ValidateAsync;
@@ -672,9 +719,7 @@ TProgram::TFutureStatus TProgram::OptimizeAsync(
.AddPostTypeAnnotation()
.Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput")
.AddOptimization()
- .Add(CreatePlanInfoTransformer(*TypeCtx_), "PlanInfo")
.Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput")
- .Add(TPlanOutputTransformer::Sync(tracePlan, GetPlanBuilder(), OutputFormat_), "PlanOutput")
.Build();
TFuture<void> openSession = OpenSession(username);
@@ -767,6 +812,58 @@ TProgram::TFutureStatus TProgram::OptimizeAsyncWithConfig(
});
}
+TProgram::TStatus TProgram::LineageWithConfig(
+ const TString& username, const IPipelineConfigurator& pipelineConf)
+{
+ YQL_PROFILE_FUNC(TRACE);
+ auto m = &TProgram::LineageAsyncWithConfig;
+ return SyncExecution(this, m, username, pipelineConf);
+}
+
+TProgram::TFutureStatus TProgram::LineageAsyncWithConfig(
+ const TString& username, const IPipelineConfigurator& pipelineConf)
+{
+ if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
+ return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
+ }
+ TypeCtx_->IsReadOnly = true;
+
+ Y_ENSURE(ExprRoot_, "Program not compiled yet");
+
+ TTransformationPipeline pipeline(TypeCtx_);
+ pipelineConf.AfterCreate(&pipeline);
+ pipeline.AddServiceTransformers();
+ pipeline.AddParametersEvaluation(*FunctionRegistry_);
+ pipeline.AddPreTypeAnnotation();
+ pipeline.AddExpressionEvaluation(*FunctionRegistry_);
+ pipeline.AddIOAnnotation();
+ pipeline.AddTypeAnnotation();
+ pipeline.AddPostTypeAnnotation();
+ pipelineConf.AfterTypeAnnotation(&pipeline);
+
+ pipeline.AddLineageOptimization(LineageStr_);
+
+ Transformer_ = pipeline.Build();
+
+ TFuture<void> openSession = OpenSession(username);
+ if (!openSession.Initialized())
+ return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
+
+ SaveExprRoot();
+
+ return openSession.Apply([this](const TFuture<void>& f) {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
+ try {
+ f.GetValue();
+ } catch (const std::exception& e) {
+ YQL_LOG(ERROR) << "OpenSession error: " << e.what();
+ ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
+ return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
+ }
+ return AsyncTransformWithFallback(false);
+ });
+}
+
TProgram::TStatus TProgram::Run(
const TString& username,
IOutputStream* traceOut,
@@ -1252,6 +1349,10 @@ TMaybe<TString> TProgram::GetDiscoveredData() {
return out.Str();
}
+TMaybe<TString> TProgram::GetLineage() {
+ return LineageStr_;
+}
+
TProgram::TFutureStatus TProgram::ContinueAsync() {
YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
return AsyncTransformWithFallback(true);
diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h
index 8a62b6604f..149c49c1b6 100644
--- a/ydb/library/yql/core/facade/yql_facade.h
+++ b/ydb/library/yql/core/facade/yql_facade.h
@@ -112,6 +112,10 @@ public:
TFutureStatus DiscoverAsync(const TString& username);
+ TStatus Lineage(const TString& username, IOutputStream* traceOut = nullptr, IOutputStream* exprOut = nullptr, bool withTypes = false);
+
+ TFutureStatus LineageAsync(const TString& username, IOutputStream* traceOut = nullptr, IOutputStream* exprOut = nullptr, bool withTypes = false);
+
TStatus Validate(const TString& username, IOutputStream* exprOut = nullptr, bool withTypes = false);
TFutureStatus ValidateAsync(const TString& username, IOutputStream* exprOut = nullptr, bool withTypes = false);
@@ -144,6 +148,14 @@ public:
IOutputStream* exprOut = nullptr,
bool withTypes = false);
+ TStatus LineageWithConfig(
+ const TString& username,
+ const IPipelineConfigurator& pipelineConf);
+
+ TFutureStatus LineageAsyncWithConfig(
+ const TString& username,
+ const IPipelineConfigurator& pipelineConf);
+
TStatus OptimizeWithConfig(
const TString& username,
const IPipelineConfigurator& pipelineConf);
@@ -228,6 +240,8 @@ public:
TMaybe<TString> GetDiscoveredData();
+ TMaybe<TString> GetLineage();
+
TString ResultsAsString() const;
void ConfigureYsonResultFormat(NYson::EYsonFormat format);
@@ -405,6 +419,7 @@ private:
i64 FallbackCounter_ = 0;
const EHiddenMode HiddenMode_ = EHiddenMode::Disable;
THiddenQueryAborter AbortHidden_ = [](){};
+ TMaybe<TString> LineageStr_;
};
} // namspace NYql
diff --git a/ydb/library/yql/core/services/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/services/CMakeLists.darwin-x86_64.txt
index f3d30a1dc4..6d3aac4b30 100644
--- a/ydb/library/yql/core/services/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/core/services/CMakeLists.darwin-x86_64.txt
@@ -35,6 +35,7 @@ target_sources(yql-core-services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_expr.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_params.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_out_transformers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_lineage.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_plan.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_transform_pipeline.cpp
)
diff --git a/ydb/library/yql/core/services/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/services/CMakeLists.linux-aarch64.txt
index 40977337e7..ea6113708e 100644
--- a/ydb/library/yql/core/services/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/core/services/CMakeLists.linux-aarch64.txt
@@ -36,6 +36,7 @@ target_sources(yql-core-services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_expr.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_params.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_out_transformers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_lineage.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_plan.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_transform_pipeline.cpp
)
diff --git a/ydb/library/yql/core/services/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/services/CMakeLists.linux-x86_64.txt
index 40977337e7..ea6113708e 100644
--- a/ydb/library/yql/core/services/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/core/services/CMakeLists.linux-x86_64.txt
@@ -36,6 +36,7 @@ target_sources(yql-core-services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_expr.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_params.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_out_transformers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_lineage.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_plan.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_transform_pipeline.cpp
)
diff --git a/ydb/library/yql/core/services/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/services/CMakeLists.windows-x86_64.txt
index f3d30a1dc4..6d3aac4b30 100644
--- a/ydb/library/yql/core/services/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/core/services/CMakeLists.windows-x86_64.txt
@@ -35,6 +35,7 @@ target_sources(yql-core-services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_expr.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_params.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_out_transformers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_lineage.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_plan.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_transform_pipeline.cpp
)
diff --git a/ydb/library/yql/core/services/yql_lineage.cpp b/ydb/library/yql/core/services/yql_lineage.cpp
new file mode 100644
index 0000000000..fa5d1bd881
--- /dev/null
+++ b/ydb/library/yql/core/services/yql_lineage.cpp
@@ -0,0 +1,474 @@
+#include "yql_lineage.h"
+#include <ydb/library/yql/core/yql_type_annotation.h>
+#include <ydb/library/yql/core/yql_expr_optimize.h>
+#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/core/yql_join.h>
+
+#include <util/system/env.h>
+
+namespace NYql {
+
+namespace {
+
+class TLineageScanner {
+public:
+ TLineageScanner(const TExprNode& root, const TTypeAnnotationContext& ctx)
+ : Root_(root)
+ , Ctx_(ctx)
+ {}
+
+ TString Process() {
+ VisitExpr(Root_, [&](const TExprNode& node) {
+ for (auto& p : Ctx_.DataSources) {
+ if (p->IsRead(node)) {
+ Reads_[&node] = p.Get();
+ }
+ }
+
+ for (auto& p : Ctx_.DataSinks) {
+ if (p->IsWrite(node)) {
+ Writes_[&node] = p.Get();
+ }
+ }
+
+ return true;
+ });
+
+ TStringStream s;
+ NYson::TYsonWriter writer(&s, NYson::EYsonFormat::Text);
+ writer.OnBeginMap();
+ writer.OnKeyedItem("Reads");
+ writer.OnBeginList();
+ for (const auto& r : Reads_) {
+ TVector<TPinInfo> inputs;
+ r.second->GetPlanFormatter().GetInputs(*r.first, inputs);
+ for (const auto& i : inputs) {
+ auto id = ++NextReadId_;
+ ReadIds_[r.first].push_back(id);
+ writer.OnListItem();
+ writer.OnBeginMap();
+ writer.OnKeyedItem("Id");
+ writer.OnInt64Scalar(id);
+ writer.OnKeyedItem("Name");
+ writer.OnStringScalar(i.DisplayName);
+ writer.OnKeyedItem("Schema");
+ WriteSchema(writer, *r.first->GetTypeAnn()->Cast<TTupleExprType>()->GetItems()[1]->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>());
+ writer.OnEndMap();
+ }
+ }
+
+ writer.OnEndList();
+ writer.OnKeyedItem("Writes");
+ writer.OnBeginList();
+ for (const auto& w : Writes_) {
+ auto data = w.first->Child(3);
+ TVector<TPinInfo> outputs;
+ w.second->GetPlanFormatter().GetOutputs(*w.first, outputs);
+ YQL_ENSURE(outputs.size() == 1);
+ auto id = ++NextWriteId_;
+ WriteIds_[w.first] = id;
+ writer.OnListItem();
+ writer.OnBeginMap();
+ writer.OnKeyedItem("Id");
+ writer.OnInt64Scalar(id);
+ writer.OnKeyedItem("Name");
+ writer.OnStringScalar(outputs.front().DisplayName);
+ writer.OnKeyedItem("Schema");
+ WriteSchema(writer, *data->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>());
+ writer.OnKeyedItem("Lineage");
+ auto lineage = CollectLineage(*data);
+ WriteLineage(writer, *lineage);
+ writer.OnEndMap();
+ }
+
+ writer.OnEndList();
+ writer.OnEndMap();
+ return s.Str();
+ }
+
+private:
+ void WriteSchema(NYson::TYsonWriter& writer, const TStructExprType& structType) {
+ writer.OnBeginMap();
+ for (const auto& i : structType.GetItems()) {
+ if (i->GetName().StartsWith("_yql_sys_")) {
+ continue;
+ }
+
+ writer.OnKeyedItem(i->GetName());
+ writer.OnStringScalar(FormatType(i->GetItemType()));
+ }
+
+ writer.OnEndMap();
+ }
+
+ using TFieldLineage = std::pair<ui32, TString>;
+
+ struct TFieldsLineage {
+ THashSet<TFieldLineage> Items;
+ };
+
+ struct TLineage {
+ // null - can't calculcate
+ TMaybe<THashMap<TString, TFieldsLineage>> Fields;
+ };
+
+ const TLineage* CollectLineage(const TExprNode& node) {
+ if (auto it = Lineages_.find(&node); it != Lineages_.end()) {
+ return &it->second;
+ }
+
+ auto& lineage = Lineages_[&node];
+ if (auto readIt = ReadIds_.find(&node); readIt != ReadIds_.end()) {
+ lineage.Fields.ConstructInPlace();
+ auto type = node.GetTypeAnn()->Cast<TTupleExprType>()->GetItems()[1]->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
+ for (const auto& i : type->GetItems()) {
+ if (i->GetName().StartsWith("_yql_sys_")) {
+ continue;
+ }
+
+ TString fieldName(i->GetName());
+ auto& v = (*lineage.Fields)[fieldName];
+ for (const auto& r : readIt->second) {
+ v.Items.insert({ r, fieldName });
+ }
+ }
+
+ return &lineage;
+ }
+
+ if (node.IsCallable({"Unordered", "Right!", "Skip", "Take", "Sort"})) {
+ lineage = *CollectLineage(node.Head());
+ return &lineage;
+ } else if (node.IsCallable("ExtractMembers")) {
+ HandleExtractMembers(lineage, node);
+ } else if (node.IsCallable({"FlatMap", "OrderedFlatMap"})) {
+ HandleFlatMap(lineage, node);
+ } else if (node.IsCallable("Aggregate")) {
+ HandleAggregate(lineage, node);
+ } else if (node.IsCallable("Extend")) {
+ HandleExtend(lineage, node);
+ } else if (node.IsCallable({"CalcOverWindow","CalcOverSessionWindow","CalcOverWindowGroup"})) {
+ HandleWindow(lineage, node);
+ } else if (node.IsCallable("EquiJoin")) {
+ HandleEquiJoin(lineage, node);
+ }
+
+ return &lineage;
+ }
+
+ void HandleExtractMembers(TLineage& lineage, const TExprNode& node) {
+ auto innerLineage = *CollectLineage(node.Head());
+ if (innerLineage.Fields.Defined()) {
+ lineage.Fields.ConstructInPlace();
+ for (const auto& atom : node.Child(1)->Children()) {
+ TString fieldName(atom->Content());
+ (*lineage.Fields)[fieldName] = (*innerLineage.Fields)[fieldName];
+ }
+ }
+ }
+
+ // returns false if all fields are used
+ bool GetUsedFields(const TExprNode& expr, const TExprNode& arg, THashSet<TString>& fields) {
+ fields.clear();
+ if (!IsDepended(expr, arg)) {
+ return true;
+ }
+
+ TParentsMap parentsMap;
+ GatherParents(expr, parentsMap);
+
+ auto parentsIt = parentsMap.find(&arg);
+ if (parentsIt == parentsMap.end()) {
+ return false;
+ } else {
+ for (const auto& p : parentsIt->second) {
+ if (p->IsCallable("Member")) {
+ fields.insert(TString(p->Tail().Content()));
+ } else {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src,
+ TFieldsLineage& dst) {
+ THashSet<TString> usedFields;
+ auto needAllFields = !GetUsedFields(expr, arg, usedFields);
+ if (needAllFields) {
+ for (const auto& f : *src.Fields) {
+ for (const auto& i: f.second.Items) {
+ dst.Items.insert(i);
+ }
+ }
+ } else {
+ for (const auto& f : usedFields) {
+ for (const auto& i: (*src.Fields).FindPtr(f)->Items) {
+ dst.Items.insert(i);
+ }
+ }
+ }
+ }
+
+ void HandleFlatMap(TLineage& lineage, const TExprNode& node) {
+ auto innerLineage = *CollectLineage(node.Head());
+ if (!innerLineage.Fields.Defined()) {
+ return;
+ }
+
+ const auto& lambda = node.Tail();
+ const auto& arg = lambda.Head().Head();
+ const auto& body = lambda.Tail();
+ const TExprNode* value;
+ if (body.IsCallable("OptionalIf")) {
+ value = &body.Tail();
+ } else if (body.IsCallable("Just")) {
+ value = &body.Head();
+ } else {
+ return;
+ }
+
+ if (value == &arg) {
+ lineage.Fields = *innerLineage.Fields;
+ return;
+ }
+
+ if (!value->IsCallable("AsStruct")) {
+ return;
+ }
+
+ lineage.Fields.ConstructInPlace();
+ for (const auto& child : value->Children()) {
+ TString field(child->Head().Content());
+ auto& res = (*lineage.Fields)[field];
+ const auto& expr = child->Tail();
+ MergeLineageFromUsedFields(expr, arg, innerLineage, res);
+ }
+ }
+
+ void HandleAggregate(TLineage& lineage, const TExprNode& node) {
+ auto innerLineage = *CollectLineage(node.Head());
+ if (!innerLineage.Fields.Defined()) {
+ return;
+ }
+
+ lineage.Fields.ConstructInPlace();
+ for (const auto& key : node.Child(1)->Children()) {
+ TString field(key->Content());
+ (*lineage.Fields)[field] = (*innerLineage.Fields)[field];
+ }
+
+ for (const auto& payload: node.Child(2)->Children()) {
+ TVector<TString> fields;
+ if (payload->Child(0)->IsList()) {
+ for (const auto& child : payload->Child(0)->Children()) {
+ fields.push_back(TString(child->Content()));
+ }
+ } else {
+ fields.push_back(TString(payload->Child(0)->Content()));
+ }
+
+ TFieldsLineage source;
+ if (payload->ChildrenSize() == 3) {
+ // distinct
+ source = (*innerLineage.Fields)[payload->Child(2)->Content()];
+ } else {
+ if (payload->Child(1)->IsCallable("AggregationTraits")) {
+ // merge all used fields from init/update handlers
+ auto initHandler = payload->Child(1)->Child(1);
+ auto updateHandler = payload->Child(1)->Child(2);
+ MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, source);
+ MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, source);
+ } else if (payload->Child(1)->IsCallable("AggApply")) {
+ auto extractHandler = payload->Child(1)->Child(2);
+ MergeLineageFromUsedFields(extractHandler->Tail(), extractHandler->Head().Head(), innerLineage, source);
+ } else {
+ lineage.Fields.Clear();
+ return;
+ }
+ }
+
+ for (const auto& field : fields) {
+ (*lineage.Fields)[field] = source;
+ }
+ }
+ }
+
+ void HandleExtend(TLineage& lineage, const TExprNode& node) {
+ TVector<TLineage> inners;
+ for (const auto& child : node.Children()) {
+ inners.push_back(*CollectLineage(*child));
+ if (!inners.back().Fields.Defined()) {
+ return;
+ }
+ }
+
+ if (inners.empty()) {
+ return;
+ }
+
+ lineage.Fields.ConstructInPlace();
+ for (const auto& x : *inners.front().Fields) {
+ auto& res = (*lineage.Fields)[x.first];
+ for (const auto& i : inners) {
+ for (const auto& x : (*i.Fields).FindPtr(x.first)->Items) {
+ res.Items.insert(x);
+ }
+ }
+ }
+ }
+
+ void HandleWindow(TLineage& lineage, const TExprNode& node) {
+ auto innerLineage = *CollectLineage(node.Head());
+ if (!innerLineage.Fields.Defined()) {
+ return;
+ }
+
+ lineage.Fields = *innerLineage.Fields;
+ TExprNode::TListType frameGroups;
+ if (node.IsCallable("CalcOverWindowGroup")) {
+ for (const auto& g : node.Child(1)->Children()) {
+ frameGroups.push_back(g->Child(2));
+ }
+ } else {
+ frameGroups.push_back(node.Child(3));
+ }
+
+ for (const auto& g : frameGroups) {
+ for (const auto& f : g->Children()) {
+ if (!f->IsCallable("WinOnRows")) {
+ lineage.Fields.Clear();
+ return;
+ }
+
+ for (ui32 i = 1; i < f->ChildrenSize(); ++i) {
+ const auto& list = f->Child(i);
+ auto field = list->Head().Content();
+ auto& res = (*lineage.Fields)[field];
+ if (list->Tail().IsCallable("RowNumber")) {
+ continue;
+ } else if (list->Tail().IsCallable({"Lag","Lead","Rank","DenseRank"})) {
+ const auto& lambda = list->Tail().Child(1);
+ MergeLineageFromUsedFields(lambda->Tail(), lambda->Head().Head(), innerLineage, res);
+ } else if (list->Tail().IsCallable("WindowTraits")) {
+ const auto& initHandler = list->Tail().Child(1);
+ const auto& updateHandler = list->Tail().Child(2);
+ MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res);
+ MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res);
+ } else {
+ lineage.Fields.Clear();
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ void HandleEquiJoin(TLineage& lineage, const TExprNode& node) {
+ TVector<TLineage> inners;
+ THashMap<TStringBuf, ui32> inputLabels;
+ for (ui32 i = 0; i < node.ChildrenSize() - 2; ++i) {
+ inners.push_back(*CollectLineage(node.Child(i)->Head()));
+ if (!inners.back().Fields.Defined()) {
+ return;
+ }
+
+ if (node.Child(i)->Tail().IsAtom()) {
+ inputLabels[node.Child(i)->Tail().Content()] = i;
+ } else {
+ for (const auto& label : node.Child(i)->Tail().Children()) {
+ inputLabels[label->Content()] = i;
+ }
+ }
+ }
+
+ THashMap<TStringBuf, TStringBuf> backRename;
+ for (auto setting : node.Tail().Children()) {
+ if (setting->Head().Content() != "rename") {
+ continue;
+ }
+
+ if (setting->Child(2)->Content().Empty()) {
+ continue;
+ }
+
+ backRename[setting->Child(2)->Content()] = setting->Child(1)->Content();
+ }
+
+ lineage.Fields.ConstructInPlace();
+ auto structType = node.GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>();
+ for (const auto& field : structType->GetItems()) {
+ TStringBuf originalName = field->GetName();
+ if (auto it = backRename.find(originalName); it != backRename.end()) {
+ originalName = it->second;
+ }
+
+ TStringBuf table, column;
+ SplitTableName(originalName, table, column);
+ ui32 index = *inputLabels.FindPtr(table);
+ auto& res = (*lineage.Fields)[field->GetName()];
+ for (const auto& i: (*inners[index].Fields).FindPtr(column)->Items) {
+ res.Items.insert(i);
+ }
+ }
+ }
+
+ void WriteLineage(NYson::TYsonWriter& writer, const TLineage& lineage) {
+ if (!lineage.Fields.Defined()) {
+ YQL_ENSURE(!GetEnv("YQL_DETERMINISTIC_MODE"), "Can't calculate lineage");
+ writer.OnEntity();
+ return;
+ }
+
+ writer.OnBeginMap();
+ TVector<TString> fields;
+ for (const auto& f : *lineage.Fields) {
+ fields.push_back(f.first);
+ }
+
+ Sort(fields);
+ for (const auto& f : fields) {
+ writer.OnKeyedItem(f);
+ writer.OnBeginList();
+ TVector<std::pair<ui32, TString>> items;
+ for (const auto& i : lineage.Fields->FindPtr(f)->Items) {
+ items.push_back(i);
+ }
+
+ Sort(items);
+ for (const auto& i: items) {
+ writer.OnListItem();
+ writer.OnBeginMap();
+ writer.OnKeyedItem("Input");
+ writer.OnInt64Scalar(i.first);
+ writer.OnKeyedItem("Field");
+ writer.OnStringScalar(i.second);
+ writer.OnEndMap();
+ }
+ writer.OnEndList();
+ }
+
+ writer.OnEndMap();
+ }
+
+private:
+ const TExprNode& Root_;
+ const TTypeAnnotationContext& Ctx_;
+ TNodeMap<IDataProvider*> Reads_, Writes_;
+ ui32 NextReadId_ = 0;
+ ui32 NextWriteId_ = 0;
+ TNodeMap<TVector<ui32>> ReadIds_;
+ TNodeMap<ui32> WriteIds_;
+ TNodeMap<TLineage> Lineages_;
+};
+
+}
+
+TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx) {
+ TLineageScanner scanner(root, ctx);
+ return scanner.Process();
+}
+
+}
diff --git a/ydb/library/yql/core/services/yql_lineage.h b/ydb/library/yql/core/services/yql_lineage.h
new file mode 100644
index 0000000000..8bacffa3b9
--- /dev/null
+++ b/ydb/library/yql/core/services/yql_lineage.h
@@ -0,0 +1,10 @@
+#pragma once
+#include <ydb/library/yql/ast/yql_expr.h>
+
+namespace NYql {
+
+struct TTypeAnnotationContext;
+
+TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx);
+
+}
diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.cpp b/ydb/library/yql/core/services/yql_transform_pipeline.cpp
index bc87e1ff56..b4b494de29 100644
--- a/ydb/library/yql/core/services/yql_transform_pipeline.cpp
+++ b/ydb/library/yql/core/services/yql_transform_pipeline.cpp
@@ -1,6 +1,7 @@
#include "yql_transform_pipeline.h"
#include "yql_eval_expr.h"
#include "yql_eval_params.h"
+#include "yql_lineage.h"
#include <ydb/library/yql/core/type_ann/type_ann_core.h>
#include <ydb/library/yql/core/type_ann/type_ann_expr.h>
@@ -179,6 +180,22 @@ TTransformationPipeline& TTransformationPipeline::AddOptimization(bool checkWorl
return *this;
}
+TTransformationPipeline& TTransformationPipeline::AddLineageOptimization(TMaybe<TString>& lineageOut, EYqlIssueCode issueCode) {
+ AddCommonOptimization(issueCode);
+ Transformers_.push_back(TTransformStage(
+ CreateFunctorTransformer(
+ [typeCtx = TypeAnnotationContext_, &lineageOut](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ Y_UNUSED(ctx);
+ output = input;
+ lineageOut = CalculateLineage(*input, *typeCtx);
+ return IGraphTransformer::TStatus::Ok;
+ }
+ ),
+ "LineageScanner",
+ issueCode));
+ return *this;
+}
+
TTransformationPipeline& TTransformationPipeline::AddCheckExecution(bool checkWorld, EYqlIssueCode issueCode) {
Transformers_.push_back(TTransformStage(
CreateCheckExecutionTransformer(*TypeAnnotationContext_, checkWorld),
diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.h b/ydb/library/yql/core/services/yql_transform_pipeline.h
index eb3e035c5c..37b52d5e1f 100644
--- a/ydb/library/yql/core/services/yql_transform_pipeline.h
+++ b/ydb/library/yql/core/services/yql_transform_pipeline.h
@@ -34,6 +34,7 @@ public:
TTransformationPipeline& AddCommonOptimization(EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
TTransformationPipeline& AddFinalCommonOptimization(EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
TTransformationPipeline& AddOptimization(bool checkWorld = true, bool withFinalOptimization = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
+ TTransformationPipeline& AddLineageOptimization(TMaybe<TString>& lineageOut, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
TTransformationPipeline& AddCheckExecution(bool checkWorld = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
TTransformationPipeline& AddRun(TOperationProgressWriter writer, EYqlIssueCode issueCode = TIssuesIds::CORE_EXEC);
diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h
index 8919fc17f5..5455d160de 100644
--- a/ydb/library/yql/core/yql_data_provider.h
+++ b/ydb/library/yql/core/yql_data_provider.h
@@ -137,6 +137,8 @@ public:
// This function is used in core optimizers to check either the node can be used as input multiple times or not
virtual bool IsPersistent(const TExprNode& node) = 0;
+ virtual bool IsRead(const TExprNode& node) = 0;
+ virtual bool IsWrite(const TExprNode& node) = 0;
// Right! or worlds are written to syncList
virtual bool CanBuildResult(const TExprNode& node, TSyncMap& syncList) = 0;
diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp
index f43c44a7cc..3ec10080f6 100644
--- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp
@@ -196,6 +196,16 @@ bool TDataProviderBase::IsPersistent(const TExprNode& node) {
return false;
}
+bool TDataProviderBase::IsRead(const TExprNode& node) {
+ Y_UNUSED(node);
+ return false;
+}
+
+bool TDataProviderBase::IsWrite(const TExprNode& node) {
+ Y_UNUSED(node);
+ return false;
+}
+
bool TDataProviderBase::CanBuildResult(const TExprNode& node, TSyncMap& syncList) {
Y_UNUSED(node);
Y_UNUSED(syncList);
diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h
index e661c75791..2dce702afb 100644
--- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h
+++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h
@@ -63,6 +63,8 @@ public:
IGraphTransformer& GetPhysicalFinalizingTransformer() override;
IGraphTransformer& GetLoadTableMetadataTransformer() override;
bool IsPersistent(const TExprNode& node) override;
+ bool IsRead(const TExprNode& node) override;
+ bool IsWrite(const TExprNode& node) override;
bool CanBuildResult(const TExprNode& node, TSyncMap& syncList) override;
bool CanPullResult(const TExprNode& node, TSyncMap& syncList, bool& canRef) override;
bool GetExecWorld(const TExprNode::TPtr& node, TExprNode::TPtr& root) override;