diff options
author | vvvv <vvvv@ydb.tech> | 2023-05-10 20:27:18 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-05-10 20:27:18 +0300 |
commit | 753ef423a15108af1961b3f9045752f7bcb44b09 (patch) | |
tree | 694268f05a3a4041ab26b7e577ed406dccfdb5d8 | |
parent | 4b476cc91a385f6a51e0416974bcf1846ba11a82 (diff) | |
download | ydb-753ef423a15108af1961b3f9045752f7bcb44b09.tar.gz |
Initial implementation of lineage scanner (for yqlrun/mrrun/embedded)
Пример запуска
%%
~/repo/arcadia/yql/tools/mrrun$./mrrun --format pretty -s --program - --lineage <<< 'use hahn; insert into `tmp/vvvv/test1` select age, 0 as x, age + region as mixed, StablePickle(TableRow()) as pick from `home/yql/tutorial/users`' 2>/dev/null
%%
результат
https://paste.yandex-team.ru/ef1fe012-33de-4fea-8e3e-a4764b89a0a9/text
13 files changed, 638 insertions, 2 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp index 3e4d699722..5a8916df48 100644 --- a/ydb/library/yql/core/facade/yql_facade.cpp +++ b/ydb/library/yql/core/facade/yql_facade.cpp @@ -567,6 +567,53 @@ TProgram::TFutureStatus TProgram::DiscoverAsync(const TString& username) { }); } +TProgram::TStatus TProgram::Lineage(const TString& username, IOutputStream* traceOut, IOutputStream* exprOut, bool withTypes) { + YQL_PROFILE_FUNC(TRACE); + auto m = &TProgram::LineageAsync; + return SyncExecution(this, m, username, traceOut, exprOut, withTypes); +} + +TProgram::TFutureStatus TProgram::LineageAsync(const TString& username, IOutputStream* traceOut, IOutputStream* exprOut, bool withTypes) { + if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) { + return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error); + } + TypeCtx_->IsReadOnly = true; + + Y_ENSURE(ExprRoot_, "Program not compiled yet"); + + ExprStream_ = exprOut; + Transformer_ = TTransformationPipeline(TypeCtx_) + .AddServiceTransformers() + .AddParametersEvaluation(*FunctionRegistry_) + .AddPreTypeAnnotation() + .AddExpressionEvaluation(*FunctionRegistry_) + .AddIOAnnotation() + .AddTypeAnnotation() + .AddPostTypeAnnotation() + .Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput") + .AddLineageOptimization(LineageStr_) + .Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput") + .Build(); + + TFuture<void> openSession = OpenSession(username); + if (!openSession.Initialized()) + return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error); + + SaveExprRoot(); + + return openSession.Apply([this](const TFuture<void>& f) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId()); + try { + f.GetValue(); + } catch (const std::exception& e) { + YQL_LOG(ERROR) << "OpenSession error: " << e.what(); + ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e)); + return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error); + } + return AsyncTransformWithFallback(false); + }); +} + TProgram::TStatus TProgram::Validate(const TString& username, IOutputStream* exprOut, bool withTypes) { YQL_PROFILE_FUNC(TRACE); auto m = &TProgram::ValidateAsync; @@ -672,9 +719,7 @@ TProgram::TFutureStatus TProgram::OptimizeAsync( .AddPostTypeAnnotation() .Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput") .AddOptimization() - .Add(CreatePlanInfoTransformer(*TypeCtx_), "PlanInfo") .Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput") - .Add(TPlanOutputTransformer::Sync(tracePlan, GetPlanBuilder(), OutputFormat_), "PlanOutput") .Build(); TFuture<void> openSession = OpenSession(username); @@ -767,6 +812,58 @@ TProgram::TFutureStatus TProgram::OptimizeAsyncWithConfig( }); } +TProgram::TStatus TProgram::LineageWithConfig( + const TString& username, const IPipelineConfigurator& pipelineConf) +{ + YQL_PROFILE_FUNC(TRACE); + auto m = &TProgram::LineageAsyncWithConfig; + return SyncExecution(this, m, username, pipelineConf); +} + +TProgram::TFutureStatus TProgram::LineageAsyncWithConfig( + const TString& username, const IPipelineConfigurator& pipelineConf) +{ + if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) { + return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error); + } + TypeCtx_->IsReadOnly = true; + + Y_ENSURE(ExprRoot_, "Program not compiled yet"); + + TTransformationPipeline pipeline(TypeCtx_); + pipelineConf.AfterCreate(&pipeline); + pipeline.AddServiceTransformers(); + pipeline.AddParametersEvaluation(*FunctionRegistry_); + pipeline.AddPreTypeAnnotation(); + pipeline.AddExpressionEvaluation(*FunctionRegistry_); + pipeline.AddIOAnnotation(); + pipeline.AddTypeAnnotation(); + pipeline.AddPostTypeAnnotation(); + pipelineConf.AfterTypeAnnotation(&pipeline); + + pipeline.AddLineageOptimization(LineageStr_); + + Transformer_ = pipeline.Build(); + + TFuture<void> openSession = OpenSession(username); + if (!openSession.Initialized()) + return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error); + + SaveExprRoot(); + + return openSession.Apply([this](const TFuture<void>& f) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId()); + try { + f.GetValue(); + } catch (const std::exception& e) { + YQL_LOG(ERROR) << "OpenSession error: " << e.what(); + ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e)); + return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error); + } + return AsyncTransformWithFallback(false); + }); +} + TProgram::TStatus TProgram::Run( const TString& username, IOutputStream* traceOut, @@ -1252,6 +1349,10 @@ TMaybe<TString> TProgram::GetDiscoveredData() { return out.Str(); } +TMaybe<TString> TProgram::GetLineage() { + return LineageStr_; +} + TProgram::TFutureStatus TProgram::ContinueAsync() { YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId()); return AsyncTransformWithFallback(true); diff --git a/ydb/library/yql/core/facade/yql_facade.h b/ydb/library/yql/core/facade/yql_facade.h index 8a62b6604f..149c49c1b6 100644 --- a/ydb/library/yql/core/facade/yql_facade.h +++ b/ydb/library/yql/core/facade/yql_facade.h @@ -112,6 +112,10 @@ public: TFutureStatus DiscoverAsync(const TString& username); + TStatus Lineage(const TString& username, IOutputStream* traceOut = nullptr, IOutputStream* exprOut = nullptr, bool withTypes = false); + + TFutureStatus LineageAsync(const TString& username, IOutputStream* traceOut = nullptr, IOutputStream* exprOut = nullptr, bool withTypes = false); + TStatus Validate(const TString& username, IOutputStream* exprOut = nullptr, bool withTypes = false); TFutureStatus ValidateAsync(const TString& username, IOutputStream* exprOut = nullptr, bool withTypes = false); @@ -144,6 +148,14 @@ public: IOutputStream* exprOut = nullptr, bool withTypes = false); + TStatus LineageWithConfig( + const TString& username, + const IPipelineConfigurator& pipelineConf); + + TFutureStatus LineageAsyncWithConfig( + const TString& username, + const IPipelineConfigurator& pipelineConf); + TStatus OptimizeWithConfig( const TString& username, const IPipelineConfigurator& pipelineConf); @@ -228,6 +240,8 @@ public: TMaybe<TString> GetDiscoveredData(); + TMaybe<TString> GetLineage(); + TString ResultsAsString() const; void ConfigureYsonResultFormat(NYson::EYsonFormat format); @@ -405,6 +419,7 @@ private: i64 FallbackCounter_ = 0; const EHiddenMode HiddenMode_ = EHiddenMode::Disable; THiddenQueryAborter AbortHidden_ = [](){}; + TMaybe<TString> LineageStr_; }; } // namspace NYql diff --git a/ydb/library/yql/core/services/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/services/CMakeLists.darwin-x86_64.txt index f3d30a1dc4..6d3aac4b30 100644 --- a/ydb/library/yql/core/services/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/core/services/CMakeLists.darwin-x86_64.txt @@ -35,6 +35,7 @@ target_sources(yql-core-services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_expr.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_params.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_out_transformers.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_lineage.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_plan.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_transform_pipeline.cpp ) diff --git a/ydb/library/yql/core/services/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/services/CMakeLists.linux-aarch64.txt index 40977337e7..ea6113708e 100644 --- a/ydb/library/yql/core/services/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/core/services/CMakeLists.linux-aarch64.txt @@ -36,6 +36,7 @@ target_sources(yql-core-services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_expr.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_params.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_out_transformers.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_lineage.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_plan.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_transform_pipeline.cpp ) diff --git a/ydb/library/yql/core/services/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/services/CMakeLists.linux-x86_64.txt index 40977337e7..ea6113708e 100644 --- a/ydb/library/yql/core/services/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/core/services/CMakeLists.linux-x86_64.txt @@ -36,6 +36,7 @@ target_sources(yql-core-services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_expr.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_params.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_out_transformers.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_lineage.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_plan.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_transform_pipeline.cpp ) diff --git a/ydb/library/yql/core/services/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/services/CMakeLists.windows-x86_64.txt index f3d30a1dc4..6d3aac4b30 100644 --- a/ydb/library/yql/core/services/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/core/services/CMakeLists.windows-x86_64.txt @@ -35,6 +35,7 @@ target_sources(yql-core-services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_expr.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_eval_params.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_out_transformers.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_lineage.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_plan.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/core/services/yql_transform_pipeline.cpp ) diff --git a/ydb/library/yql/core/services/yql_lineage.cpp b/ydb/library/yql/core/services/yql_lineage.cpp new file mode 100644 index 0000000000..fa5d1bd881 --- /dev/null +++ b/ydb/library/yql/core/services/yql_lineage.cpp @@ -0,0 +1,474 @@ +#include "yql_lineage.h" +#include <ydb/library/yql/core/yql_type_annotation.h> +#include <ydb/library/yql/core/yql_expr_optimize.h> +#include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/core/yql_join.h> + +#include <util/system/env.h> + +namespace NYql { + +namespace { + +class TLineageScanner { +public: + TLineageScanner(const TExprNode& root, const TTypeAnnotationContext& ctx) + : Root_(root) + , Ctx_(ctx) + {} + + TString Process() { + VisitExpr(Root_, [&](const TExprNode& node) { + for (auto& p : Ctx_.DataSources) { + if (p->IsRead(node)) { + Reads_[&node] = p.Get(); + } + } + + for (auto& p : Ctx_.DataSinks) { + if (p->IsWrite(node)) { + Writes_[&node] = p.Get(); + } + } + + return true; + }); + + TStringStream s; + NYson::TYsonWriter writer(&s, NYson::EYsonFormat::Text); + writer.OnBeginMap(); + writer.OnKeyedItem("Reads"); + writer.OnBeginList(); + for (const auto& r : Reads_) { + TVector<TPinInfo> inputs; + r.second->GetPlanFormatter().GetInputs(*r.first, inputs); + for (const auto& i : inputs) { + auto id = ++NextReadId_; + ReadIds_[r.first].push_back(id); + writer.OnListItem(); + writer.OnBeginMap(); + writer.OnKeyedItem("Id"); + writer.OnInt64Scalar(id); + writer.OnKeyedItem("Name"); + writer.OnStringScalar(i.DisplayName); + writer.OnKeyedItem("Schema"); + WriteSchema(writer, *r.first->GetTypeAnn()->Cast<TTupleExprType>()->GetItems()[1]->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>()); + writer.OnEndMap(); + } + } + + writer.OnEndList(); + writer.OnKeyedItem("Writes"); + writer.OnBeginList(); + for (const auto& w : Writes_) { + auto data = w.first->Child(3); + TVector<TPinInfo> outputs; + w.second->GetPlanFormatter().GetOutputs(*w.first, outputs); + YQL_ENSURE(outputs.size() == 1); + auto id = ++NextWriteId_; + WriteIds_[w.first] = id; + writer.OnListItem(); + writer.OnBeginMap(); + writer.OnKeyedItem("Id"); + writer.OnInt64Scalar(id); + writer.OnKeyedItem("Name"); + writer.OnStringScalar(outputs.front().DisplayName); + writer.OnKeyedItem("Schema"); + WriteSchema(writer, *data->GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>()); + writer.OnKeyedItem("Lineage"); + auto lineage = CollectLineage(*data); + WriteLineage(writer, *lineage); + writer.OnEndMap(); + } + + writer.OnEndList(); + writer.OnEndMap(); + return s.Str(); + } + +private: + void WriteSchema(NYson::TYsonWriter& writer, const TStructExprType& structType) { + writer.OnBeginMap(); + for (const auto& i : structType.GetItems()) { + if (i->GetName().StartsWith("_yql_sys_")) { + continue; + } + + writer.OnKeyedItem(i->GetName()); + writer.OnStringScalar(FormatType(i->GetItemType())); + } + + writer.OnEndMap(); + } + + using TFieldLineage = std::pair<ui32, TString>; + + struct TFieldsLineage { + THashSet<TFieldLineage> Items; + }; + + struct TLineage { + // null - can't calculcate + TMaybe<THashMap<TString, TFieldsLineage>> Fields; + }; + + const TLineage* CollectLineage(const TExprNode& node) { + if (auto it = Lineages_.find(&node); it != Lineages_.end()) { + return &it->second; + } + + auto& lineage = Lineages_[&node]; + if (auto readIt = ReadIds_.find(&node); readIt != ReadIds_.end()) { + lineage.Fields.ConstructInPlace(); + auto type = node.GetTypeAnn()->Cast<TTupleExprType>()->GetItems()[1]->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + for (const auto& i : type->GetItems()) { + if (i->GetName().StartsWith("_yql_sys_")) { + continue; + } + + TString fieldName(i->GetName()); + auto& v = (*lineage.Fields)[fieldName]; + for (const auto& r : readIt->second) { + v.Items.insert({ r, fieldName }); + } + } + + return &lineage; + } + + if (node.IsCallable({"Unordered", "Right!", "Skip", "Take", "Sort"})) { + lineage = *CollectLineage(node.Head()); + return &lineage; + } else if (node.IsCallable("ExtractMembers")) { + HandleExtractMembers(lineage, node); + } else if (node.IsCallable({"FlatMap", "OrderedFlatMap"})) { + HandleFlatMap(lineage, node); + } else if (node.IsCallable("Aggregate")) { + HandleAggregate(lineage, node); + } else if (node.IsCallable("Extend")) { + HandleExtend(lineage, node); + } else if (node.IsCallable({"CalcOverWindow","CalcOverSessionWindow","CalcOverWindowGroup"})) { + HandleWindow(lineage, node); + } else if (node.IsCallable("EquiJoin")) { + HandleEquiJoin(lineage, node); + } + + return &lineage; + } + + void HandleExtractMembers(TLineage& lineage, const TExprNode& node) { + auto innerLineage = *CollectLineage(node.Head()); + if (innerLineage.Fields.Defined()) { + lineage.Fields.ConstructInPlace(); + for (const auto& atom : node.Child(1)->Children()) { + TString fieldName(atom->Content()); + (*lineage.Fields)[fieldName] = (*innerLineage.Fields)[fieldName]; + } + } + } + + // returns false if all fields are used + bool GetUsedFields(const TExprNode& expr, const TExprNode& arg, THashSet<TString>& fields) { + fields.clear(); + if (!IsDepended(expr, arg)) { + return true; + } + + TParentsMap parentsMap; + GatherParents(expr, parentsMap); + + auto parentsIt = parentsMap.find(&arg); + if (parentsIt == parentsMap.end()) { + return false; + } else { + for (const auto& p : parentsIt->second) { + if (p->IsCallable("Member")) { + fields.insert(TString(p->Tail().Content())); + } else { + return false; + } + } + } + + return true; + } + + void MergeLineageFromUsedFields(const TExprNode& expr, const TExprNode& arg, const TLineage& src, + TFieldsLineage& dst) { + THashSet<TString> usedFields; + auto needAllFields = !GetUsedFields(expr, arg, usedFields); + if (needAllFields) { + for (const auto& f : *src.Fields) { + for (const auto& i: f.second.Items) { + dst.Items.insert(i); + } + } + } else { + for (const auto& f : usedFields) { + for (const auto& i: (*src.Fields).FindPtr(f)->Items) { + dst.Items.insert(i); + } + } + } + } + + void HandleFlatMap(TLineage& lineage, const TExprNode& node) { + auto innerLineage = *CollectLineage(node.Head()); + if (!innerLineage.Fields.Defined()) { + return; + } + + const auto& lambda = node.Tail(); + const auto& arg = lambda.Head().Head(); + const auto& body = lambda.Tail(); + const TExprNode* value; + if (body.IsCallable("OptionalIf")) { + value = &body.Tail(); + } else if (body.IsCallable("Just")) { + value = &body.Head(); + } else { + return; + } + + if (value == &arg) { + lineage.Fields = *innerLineage.Fields; + return; + } + + if (!value->IsCallable("AsStruct")) { + return; + } + + lineage.Fields.ConstructInPlace(); + for (const auto& child : value->Children()) { + TString field(child->Head().Content()); + auto& res = (*lineage.Fields)[field]; + const auto& expr = child->Tail(); + MergeLineageFromUsedFields(expr, arg, innerLineage, res); + } + } + + void HandleAggregate(TLineage& lineage, const TExprNode& node) { + auto innerLineage = *CollectLineage(node.Head()); + if (!innerLineage.Fields.Defined()) { + return; + } + + lineage.Fields.ConstructInPlace(); + for (const auto& key : node.Child(1)->Children()) { + TString field(key->Content()); + (*lineage.Fields)[field] = (*innerLineage.Fields)[field]; + } + + for (const auto& payload: node.Child(2)->Children()) { + TVector<TString> fields; + if (payload->Child(0)->IsList()) { + for (const auto& child : payload->Child(0)->Children()) { + fields.push_back(TString(child->Content())); + } + } else { + fields.push_back(TString(payload->Child(0)->Content())); + } + + TFieldsLineage source; + if (payload->ChildrenSize() == 3) { + // distinct + source = (*innerLineage.Fields)[payload->Child(2)->Content()]; + } else { + if (payload->Child(1)->IsCallable("AggregationTraits")) { + // merge all used fields from init/update handlers + auto initHandler = payload->Child(1)->Child(1); + auto updateHandler = payload->Child(1)->Child(2); + MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, source); + MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, source); + } else if (payload->Child(1)->IsCallable("AggApply")) { + auto extractHandler = payload->Child(1)->Child(2); + MergeLineageFromUsedFields(extractHandler->Tail(), extractHandler->Head().Head(), innerLineage, source); + } else { + lineage.Fields.Clear(); + return; + } + } + + for (const auto& field : fields) { + (*lineage.Fields)[field] = source; + } + } + } + + void HandleExtend(TLineage& lineage, const TExprNode& node) { + TVector<TLineage> inners; + for (const auto& child : node.Children()) { + inners.push_back(*CollectLineage(*child)); + if (!inners.back().Fields.Defined()) { + return; + } + } + + if (inners.empty()) { + return; + } + + lineage.Fields.ConstructInPlace(); + for (const auto& x : *inners.front().Fields) { + auto& res = (*lineage.Fields)[x.first]; + for (const auto& i : inners) { + for (const auto& x : (*i.Fields).FindPtr(x.first)->Items) { + res.Items.insert(x); + } + } + } + } + + void HandleWindow(TLineage& lineage, const TExprNode& node) { + auto innerLineage = *CollectLineage(node.Head()); + if (!innerLineage.Fields.Defined()) { + return; + } + + lineage.Fields = *innerLineage.Fields; + TExprNode::TListType frameGroups; + if (node.IsCallable("CalcOverWindowGroup")) { + for (const auto& g : node.Child(1)->Children()) { + frameGroups.push_back(g->Child(2)); + } + } else { + frameGroups.push_back(node.Child(3)); + } + + for (const auto& g : frameGroups) { + for (const auto& f : g->Children()) { + if (!f->IsCallable("WinOnRows")) { + lineage.Fields.Clear(); + return; + } + + for (ui32 i = 1; i < f->ChildrenSize(); ++i) { + const auto& list = f->Child(i); + auto field = list->Head().Content(); + auto& res = (*lineage.Fields)[field]; + if (list->Tail().IsCallable("RowNumber")) { + continue; + } else if (list->Tail().IsCallable({"Lag","Lead","Rank","DenseRank"})) { + const auto& lambda = list->Tail().Child(1); + MergeLineageFromUsedFields(lambda->Tail(), lambda->Head().Head(), innerLineage, res); + } else if (list->Tail().IsCallable("WindowTraits")) { + const auto& initHandler = list->Tail().Child(1); + const auto& updateHandler = list->Tail().Child(2); + MergeLineageFromUsedFields(initHandler->Tail(), initHandler->Head().Head(), innerLineage, res); + MergeLineageFromUsedFields(updateHandler->Tail(), updateHandler->Head().Head(), innerLineage, res); + } else { + lineage.Fields.Clear(); + return; + } + } + } + } + } + + void HandleEquiJoin(TLineage& lineage, const TExprNode& node) { + TVector<TLineage> inners; + THashMap<TStringBuf, ui32> inputLabels; + for (ui32 i = 0; i < node.ChildrenSize() - 2; ++i) { + inners.push_back(*CollectLineage(node.Child(i)->Head())); + if (!inners.back().Fields.Defined()) { + return; + } + + if (node.Child(i)->Tail().IsAtom()) { + inputLabels[node.Child(i)->Tail().Content()] = i; + } else { + for (const auto& label : node.Child(i)->Tail().Children()) { + inputLabels[label->Content()] = i; + } + } + } + + THashMap<TStringBuf, TStringBuf> backRename; + for (auto setting : node.Tail().Children()) { + if (setting->Head().Content() != "rename") { + continue; + } + + if (setting->Child(2)->Content().Empty()) { + continue; + } + + backRename[setting->Child(2)->Content()] = setting->Child(1)->Content(); + } + + lineage.Fields.ConstructInPlace(); + auto structType = node.GetTypeAnn()->Cast<TListExprType>()->GetItemType()->Cast<TStructExprType>(); + for (const auto& field : structType->GetItems()) { + TStringBuf originalName = field->GetName(); + if (auto it = backRename.find(originalName); it != backRename.end()) { + originalName = it->second; + } + + TStringBuf table, column; + SplitTableName(originalName, table, column); + ui32 index = *inputLabels.FindPtr(table); + auto& res = (*lineage.Fields)[field->GetName()]; + for (const auto& i: (*inners[index].Fields).FindPtr(column)->Items) { + res.Items.insert(i); + } + } + } + + void WriteLineage(NYson::TYsonWriter& writer, const TLineage& lineage) { + if (!lineage.Fields.Defined()) { + YQL_ENSURE(!GetEnv("YQL_DETERMINISTIC_MODE"), "Can't calculate lineage"); + writer.OnEntity(); + return; + } + + writer.OnBeginMap(); + TVector<TString> fields; + for (const auto& f : *lineage.Fields) { + fields.push_back(f.first); + } + + Sort(fields); + for (const auto& f : fields) { + writer.OnKeyedItem(f); + writer.OnBeginList(); + TVector<std::pair<ui32, TString>> items; + for (const auto& i : lineage.Fields->FindPtr(f)->Items) { + items.push_back(i); + } + + Sort(items); + for (const auto& i: items) { + writer.OnListItem(); + writer.OnBeginMap(); + writer.OnKeyedItem("Input"); + writer.OnInt64Scalar(i.first); + writer.OnKeyedItem("Field"); + writer.OnStringScalar(i.second); + writer.OnEndMap(); + } + writer.OnEndList(); + } + + writer.OnEndMap(); + } + +private: + const TExprNode& Root_; + const TTypeAnnotationContext& Ctx_; + TNodeMap<IDataProvider*> Reads_, Writes_; + ui32 NextReadId_ = 0; + ui32 NextWriteId_ = 0; + TNodeMap<TVector<ui32>> ReadIds_; + TNodeMap<ui32> WriteIds_; + TNodeMap<TLineage> Lineages_; +}; + +} + +TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx) { + TLineageScanner scanner(root, ctx); + return scanner.Process(); +} + +} diff --git a/ydb/library/yql/core/services/yql_lineage.h b/ydb/library/yql/core/services/yql_lineage.h new file mode 100644 index 0000000000..8bacffa3b9 --- /dev/null +++ b/ydb/library/yql/core/services/yql_lineage.h @@ -0,0 +1,10 @@ +#pragma once +#include <ydb/library/yql/ast/yql_expr.h> + +namespace NYql { + +struct TTypeAnnotationContext; + +TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx); + +} diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.cpp b/ydb/library/yql/core/services/yql_transform_pipeline.cpp index bc87e1ff56..b4b494de29 100644 --- a/ydb/library/yql/core/services/yql_transform_pipeline.cpp +++ b/ydb/library/yql/core/services/yql_transform_pipeline.cpp @@ -1,6 +1,7 @@ #include "yql_transform_pipeline.h" #include "yql_eval_expr.h" #include "yql_eval_params.h" +#include "yql_lineage.h" #include <ydb/library/yql/core/type_ann/type_ann_core.h> #include <ydb/library/yql/core/type_ann/type_ann_expr.h> @@ -179,6 +180,22 @@ TTransformationPipeline& TTransformationPipeline::AddOptimization(bool checkWorl return *this; } +TTransformationPipeline& TTransformationPipeline::AddLineageOptimization(TMaybe<TString>& lineageOut, EYqlIssueCode issueCode) { + AddCommonOptimization(issueCode); + Transformers_.push_back(TTransformStage( + CreateFunctorTransformer( + [typeCtx = TypeAnnotationContext_, &lineageOut](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + Y_UNUSED(ctx); + output = input; + lineageOut = CalculateLineage(*input, *typeCtx); + return IGraphTransformer::TStatus::Ok; + } + ), + "LineageScanner", + issueCode)); + return *this; +} + TTransformationPipeline& TTransformationPipeline::AddCheckExecution(bool checkWorld, EYqlIssueCode issueCode) { Transformers_.push_back(TTransformStage( CreateCheckExecutionTransformer(*TypeAnnotationContext_, checkWorld), diff --git a/ydb/library/yql/core/services/yql_transform_pipeline.h b/ydb/library/yql/core/services/yql_transform_pipeline.h index eb3e035c5c..37b52d5e1f 100644 --- a/ydb/library/yql/core/services/yql_transform_pipeline.h +++ b/ydb/library/yql/core/services/yql_transform_pipeline.h @@ -34,6 +34,7 @@ public: TTransformationPipeline& AddCommonOptimization(EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); TTransformationPipeline& AddFinalCommonOptimization(EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); TTransformationPipeline& AddOptimization(bool checkWorld = true, bool withFinalOptimization = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); + TTransformationPipeline& AddLineageOptimization(TMaybe<TString>& lineageOut, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); TTransformationPipeline& AddCheckExecution(bool checkWorld = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); TTransformationPipeline& AddRun(TOperationProgressWriter writer, EYqlIssueCode issueCode = TIssuesIds::CORE_EXEC); diff --git a/ydb/library/yql/core/yql_data_provider.h b/ydb/library/yql/core/yql_data_provider.h index 8919fc17f5..5455d160de 100644 --- a/ydb/library/yql/core/yql_data_provider.h +++ b/ydb/library/yql/core/yql_data_provider.h @@ -137,6 +137,8 @@ public: // This function is used in core optimizers to check either the node can be used as input multiple times or not virtual bool IsPersistent(const TExprNode& node) = 0; + virtual bool IsRead(const TExprNode& node) = 0; + virtual bool IsWrite(const TExprNode& node) = 0; // Right! or worlds are written to syncList virtual bool CanBuildResult(const TExprNode& node, TSyncMap& syncList) = 0; diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp index f43c44a7cc..3ec10080f6 100644 --- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp +++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.cpp @@ -196,6 +196,16 @@ bool TDataProviderBase::IsPersistent(const TExprNode& node) { return false; } +bool TDataProviderBase::IsRead(const TExprNode& node) { + Y_UNUSED(node); + return false; +} + +bool TDataProviderBase::IsWrite(const TExprNode& node) { + Y_UNUSED(node); + return false; +} + bool TDataProviderBase::CanBuildResult(const TExprNode& node, TSyncMap& syncList) { Y_UNUSED(node); Y_UNUSED(syncList); diff --git a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h index e661c75791..2dce702afb 100644 --- a/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h +++ b/ydb/library/yql/providers/common/provider/yql_data_provider_impl.h @@ -63,6 +63,8 @@ public: IGraphTransformer& GetPhysicalFinalizingTransformer() override; IGraphTransformer& GetLoadTableMetadataTransformer() override; bool IsPersistent(const TExprNode& node) override; + bool IsRead(const TExprNode& node) override; + bool IsWrite(const TExprNode& node) override; bool CanBuildResult(const TExprNode& node, TSyncMap& syncList) override; bool CanPullResult(const TExprNode& node, TSyncMap& syncList, bool& canRef) override; bool GetExecWorld(const TExprNode::TPtr& node, TExprNode::TPtr& root) override; |