aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormrlolthe1st <mrlolthe1st@yandex-team.com>2023-05-19 13:14:00 +0300
committermrlolthe1st <mrlolthe1st@yandex-team.com>2023-05-19 13:14:00 +0300
commit240660ff0b3a62970a9f037077b8fc947b963354 (patch)
tree1db5eb5a934cb9fb5dace4b1c924323454e30729
parent1c61d3c39a24c9392a406352c70c70a8c2c8bd79 (diff)
downloadydb-240660ff0b3a62970a9f037077b8fc947b963354.tar.gz
YQL-18505: Estimate read size after optimizers
YQL-18505: Estimate read size after optimizers
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_opt_build.cpp4
-rw-r--r--ydb/library/yql/core/yql_graph_transformer.cpp98
-rw-r--r--ydb/library/yql/core/yql_graph_transformer.h92
-rw-r--r--ydb/library/yql/dq/integration/yql_dq_integration.h3
-rw-r--r--ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp6
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp61
-rw-r--r--ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h3
-rw-r--r--ydb/library/yql/providers/common/transform/yql_optimize.cpp6
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp46
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp36
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp226
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_validate.h4
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp6
-rw-r--r--ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp7
-rw-r--r--ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp6
15 files changed, 318 insertions, 286 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
index 2675c401c6f..8c0d6a0dc11 100644
--- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp
@@ -323,7 +323,9 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T
}
for (const auto& dataSource : types.DataSources) {
- if (auto* dqIntegration = dataSource->GetDqIntegration(); dqIntegration && dqIntegration->CanRead(TDqSettings::TDefault::DataSizePerJob, TDqSettings::TDefault::MaxTasksPerStage, *node.Ptr(), ctx)) {
+ if (auto* dqIntegration = dataSource->GetDqIntegration(); dqIntegration
+ && dqIntegration->CanRead(*node.Ptr(), ctx)
+ && dqIntegration->EstimateReadSize(TDqSettings::TDefault::DataSizePerJob, TDqSettings::TDefault::MaxTasksPerStage, *node.Ptr(), ctx)) {
txRes.Ops.insert(node.Raw());
for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) {
if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) {
diff --git a/ydb/library/yql/core/yql_graph_transformer.cpp b/ydb/library/yql/core/yql_graph_transformer.cpp
index 0ef4af98226..886fddbfd15 100644
--- a/ydb/library/yql/core/yql_graph_transformer.cpp
+++ b/ydb/library/yql/core/yql_graph_transformer.cpp
@@ -10,24 +10,24 @@ namespace {
class TCompositeGraphTransformer : public TGraphTransformerBase {
public:
TCompositeGraphTransformer(const TVector<TTransformStage>& stages, bool useIssueScopes, bool doCheckArguments)
- : Stages(stages)
- , UseIssueScopes(useIssueScopes)
- , DoCheckArguments(doCheckArguments)
+ : Stages_(stages)
+ , UseIssueScopes_(useIssueScopes)
+ , DoCheckArguments_(doCheckArguments)
{
- if (UseIssueScopes) {
- for (const auto& stage : Stages) {
+ if (UseIssueScopes_) {
+ for (const auto& stage : Stages_) {
YQL_ENSURE(!stage.Name.empty());
}
}
}
void Rewind() override {
- for (auto& stage : Stages) {
+ for (auto& stage : Stages_) {
stage.GetTransformer().Rewind();
}
- Index = 0;
- CheckArgumentsCount = 0;
+ Index_ = 0;
+ CheckArgumentsCount_ = 0;
}
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
@@ -40,41 +40,41 @@ public:
}
#endif
- if (Index >= Stages.size()) {
+ if (Index_ >= Stages_.size()) {
return TStatus::Ok;
}
auto status = WithScope(ctx, [&]() {
- return Stages[Index].GetTransformer().Transform(input, output, ctx);
+ return Stages_[Index_].GetTransformer().Transform(input, output, ctx);
});
#ifndef NDEBUG
- if (DoCheckArguments && output && output != input) {
+ if (DoCheckArguments_ && output && output != input) {
try {
CheckArguments(*output);
- ++CheckArgumentsCount;
+ ++CheckArgumentsCount_;
} catch (yexception& e) {
- e << "at CheckArguments() pass #" << CheckArgumentsCount
- << ", stage '" << Stages[Index].Name << "'";
+ e << "at CheckArguments() pass #" << CheckArgumentsCount_
+ << ", stage '" << Stages_[Index_].Name << "'";
throw;
}
}
#else
- Y_UNUSED(DoCheckArguments);
- Y_UNUSED(CheckArgumentsCount);
+ Y_UNUSED(DoCheckArguments_);
+ Y_UNUSED(CheckArgumentsCount_);
#endif
status = HandleStatus(status);
return status;
}
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) override {
- YQL_ENSURE(Index < Stages.size());
- return Stages[Index].GetTransformer().GetAsyncFuture(input);
+ YQL_ENSURE(Index_ < Stages_.size());
+ return Stages_[Index_].GetTransformer().GetAsyncFuture(input);
}
TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
- YQL_ENSURE(Index < Stages.size());
+ YQL_ENSURE(Index_ < Stages_.size());
auto status = WithScope(ctx, [&]() {
- return Stages[Index].GetTransformer().ApplyAsyncChanges(input, output, ctx);
+ return Stages_[Index_].GetTransformer().ApplyAsyncChanges(input, output, ctx);
});
status = HandleStatus(status);
@@ -82,18 +82,18 @@ public:
}
TStatistics GetStatistics() const final {
- if (Statistics.Stages.empty()) {
- Statistics.Stages.resize(Stages.size());
+ if (Statistics_.Stages.empty()) {
+ Statistics_.Stages.resize(Stages_.size());
}
- YQL_ENSURE(Stages.size() == Statistics.Stages.size());
- for (size_t i = 0; i < Stages.size(); ++i) {
- auto& stagePair = Statistics.Stages[i];
- stagePair.first = Stages[i].Name;
- stagePair.second = Stages[i].GetTransformer().GetStatistics();
+ YQL_ENSURE(Stages_.size() == Statistics_.Stages.size());
+ for (size_t i = 0; i < Stages_.size(); ++i) {
+ auto& stagePair = Statistics_.Stages[i];
+ stagePair.first = Stages_[i].Name;
+ stagePair.second = Stages_[i].GetTransformer().GetStatistics();
}
- return Statistics;
+ return Statistics_;
}
private:
@@ -104,11 +104,11 @@ private:
if (status.HasRestart) {
// ignore Async status in this case
- Index = 0;
+ Index_ = 0;
status = IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
} else if (status.Level == IGraphTransformer::TStatus::Ok) {
status = IGraphTransformer::TStatus::Repeat;
- ++Index;
+ ++Index_;
}
return status;
@@ -116,10 +116,10 @@ private:
template <typename TFunc>
TStatus WithScope(TExprContext& ctx, TFunc func) {
- if (UseIssueScopes) {
+ if (UseIssueScopes_) {
TIssueScopeGuard guard(ctx.IssueManager, [&]() {
- const auto scopeIssueCode = Stages[Index].IssueCode;
- const auto scopeIssueMessage = Stages[Index].IssueMessage;
+ const auto scopeIssueCode = Stages_[Index_].IssueCode;
+ const auto scopeIssueMessage = Stages_[Index_].IssueMessage;
auto issue = MakeIntrusive<TIssue>(TPosition(), scopeIssueMessage ? scopeIssueMessage : IssueCodeToString(scopeIssueCode));
issue->SetCode(scopeIssueCode, GetSeverity(scopeIssueCode));
@@ -133,11 +133,11 @@ private:
}
protected:
- TVector<TTransformStage> Stages;
- const bool UseIssueScopes;
- const bool DoCheckArguments;
- size_t Index = 0;
- ui64 CheckArgumentsCount = 0;
+ TVector<TTransformStage> Stages_;
+ const bool UseIssueScopes_;
+ const bool DoCheckArguments_;
+ size_t Index_ = 0;
+ ui64 CheckArgumentsCount_ = 0;
};
void AddTooManyTransformationsError(TPositionHandle pos, const TStringBuf& where, TExprContext& ctx) {
@@ -173,7 +173,7 @@ public:
private:
void Rewind() override {
- Condition.Clear();
+ Condition_.Clear();
TCompositeGraphTransformer::Rewind();
}
@@ -184,17 +184,17 @@ private:
if (status.HasRestart) {
// ignore Async status in this case
- Index = 0;
+ Index_ = 0;
status = IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
} else if (status.Level == IGraphTransformer::TStatus::Ok) {
status = IGraphTransformer::TStatus::Repeat;
- YQL_ENSURE(!Condition.Empty(), "Condition must be set");
- if (Index == 0 && *Condition) {
- Index = 1; // left
- } else if (Index == 0) {
- Index = 2; // right
+ YQL_ENSURE(!Condition_.Empty(), "Condition must be set");
+ if (Index_ == 0 && *Condition_) {
+ Index_ = 1; // left
+ } else if (Index_ == 0) {
+ Index_ = 2; // right
} else {
- Index = 3; // end
+ Index_ = 3; // end
}
}
@@ -205,8 +205,8 @@ private:
{
auto transformer = CreateFunctorTransformer([this, condition](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
output = input;
- if (Condition.Empty()) {
- Condition = condition(input, ctx);
+ if (Condition_.Empty()) {
+ Condition_ = condition(input, ctx);
}
return TStatus::Ok;
});
@@ -214,7 +214,7 @@ private:
return TTransformStage(transformer, "Condition", TIssuesIds::DEFAULT_ERROR);
}
- TMaybe<bool> Condition;
+ TMaybe<bool> Condition_;
};
} // namespace
diff --git a/ydb/library/yql/core/yql_graph_transformer.h b/ydb/library/yql/core/yql_graph_transformer.h
index 4512ff4e3f9..26e508fc369 100644
--- a/ydb/library/yql/core/yql_graph_transformer.h
+++ b/ydb/library/yql/core/yql_graph_transformer.h
@@ -110,71 +110,71 @@ private:
class TTransformScope {
public:
TTransformScope(TStatistics& statistics, const TExprContext* exprCtx)
- : Statistics(statistics)
- , ExprCtx(exprCtx)
- , TransformStart(TInstant::Now())
- , ExprNodesSize(exprCtx ? exprCtx->ExprNodes.size() : 0)
- , TypeNodesSize(exprCtx ? exprCtx->TypeNodes.size() : 0)
- , ConstraintNodesSize(exprCtx ? exprCtx->ConstraintNodes.size() : 0)
+ : Statistics_(statistics)
+ , ExprCtx_(exprCtx)
+ , TransformStart_(TInstant::Now())
+ , ExprNodesSize_(exprCtx ? exprCtx->ExprNodes.size() : 0)
+ , TypeNodesSize_(exprCtx ? exprCtx->TypeNodes.size() : 0)
+ , ConstraintNodesSize_(exprCtx ? exprCtx->ConstraintNodes.size() : 0)
{
}
~TTransformScope() {
- Statistics.TransformDuration += TInstant::Now() - TransformStart;
- if (ExprCtx) {
- Statistics.NewExprNodes += ExprCtx->ExprNodes.size() - ExprNodesSize;
- Statistics.NewTypeNodes += ExprCtx->TypeNodes.size() - TypeNodesSize;
- Statistics.NewConstraintNodes += ExprCtx->ConstraintNodes.size() - ConstraintNodesSize;
+ Statistics_.TransformDuration += TInstant::Now() - TransformStart_;
+ if (ExprCtx_) {
+ Statistics_.NewExprNodes += ExprCtx_->ExprNodes.size() - ExprNodesSize_;
+ Statistics_.NewTypeNodes += ExprCtx_->TypeNodes.size() - TypeNodesSize_;
+ Statistics_.NewConstraintNodes += ExprCtx_->ConstraintNodes.size() - ConstraintNodesSize_;
}
}
TStatus HandleStatus(const TStatus& status) {
if (status == TStatus::Repeat) {
- Statistics.Repeats++;
+ Statistics_.Repeats++;
}
if (status.HasRestart) {
- Statistics.Restarts++;
+ Statistics_.Restarts++;
}
return status;
}
private:
- TStatistics& Statistics;
- const TExprContext* ExprCtx;
- TInstant TransformStart;
- i64 ExprNodesSize;
- i64 TypeNodesSize;
- i64 ConstraintNodesSize;
+ TStatistics& Statistics_;
+ const TExprContext* ExprCtx_;
+ TInstant TransformStart_;
+ i64 ExprNodesSize_;
+ i64 TypeNodesSize_;
+ i64 ConstraintNodesSize_;
};
public:
TGraphTransformerBase()
- : Statistics(TStatistics::Zero())
- , AsyncStart() {}
+ : Statistics_(TStatistics::Zero())
+ , AsyncStart_() {}
TStatus Transform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- TTransformScope scope(Statistics, &ctx);
+ TTransformScope scope(Statistics_, &ctx);
return scope.HandleStatus(DoTransform(input, output, ctx));
}
NThreading::TFuture<void> GetAsyncFuture(const TExprNode& input) final {
- TTransformScope scope(Statistics, nullptr);
- AsyncStart = TInstant::Now();
+ TTransformScope scope(Statistics_, nullptr);
+ AsyncStart_ = TInstant::Now();
return DoGetAsyncFuture(input);
}
TStatus ApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- TTransformScope scope(Statistics, &ctx);
- Statistics.WaitDuration += TInstant::Now() - AsyncStart;
+ TTransformScope scope(Statistics_, &ctx);
+ Statistics_.WaitDuration += TInstant::Now() - AsyncStart_;
return scope.HandleStatus(DoApplyAsyncChanges(input, output, ctx));
}
- virtual TStatistics GetStatistics() const override { return Statistics; }
+ virtual TStatistics GetStatistics() const override { return Statistics_; }
public:
virtual TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0;
@@ -182,10 +182,10 @@ public:
virtual TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0;
protected:
- mutable TStatistics Statistics;
+ mutable TStatistics Statistics_;
private:
- TInstant AsyncStart;
+ TInstant AsyncStart_;
};
struct TTransformStage {
@@ -197,24 +197,24 @@ struct TTransformStage {
: Name(name)
, IssueCode(issueCode)
, IssueMessage(issueMessage)
- , RawTransformer(transformer.Get())
- , Transformer(transformer)
+ , RawTransformer_(transformer.Get())
+ , Transformer_(transformer)
{}
TTransformStage(IGraphTransformer& transformer, const TString& name, EYqlIssueCode issueCode, const TString& issueMessage = {})
: Name(name)
, IssueCode(issueCode)
, IssueMessage(issueMessage)
- , RawTransformer(&transformer)
+ , RawTransformer_(&transformer)
{}
IGraphTransformer& GetTransformer() const
{
- return *RawTransformer;
+ return *RawTransformer_;
}
private:
- IGraphTransformer* const RawTransformer;
- const TAutoPtr<IGraphTransformer> Transformer;
+ IGraphTransformer* const RawTransformer_;
+ const TAutoPtr<IGraphTransformer> Transformer_;
};
TAutoPtr<IGraphTransformer> CreateCompositeGraphTransformer(const TVector<TTransformStage>& stages, bool useIssueScopes);
@@ -264,10 +264,10 @@ template <typename TFunctor>
class TFunctorTransformer final: public TSyncTransformerBase {
public:
TFunctorTransformer(TFunctor functor)
- : Functor(std::move(functor)) {}
+ : Functor_(std::move(functor)) {}
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- TStatus status = Functor(input, output, ctx);
+ TStatus status = Functor_(input, output, ctx);
YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Async);
return status;
@@ -277,7 +277,7 @@ public:
}
private:
- TFunctor Functor;
+ TFunctor Functor_;
};
template <typename TFunctor>
@@ -295,34 +295,34 @@ public:
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
auto pair = static_cast<TDerived*>(this)->CallbackTransform(input, output, ctx);
if (pair.first == TStatus::Async) {
- YQL_ENSURE(Callbacks.emplace(input.Get(), pair.second).second);
+ YQL_ENSURE(Callbacks_.emplace(input.Get(), pair.second).second);
}
return pair.first;
}
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
- const auto it = Callbacks.find(&input);
- YQL_ENSURE(it != Callbacks.cend());
+ const auto it = Callbacks_.find(&input);
+ YQL_ENSURE(it != Callbacks_.cend());
return it->second.IgnoreResult();
}
TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
- const auto it = Callbacks.find(input.Get());
- YQL_ENSURE(it != Callbacks.cend());
+ const auto it = Callbacks_.find(input.Get());
+ YQL_ENSURE(it != Callbacks_.cend());
auto& future = it->second;
YQL_ENSURE(future.HasValue());
const auto status = future.GetValue()(input, output, ctx);
- Callbacks.erase(it);
+ Callbacks_.erase(it);
return status;
}
void Rewind() override {
- Callbacks.clear();
+ Callbacks_.clear();
}
private:
- TNodeMap<TAsyncTransformCallbackFuture> Callbacks;
+ TNodeMap<TAsyncTransformCallbackFuture> Callbacks_;
};
template <bool AlwaysRaiseIssues = true, typename TFuture, typename TCallback>
diff --git a/ydb/library/yql/dq/integration/yql_dq_integration.h b/ydb/library/yql/dq/integration/yql_dq_integration.h
index 447af086e68..04f31368b7b 100644
--- a/ydb/library/yql/dq/integration/yql_dq_integration.h
+++ b/ydb/library/yql/dq/integration/yql_dq_integration.h
@@ -30,7 +30,8 @@ public:
virtual ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node,
TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) = 0;
virtual bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues = false) { Y_UNUSED(skipIssues); Y_UNUSED(node); Y_UNUSED(ctx); return true; }
- virtual TMaybe<ui64> CanRead(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0;
+ virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0;
+ virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) = 0;
virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0;
virtual TMaybe<bool> CanWrite(const TDqSettings& config, const TExprNode& write, TExprContext& ctx) = 0;
virtual void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) = 0;
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp
index 7def8d8c9cd..8a44248f694 100644
--- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp
+++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp
@@ -22,7 +22,11 @@ public:
: State_(state)
{}
- TMaybe<ui64> CanRead(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&, bool) override {
+ bool CanRead(const TExprNode& read, TExprContext&, bool) override {
+ return TClReadTable::Match(&read);
+ }
+
+ TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&) override {
if (TClReadTable::Match(&read)) {
return 0ul; // TODO: return real size
}
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
index 09f8e1951df..4f9da234bda 100644
--- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
+++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp
@@ -2,80 +2,51 @@
namespace NYql {
-ui64 TDqIntegrationBase::Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node,
- TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) {
- Y_UNUSED(config);
- Y_UNUSED(maxPartitions);
- Y_UNUSED(node);
- Y_UNUSED(partitions);
- Y_UNUSED(clusterName);
- Y_UNUSED(ctx);
- Y_UNUSED(canFallback);
+ui64 TDqIntegrationBase::Partition(const TDqSettings&, size_t, const TExprNode&,
+ TVector<TString>&, TString*, TExprContext&, bool) {
return 0;
}
-TMaybe<ui64> TDqIntegrationBase::CanRead(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext& ctx, bool skipIssues) {
- Y_UNUSED(read);
- Y_UNUSED(ctx);
- Y_UNUSED(skipIssues);
- return Nothing();
+bool TDqIntegrationBase::CanRead(const TExprNode&, TExprContext&, bool) {
+ return false;
}
-TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) {
- Y_UNUSED(read);
- Y_UNUSED(ctx);
+TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TExprNode &, TExprContext&) {
+ return Nothing();
+}
+TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext&) {
return read;
}
-TMaybe<bool> TDqIntegrationBase::CanWrite(const TDqSettings&, const TExprNode& write, TExprContext& ctx) {
- Y_UNUSED(write);
- Y_UNUSED(ctx);
+TMaybe<bool> TDqIntegrationBase::CanWrite(const TDqSettings&, const TExprNode&, TExprContext&) {
return Nothing();
}
-void TDqIntegrationBase::RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) {
- Y_UNUSED(compiler);
+void TDqIntegrationBase::RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase&) {
}
bool TDqIntegrationBase::CanFallback() {
return false;
}
-void TDqIntegrationBase::FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) {
- Y_UNUSED(node);
- Y_UNUSED(settings);
- Y_UNUSED(sourceType);
+void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&) {
}
-void TDqIntegrationBase::FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) {
- Y_UNUSED(node);
- Y_UNUSED(settings);
- Y_UNUSED(sinkType);
+void TDqIntegrationBase::FillSinkSettings(const TExprNode&, ::google::protobuf::Any&, TString&) {
}
-void TDqIntegrationBase::FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) {
- Y_UNUSED(node);
- Y_UNUSED(settings);
+void TDqIntegrationBase::FillTransformSettings(const TExprNode&, ::google::protobuf::Any&) {
}
-void TDqIntegrationBase::Annotate(const TExprNode& node, THashMap<TString, TString>& params) {
- Y_UNUSED(node);
- Y_UNUSED(params);
+void TDqIntegrationBase::Annotate(const TExprNode&, THashMap<TString, TString>&) {
}
-bool TDqIntegrationBase::PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) {
- Y_UNUSED(root);
- Y_UNUSED(ctx);
- Y_UNUSED(params);
- Y_UNUSED(secureParams);
+bool TDqIntegrationBase::PrepareFullResultTableParams(const TExprNode&, TExprContext&, THashMap<TString, TString>&, THashMap<TString, TString>&) {
return false;
}
-void TDqIntegrationBase::WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) {
- Y_UNUSED(writer);
- Y_UNUSED(columns);
- Y_UNUSED(graphParams);
+void TDqIntegrationBase::WriteFullResultTableRef(NYson::TYsonWriter&, const TVector<TString>&, const THashMap<TString, TString>&) {
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
index 8e56a9db395..de26d816714 100644
--- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
+++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h
@@ -8,7 +8,8 @@ class TDqIntegrationBase: public IDqIntegration {
public:
ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node,
TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) override;
- TMaybe<ui64> CanRead(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& read, TExprContext& ctx, bool skipIssues) override;
+ bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues) override;
+ TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) override;
TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override;
void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override;
TMaybe<bool> CanWrite(const TDqSettings& config, const TExprNode& write, TExprContext& ctx) override;
diff --git a/ydb/library/yql/providers/common/transform/yql_optimize.cpp b/ydb/library/yql/providers/common/transform/yql_optimize.cpp
index ef29baa8ffe..8e69e71bccb 100644
--- a/ydb/library/yql/providers/common/transform/yql_optimize.cpp
+++ b/ydb/library/yql/providers/common/transform/yql_optimize.cpp
@@ -14,18 +14,18 @@ using namespace NNodes;
class TOptimizeTransformerBase::TIgnoreOptimizationContext: public IOptimizationContext {
public:
TIgnoreOptimizationContext(TOptimizeTransformerBase::TGetParents getParents)
- : GetParents(std::move(getParents))
+ : GetParents_(std::move(getParents))
{
}
virtual ~TIgnoreOptimizationContext() = default;
void RemapNode(const TExprNode& src, const TExprNode::TPtr&) final {
- const TParentsMap* parentsMap = GetParents();
+ const TParentsMap* parentsMap = GetParents_();
auto parentsIt = parentsMap->find(&src);
YQL_ENSURE(parentsIt != parentsMap->cend());
YQL_ENSURE(parentsIt->second.size() == 1, "Bad usage of local optimizer. Try to switch to global mode");
}
private:
- TOptimizeTransformerBase::TGetParents GetParents;
+ TOptimizeTransformerBase::TGetParents GetParents_;
};
class TOptimizeTransformerBase::TRemapOptimizationContext: public IOptimizationContext {
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
index e353f088549..21c0eb85d8a 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp
@@ -41,13 +41,13 @@ namespace {
class TDqDataProviderSource: public TDataProviderBase {
public:
TDqDataProviderSource(const TDqState::TPtr& state, TExecTransformerFactory execTransformerFactory)
- : State(state)
- , ConfigurationTransformer([this]() {
- return MakeHolder<NCommon::TProviderConfigurationTransformer>(State->Settings, *State->TypeCtx, TString{DqProviderName});
+ : State_(state)
+ , ConfigurationTransformer_([this]() {
+ return MakeHolder<NCommon::TProviderConfigurationTransformer>(State_->Settings, *State_->TypeCtx, TString{DqProviderName});
})
- , ExecTransformer([this, execTransformerFactory] () { return THolder<IGraphTransformer>(execTransformerFactory(State)); })
- , TypeAnnotationTransformer([] () { return CreateDqsDataSourceTypeAnnotationTransformer(); })
- , ConstraintsTransformer([] () { return CreateDqDataSourceConstraintTransformer(); })
+ , ExecTransformer_([this, execTransformerFactory] () { return THolder<IGraphTransformer>(execTransformerFactory(State_)); })
+ , TypeAnnotationTransformer_([] () { return CreateDqsDataSourceTypeAnnotationTransformer(); })
+ , ConstraintsTransformer_([] () { return CreateDqDataSourceConstraintTransformer(); })
{ }
TStringBuf GetName() const override {
@@ -56,16 +56,16 @@ public:
IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) override {
Y_UNUSED(instantOnly);
- return *TypeAnnotationTransformer;
+ return *TypeAnnotationTransformer_;
}
IGraphTransformer& GetConstraintTransformer(bool instantOnly, bool subGraph) override {
Y_UNUSED(instantOnly && subGraph);
- return *ConstraintsTransformer;
+ return *ConstraintsTransformer_;
}
IGraphTransformer& GetConfigurationTransformer() override {
- return *ConfigurationTransformer;
+ return *ConfigurationTransformer_;
}
bool CanBuildResult(const TExprNode& node, TSyncMap& syncList) override {
@@ -174,9 +174,9 @@ public:
return false;
}
- canRef = State->Settings->EnableFullResultWrite.Get().GetOrElse(false);
+ canRef = State_->Settings->EnableFullResultWrite.Get().GetOrElse(false);
if (canRef) {
- if (auto fullResultTableProvider = State->TypeCtx->DataSinkMap.Value(State->TypeCtx->FullResultDataSink, nullptr)) {
+ if (auto fullResultTableProvider = State_->TypeCtx->DataSinkMap.Value(State_->TypeCtx->FullResultDataSink, nullptr)) {
canRef = !!fullResultTableProvider->GetDqIntegration();
} else {
canRef = false;
@@ -225,31 +225,31 @@ public:
}
bool ValidateExecution(const TExprNode& node, TExprContext& ctx) override {
- return ValidateDqExecution(node, *State->TypeCtx, ctx);
+ return ValidateDqExecution(node, *State_->TypeCtx, ctx, State_);
}
bool CanParse(const TExprNode& node) override {
- return TypeAnnotationTransformer->CanParse(node);
+ return TypeAnnotationTransformer_->CanParse(node);
}
IGraphTransformer& GetCallableExecutionTransformer() override {
- return *ExecTransformer;
+ return *ExecTransformer_;
}
void Reset() final {
- if (ExecTransformer) {
- ExecTransformer->Rewind();
- TypeAnnotationTransformer->Rewind();
- ConstraintsTransformer->Rewind();
+ if (ExecTransformer_) {
+ ExecTransformer_->Rewind();
+ TypeAnnotationTransformer_->Rewind();
+ ConstraintsTransformer_->Rewind();
}
}
private:
- const TDqState::TPtr State;
- TLazyInitHolder<IGraphTransformer> ConfigurationTransformer;
- TLazyInitHolder<IGraphTransformer> ExecTransformer;
- TLazyInitHolder<TVisitorTransformerBase> TypeAnnotationTransformer;
- TLazyInitHolder<IGraphTransformer> ConstraintsTransformer;
+ const TDqState::TPtr State_;
+ TLazyInitHolder<IGraphTransformer> ConfigurationTransformer_;
+ TLazyInitHolder<IGraphTransformer> ExecTransformer_;
+ TLazyInitHolder<TVisitorTransformerBase> TypeAnnotationTransformer_;
+ TLazyInitHolder<IGraphTransformer> ConstraintsTransformer_;
};
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
index e60af247c41..e568efb4167 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp
@@ -78,17 +78,9 @@ public:
Statistics_["DqAnalyzerOn"]++;
- ui64 dataSize = 0;
bool good = true;
- bool hasJoin = false;
TNodeSet visited;
- Scan(*input, ctx, good, dataSize, visited, hasJoin);
-
- if (good && hasJoin && dataSize > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) {
- Statistics_["DqAnalyzerBigJoin"]++;
- AddInfo(ctx, TStringBuilder() << "too big join input: " << dataSize);
- good = false;
- }
+ Scan(*input, ctx, good, visited);
if (good) {
Statistics_["DqAnalyzerOk"]++;
@@ -99,7 +91,6 @@ public:
if (!good) {
YQL_CLOG(DEBUG, ProviderDq) << "abort hidden";
State_->AbortHidden();
- YQL_CLOG(DEBUG, ProviderDq) << "good: " << good << " hasJoin: " << hasJoin << " dataSize: " << dataSize;
return TStatus::Ok;
}
}
@@ -128,21 +119,17 @@ private:
ctx.IssueManager.RaiseIssue(info);
}
- void Scan(const TExprNode& node, TExprContext& ctx, bool& good, ui64& dataSize, TNodeSet& visited, bool& hasJoin) const {
+ void Scan(const TExprNode& node, TExprContext& ctx, bool& good, TNodeSet& visited) const {
if (!visited.insert(&node).second) {
return;
}
TExprBase expr(&node);
- if (TMaybeNode<TCoEquiJoin>(&node)) {
- hasJoin = true;
- }
-
if (TCoCommit::Match(&node)) {
for (size_t i = 0; i != node.ChildrenSize() && good; ++i) {
if (i != TCoCommit::idx_DataSink) {
- Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin);
+ Scan(*node.Child(i), ctx, good, visited);
}
}
} else if (auto datasource = TMaybeNode<TCoDataSource>(&node).Category()) {
@@ -165,11 +152,10 @@ private:
YQL_ENSURE(datasource);
auto dqIntegration = (*datasource)->GetDqIntegration();
if (dqIntegration) {
- TMaybe<ui64> size;
- bool pragmas = true;
- if ((pragmas = dqIntegration->CheckPragmas(node, ctx, false)) && (size = dqIntegration->CanRead(State_->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob), State_->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), node, ctx, /*skipIssues = */ false))) {
- dataSize += *size;
- } else {
+ bool pragmas = dqIntegration->CheckPragmas(node, ctx, false);
+ bool canRead = pragmas && dqIntegration->CanRead(node, ctx, /*skipIssues = */ false);
+
+ if (!pragmas || !canRead) {
good = false;
if (!pragmas) {
State_->TypeCtx->PureResultDataSource.clear();
@@ -184,7 +170,7 @@ private:
}
if (good) {
- Scan(node.Head(), ctx,good, dataSize, visited, hasJoin);
+ Scan(node.Head(), ctx,good, visited);
}
} else if (node.GetTypeAnn()->GetKind() == ETypeAnnotationKind::World
&& !TCoCommit::Match(&node)
@@ -205,7 +191,7 @@ private:
}
if (good) {
for (size_t i = 0; i != node.ChildrenSize() && good; ++i) {
- Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin);
+ Scan(*node.Child(i), ctx, good, visited);
}
}
}
@@ -216,13 +202,13 @@ private:
}
if (good) {
for (size_t i = 0; i != node.ChildrenSize() && good; ++i) {
- Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin);
+ Scan(*node.Child(i), ctx, good, visited);
}
}
}
else {
for (size_t i = 0; i != node.ChildrenSize() && good; ++i) {
- Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin);
+ Scan(*node.Child(i), ctx, good, visited);
}
}
}
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
index 376a527e4bf..49f8a5fe79e 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
@@ -1,6 +1,8 @@
#include "yql_dq_validate.h"
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
+#include <ydb/library/yql/dq/integration/yql_dq_integration.h>
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
@@ -15,116 +17,166 @@
namespace NYql {
using namespace NYql::NNodes;
-
namespace {
-
-bool ValidateDqNode(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited);
-
-bool ValidateDqStage(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited) {
- if (!visited.insert(&node).second) {
- return true;
+class TDqExecutionValidator {
+private:
+ static void ReportError(TExprContext& ctx, const TExprNode& where, const TString& err) {
+ YQL_CLOG(WARN, ProviderDq) << "Falling back from DQ: " << err;
+ ctx.AddError(YqlIssue(ctx.GetPosition(where.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, err));
}
- bool hasErrors = false;
- if (auto bad = FindNonYieldTransparentNode(TDqStageBase(&node).Program().Ptr(), typeCtx)) {
- hasErrors = true;
- YQL_CLOG(WARN, ProviderDq) << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage";
- ctx.AddError(YqlIssue(ctx.GetPosition(bad->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage"));
- }
- VisitExpr(TDqStageBase(&node).Program().Body().Ptr(),
- [](const TExprNode::TPtr& n) {
- return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()) && !TDqReadWrapBase::Match(n.Get());
- },
- [&hasErrors, &ctx](const TExprNode::TPtr& n) {
- if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) {
- YQL_CLOG(WARN, ProviderDq) << "Cannot execute system python udf " << n->Content() << " in DQ";
- ctx.AddError(YqlIssue(ctx.GetPosition(n->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ"));
- hasErrors = true;
- }
- return !hasErrors;
+ bool ValidateDqStage(const TExprNode& node) {
+ if (!Visited_.insert(&node).second) {
+ return true;
}
- );
- for (auto n: TDqStageBase(&node).Inputs()) {
- hasErrors = !ValidateDqNode(n.Ref(), typeCtx, ctx, visited) || hasErrors;
- }
- if (auto outs = TDqStageBase(&node).Outputs()) {
- for (auto n: outs.Cast()) {
- hasErrors = !ValidateDqNode(n.Ref(), typeCtx, ctx, visited) || hasErrors;
+ bool hasErrors = false;
+ if (auto bad = FindNonYieldTransparentNode(TDqStageBase(&node).Program().Ptr(), TypeCtx_)) {
+ hasErrors = true;
+ ReportError(Ctx_, *bad, TStringBuilder() << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage");
}
- }
- return !hasErrors;
+ VisitExpr(TDqStageBase(&node).Program().Body().Ptr(),
+ [](const TExprNode::TPtr& n) {
+ return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()) && !TDqReadWrapBase::Match(n.Get());
+ },
+ [&hasErrors, &ctx = Ctx_, &dataSize = DataSize_, &typeCtx = TypeCtx_, &state = State_](const TExprNode::TPtr& n) {
+ if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) {
+ ReportError(ctx, *n, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ");
+ hasErrors = true;
+ }
+
+ if (!typeCtx.ForceDq && TDqReadWrapBase::Match(n.Get())) {
+ auto readNode = n->Child(0);
+ auto dataSourceName = readNode->Child(1)->Child(0)->Content();
+ if (dataSourceName != DqProviderName) {
+ auto datasource = typeCtx.DataSourceMap.FindPtr(dataSourceName);
+ YQL_ENSURE(datasource);
+ auto dqIntegration = (*datasource)->GetDqIntegration();
+ YQL_ENSURE(dqIntegration);
+ if (dqIntegration) {
+ TMaybe<ui64> size;
+ hasErrors |= !(size = dqIntegration->EstimateReadSize(state->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob), state->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), *readNode, ctx));
+ if (size) {
+ dataSize += *size;
+ }
+ }
+ }
+ }
+ return !hasErrors;
+ }
+ );
-}
+ for (auto n: TDqStageBase(&node).Inputs()) {
+ hasErrors |= !ValidateDqNode(n.Ref());
+ }
-bool ValidateDqNode(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited) {
- if (node.GetState() == TExprNode::EState::ExecutionComplete) {
- return true;
- }
+ if (auto outs = TDqStageBase(&node).Outputs()) {
+ for (auto n: outs.Cast()) {
+ hasErrors |= !ValidateDqNode(n.Ref());
+ }
+ }
- if (TDqStageBase::Match(&node)) {
- // visited will be updated inside ValidateDqStage
- return ValidateDqStage(node, typeCtx, ctx, visited);
- }
+ return !hasErrors;
- if (!visited.insert(&node).second) {
- return true;
}
- if (TDqCnResult::Match(&node)) {
- YQL_CLOG(WARN, ProviderDq) << TDqCnResult::CallableName() << " connection cannot be used inside graph";
- ctx.AddError(YqlIssue(ctx.GetPosition(node.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << TDqCnResult::CallableName() << " connection cannot be used inside graph"));
- return false;
- }
- if (TDqConnection::Match(&node)) {
- return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref(), typeCtx, ctx, visited);
- }
- if (TDqPhyPrecompute::Match(&node)) {
- return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref(), typeCtx, ctx, visited);
- }
+ bool ValidateDqNode(const TExprNode& node) {
+ if (node.GetState() == TExprNode::EState::ExecutionComplete) {
+ return true;
+ }
- if (TDqSource::Match(&node) || TDqTransform::Match(&node) || TDqSink::Match(&node)) {
- return true;
- }
+ if (TDqStageBase::Match(&node)) {
+ // visited will be updated inside ValidateDqStage
+ return ValidateDqStage(node);
+ }
- YQL_CLOG(WARN, ProviderDq) << "Failed to execute callable with name: " << node.Content() << " in DQ";
- ctx.AddError(YqlIssue(ctx.GetPosition(node.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Failed to execute callable with name: " << node.Content() << " in DQ"));
- return false;
-}
+ if (!Visited_.insert(&node).second) {
+ return true;
+ }
-}
+ if (TDqCnResult::Match(&node)) {
+ ReportError(Ctx_, node, TStringBuilder() << TDqCnResult::CallableName() << " connection cannot be used inside graph");
+ return false;
+ }
-bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx) {
- YQL_LOG_CTX_SCOPE(__FUNCTION__);
+ if (TDqConnection::Match(&node)) {
+ return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref());
+ }
+ if (TDqPhyPrecompute::Match(&node)) {
+ return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref());
+ }
+
+ if (TDqSource::Match(&node) || TDqTransform::Match(&node) || TDqSink::Match(&node)) {
+ return true;
+ }
+
+ ReportError(Ctx_, node, TStringBuilder() << "Failed to execute callable with name: " << node.Content() << " in DQ");
+ return false;
+ }
- TNodeSet dqNodes;
- if (TDqCnResult::Match(&node)) {
- dqNodes.insert(TDqCnResult(&node).Output().Stage().Raw());
- } else if (TDqQuery::Match(&node)) {
- for (auto st: TDqQuery(&node).SinkStages()) {
- dqNodes.insert(st.Raw());
+public:
+ TDqExecutionValidator(const TTypeAnnotationContext& typeCtx, TExprContext& ctx, const TDqState::TPtr state) : TypeCtx_(typeCtx), Ctx_(ctx), State_(state) {}
+
+ bool ValidateDqExecution(const TExprNode& node) {
+ YQL_LOG_CTX_SCOPE(__FUNCTION__);
+
+ TNodeSet dqNodes;
+
+ bool hasJoin = false;
+ if (TDqCnResult::Match(&node)) {
+ dqNodes.insert(TDqCnResult(&node).Output().Stage().Raw());
+ } else if (TDqQuery::Match(&node)) {
+ for (auto st: TDqQuery(&node).SinkStages()) {
+ dqNodes.insert(st.Raw());
+ }
+ } else {
+ VisitExpr(node, [&dqNodes](const TExprNode& n) {
+ if (TDqStageBase::Match(&n)) {
+ dqNodes.insert(&n);
+ return false;
+ } else if (TDqConnection::Match(&n)) {
+ dqNodes.insert(&n);
+ return false;
+ } else if (TDqReadWrapBase::Match(&n)) {
+ return false;
+ }
+ return true;
+ });
}
- } else {
- VisitExpr(node, [&dqNodes](const TExprNode& n) {
- if (TDqStageBase::Match(&n)) {
- dqNodes.insert(&n);
- return false;
- } else if (TDqConnection::Match(&n)) {
- dqNodes.insert(&n);
- return false;
- } else if (TDqReadWrapBase::Match(&n)) {
- return false;
+
+ VisitExpr(node, [&hasJoin](const TExprNode& n) {
+ if (TMaybeNode<TDqPhyMapJoin>(&n)) {
+ hasJoin = true;
}
return true;
});
+
+ bool hasError = false;
+
+ for (const auto n: dqNodes) {
+ hasError |= !ValidateDqNode(*n);
+ }
+
+ if (!hasError && hasJoin && DataSize_ > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) {
+ ReportError(Ctx_, node, TStringBuilder() << "too big join input: " << DataSize_);
+ return false;
+ }
+ return !hasError;
}
- TNodeSet visited;
- bool hasError = false;
- for (const auto n: dqNodes) {
- hasError = !ValidateDqNode(*n, typeCtx, ctx, visited) || hasError;
- }
- return !hasError;
+private:
+
+ const TTypeAnnotationContext& TypeCtx_;
+ TExprContext& Ctx_;
+ TNodeSet Visited_;
+ size_t DataSize_ = 0;
+ const TDqState::TPtr State_;
+
+};
+}
+
+bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, const TDqState::TPtr state) {
+ return TDqExecutionValidator(typeCtx, ctx, state).ValidateDqExecution(node);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.h b/ydb/library/yql/providers/dq/provider/yql_dq_validate.h
index 087cafd13e0..8afc615b350 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_validate.h
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.h
@@ -1,10 +1,12 @@
#pragma once
+#include "yql_dq_state.h"
+
#include <ydb/library/yql/ast/yql_expr.h>
#include <ydb/library/yql/core/yql_type_annotation.h>
namespace NYql {
-bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx);
+bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, const TDqState::TPtr state);
} // namespace NYql
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index e438503cdf2..5cf8f9d5b37 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -119,7 +119,11 @@ public:
return 0;
}
- TMaybe<ui64> CanRead(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&, bool) override {
+ bool CanRead(const TExprNode& read, TExprContext&, bool) override {
+ return TS3ReadObject::Match(&read);
+ }
+
+ TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&) override {
if (TS3ReadObject::Match(&read)) {
return 0ul; // TODO: return real size
}
diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
index cdb4ffd2eec..33be2f8fdde 100644
--- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
+++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp
@@ -74,10 +74,15 @@ public:
{
}
- TMaybe<ui64> CanRead(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode&, TExprContext&, bool) override {
+ bool CanRead(const TExprNode&, TExprContext&, bool) override {
YQL_ENSURE(false, "Unimplemented");
}
+ TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode&, TExprContext&) override {
+ YQL_ENSURE(false, "Unimplemented");
+ }
+
+
TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr&, TExprContext&) override {
YQL_ENSURE(false, "Unimplemented");
}
diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp
index 804d75b0645..6e887fb99c2 100644
--- a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp
+++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp
@@ -69,7 +69,11 @@ public:
return 0;
}
- TMaybe<ui64> CanRead(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&, bool ) override {
+ bool CanRead(const TExprNode& read, TExprContext&, bool ) override {
+ return TYdbReadTable::Match(&read);
+ }
+
+ TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext& ) override {
if (TYdbReadTable::Match(&read)) {
return 0ul; // TODO: return real size
}