diff options
author | udovichenko-r <rvu@ydb.tech> | 2023-01-23 15:47:35 +0300 |
---|---|---|
committer | udovichenko-r <rvu@ydb.tech> | 2023-01-23 15:47:35 +0300 |
commit | 4395852866f7a69b06e307b0f9fcd8ef950c200b (patch) | |
tree | 3a737bd1496b056d2c96b755a587c6441731bbc5 | |
parent | b1be0934e8af6e8b1e54d0e5a600fce3c0472e01 (diff) | |
download | ydb-4395852866f7a69b06e307b0f9fcd8ef950c200b.tar.gz |
[yql] Check yield transparency
16 files changed, 297 insertions, 59 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp index 255f53101d7..62167050011 100644 --- a/ydb/library/yql/core/facade/yql_facade.cpp +++ b/ydb/library/yql/core/facade/yql_facade.cpp @@ -933,6 +933,7 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap auto issues = ExprCtx_->IssueManager.GetIssues(); bool hasDqGatewayError = false; bool hasDqGatewayFallbackError = false; + bool hasDqOptimizeError = false; auto checkIssue = [&](const TIssue& issue) { if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_ERROR) { @@ -942,6 +943,9 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap YQL_LOG(DEBUG) << "Gateway Fallback Error " << issue; hasDqGatewayError = true; hasDqGatewayFallbackError = true; + } else if (issue.GetCode() == TIssuesIds::DQ_OPTIMIZE_ERROR) { + YQL_LOG(DEBUG) << "Optimize Error " << issue; + hasDqOptimizeError = true; } }; @@ -952,7 +956,7 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap } }; - std::function<void(const TIssuePtr& issue)> toWarning = [&](const TIssuePtr& issue) { + std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) { if (issue->Severity == TSeverityIds::S_ERROR || issue->Severity == TSeverityIds::S_FATAL || issue->Severity == TSeverityIds::S_WARNING) @@ -960,7 +964,7 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap issue->Severity = TSeverityIds::S_INFO; } for (const auto& subissue : issue->GetSubIssues()) { - toWarning(subissue); + toInfo(subissue); } }; @@ -993,9 +997,9 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap } CleanupLastSession(); - if (hasDqGatewayError) { - TIssue warning("DQ cannot execute the query"); - warning.Severity = TSeverityIds::S_INFO; + if (hasDqGatewayError || hasDqOptimizeError) { + TIssue info("DQ cannot execute the query"); + info.Severity = TSeverityIds::S_INFO; for (auto& issue : issues) { TIssuePtr newIssue = new TIssue(issue); @@ -1006,12 +1010,12 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap newIssue->Severity = TSeverityIds::S_INFO; } for (auto& subissue : newIssue->GetSubIssues()) { - toWarning(subissue); + toInfo(subissue); } - warning.AddSubIssue(newIssue); + info.AddSubIssue(newIssue); } - ExprCtx_->IssueManager.AddIssues({warning}); + ExprCtx_->IssueManager.AddIssues({info}); } FallbackCounter ++; diff --git a/ydb/library/yql/core/issue/protos/issue_id.proto b/ydb/library/yql/core/issue/protos/issue_id.proto index 67a7c6b0b19..19ff20fe84b 100644 --- a/ydb/library/yql/core/issue/protos/issue_id.proto +++ b/ydb/library/yql/core/issue/protos/issue_id.proto @@ -176,6 +176,7 @@ message TIssuesIds { // dq DQ_GATEWAY_ERROR = 6000; DQ_GATEWAY_NEED_FALLBACK_ERROR = 6001; + DQ_OPTIMIZE_ERROR = 6002; // range [200000, 399999) reserved for KiKiMR issue codes, do not use! diff --git a/ydb/library/yql/core/issue/yql_issue.txt b/ydb/library/yql/core/issue/yql_issue.txt index 2dd72cb447e..c6fa93b810c 100644 --- a/ydb/library/yql/core/issue/yql_issue.txt +++ b/ydb/library/yql/core/issue/yql_issue.txt @@ -617,6 +617,10 @@ ids { severity: S_ERROR } ids { + code: DQ_OPTIMIZE_ERROR + severity: S_ERROR +} +ids { code: YQL_DUPLICATE_DECLARE severity: S_WARNING } diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp index 932ac907363..7504f9680bd 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.cpp +++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp @@ -4765,6 +4765,15 @@ bool IsEmptyList(const TTypeAnnotationNode& type) { return type.GetKind() == ETypeAnnotationKind::EmptyList; } +bool IsFlowOrStream(const TTypeAnnotationNode& type) { + const auto kind = type.GetKind(); + return kind == ETypeAnnotationKind::Stream || kind == ETypeAnnotationKind::Flow; +} + +bool IsFlowOrStream(const TExprNode& node) { + return IsFlowOrStream(*node.GetTypeAnn()); +} + namespace { using TIndentPrinter = std::function<void(TStringBuilder& res, size_t)>; @@ -5249,27 +5258,6 @@ std::optional<ui32> GetFieldPosition(const TStructExprType& structType, const TS return std::nullopt; } -bool IsCallableTypeHasStreams(const TCallableExprType* callableType) { - for (;;) { - if (callableType->GetReturnType()->GetKind() == ETypeAnnotationKind::Stream) { - return true; - } - else { - for (auto& arg: callableType->GetArguments()) { - if (arg.Type->GetKind() == ETypeAnnotationKind::Stream) { - return true; - } - } - } - if (callableType->GetReturnType()->GetKind() == ETypeAnnotationKind::Callable) { - callableType = callableType->GetReturnType()->Cast<TCallableExprType>(); - } else { - break; - } - } - return false; -} - bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx) { pgType = 0; convertToPg = false; diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h index 0d01545625d..9f1bfffc66d 100644 --- a/ydb/library/yql/core/yql_expr_type_annotation.h +++ b/ydb/library/yql/core/yql_expr_type_annotation.h @@ -277,6 +277,8 @@ bool IsNull(const TTypeAnnotationNode& type); bool IsEmptyList(const TExprNode& node); bool IsEmptyList(const TTypeAnnotationNode& type); bool IsInstantEqual(const TTypeAnnotationNode& type); +bool IsFlowOrStream(const TTypeAnnotationNode& type); +bool IsFlowOrStream(const TExprNode& node); TString GetTypeDiff(const TTypeAnnotationNode& left, const TTypeAnnotationNode& right); TString GetTypePrettyDiff(const TTypeAnnotationNode& left, const TTypeAnnotationNode& right); @@ -293,7 +295,6 @@ std::optional<ui32> GetFieldPosition(const TMultiExprType& tupleType, const TStr std::optional<ui32> GetFieldPosition(const TTupleExprType& tupleType, const TStringBuf& field); std::optional<ui32> GetFieldPosition(const TStructExprType& structType, const TStringBuf& field); -bool IsCallableTypeHasStreams(const TCallableExprType* callableType); bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx); bool HasContextFuncs(const TExprNode& input); diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp index c24cd01ed82..34d68d6edfa 100644 --- a/ydb/library/yql/core/yql_opt_utils.cpp +++ b/ydb/library/yql/core/yql_opt_utils.cpp @@ -1528,7 +1528,7 @@ TVector<TStringBuf> GetCommonKeysFromVariantSelector(const NNodes::TCoLambda& la } bool IsIdentityLambda(const TExprNode& lambda) { - return lambda.IsLambda() && &lambda.Head().Head() == &lambda.Tail(); + return lambda.IsLambda() && lambda.Head().ChildrenSize() == 1 && &lambda.Head().Head() == &lambda.Tail(); } TExprNode::TPtr MakeExpandMap(TPositionHandle pos, const TVector<TString>& columns, const TExprNode::TPtr& input, TExprContext& ctx) { @@ -1576,4 +1576,114 @@ TExprNode::TPtr MakeNarrowMap(TPositionHandle pos, const TVector<TString>& colum .Build(); } +TExprNode::TPtr FindNonYieldTransparentNodeImpl(const TExprNode::TPtr& root, const bool udfSupportsYield, const TNodeSet& flowSources) { + auto depensOnFlow = [&flowSources](const TExprNode::TPtr& node) { + return !!FindNode(node, + [](const TExprNode::TPtr& n) { + return !TCoDependsOn::Match(n.Get()); + }, + [&flowSources](const TExprNode::TPtr& n) { + return flowSources.contains(n.Get()); + } + ); + }; + + auto candidates = FindNodes(root, + [&flowSources](const TExprNode::TPtr& node) { + if (flowSources.contains(node.Get()) || TCoDependsOn::Match(node.Get())) { + return false; + } + if (node->ChildrenSize() > 0 && node->Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::World) { + return false; + } + return true; + }, + [](const TExprNode::TPtr& node) { + return TCoCollect::Match(node.Get()) + || TCoForwardList::Match(node.Get()) + || TCoApply::Match(node.Get()) + || TCoSwitch::Match(node.Get()) + || node->IsCallable("DqReplicate"); + } + ); + + for (auto candidate: candidates) { + if (TCoCollect::Match(candidate.Get()) || TCoForwardList::Match(candidate.Get())) { + if (depensOnFlow(candidate->HeadPtr())) { + return candidate; + } + } else if (TCoApply::Match(candidate.Get())) { + if (AnyOf(candidate->Children().begin() + 1, candidate->Children().end(), depensOnFlow)) { + if (!IsFlowOrStream(*candidate)) { + while (TCoApply::Match(candidate.Get())) { + candidate = candidate->HeadPtr(); + } + return candidate; + } + if (!udfSupportsYield) { + while (TCoApply::Match(candidate.Get())) { + candidate = candidate->HeadPtr(); + } + if (TCoScriptUdf::Match(candidate.Get())) { + return candidate; + } + } + } + } else if (TCoSwitch::Match(candidate.Get())) { + for (size_t i = 3; i < candidate->ChildrenSize(); i += 2) { + if (auto node = FindNonYieldTransparentNodeImpl(candidate->Child(i)->TailPtr(), udfSupportsYield, TNodeSet{&candidate->Child(i)->Head().Head()})) { + return node; + } + } + } else if (candidate->IsCallable("DqReplicate")) { + for (size_t i = 1; i < candidate->ChildrenSize(); ++i) { + if (auto node = FindNonYieldTransparentNodeImpl(candidate->Child(i)->TailPtr(), udfSupportsYield, TNodeSet{&candidate->Child(i)->Head().Head()})) { + return node; + } + } + } + } + return {}; +} + +TExprNode::TPtr FindNonYieldTransparentNode(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx) { + TNodeSet flowSources; + TExprNode::TPtr from = root; + if (root->IsLambda()) { + if (IsIdentityLambda(*root)) { + return {}; + } + from = root->TailPtr(); + + // Add all flow lambda args + root->Head().ForEachChild([&flowSources](const TExprNode& arg) { + if (IsFlowOrStream(arg)) { + flowSources.insert(&arg); + } + }); + } + + static const THashSet<TStringBuf> WHITE_LIST = {"EmptyIterator"sv, TCoToStream::CallableName(), TCoIterator::CallableName(), + TCoToFlow::CallableName(), TCoApply::CallableName(), TCoNth::CallableName(), TCoMux::CallableName()}; + // Find all other flow sources (readers) + auto sources = FindNodes(from, + [](const TExprNode::TPtr& node) { + return !node->IsCallable(WHITE_LIST) + && node->IsCallable() + && IsFlowOrStream(*node) + && (node->ChildrenSize() == 0 || !IsFlowOrStream(node->Head())); + } + ); + std::for_each(sources.cbegin(), sources.cend(), [&flowSources](const TExprNode::TPtr& node) { flowSources.insert(node.Get()); }); + + if (flowSources.empty()) { + return {}; + } + return FindNonYieldTransparentNodeImpl(from, typeCtx.UdfSupportsYield, flowSources); +} + +bool IsYieldTransparent(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx) { + return !FindNonYieldTransparentNode(root, typeCtx); +} + } diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h index e199ba26788..f6ec3cdf973 100644 --- a/ydb/library/yql/core/yql_opt_utils.h +++ b/ydb/library/yql/core/yql_opt_utils.h @@ -122,5 +122,7 @@ bool IsIdentityLambda(const TExprNode& lambda); TExprNode::TPtr MakeExpandMap(TPositionHandle pos, const TVector<TString>& columns, const TExprNode::TPtr& input, TExprContext& ctx); TExprNode::TPtr MakeNarrowMap(TPositionHandle pos, const TVector<TString>& columns, const TExprNode::TPtr& input, TExprContext& ctx); +TExprNode::TPtr FindNonYieldTransparentNode(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx); +bool IsYieldTransparent(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx); } diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp index ffc54481edc..7f157e45fee 100644 --- a/ydb/library/yql/providers/common/provider/yql_provider.cpp +++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp @@ -836,12 +836,12 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN } for (size_t i = 1; i < node->ChildrenSize(); ++i) { - if (IsFlowOrStream(node->Child(i))) { + if (IsFlowOrStream(*node->Child(i))) { applyStreamChildren.push_back(node->Child(i)); } else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) { if (node->Child(i)->IsCallable("ForwardList")) { applyStreamChildren.push_back(node->Child(i)->Child(0)); - } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) { + } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->Head())) { applyStreamChildren.push_back(node->Child(i)->Child(0)); } } @@ -858,7 +858,7 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN writer.OnBeginMap(); writer.OnKeyedItem("Name"); writer.OnStringScalar(node->Content()); - if (TCoFlatMapBase::Match(node) && IsFlowOrStream(node->ChildPtr(1).Get())) { + if (TCoFlatMapBase::Match(node) && IsFlowOrStream(*node->Child(1))) { writer.OnKeyedItem("Children"); writer.OnBeginList(); writer.OnListItem(); @@ -942,12 +942,12 @@ double GetDataReplicationFactor(double factor, const TExprNode* node, const TExp case ETypeAnnotationKind::List: { double applyFactor = 0.0; for (size_t i = 1; i < node->ChildrenSize(); ++i) { - if (IsFlowOrStream(node->Child(i))) { + if (IsFlowOrStream(*node->Child(i))) { applyFactor += GetDataReplicationFactor(factor, node->Child(i), stream, ctx); } else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) { if (node->Child(i)->IsCallable("ForwardList")) { applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx); - } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) { + } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->Head())) { applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx); } } diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.darwin.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.darwin.txt index 582b891a7fd..46c0f58d2b3 100644 --- a/ydb/library/yql/providers/dq/provider/CMakeLists.darwin.txt +++ b/ydb/library/yql/providers/dq/provider/CMakeLists.darwin.txt @@ -21,6 +21,7 @@ target_link_libraries(providers-dq-provider PUBLIC public-lib-yson_value cpp-client-ydb_driver library-yql-core + yql-core-issue yql-utils-backtrace yql-dq-transform yql-dq-tasks @@ -51,4 +52,5 @@ target_sources(providers-dq-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp ) diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.linux-aarch64.txt index 99160494bad..610a417dbcb 100644 --- a/ydb/library/yql/providers/dq/provider/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/dq/provider/CMakeLists.linux-aarch64.txt @@ -22,6 +22,7 @@ target_link_libraries(providers-dq-provider PUBLIC public-lib-yson_value cpp-client-ydb_driver library-yql-core + yql-core-issue yql-utils-backtrace yql-dq-transform yql-dq-tasks @@ -52,4 +53,5 @@ target_sources(providers-dq-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp ) diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.linux.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.linux.txt index 99160494bad..610a417dbcb 100644 --- a/ydb/library/yql/providers/dq/provider/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/dq/provider/CMakeLists.linux.txt @@ -22,6 +22,7 @@ target_link_libraries(providers-dq-provider PUBLIC public-lib-yson_value cpp-client-ydb_driver library-yql-core + yql-core-issue yql-utils-backtrace yql-dq-transform yql-dq-tasks @@ -52,4 +53,5 @@ target_sources(providers-dq-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp ) diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp index 3368912d6ba..bbf4cede26e 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp @@ -1,6 +1,7 @@ #include "yql_dq_datasink_type_ann.h" #include <ydb/library/yql/core/yql_expr_type_annotation.h> +#include <ydb/library/yql/core/issue/yql_issue.h> #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/dq/type_ann/dq_type_ann.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> @@ -45,12 +46,7 @@ public: private: TStatus AnnotateDqReplicateAlwaysError(const TExprNode::TPtr& input, TExprContext& ctx) { - ctx.AddError( - TIssue( - ctx.GetPosition(input->Pos()), - "Reading multiple times from the same source is not supported") - .SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, - TSeverityIds::S_ERROR)); + ctx.AddError(YqlIssue(ctx.GetPosition(input->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, "Reading multiple times from the same source is not supported")); return TStatus::Error; } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp index 6cd00b8871f..e353f088549 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp @@ -2,6 +2,7 @@ #include "yql_dq_datasource_constraints.h" #include "yql_dq_datasource_type_ann.h" #include "yql_dq_state.h" +#include "yql_dq_validate.h" #include <ydb/library/yql/providers/common/config/yql_configuration_transformer.h> #include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h> @@ -223,6 +224,10 @@ public: return TDqCnResult::Match(&node) || TDqQuery::Match(&node); } + bool ValidateExecution(const TExprNode& node, TExprContext& ctx) override { + return ValidateDqExecution(node, *State->TypeCtx, ctx); + } + bool CanParse(const TExprNode& node) override { return TypeAnnotationTransformer->CanParse(node); } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp index ee81b95657c..2e60546388e 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp @@ -25,13 +25,11 @@ namespace { const THashSet<TStringBuf> VALID_SOURCES = {DqProviderName, ConfigProviderName, YtProviderName, ClickHouseProviderName, YdbProviderName, S3ProviderName}; const THashSet<TStringBuf> VALID_SINKS = {ResultProviderName, YtProviderName, S3ProviderName}; -const THashSet<TStringBuf> UNSUPPORTED_CALLABLE = { TCoForwardList::CallableName() }; } namespace NDq { bool CheckJoinColumns(const TExprBase& node); - bool CheckJoinLinkSettings(const TExprBase& node); } // namespace NDq class TDqsRecaptureTransformer : public TSyncTransformerBase { @@ -163,14 +161,6 @@ private: Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin); } } - } else if (node.IsCallable(UNSUPPORTED_CALLABLE)) { - AddInfo(ctx, TStringBuilder() << "unsupported callable '" << node.Content() << "'"); - good = false; - } else if (node.IsCallable(TCoCollect::CallableName())) { - if (ETypeAnnotationKind::List != node.Head().GetTypeAnn()->GetKind()) { - AddInfo(ctx, TStringBuilder() << "unsupported callable '" << node.Content() << "' over stream/flow"); - good = false; - } } else if (auto datasource = TMaybeNode<TCoDataSource>(&node).Category()) { if (!VALID_SOURCES.contains(datasource.Cast().Value())) { AddInfo(ctx, TStringBuilder() << "source '" << datasource.Cast().Value() << "' is not supported by DQ"); @@ -236,13 +226,7 @@ private: } } else if (TCoScriptUdf::Match(&node)) { - if (!State_->TypeCtx->UdfSupportsYield) { - if (IsCallableTypeHasStreams(node.GetTypeAnn()->Cast<TCallableExprType>())) { - AddInfo(ctx, TStringBuilder() << "script udf with streams"); - good = false; - } - } - if (NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(node.Head().Content()))) { + if (good && TCoScriptUdf::Match(&node) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(node.Head().Content()))) { AddInfo(ctx, TStringBuilder() << "system python udf"); good = false; } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp new file mode 100644 index 00000000000..3ada75be3d5 --- /dev/null +++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp @@ -0,0 +1,127 @@ +#include "yql_dq_validate.h" + +#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> +#include <ydb/library/yql/core/yql_expr_optimize.h> +#include <ydb/library/yql/core/yql_expr_type_annotation.h> +#include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/core/issue/yql_issue.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/library/yql/utils/log/log.h> + +#include <util/string/builder.h> + +namespace NYql { + +using namespace NYql::NNodes; + +namespace { + +bool ValidateDqNode(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited); + +bool ValidateDqStage(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited) { + if (!visited.insert(&node).second) { + return true; + } + + bool hasErrors = false; + if (auto bad = FindNonYieldTransparentNode(TDqStageBase(&node).Program().Ptr(), typeCtx)) { + hasErrors = true; + YQL_CLOG(WARN, ProviderDq) << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage"; + ctx.AddError(YqlIssue(ctx.GetPosition(bad->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage")); + } + VisitExpr(TDqStageBase(&node).Program().Body().Ptr(), + [](const TExprNode::TPtr& n) { + return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()); + }, + [&hasErrors, &ctx](const TExprNode::TPtr& n) { + if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) { + YQL_CLOG(WARN, ProviderDq) << "Cannot execute system python udf " << n->Content() << " in DQ"; + ctx.AddError(YqlIssue(ctx.GetPosition(n->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ")); + hasErrors = true; + } + return !hasErrors; + } + ); + + for (auto n: TDqStageBase(&node).Inputs()) { + hasErrors = !ValidateDqNode(n.Ref(), typeCtx, ctx, visited) || hasErrors; + } + if (auto outs = TDqStageBase(&node).Outputs()) { + for (auto n: outs.Cast()) { + hasErrors = !ValidateDqNode(n.Ref(), typeCtx, ctx, visited) || hasErrors; + } + } + + return !hasErrors; + +} + +bool ValidateDqNode(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited) { + if (node.GetState() == TExprNode::EState::ExecutionComplete) { + return true; + } + + if (TDqStageBase::Match(&node)) { + // visited will be updated inside ValidateDqStage + return ValidateDqStage(node, typeCtx, ctx, visited); + } + + if (!visited.insert(&node).second) { + return true; + } + + if (TDqCnResult::Match(&node)) { + YQL_CLOG(WARN, ProviderDq) << TDqCnResult::CallableName() << " connection cannot be used inside graph"; + ctx.AddError(YqlIssue(ctx.GetPosition(node.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << TDqCnResult::CallableName() << " connection cannot be used inside graph")); + return false; + } + if (TDqConnection::Match(&node)) { + return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref(), typeCtx, ctx, visited); + } + if (TDqPhyPrecompute::Match(&node)) { + return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref(), typeCtx, ctx, visited); + } + + if (TDqSource::Match(&node) || TDqTransform::Match(&node) || TDqSink::Match(&node)) { + return true; + } + + YQL_CLOG(WARN, ProviderDq) << "Failed to execute callable with name: " << node.Content() << " in DQ"; + ctx.AddError(YqlIssue(ctx.GetPosition(node.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Failed to execute callable with name: " << node.Content() << " in DQ")); + return false; +} + +} + +bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx) { + YQL_LOG_CTX_SCOPE(__FUNCTION__); + + TNodeSet dqNodes; + if (TDqCnResult::Match(&node)) { + dqNodes.insert(TDqCnResult(&node).Output().Stage().Raw()); + } else if (TDqQuery::Match(&node)) { + for (auto st: TDqQuery(&node).SinkStages()) { + dqNodes.insert(st.Raw()); + } + } else { + VisitExpr(node, [&dqNodes](const TExprNode& n) { + if (TDqStageBase::Match(&n)) { + dqNodes.insert(&n); + return false; + } else if (TDqConnection::Match(&n)) { + dqNodes.insert(&n); + return false; + } + return true; + }); + } + TNodeSet visited; + bool hasError = false; + for (const auto n: dqNodes) { + hasError = !ValidateDqNode(*n, typeCtx, ctx, visited) || hasError; + } + return !hasError; +} + +} // namespace NYql diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.h b/ydb/library/yql/providers/dq/provider/yql_dq_validate.h new file mode 100644 index 00000000000..087cafd13e0 --- /dev/null +++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.h @@ -0,0 +1,10 @@ +#pragma once + +#include <ydb/library/yql/ast/yql_expr.h> +#include <ydb/library/yql/core/yql_type_annotation.h> + +namespace NYql { + +bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx); + +} // namespace NYql |