diff options
author | YDBot <[email protected]> | 2025-10-23 18:38:13 +0000 |
---|---|---|
committer | YDBot <[email protected]> | 2025-10-23 18:38:13 +0000 |
commit | 564cf7cb2255a107b4f44c18b2a1844041f20b4d (patch) | |
tree | 6a4a60e8bde612dcc3ac7fd93fffbd36351d4e35 /yql/essentials/core | |
parent | 89a17b25091c24744b7ebd0650b5b810457b1145 (diff) | |
parent | a703d86902fd02bd8e373d959b2498c034657449 (diff) |
Diffstat (limited to 'yql/essentials/core')
-rw-r--r-- | yql/essentials/core/facade/yql_facade.cpp | 8 | ||||
-rw-r--r-- | yql/essentials/core/facade/yql_facade.h | 5 | ||||
-rw-r--r-- | yql/essentials/core/issue/protos/issue_id.proto | 1 | ||||
-rw-r--r-- | yql/essentials/core/issue/yql_issue.txt | 4 | ||||
-rw-r--r-- | yql/essentials/core/services/yql_lineage.cpp | 23 | ||||
-rw-r--r-- | yql/essentials/core/services/yql_lineage.h | 3 | ||||
-rw-r--r-- | yql/essentials/core/services/yql_transform_pipeline.cpp | 88 | ||||
-rw-r--r-- | yql/essentials/core/services/yql_transform_pipeline.h | 2 | ||||
-rw-r--r-- | yql/essentials/core/type_ann/type_ann_core.cpp | 171 |
9 files changed, 273 insertions, 32 deletions
diff --git a/yql/essentials/core/facade/yql_facade.cpp b/yql/essentials/core/facade/yql_facade.cpp index dfb44c1af3d..0b6f9eca6b6 100644 --- a/yql/essentials/core/facade/yql_facade.cpp +++ b/yql/essentials/core/facade/yql_facade.cpp @@ -1192,7 +1192,7 @@ TProgram::TFutureStatus TProgram::OptimizeAsync( .AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true) .AddPostTypeAnnotation() .Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput") - .AddOptimization() + .AddOptimizationWithLineage(EnableLineage_) .Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput") .Build(); @@ -1262,7 +1262,7 @@ TProgram::TFutureStatus TProgram::OptimizeAsyncWithConfig( pipeline.AddPostTypeAnnotation(); pipelineConf.AfterTypeAnnotation(&pipeline); - pipeline.AddOptimization(); + pipeline.AddOptimizationWithLineage(EnableLineage_); if (EnableRangeComputeFor_) { pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()), "ExpandRangeComputeFor", TIssuesIds::CORE_EXEC); @@ -1402,7 +1402,7 @@ TProgram::TFutureStatus TProgram::RunAsync( pipeline.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true); pipeline.AddPostTypeAnnotation(); pipeline.Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput"); - pipeline.AddOptimizationWithLineage(); + pipeline.AddOptimizationWithLineage(EnableLineage_); if (EnableRangeComputeFor_) { pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()), "ExpandRangeComputeFor", TIssuesIds::CORE_EXEC); @@ -1480,7 +1480,7 @@ TProgram::TFutureStatus TProgram::RunAsyncWithConfig( pipeline.AddPostTypeAnnotation(); pipelineConf.AfterTypeAnnotation(&pipeline); - pipeline.AddOptimizationWithLineage(); + pipeline.AddOptimizationWithLineage(EnableLineage_); if (EnableRangeComputeFor_) { pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()), "ExpandRangeComputeFor", TIssuesIds::CORE_EXEC); diff --git a/yql/essentials/core/facade/yql_facade.h b/yql/essentials/core/facade/yql_facade.h index 018367a3b37..0f6767562ad 100644 --- a/yql/essentials/core/facade/yql_facade.h +++ b/yql/essentials/core/facade/yql_facade.h @@ -353,6 +353,10 @@ public: TString GetSourceCode() const; + void SetEnableLineage() { + EnableLineage_ = true; + } + private: TProgram( const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, @@ -483,6 +487,7 @@ private: TMaybe<TString> GatewaysForMerge_; TIssues FinalIssues_; TMaybe<TIssue> ParametersIssue_; + bool EnableLineage_ = false; }; void UpdateSqlFlagsFromQContext(const TQContext& qContext, THashSet<TString>& flags, TMaybe<TString> gatewaysPatch = {}); diff --git a/yql/essentials/core/issue/protos/issue_id.proto b/yql/essentials/core/issue/protos/issue_id.proto index 90fe70ce034..b9cdc43c866 100644 --- a/yql/essentials/core/issue/protos/issue_id.proto +++ b/yql/essentials/core/issue/protos/issue_id.proto @@ -41,6 +41,7 @@ message TIssuesIds { CORE_DEPRECATED_LANG_VER = 1113; CORE_UNSUPPORTED_LANG_VER = 1114; CORE_SINGULAR_TYPE_ENSURE = 1115; + CORE_SIMPLE_PG = 1116; // core informational CORE_TOP_UNSUPPORTED_BLOCK_TYPES = 1200; diff --git a/yql/essentials/core/issue/yql_issue.txt b/yql/essentials/core/issue/yql_issue.txt index 948c9aa01b7..ee69f1e4bfb 100644 --- a/yql/essentials/core/issue/yql_issue.txt +++ b/yql/essentials/core/issue/yql_issue.txt @@ -696,6 +696,10 @@ ids { severity: S_WARNING } ids { + code: CORE_SIMPLE_PG + severity: S_WARNING +} +ids { code: YT_SECURE_DATA_IN_COMMON_TMP severity: S_WARNING } diff --git a/yql/essentials/core/services/yql_lineage.cpp b/yql/essentials/core/services/yql_lineage.cpp index 57e3e02192f..35721515add 100644 --- a/yql/essentials/core/services/yql_lineage.cpp +++ b/yql/essentials/core/services/yql_lineage.cpp @@ -5,6 +5,7 @@ #include <yql/essentials/core/yql_opt_utils.h> #include <yql/essentials/core/yql_join.h> +#include <library/cpp/yson/node/node_io.h> #include <util/system/env.h> namespace NYql { @@ -966,4 +967,26 @@ TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ct return scanner.Process(); } +TString NormalizeLineage(const TString& lineageStr) { + THashMap<i64, TString> idToPath; + auto lineageNode = NYT::NodeFromYsonString(lineageStr); + auto& readsSection = lineageNode.AsMap()["Reads"]; + for (auto& readNode : readsSection.AsList()) { + auto& readMap = readNode.AsMap(); + idToPath[readMap["Id"].AsInt64()] = readMap["Name"].AsString(); + readMap["Id"] = readMap["Name"]; + } + auto& writesSection = lineageNode.AsMap()["Writes"]; + for (auto& writeNode : writesSection.AsList()) { + auto& writeMap = writeNode.AsMap(); + writeMap["Id"] = writeMap["Name"]; + for (auto& [fieldName, fieldLineage] : writeMap["Lineage"].AsMap()) { + for (auto& inputField : fieldLineage.AsList()) { + inputField["Input"] = idToPath[inputField["Input"].AsInt64()]; + } + } + } + return NYT::NodeToCanonicalYsonString(lineageNode); +} + } // namespace NYql diff --git a/yql/essentials/core/services/yql_lineage.h b/yql/essentials/core/services/yql_lineage.h index c1a71ae185b..2b66c064a4d 100644 --- a/yql/essentials/core/services/yql_lineage.h +++ b/yql/essentials/core/services/yql_lineage.h @@ -8,4 +8,7 @@ struct TExprContext; TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx, bool standalone); +// Replace input and output table's IDs with pathes for checking lineage equality +TString NormalizeLineage(const TString& lineageStr); + } // namespace NYql diff --git a/yql/essentials/core/services/yql_transform_pipeline.cpp b/yql/essentials/core/services/yql_transform_pipeline.cpp index 1b2df336500..45c0a5ae6a8 100644 --- a/yql/essentials/core/services/yql_transform_pipeline.cpp +++ b/yql/essentials/core/services/yql_transform_pipeline.cpp @@ -21,6 +21,9 @@ namespace NYql { +const TString LineageComponent = "Lineage"; +const TString LineageResultLabel = "LineageResult"; + TTransformationPipeline::TTransformationPipeline( TIntrusivePtr<TTypeAnnotationContext> ctx, TTypeAnnCallableFactory typeAnnCallableFactory) @@ -167,33 +170,66 @@ TTransformationPipeline& TTransformationPipeline::AddFinalCommonOptimization(EYq return *this; } -TTransformationPipeline& TTransformationPipeline::AddOptimizationWithLineage(bool checkWorld, bool withFinalOptimization, EYqlIssueCode issueCode) { +TTransformationPipeline& TTransformationPipeline::AddOptimizationWithLineage(bool enableLineage, bool checkWorld, bool withFinalOptimization, EYqlIssueCode issueCode) { AddCommonOptimization(false, issueCode); - Transformers_.push_back(TTransformStage( - CreateChoiceGraphTransformer( - [&typesCtx = std::as_const(*TypeAnnotationContext_)](const TExprNode::TPtr&, TExprContext&) { - return typesCtx.EnableLineage; - }, - TTransformStage( - CreateFunctorTransformer( - [typeCtx = TypeAnnotationContext_](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { - output = input; - try { - CalculateLineage(*input, *typeCtx, ctx, false); - } catch (const std::exception& e) { - YQL_LOG(ERROR) << "CalculateLineage error: " << e.what(); - typeCtx->CorrectLineage = false; - } - return IGraphTransformer::TStatus::Ok; - }), - "Lineage", - issueCode), - TTransformStage( - new TNullTransformer(), - "SkipLineage", - issueCode)), - "LineageCalculation", - issueCode)); + if (enableLineage) { + Transformers_.push_back(TTransformStage( + CreateChoiceGraphTransformer( + [&typesCtx = std::as_const(*TypeAnnotationContext_)](const TExprNode::TPtr&, TExprContext&) { + return typesCtx.EnableLineage; + }, + TTransformStage( + CreateSinglePassFunctorTransformer( + [typeCtx = TypeAnnotationContext_](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + output = input; + TString calculatedLineage, loadedLineage; + if (typeCtx->QContext && typeCtx->QContext.CanRead()) { + auto loaded = typeCtx->QContext.GetReader()->Get({LineageComponent, LineageResultLabel}).GetValueSync(); + if (loaded.Defined()) { + loadedLineage = loaded->Value; + } else { + YQL_LOG(INFO) << "There is no lineage in QStorage, lineage calculation is skipped in replay mode"; + return IGraphTransformer::TStatus::Ok; + } + } + std::exception_ptr lineageError; + try { + calculatedLineage = CalculateLineage(*input, *typeCtx, ctx, false); + } catch (const std::exception& e) { + YQL_LOG(ERROR) << "Lineage calculation error: " << e.what(); + typeCtx->CorrectLineage = false; + lineageError = std::current_exception(); + } + if (!loadedLineage.empty()) { + // if lineage calculation is failed, but loaded lineage exists, rethrow exception for replay mode + if (lineageError) { + std::rethrow_exception(lineageError); + } + if (NormalizeLineage(calculatedLineage) != NormalizeLineage(loadedLineage)) { + YQL_LOG(INFO) << "Lineage in replay is different:" + << "\nCalculated lineage:\n" + << calculatedLineage + << "\nLoaded lineage:\n" + << loadedLineage; + throw yexception() << "Lineage in replay is different"; + } + YQL_LOG(INFO) << "Lineage replay is the same"; + } + if (typeCtx->QContext && typeCtx->QContext.CanWrite() && typeCtx->CorrectLineage) { + typeCtx->QContext.GetWriter()->Put({LineageComponent, LineageResultLabel}, calculatedLineage).GetValueSync(); + YQL_LOG(INFO) << "Lineage is saved to QStorage"; + } + return IGraphTransformer::TStatus::Ok; + }), + "Lineage", + issueCode), + TTransformStage( + new TNullTransformer(), + "SkipLineage", + issueCode)), + "LineageCalculation", + issueCode)); + } AddProviderOptimization(issueCode); if (withFinalOptimization) { AddFinalCommonOptimization(issueCode); diff --git a/yql/essentials/core/services/yql_transform_pipeline.h b/yql/essentials/core/services/yql_transform_pipeline.h index 4cd60c76079..ca1dbfae83d 100644 --- a/yql/essentials/core/services/yql_transform_pipeline.h +++ b/yql/essentials/core/services/yql_transform_pipeline.h @@ -36,7 +36,7 @@ public: TTransformationPipeline& AddFinalCommonOptimization(EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); TTransformationPipeline& AddOptimization(bool checkWorld = true, bool withFinalOptimization = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); TTransformationPipeline& AddProviderOptimization(EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); - TTransformationPipeline& AddOptimizationWithLineage(bool checkWorld = true, bool withFinalOptimization = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); + TTransformationPipeline& AddOptimizationWithLineage(bool enableLineage, bool checkWorld = true, bool withFinalOptimization = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); TTransformationPipeline& AddLineageOptimization(TMaybe<TString>& lineageOut, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); TTransformationPipeline& AddCheckExecution(bool checkWorld = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION); TTransformationPipeline& AddRun(TOperationProgressWriter writer, EYqlIssueCode issueCode = TIssuesIds::CORE_EXEC); diff --git a/yql/essentials/core/type_ann/type_ann_core.cpp b/yql/essentials/core/type_ann/type_ann_core.cpp index 8320c08b502..cf8f316fe11 100644 --- a/yql/essentials/core/type_ann/type_ann_core.cpp +++ b/yql/essentials/core/type_ann/type_ann_core.cpp @@ -3298,6 +3298,49 @@ namespace NTypeAnnImpl { return IGraphTransformer::TStatus::Ok; } + IGraphTransformer::TStatus SqlConcatWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { + if (!IsBackwardCompatibleFeatureAvailable(ctx.Types.LangVer, MakeLangVersion(2025, 04), ctx.Types.BackportMode)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Concat function is not available before version 2025.04")); + return IGraphTransformer::TStatus::Error; + } + + if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + bool hasOptionals = false; + bool hasBinary = false; + for (ui32 i = 0; i < input->ChildrenSize(); ++i) { + if (IsNull(*input->Child(i))) { + output = input->ChildPtr(i); + return IGraphTransformer::TStatus::Repeat; + } + + bool isOptional; + const TDataExprType* dataType ; + if (!EnsureDataOrOptionalOfData(*input->Child(i), isOptional, dataType, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (dataType->GetSlot() != EDataSlot::String && dataType->GetSlot() != EDataSlot::Utf8) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(i)->Pos()), + TStringBuilder() << "Expected (optional) String or Utf8, but got: " << *input->Child(i)->GetTypeAnn())); + return IGraphTransformer::TStatus::Error; + } + + hasOptionals = hasOptionals || isOptional; + hasBinary = hasBinary || (dataType->GetSlot() == EDataSlot::String); + } + + const TTypeAnnotationNode* retType = ctx.Expr.MakeType<TDataExprType>(hasBinary ? EDataSlot::String : EDataSlot::Utf8); + if (hasOptionals) { + retType = ctx.Expr.MakeType<TOptionalExprType>(retType); + } + + input->SetTypeAnn(retType); + return IGraphTransformer::TStatus::Ok; + } + IGraphTransformer::TStatus SubstringWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { if (!EnsureArgsCount(*input, 3, ctx.Expr)) { return IGraphTransformer::TStatus::Error; @@ -8681,6 +8724,128 @@ template <NKikimr::NUdf::EDataSlot DataSlot> return IGraphTransformer::TStatus::Ok; } + IGraphTransformer::TStatus BuildSimplePgCall(TPositionHandle pos, TStringBuf name, + const TExprNodeList& args, TExprNode::TPtr& output, TExtContext& ctx) { + if (!IsBackwardCompatibleFeatureAvailable(ctx.Types.LangVer, MakeLangVersion(2025, 04), ctx.Types.BackportMode)) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), "SimplePg functions are not available before version 2025.04")); + return IGraphTransformer::TStatus::Error; + } + + if (name == "now") { + if (args.size() != 0) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), "Expected 0 arguments")); + return IGraphTransformer::TStatus::Error; + } + + // clang-format off + output = ctx.Expr.Builder(pos) + .Callable("Apply") + .Callable(0, "Udf") + .Atom(0, "DateTime2.MakeTimestamp") + .Seal() + .Callable(1, "Apply") + .Callable(0, "Udf") + .Atom(0, "DateTime2.ParseIso8601") + .Seal() + .Callable(1, "FromPg") + .Callable(0, "PgCast") + .Callable(0, "PgCall") + .Atom(0, "now") + .List(1) + .Seal() + .Seal() + .Callable(1, "PgType") + .Atom(0, "text") + .Seal() + .Seal() + .Seal() + .Seal() + .Seal() + .Build(); + // clang-format on + return IGraphTransformer::TStatus::Repeat; + } else if (name == "to_date") { + if (args.size() != 2) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), "Expected 2 arguments")); + return IGraphTransformer::TStatus::Error; + } + + // clang-format off + output = ctx.Expr.Builder(pos) + .Callable("SafeCast") + .Callable(0, "FromPg") + .Callable(0, "PgCast") + .Callable(0, "PgCall") + .Atom(0, "to_date") + .List(1) + .Seal() + .Callable(2, "SafeCast") + .Add(0, args[0]) + .Callable(1, "DataType") + .Atom(0, "Utf8") + .Seal() + .Seal() + .Callable(3, "SafeCast") + .Add(0, args[1]) + .Callable(1, "DataType") + .Atom(0, "Utf8") + .Seal() + .Seal() + .Seal() + .Callable(1, "PgType") + .Atom(0, "text") + .Seal() + .Seal() + .Seal() + .Callable(1, "DataType") + .Atom(0, "Date32") + .Seal() + .Seal() + .Build(); + // clang-format on + return IGraphTransformer::TStatus::Repeat; + } else if (name == "round") { + if (args.size() < 1 || args.size() > 2) { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), "Expected 1 or 2 arguments")); + return IGraphTransformer::TStatus::Error; + } + + // clang-format off + output = ctx.Expr.Builder(pos) + .Callable("FromPg") + .Callable(0, "PgCast") + .Callable(0, "PgCall") + .Atom(0,"round") + .List(1) + .Seal() + .Callable(2, "PgCast") + .Add(0, args[0]) + .Callable(1, "PgType") + .Atom(0, "numeric") + .Seal() + .Seal() + .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& { + if (args.size() == 2) { + parent.Add(3, args[1]); + } + + return parent; + }) + .Seal() + .Callable(1, "PgType") + .Atom(0, "float8") + .Seal() + .Seal() + .Seal() + .Build(); + // clang-format on + return IGraphTransformer::TStatus::Repeat; + } else { + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), TStringBuilder() << "Unknown SimplePg function: " << name)); + return IGraphTransformer::TStatus::Error; + } + } + IGraphTransformer::TStatus SqlCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { if (!EnsureMinArgsCount(*input, 2, ctx.Expr)) { return IGraphTransformer::TStatus::Error; @@ -8698,7 +8863,6 @@ template <NKikimr::NUdf::EDataSlot DataSlot> } auto udfName = input->ChildPtr(0); - if (!EnsureTupleMinSize(*input->Child(1), 1, ctx.Expr) || !EnsureTupleMaxSize(*input->Child(1), 2, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -8754,6 +8918,10 @@ template <NKikimr::NUdf::EDataSlot DataSlot> options = input->Child(5); } + if (udfName->Content().StartsWith("SimplePg.")) { + return BuildSimplePgCall(input->Pos(), udfName->Content().substr(9), positionalArgs, output, ctx); + } + TExprNode::TPtr udf = ctx.Expr.Builder(input->Pos()) .Callable("Udf") .Add(0, udfName) @@ -13012,6 +13180,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["WithWorld"] = &WithWorldWrapper; Functions["Concat"] = &ConcatWrapper; Functions["AggrConcat"] = &AggrConcatWrapper; + ExtFunctions["SqlConcat"] = &SqlConcatWrapper; ExtFunctions["Substring"] = &SubstringWrapper; ExtFunctions["Find"] = &FindWrapper; ExtFunctions["RFind"] = &FindWrapper; |