aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <rvu@ydb.tech>2023-01-23 15:47:35 +0300
committerudovichenko-r <rvu@ydb.tech>2023-01-23 15:47:35 +0300
commit4395852866f7a69b06e307b0f9fcd8ef950c200b (patch)
tree3a737bd1496b056d2c96b755a587c6441731bbc5
parentb1be0934e8af6e8b1e54d0e5a600fce3c0472e01 (diff)
downloadydb-4395852866f7a69b06e307b0f9fcd8ef950c200b.tar.gz
[yql] Check yield transparency
-rw-r--r--ydb/library/yql/core/facade/yql_facade.cpp20
-rw-r--r--ydb/library/yql/core/issue/protos/issue_id.proto1
-rw-r--r--ydb/library/yql/core/issue/yql_issue.txt4
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.cpp30
-rw-r--r--ydb/library/yql/core/yql_expr_type_annotation.h3
-rw-r--r--ydb/library/yql/core/yql_opt_utils.cpp112
-rw-r--r--ydb/library/yql/core/yql_opt_utils.h2
-rw-r--r--ydb/library/yql/providers/common/provider/yql_provider.cpp10
-rw-r--r--ydb/library/yql/providers/dq/provider/CMakeLists.darwin.txt2
-rw-r--r--ydb/library/yql/providers/dq/provider/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/library/yql/providers/dq/provider/CMakeLists.linux.txt2
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp8
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp5
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp18
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp127
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_validate.h10
16 files changed, 297 insertions, 59 deletions
diff --git a/ydb/library/yql/core/facade/yql_facade.cpp b/ydb/library/yql/core/facade/yql_facade.cpp
index 255f53101d7..62167050011 100644
--- a/ydb/library/yql/core/facade/yql_facade.cpp
+++ b/ydb/library/yql/core/facade/yql_facade.cpp
@@ -933,6 +933,7 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
auto issues = ExprCtx_->IssueManager.GetIssues();
bool hasDqGatewayError = false;
bool hasDqGatewayFallbackError = false;
+ bool hasDqOptimizeError = false;
auto checkIssue = [&](const TIssue& issue) {
if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_ERROR) {
@@ -942,6 +943,9 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
YQL_LOG(DEBUG) << "Gateway Fallback Error " << issue;
hasDqGatewayError = true;
hasDqGatewayFallbackError = true;
+ } else if (issue.GetCode() == TIssuesIds::DQ_OPTIMIZE_ERROR) {
+ YQL_LOG(DEBUG) << "Optimize Error " << issue;
+ hasDqOptimizeError = true;
}
};
@@ -952,7 +956,7 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
}
};
- std::function<void(const TIssuePtr& issue)> toWarning = [&](const TIssuePtr& issue) {
+ std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) {
if (issue->Severity == TSeverityIds::S_ERROR
|| issue->Severity == TSeverityIds::S_FATAL
|| issue->Severity == TSeverityIds::S_WARNING)
@@ -960,7 +964,7 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
issue->Severity = TSeverityIds::S_INFO;
}
for (const auto& subissue : issue->GetSubIssues()) {
- toWarning(subissue);
+ toInfo(subissue);
}
};
@@ -993,9 +997,9 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
}
CleanupLastSession();
- if (hasDqGatewayError) {
- TIssue warning("DQ cannot execute the query");
- warning.Severity = TSeverityIds::S_INFO;
+ if (hasDqGatewayError || hasDqOptimizeError) {
+ TIssue info("DQ cannot execute the query");
+ info.Severity = TSeverityIds::S_INFO;
for (auto& issue : issues) {
TIssuePtr newIssue = new TIssue(issue);
@@ -1006,12 +1010,12 @@ TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool ap
newIssue->Severity = TSeverityIds::S_INFO;
}
for (auto& subissue : newIssue->GetSubIssues()) {
- toWarning(subissue);
+ toInfo(subissue);
}
- warning.AddSubIssue(newIssue);
+ info.AddSubIssue(newIssue);
}
- ExprCtx_->IssueManager.AddIssues({warning});
+ ExprCtx_->IssueManager.AddIssues({info});
}
FallbackCounter ++;
diff --git a/ydb/library/yql/core/issue/protos/issue_id.proto b/ydb/library/yql/core/issue/protos/issue_id.proto
index 67a7c6b0b19..19ff20fe84b 100644
--- a/ydb/library/yql/core/issue/protos/issue_id.proto
+++ b/ydb/library/yql/core/issue/protos/issue_id.proto
@@ -176,6 +176,7 @@ message TIssuesIds {
// dq
DQ_GATEWAY_ERROR = 6000;
DQ_GATEWAY_NEED_FALLBACK_ERROR = 6001;
+ DQ_OPTIMIZE_ERROR = 6002;
// range [200000, 399999) reserved for KiKiMR issue codes, do not use!
diff --git a/ydb/library/yql/core/issue/yql_issue.txt b/ydb/library/yql/core/issue/yql_issue.txt
index 2dd72cb447e..c6fa93b810c 100644
--- a/ydb/library/yql/core/issue/yql_issue.txt
+++ b/ydb/library/yql/core/issue/yql_issue.txt
@@ -617,6 +617,10 @@ ids {
severity: S_ERROR
}
ids {
+ code: DQ_OPTIMIZE_ERROR
+ severity: S_ERROR
+}
+ids {
code: YQL_DUPLICATE_DECLARE
severity: S_WARNING
}
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.cpp b/ydb/library/yql/core/yql_expr_type_annotation.cpp
index 932ac907363..7504f9680bd 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.cpp
+++ b/ydb/library/yql/core/yql_expr_type_annotation.cpp
@@ -4765,6 +4765,15 @@ bool IsEmptyList(const TTypeAnnotationNode& type) {
return type.GetKind() == ETypeAnnotationKind::EmptyList;
}
+bool IsFlowOrStream(const TTypeAnnotationNode& type) {
+ const auto kind = type.GetKind();
+ return kind == ETypeAnnotationKind::Stream || kind == ETypeAnnotationKind::Flow;
+}
+
+bool IsFlowOrStream(const TExprNode& node) {
+ return IsFlowOrStream(*node.GetTypeAnn());
+}
+
namespace {
using TIndentPrinter = std::function<void(TStringBuilder& res, size_t)>;
@@ -5249,27 +5258,6 @@ std::optional<ui32> GetFieldPosition(const TStructExprType& structType, const TS
return std::nullopt;
}
-bool IsCallableTypeHasStreams(const TCallableExprType* callableType) {
- for (;;) {
- if (callableType->GetReturnType()->GetKind() == ETypeAnnotationKind::Stream) {
- return true;
- }
- else {
- for (auto& arg: callableType->GetArguments()) {
- if (arg.Type->GetKind() == ETypeAnnotationKind::Stream) {
- return true;
- }
- }
- }
- if (callableType->GetReturnType()->GetKind() == ETypeAnnotationKind::Callable) {
- callableType = callableType->GetReturnType()->Cast<TCallableExprType>();
- } else {
- break;
- }
- }
- return false;
-}
-
bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx) {
pgType = 0;
convertToPg = false;
diff --git a/ydb/library/yql/core/yql_expr_type_annotation.h b/ydb/library/yql/core/yql_expr_type_annotation.h
index 0d01545625d..9f1bfffc66d 100644
--- a/ydb/library/yql/core/yql_expr_type_annotation.h
+++ b/ydb/library/yql/core/yql_expr_type_annotation.h
@@ -277,6 +277,8 @@ bool IsNull(const TTypeAnnotationNode& type);
bool IsEmptyList(const TExprNode& node);
bool IsEmptyList(const TTypeAnnotationNode& type);
bool IsInstantEqual(const TTypeAnnotationNode& type);
+bool IsFlowOrStream(const TTypeAnnotationNode& type);
+bool IsFlowOrStream(const TExprNode& node);
TString GetTypeDiff(const TTypeAnnotationNode& left, const TTypeAnnotationNode& right);
TString GetTypePrettyDiff(const TTypeAnnotationNode& left, const TTypeAnnotationNode& right);
@@ -293,7 +295,6 @@ std::optional<ui32> GetFieldPosition(const TMultiExprType& tupleType, const TStr
std::optional<ui32> GetFieldPosition(const TTupleExprType& tupleType, const TStringBuf& field);
std::optional<ui32> GetFieldPosition(const TStructExprType& structType, const TStringBuf& field);
-bool IsCallableTypeHasStreams(const TCallableExprType* callableType);
bool ExtractPgType(const TTypeAnnotationNode* type, ui32& pgType, bool& convertToPg, TPositionHandle pos, TExprContext& ctx);
bool HasContextFuncs(const TExprNode& input);
diff --git a/ydb/library/yql/core/yql_opt_utils.cpp b/ydb/library/yql/core/yql_opt_utils.cpp
index c24cd01ed82..34d68d6edfa 100644
--- a/ydb/library/yql/core/yql_opt_utils.cpp
+++ b/ydb/library/yql/core/yql_opt_utils.cpp
@@ -1528,7 +1528,7 @@ TVector<TStringBuf> GetCommonKeysFromVariantSelector(const NNodes::TCoLambda& la
}
bool IsIdentityLambda(const TExprNode& lambda) {
- return lambda.IsLambda() && &lambda.Head().Head() == &lambda.Tail();
+ return lambda.IsLambda() && lambda.Head().ChildrenSize() == 1 && &lambda.Head().Head() == &lambda.Tail();
}
TExprNode::TPtr MakeExpandMap(TPositionHandle pos, const TVector<TString>& columns, const TExprNode::TPtr& input, TExprContext& ctx) {
@@ -1576,4 +1576,114 @@ TExprNode::TPtr MakeNarrowMap(TPositionHandle pos, const TVector<TString>& colum
.Build();
}
+TExprNode::TPtr FindNonYieldTransparentNodeImpl(const TExprNode::TPtr& root, const bool udfSupportsYield, const TNodeSet& flowSources) {
+ auto depensOnFlow = [&flowSources](const TExprNode::TPtr& node) {
+ return !!FindNode(node,
+ [](const TExprNode::TPtr& n) {
+ return !TCoDependsOn::Match(n.Get());
+ },
+ [&flowSources](const TExprNode::TPtr& n) {
+ return flowSources.contains(n.Get());
+ }
+ );
+ };
+
+ auto candidates = FindNodes(root,
+ [&flowSources](const TExprNode::TPtr& node) {
+ if (flowSources.contains(node.Get()) || TCoDependsOn::Match(node.Get())) {
+ return false;
+ }
+ if (node->ChildrenSize() > 0 && node->Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::World) {
+ return false;
+ }
+ return true;
+ },
+ [](const TExprNode::TPtr& node) {
+ return TCoCollect::Match(node.Get())
+ || TCoForwardList::Match(node.Get())
+ || TCoApply::Match(node.Get())
+ || TCoSwitch::Match(node.Get())
+ || node->IsCallable("DqReplicate");
+ }
+ );
+
+ for (auto candidate: candidates) {
+ if (TCoCollect::Match(candidate.Get()) || TCoForwardList::Match(candidate.Get())) {
+ if (depensOnFlow(candidate->HeadPtr())) {
+ return candidate;
+ }
+ } else if (TCoApply::Match(candidate.Get())) {
+ if (AnyOf(candidate->Children().begin() + 1, candidate->Children().end(), depensOnFlow)) {
+ if (!IsFlowOrStream(*candidate)) {
+ while (TCoApply::Match(candidate.Get())) {
+ candidate = candidate->HeadPtr();
+ }
+ return candidate;
+ }
+ if (!udfSupportsYield) {
+ while (TCoApply::Match(candidate.Get())) {
+ candidate = candidate->HeadPtr();
+ }
+ if (TCoScriptUdf::Match(candidate.Get())) {
+ return candidate;
+ }
+ }
+ }
+ } else if (TCoSwitch::Match(candidate.Get())) {
+ for (size_t i = 3; i < candidate->ChildrenSize(); i += 2) {
+ if (auto node = FindNonYieldTransparentNodeImpl(candidate->Child(i)->TailPtr(), udfSupportsYield, TNodeSet{&candidate->Child(i)->Head().Head()})) {
+ return node;
+ }
+ }
+ } else if (candidate->IsCallable("DqReplicate")) {
+ for (size_t i = 1; i < candidate->ChildrenSize(); ++i) {
+ if (auto node = FindNonYieldTransparentNodeImpl(candidate->Child(i)->TailPtr(), udfSupportsYield, TNodeSet{&candidate->Child(i)->Head().Head()})) {
+ return node;
+ }
+ }
+ }
+ }
+ return {};
+}
+
+TExprNode::TPtr FindNonYieldTransparentNode(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx) {
+ TNodeSet flowSources;
+ TExprNode::TPtr from = root;
+ if (root->IsLambda()) {
+ if (IsIdentityLambda(*root)) {
+ return {};
+ }
+ from = root->TailPtr();
+
+ // Add all flow lambda args
+ root->Head().ForEachChild([&flowSources](const TExprNode& arg) {
+ if (IsFlowOrStream(arg)) {
+ flowSources.insert(&arg);
+ }
+ });
+ }
+
+ static const THashSet<TStringBuf> WHITE_LIST = {"EmptyIterator"sv, TCoToStream::CallableName(), TCoIterator::CallableName(),
+ TCoToFlow::CallableName(), TCoApply::CallableName(), TCoNth::CallableName(), TCoMux::CallableName()};
+ // Find all other flow sources (readers)
+ auto sources = FindNodes(from,
+ [](const TExprNode::TPtr& node) {
+ return !node->IsCallable(WHITE_LIST)
+ && node->IsCallable()
+ && IsFlowOrStream(*node)
+ && (node->ChildrenSize() == 0 || !IsFlowOrStream(node->Head()));
+ }
+ );
+ std::for_each(sources.cbegin(), sources.cend(), [&flowSources](const TExprNode::TPtr& node) { flowSources.insert(node.Get()); });
+
+ if (flowSources.empty()) {
+ return {};
+ }
+ return FindNonYieldTransparentNodeImpl(from, typeCtx.UdfSupportsYield, flowSources);
+}
+
+bool IsYieldTransparent(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx) {
+ return !FindNonYieldTransparentNode(root, typeCtx);
+}
+
}
diff --git a/ydb/library/yql/core/yql_opt_utils.h b/ydb/library/yql/core/yql_opt_utils.h
index e199ba26788..f6ec3cdf973 100644
--- a/ydb/library/yql/core/yql_opt_utils.h
+++ b/ydb/library/yql/core/yql_opt_utils.h
@@ -122,5 +122,7 @@ bool IsIdentityLambda(const TExprNode& lambda);
TExprNode::TPtr MakeExpandMap(TPositionHandle pos, const TVector<TString>& columns, const TExprNode::TPtr& input, TExprContext& ctx);
TExprNode::TPtr MakeNarrowMap(TPositionHandle pos, const TVector<TString>& columns, const TExprNode::TPtr& input, TExprContext& ctx);
+TExprNode::TPtr FindNonYieldTransparentNode(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx);
+bool IsYieldTransparent(const TExprNode::TPtr& root, const TTypeAnnotationContext& typeCtx);
}
diff --git a/ydb/library/yql/providers/common/provider/yql_provider.cpp b/ydb/library/yql/providers/common/provider/yql_provider.cpp
index ffc54481edc..7f157e45fee 100644
--- a/ydb/library/yql/providers/common/provider/yql_provider.cpp
+++ b/ydb/library/yql/providers/common/provider/yql_provider.cpp
@@ -836,12 +836,12 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN
}
for (size_t i = 1; i < node->ChildrenSize(); ++i) {
- if (IsFlowOrStream(node->Child(i))) {
+ if (IsFlowOrStream(*node->Child(i))) {
applyStreamChildren.push_back(node->Child(i));
} else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) {
if (node->Child(i)->IsCallable("ForwardList")) {
applyStreamChildren.push_back(node->Child(i)->Child(0));
- } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) {
+ } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->Head())) {
applyStreamChildren.push_back(node->Child(i)->Child(0));
}
}
@@ -858,7 +858,7 @@ void WriteStream(NYson::TYsonWriter& writer, const TExprNode* node, const TExprN
writer.OnBeginMap();
writer.OnKeyedItem("Name");
writer.OnStringScalar(node->Content());
- if (TCoFlatMapBase::Match(node) && IsFlowOrStream(node->ChildPtr(1).Get())) {
+ if (TCoFlatMapBase::Match(node) && IsFlowOrStream(*node->Child(1))) {
writer.OnKeyedItem("Children");
writer.OnBeginList();
writer.OnListItem();
@@ -942,12 +942,12 @@ double GetDataReplicationFactor(double factor, const TExprNode* node, const TExp
case ETypeAnnotationKind::List: {
double applyFactor = 0.0;
for (size_t i = 1; i < node->ChildrenSize(); ++i) {
- if (IsFlowOrStream(node->Child(i))) {
+ if (IsFlowOrStream(*node->Child(i))) {
applyFactor += GetDataReplicationFactor(factor, node->Child(i), stream, ctx);
} else if (node->Child(i)->GetTypeAnn()->GetKind() == ETypeAnnotationKind::List) {
if (node->Child(i)->IsCallable("ForwardList")) {
applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx);
- } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->HeadPtr().Get())) {
+ } else if (node->Child(i)->IsCallable("Collect") && IsFlowOrStream(node->Child(i)->Head())) {
applyFactor += GetDataReplicationFactor(factor, node->Child(i)->Child(0), stream, ctx);
}
}
diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.darwin.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.darwin.txt
index 582b891a7fd..46c0f58d2b3 100644
--- a/ydb/library/yql/providers/dq/provider/CMakeLists.darwin.txt
+++ b/ydb/library/yql/providers/dq/provider/CMakeLists.darwin.txt
@@ -21,6 +21,7 @@ target_link_libraries(providers-dq-provider PUBLIC
public-lib-yson_value
cpp-client-ydb_driver
library-yql-core
+ yql-core-issue
yql-utils-backtrace
yql-dq-transform
yql-dq-tasks
@@ -51,4 +52,5 @@ target_sources(providers-dq-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
)
diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.linux-aarch64.txt
index 99160494bad..610a417dbcb 100644
--- a/ydb/library/yql/providers/dq/provider/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/dq/provider/CMakeLists.linux-aarch64.txt
@@ -22,6 +22,7 @@ target_link_libraries(providers-dq-provider PUBLIC
public-lib-yson_value
cpp-client-ydb_driver
library-yql-core
+ yql-core-issue
yql-utils-backtrace
yql-dq-transform
yql-dq-tasks
@@ -52,4 +53,5 @@ target_sources(providers-dq-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
)
diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.linux.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.linux.txt
index 99160494bad..610a417dbcb 100644
--- a/ydb/library/yql/providers/dq/provider/CMakeLists.linux.txt
+++ b/ydb/library/yql/providers/dq/provider/CMakeLists.linux.txt
@@ -22,6 +22,7 @@ target_link_libraries(providers-dq-provider PUBLIC
public-lib-yson_value
cpp-client-ydb_driver
library-yql-core
+ yql-core-issue
yql-utils-backtrace
yql-dq-transform
yql-dq-tasks
@@ -52,4 +53,5 @@ target_sources(providers-dq-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasink.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
)
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
index 3368912d6ba..bbf4cede26e 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_type_ann.cpp
@@ -1,6 +1,7 @@
#include "yql_dq_datasink_type_ann.h"
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
+#include <ydb/library/yql/core/issue/yql_issue.h>
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/dq/type_ann/dq_type_ann.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
@@ -45,12 +46,7 @@ public:
private:
TStatus AnnotateDqReplicateAlwaysError(const TExprNode::TPtr& input, TExprContext& ctx) {
- ctx.AddError(
- TIssue(
- ctx.GetPosition(input->Pos()),
- "Reading multiple times from the same source is not supported")
- .SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR,
- TSeverityIds::S_ERROR));
+ ctx.AddError(YqlIssue(ctx.GetPosition(input->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, "Reading multiple times from the same source is not supported"));
return TStatus::Error;
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
index 6cd00b8871f..e353f088549 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
@@ -2,6 +2,7 @@
#include "yql_dq_datasource_constraints.h"
#include "yql_dq_datasource_type_ann.h"
#include "yql_dq_state.h"
+#include "yql_dq_validate.h"
#include <ydb/library/yql/providers/common/config/yql_configuration_transformer.h>
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
@@ -223,6 +224,10 @@ public:
return TDqCnResult::Match(&node) || TDqQuery::Match(&node);
}
+ bool ValidateExecution(const TExprNode& node, TExprContext& ctx) override {
+ return ValidateDqExecution(node, *State->TypeCtx, ctx);
+ }
+
bool CanParse(const TExprNode& node) override {
return TypeAnnotationTransformer->CanParse(node);
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
index ee81b95657c..2e60546388e 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
@@ -25,13 +25,11 @@ namespace {
const THashSet<TStringBuf> VALID_SOURCES = {DqProviderName, ConfigProviderName, YtProviderName, ClickHouseProviderName, YdbProviderName, S3ProviderName};
const THashSet<TStringBuf> VALID_SINKS = {ResultProviderName, YtProviderName, S3ProviderName};
-const THashSet<TStringBuf> UNSUPPORTED_CALLABLE = { TCoForwardList::CallableName() };
}
namespace NDq {
bool CheckJoinColumns(const TExprBase& node);
- bool CheckJoinLinkSettings(const TExprBase& node);
} // namespace NDq
class TDqsRecaptureTransformer : public TSyncTransformerBase {
@@ -163,14 +161,6 @@ private:
Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin);
}
}
- } else if (node.IsCallable(UNSUPPORTED_CALLABLE)) {
- AddInfo(ctx, TStringBuilder() << "unsupported callable '" << node.Content() << "'");
- good = false;
- } else if (node.IsCallable(TCoCollect::CallableName())) {
- if (ETypeAnnotationKind::List != node.Head().GetTypeAnn()->GetKind()) {
- AddInfo(ctx, TStringBuilder() << "unsupported callable '" << node.Content() << "' over stream/flow");
- good = false;
- }
} else if (auto datasource = TMaybeNode<TCoDataSource>(&node).Category()) {
if (!VALID_SOURCES.contains(datasource.Cast().Value())) {
AddInfo(ctx, TStringBuilder() << "source '" << datasource.Cast().Value() << "' is not supported by DQ");
@@ -236,13 +226,7 @@ private:
}
}
else if (TCoScriptUdf::Match(&node)) {
- if (!State_->TypeCtx->UdfSupportsYield) {
- if (IsCallableTypeHasStreams(node.GetTypeAnn()->Cast<TCallableExprType>())) {
- AddInfo(ctx, TStringBuilder() << "script udf with streams");
- good = false;
- }
- }
- if (NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(node.Head().Content()))) {
+ if (good && TCoScriptUdf::Match(&node) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(node.Head().Content()))) {
AddInfo(ctx, TStringBuilder() << "system python udf");
good = false;
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
new file mode 100644
index 00000000000..3ada75be3d5
--- /dev/null
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
@@ -0,0 +1,127 @@
+#include "yql_dq_validate.h"
+
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
+#include <ydb/library/yql/core/yql_expr_optimize.h>
+#include <ydb/library/yql/core/yql_expr_type_annotation.h>
+#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/library/yql/core/issue/yql_issue.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/utils/log/log.h>
+
+#include <util/string/builder.h>
+
+namespace NYql {
+
+using namespace NYql::NNodes;
+
+namespace {
+
+bool ValidateDqNode(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited);
+
+bool ValidateDqStage(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited) {
+ if (!visited.insert(&node).second) {
+ return true;
+ }
+
+ bool hasErrors = false;
+ if (auto bad = FindNonYieldTransparentNode(TDqStageBase(&node).Program().Ptr(), typeCtx)) {
+ hasErrors = true;
+ YQL_CLOG(WARN, ProviderDq) << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage";
+ ctx.AddError(YqlIssue(ctx.GetPosition(bad->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage"));
+ }
+ VisitExpr(TDqStageBase(&node).Program().Body().Ptr(),
+ [](const TExprNode::TPtr& n) {
+ return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get());
+ },
+ [&hasErrors, &ctx](const TExprNode::TPtr& n) {
+ if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) {
+ YQL_CLOG(WARN, ProviderDq) << "Cannot execute system python udf " << n->Content() << " in DQ";
+ ctx.AddError(YqlIssue(ctx.GetPosition(n->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ"));
+ hasErrors = true;
+ }
+ return !hasErrors;
+ }
+ );
+
+ for (auto n: TDqStageBase(&node).Inputs()) {
+ hasErrors = !ValidateDqNode(n.Ref(), typeCtx, ctx, visited) || hasErrors;
+ }
+ if (auto outs = TDqStageBase(&node).Outputs()) {
+ for (auto n: outs.Cast()) {
+ hasErrors = !ValidateDqNode(n.Ref(), typeCtx, ctx, visited) || hasErrors;
+ }
+ }
+
+ return !hasErrors;
+
+}
+
+bool ValidateDqNode(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited) {
+ if (node.GetState() == TExprNode::EState::ExecutionComplete) {
+ return true;
+ }
+
+ if (TDqStageBase::Match(&node)) {
+ // visited will be updated inside ValidateDqStage
+ return ValidateDqStage(node, typeCtx, ctx, visited);
+ }
+
+ if (!visited.insert(&node).second) {
+ return true;
+ }
+
+ if (TDqCnResult::Match(&node)) {
+ YQL_CLOG(WARN, ProviderDq) << TDqCnResult::CallableName() << " connection cannot be used inside graph";
+ ctx.AddError(YqlIssue(ctx.GetPosition(node.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << TDqCnResult::CallableName() << " connection cannot be used inside graph"));
+ return false;
+ }
+ if (TDqConnection::Match(&node)) {
+ return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref(), typeCtx, ctx, visited);
+ }
+ if (TDqPhyPrecompute::Match(&node)) {
+ return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref(), typeCtx, ctx, visited);
+ }
+
+ if (TDqSource::Match(&node) || TDqTransform::Match(&node) || TDqSink::Match(&node)) {
+ return true;
+ }
+
+ YQL_CLOG(WARN, ProviderDq) << "Failed to execute callable with name: " << node.Content() << " in DQ";
+ ctx.AddError(YqlIssue(ctx.GetPosition(node.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Failed to execute callable with name: " << node.Content() << " in DQ"));
+ return false;
+}
+
+}
+
+bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx) {
+ YQL_LOG_CTX_SCOPE(__FUNCTION__);
+
+ TNodeSet dqNodes;
+ if (TDqCnResult::Match(&node)) {
+ dqNodes.insert(TDqCnResult(&node).Output().Stage().Raw());
+ } else if (TDqQuery::Match(&node)) {
+ for (auto st: TDqQuery(&node).SinkStages()) {
+ dqNodes.insert(st.Raw());
+ }
+ } else {
+ VisitExpr(node, [&dqNodes](const TExprNode& n) {
+ if (TDqStageBase::Match(&n)) {
+ dqNodes.insert(&n);
+ return false;
+ } else if (TDqConnection::Match(&n)) {
+ dqNodes.insert(&n);
+ return false;
+ }
+ return true;
+ });
+ }
+ TNodeSet visited;
+ bool hasError = false;
+ for (const auto n: dqNodes) {
+ hasError = !ValidateDqNode(*n, typeCtx, ctx, visited) || hasError;
+ }
+ return !hasError;
+}
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.h b/ydb/library/yql/providers/dq/provider/yql_dq_validate.h
new file mode 100644
index 00000000000..087cafd13e0
--- /dev/null
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <ydb/library/yql/ast/yql_expr.h>
+#include <ydb/library/yql/core/yql_type_annotation.h>
+
+namespace NYql {
+
+bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx);
+
+} // namespace NYql