summaryrefslogtreecommitdiffstats
path: root/yql/essentials/core
diff options
context:
space:
mode:
authorYDBot <[email protected]>2025-10-23 18:38:13 +0000
committerYDBot <[email protected]>2025-10-23 18:38:13 +0000
commit564cf7cb2255a107b4f44c18b2a1844041f20b4d (patch)
tree6a4a60e8bde612dcc3ac7fd93fffbd36351d4e35 /yql/essentials/core
parent89a17b25091c24744b7ebd0650b5b810457b1145 (diff)
parenta703d86902fd02bd8e373d959b2498c034657449 (diff)
Merge pull request #27203 from ydb-platform/merge-rightlib-251021-0051HEADmain
Diffstat (limited to 'yql/essentials/core')
-rw-r--r--yql/essentials/core/facade/yql_facade.cpp8
-rw-r--r--yql/essentials/core/facade/yql_facade.h5
-rw-r--r--yql/essentials/core/issue/protos/issue_id.proto1
-rw-r--r--yql/essentials/core/issue/yql_issue.txt4
-rw-r--r--yql/essentials/core/services/yql_lineage.cpp23
-rw-r--r--yql/essentials/core/services/yql_lineage.h3
-rw-r--r--yql/essentials/core/services/yql_transform_pipeline.cpp88
-rw-r--r--yql/essentials/core/services/yql_transform_pipeline.h2
-rw-r--r--yql/essentials/core/type_ann/type_ann_core.cpp171
9 files changed, 273 insertions, 32 deletions
diff --git a/yql/essentials/core/facade/yql_facade.cpp b/yql/essentials/core/facade/yql_facade.cpp
index dfb44c1af3d..0b6f9eca6b6 100644
--- a/yql/essentials/core/facade/yql_facade.cpp
+++ b/yql/essentials/core/facade/yql_facade.cpp
@@ -1192,7 +1192,7 @@ TProgram::TFutureStatus TProgram::OptimizeAsync(
.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true)
.AddPostTypeAnnotation()
.Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput")
- .AddOptimization()
+ .AddOptimizationWithLineage(EnableLineage_)
.Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput")
.Build();
@@ -1262,7 +1262,7 @@ TProgram::TFutureStatus TProgram::OptimizeAsyncWithConfig(
pipeline.AddPostTypeAnnotation();
pipelineConf.AfterTypeAnnotation(&pipeline);
- pipeline.AddOptimization();
+ pipeline.AddOptimizationWithLineage(EnableLineage_);
if (EnableRangeComputeFor_) {
pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()),
"ExpandRangeComputeFor", TIssuesIds::CORE_EXEC);
@@ -1402,7 +1402,7 @@ TProgram::TFutureStatus TProgram::RunAsync(
pipeline.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true);
pipeline.AddPostTypeAnnotation();
pipeline.Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput");
- pipeline.AddOptimizationWithLineage();
+ pipeline.AddOptimizationWithLineage(EnableLineage_);
if (EnableRangeComputeFor_) {
pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()),
"ExpandRangeComputeFor", TIssuesIds::CORE_EXEC);
@@ -1480,7 +1480,7 @@ TProgram::TFutureStatus TProgram::RunAsyncWithConfig(
pipeline.AddPostTypeAnnotation();
pipelineConf.AfterTypeAnnotation(&pipeline);
- pipeline.AddOptimizationWithLineage();
+ pipeline.AddOptimizationWithLineage(EnableLineage_);
if (EnableRangeComputeFor_) {
pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()),
"ExpandRangeComputeFor", TIssuesIds::CORE_EXEC);
diff --git a/yql/essentials/core/facade/yql_facade.h b/yql/essentials/core/facade/yql_facade.h
index 018367a3b37..0f6767562ad 100644
--- a/yql/essentials/core/facade/yql_facade.h
+++ b/yql/essentials/core/facade/yql_facade.h
@@ -353,6 +353,10 @@ public:
TString GetSourceCode() const;
+ void SetEnableLineage() {
+ EnableLineage_ = true;
+ }
+
private:
TProgram(
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
@@ -483,6 +487,7 @@ private:
TMaybe<TString> GatewaysForMerge_;
TIssues FinalIssues_;
TMaybe<TIssue> ParametersIssue_;
+ bool EnableLineage_ = false;
};
void UpdateSqlFlagsFromQContext(const TQContext& qContext, THashSet<TString>& flags, TMaybe<TString> gatewaysPatch = {});
diff --git a/yql/essentials/core/issue/protos/issue_id.proto b/yql/essentials/core/issue/protos/issue_id.proto
index 90fe70ce034..b9cdc43c866 100644
--- a/yql/essentials/core/issue/protos/issue_id.proto
+++ b/yql/essentials/core/issue/protos/issue_id.proto
@@ -41,6 +41,7 @@ message TIssuesIds {
CORE_DEPRECATED_LANG_VER = 1113;
CORE_UNSUPPORTED_LANG_VER = 1114;
CORE_SINGULAR_TYPE_ENSURE = 1115;
+ CORE_SIMPLE_PG = 1116;
// core informational
CORE_TOP_UNSUPPORTED_BLOCK_TYPES = 1200;
diff --git a/yql/essentials/core/issue/yql_issue.txt b/yql/essentials/core/issue/yql_issue.txt
index 948c9aa01b7..ee69f1e4bfb 100644
--- a/yql/essentials/core/issue/yql_issue.txt
+++ b/yql/essentials/core/issue/yql_issue.txt
@@ -696,6 +696,10 @@ ids {
severity: S_WARNING
}
ids {
+ code: CORE_SIMPLE_PG
+ severity: S_WARNING
+}
+ids {
code: YT_SECURE_DATA_IN_COMMON_TMP
severity: S_WARNING
}
diff --git a/yql/essentials/core/services/yql_lineage.cpp b/yql/essentials/core/services/yql_lineage.cpp
index 57e3e02192f..35721515add 100644
--- a/yql/essentials/core/services/yql_lineage.cpp
+++ b/yql/essentials/core/services/yql_lineage.cpp
@@ -5,6 +5,7 @@
#include <yql/essentials/core/yql_opt_utils.h>
#include <yql/essentials/core/yql_join.h>
+#include <library/cpp/yson/node/node_io.h>
#include <util/system/env.h>
namespace NYql {
@@ -966,4 +967,26 @@ TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ct
return scanner.Process();
}
+TString NormalizeLineage(const TString& lineageStr) {
+ THashMap<i64, TString> idToPath;
+ auto lineageNode = NYT::NodeFromYsonString(lineageStr);
+ auto& readsSection = lineageNode.AsMap()["Reads"];
+ for (auto& readNode : readsSection.AsList()) {
+ auto& readMap = readNode.AsMap();
+ idToPath[readMap["Id"].AsInt64()] = readMap["Name"].AsString();
+ readMap["Id"] = readMap["Name"];
+ }
+ auto& writesSection = lineageNode.AsMap()["Writes"];
+ for (auto& writeNode : writesSection.AsList()) {
+ auto& writeMap = writeNode.AsMap();
+ writeMap["Id"] = writeMap["Name"];
+ for (auto& [fieldName, fieldLineage] : writeMap["Lineage"].AsMap()) {
+ for (auto& inputField : fieldLineage.AsList()) {
+ inputField["Input"] = idToPath[inputField["Input"].AsInt64()];
+ }
+ }
+ }
+ return NYT::NodeToCanonicalYsonString(lineageNode);
+}
+
} // namespace NYql
diff --git a/yql/essentials/core/services/yql_lineage.h b/yql/essentials/core/services/yql_lineage.h
index c1a71ae185b..2b66c064a4d 100644
--- a/yql/essentials/core/services/yql_lineage.h
+++ b/yql/essentials/core/services/yql_lineage.h
@@ -8,4 +8,7 @@ struct TExprContext;
TString CalculateLineage(const TExprNode& root, const TTypeAnnotationContext& ctx, TExprContext& exprCtx, bool standalone);
+// Replace input and output table's IDs with pathes for checking lineage equality
+TString NormalizeLineage(const TString& lineageStr);
+
} // namespace NYql
diff --git a/yql/essentials/core/services/yql_transform_pipeline.cpp b/yql/essentials/core/services/yql_transform_pipeline.cpp
index 1b2df336500..45c0a5ae6a8 100644
--- a/yql/essentials/core/services/yql_transform_pipeline.cpp
+++ b/yql/essentials/core/services/yql_transform_pipeline.cpp
@@ -21,6 +21,9 @@
namespace NYql {
+const TString LineageComponent = "Lineage";
+const TString LineageResultLabel = "LineageResult";
+
TTransformationPipeline::TTransformationPipeline(
TIntrusivePtr<TTypeAnnotationContext> ctx,
TTypeAnnCallableFactory typeAnnCallableFactory)
@@ -167,33 +170,66 @@ TTransformationPipeline& TTransformationPipeline::AddFinalCommonOptimization(EYq
return *this;
}
-TTransformationPipeline& TTransformationPipeline::AddOptimizationWithLineage(bool checkWorld, bool withFinalOptimization, EYqlIssueCode issueCode) {
+TTransformationPipeline& TTransformationPipeline::AddOptimizationWithLineage(bool enableLineage, bool checkWorld, bool withFinalOptimization, EYqlIssueCode issueCode) {
AddCommonOptimization(false, issueCode);
- Transformers_.push_back(TTransformStage(
- CreateChoiceGraphTransformer(
- [&typesCtx = std::as_const(*TypeAnnotationContext_)](const TExprNode::TPtr&, TExprContext&) {
- return typesCtx.EnableLineage;
- },
- TTransformStage(
- CreateFunctorTransformer(
- [typeCtx = TypeAnnotationContext_](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
- output = input;
- try {
- CalculateLineage(*input, *typeCtx, ctx, false);
- } catch (const std::exception& e) {
- YQL_LOG(ERROR) << "CalculateLineage error: " << e.what();
- typeCtx->CorrectLineage = false;
- }
- return IGraphTransformer::TStatus::Ok;
- }),
- "Lineage",
- issueCode),
- TTransformStage(
- new TNullTransformer(),
- "SkipLineage",
- issueCode)),
- "LineageCalculation",
- issueCode));
+ if (enableLineage) {
+ Transformers_.push_back(TTransformStage(
+ CreateChoiceGraphTransformer(
+ [&typesCtx = std::as_const(*TypeAnnotationContext_)](const TExprNode::TPtr&, TExprContext&) {
+ return typesCtx.EnableLineage;
+ },
+ TTransformStage(
+ CreateSinglePassFunctorTransformer(
+ [typeCtx = TypeAnnotationContext_](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ output = input;
+ TString calculatedLineage, loadedLineage;
+ if (typeCtx->QContext && typeCtx->QContext.CanRead()) {
+ auto loaded = typeCtx->QContext.GetReader()->Get({LineageComponent, LineageResultLabel}).GetValueSync();
+ if (loaded.Defined()) {
+ loadedLineage = loaded->Value;
+ } else {
+ YQL_LOG(INFO) << "There is no lineage in QStorage, lineage calculation is skipped in replay mode";
+ return IGraphTransformer::TStatus::Ok;
+ }
+ }
+ std::exception_ptr lineageError;
+ try {
+ calculatedLineage = CalculateLineage(*input, *typeCtx, ctx, false);
+ } catch (const std::exception& e) {
+ YQL_LOG(ERROR) << "Lineage calculation error: " << e.what();
+ typeCtx->CorrectLineage = false;
+ lineageError = std::current_exception();
+ }
+ if (!loadedLineage.empty()) {
+ // if lineage calculation is failed, but loaded lineage exists, rethrow exception for replay mode
+ if (lineageError) {
+ std::rethrow_exception(lineageError);
+ }
+ if (NormalizeLineage(calculatedLineage) != NormalizeLineage(loadedLineage)) {
+ YQL_LOG(INFO) << "Lineage in replay is different:"
+ << "\nCalculated lineage:\n"
+ << calculatedLineage
+ << "\nLoaded lineage:\n"
+ << loadedLineage;
+ throw yexception() << "Lineage in replay is different";
+ }
+ YQL_LOG(INFO) << "Lineage replay is the same";
+ }
+ if (typeCtx->QContext && typeCtx->QContext.CanWrite() && typeCtx->CorrectLineage) {
+ typeCtx->QContext.GetWriter()->Put({LineageComponent, LineageResultLabel}, calculatedLineage).GetValueSync();
+ YQL_LOG(INFO) << "Lineage is saved to QStorage";
+ }
+ return IGraphTransformer::TStatus::Ok;
+ }),
+ "Lineage",
+ issueCode),
+ TTransformStage(
+ new TNullTransformer(),
+ "SkipLineage",
+ issueCode)),
+ "LineageCalculation",
+ issueCode));
+ }
AddProviderOptimization(issueCode);
if (withFinalOptimization) {
AddFinalCommonOptimization(issueCode);
diff --git a/yql/essentials/core/services/yql_transform_pipeline.h b/yql/essentials/core/services/yql_transform_pipeline.h
index 4cd60c76079..ca1dbfae83d 100644
--- a/yql/essentials/core/services/yql_transform_pipeline.h
+++ b/yql/essentials/core/services/yql_transform_pipeline.h
@@ -36,7 +36,7 @@ public:
TTransformationPipeline& AddFinalCommonOptimization(EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
TTransformationPipeline& AddOptimization(bool checkWorld = true, bool withFinalOptimization = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
TTransformationPipeline& AddProviderOptimization(EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
- TTransformationPipeline& AddOptimizationWithLineage(bool checkWorld = true, bool withFinalOptimization = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
+ TTransformationPipeline& AddOptimizationWithLineage(bool enableLineage, bool checkWorld = true, bool withFinalOptimization = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
TTransformationPipeline& AddLineageOptimization(TMaybe<TString>& lineageOut, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
TTransformationPipeline& AddCheckExecution(bool checkWorld = true, EYqlIssueCode issueCode = TIssuesIds::CORE_OPTIMIZATION);
TTransformationPipeline& AddRun(TOperationProgressWriter writer, EYqlIssueCode issueCode = TIssuesIds::CORE_EXEC);
diff --git a/yql/essentials/core/type_ann/type_ann_core.cpp b/yql/essentials/core/type_ann/type_ann_core.cpp
index 8320c08b502..cf8f316fe11 100644
--- a/yql/essentials/core/type_ann/type_ann_core.cpp
+++ b/yql/essentials/core/type_ann/type_ann_core.cpp
@@ -3298,6 +3298,49 @@ namespace NTypeAnnImpl {
return IGraphTransformer::TStatus::Ok;
}
+ IGraphTransformer::TStatus SqlConcatWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
+ if (!IsBackwardCompatibleFeatureAvailable(ctx.Types.LangVer, MakeLangVersion(2025, 04), ctx.Types.BackportMode)) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), "Concat function is not available before version 2025.04"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (!EnsureMinArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ bool hasOptionals = false;
+ bool hasBinary = false;
+ for (ui32 i = 0; i < input->ChildrenSize(); ++i) {
+ if (IsNull(*input->Child(i))) {
+ output = input->ChildPtr(i);
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ bool isOptional;
+ const TDataExprType* dataType ;
+ if (!EnsureDataOrOptionalOfData(*input->Child(i), isOptional, dataType, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (dataType->GetSlot() != EDataSlot::String && dataType->GetSlot() != EDataSlot::Utf8) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Child(i)->Pos()),
+ TStringBuilder() << "Expected (optional) String or Utf8, but got: " << *input->Child(i)->GetTypeAnn()));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ hasOptionals = hasOptionals || isOptional;
+ hasBinary = hasBinary || (dataType->GetSlot() == EDataSlot::String);
+ }
+
+ const TTypeAnnotationNode* retType = ctx.Expr.MakeType<TDataExprType>(hasBinary ? EDataSlot::String : EDataSlot::Utf8);
+ if (hasOptionals) {
+ retType = ctx.Expr.MakeType<TOptionalExprType>(retType);
+ }
+
+ input->SetTypeAnn(retType);
+ return IGraphTransformer::TStatus::Ok;
+ }
+
IGraphTransformer::TStatus SubstringWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
if (!EnsureArgsCount(*input, 3, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
@@ -8681,6 +8724,128 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
return IGraphTransformer::TStatus::Ok;
}
+ IGraphTransformer::TStatus BuildSimplePgCall(TPositionHandle pos, TStringBuf name,
+ const TExprNodeList& args, TExprNode::TPtr& output, TExtContext& ctx) {
+ if (!IsBackwardCompatibleFeatureAvailable(ctx.Types.LangVer, MakeLangVersion(2025, 04), ctx.Types.BackportMode)) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), "SimplePg functions are not available before version 2025.04"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (name == "now") {
+ if (args.size() != 0) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), "Expected 0 arguments"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ // clang-format off
+ output = ctx.Expr.Builder(pos)
+ .Callable("Apply")
+ .Callable(0, "Udf")
+ .Atom(0, "DateTime2.MakeTimestamp")
+ .Seal()
+ .Callable(1, "Apply")
+ .Callable(0, "Udf")
+ .Atom(0, "DateTime2.ParseIso8601")
+ .Seal()
+ .Callable(1, "FromPg")
+ .Callable(0, "PgCast")
+ .Callable(0, "PgCall")
+ .Atom(0, "now")
+ .List(1)
+ .Seal()
+ .Seal()
+ .Callable(1, "PgType")
+ .Atom(0, "text")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ // clang-format on
+ return IGraphTransformer::TStatus::Repeat;
+ } else if (name == "to_date") {
+ if (args.size() != 2) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), "Expected 2 arguments"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ // clang-format off
+ output = ctx.Expr.Builder(pos)
+ .Callable("SafeCast")
+ .Callable(0, "FromPg")
+ .Callable(0, "PgCast")
+ .Callable(0, "PgCall")
+ .Atom(0, "to_date")
+ .List(1)
+ .Seal()
+ .Callable(2, "SafeCast")
+ .Add(0, args[0])
+ .Callable(1, "DataType")
+ .Atom(0, "Utf8")
+ .Seal()
+ .Seal()
+ .Callable(3, "SafeCast")
+ .Add(0, args[1])
+ .Callable(1, "DataType")
+ .Atom(0, "Utf8")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Callable(1, "PgType")
+ .Atom(0, "text")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Callable(1, "DataType")
+ .Atom(0, "Date32")
+ .Seal()
+ .Seal()
+ .Build();
+ // clang-format on
+ return IGraphTransformer::TStatus::Repeat;
+ } else if (name == "round") {
+ if (args.size() < 1 || args.size() > 2) {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), "Expected 1 or 2 arguments"));
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ // clang-format off
+ output = ctx.Expr.Builder(pos)
+ .Callable("FromPg")
+ .Callable(0, "PgCast")
+ .Callable(0, "PgCall")
+ .Atom(0,"round")
+ .List(1)
+ .Seal()
+ .Callable(2, "PgCast")
+ .Add(0, args[0])
+ .Callable(1, "PgType")
+ .Atom(0, "numeric")
+ .Seal()
+ .Seal()
+ .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
+ if (args.size() == 2) {
+ parent.Add(3, args[1]);
+ }
+
+ return parent;
+ })
+ .Seal()
+ .Callable(1, "PgType")
+ .Atom(0, "float8")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+ // clang-format on
+ return IGraphTransformer::TStatus::Repeat;
+ } else {
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(pos), TStringBuilder() << "Unknown SimplePg function: " << name));
+ return IGraphTransformer::TStatus::Error;
+ }
+ }
+
IGraphTransformer::TStatus SqlCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
if (!EnsureMinArgsCount(*input, 2, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
@@ -8698,7 +8863,6 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
}
auto udfName = input->ChildPtr(0);
-
if (!EnsureTupleMinSize(*input->Child(1), 1, ctx.Expr) || !EnsureTupleMaxSize(*input->Child(1), 2, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
@@ -8754,6 +8918,10 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
options = input->Child(5);
}
+ if (udfName->Content().StartsWith("SimplePg.")) {
+ return BuildSimplePgCall(input->Pos(), udfName->Content().substr(9), positionalArgs, output, ctx);
+ }
+
TExprNode::TPtr udf = ctx.Expr.Builder(input->Pos())
.Callable("Udf")
.Add(0, udfName)
@@ -13012,6 +13180,7 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["WithWorld"] = &WithWorldWrapper;
Functions["Concat"] = &ConcatWrapper;
Functions["AggrConcat"] = &AggrConcatWrapper;
+ ExtFunctions["SqlConcat"] = &SqlConcatWrapper;
ExtFunctions["Substring"] = &SubstringWrapper;
ExtFunctions["Find"] = &FindWrapper;
ExtFunctions["RFind"] = &FindWrapper;