diff options
author | mrlolthe1st <mrlolthe1st@yandex-team.com> | 2023-05-19 13:14:00 +0300 |
---|---|---|
committer | mrlolthe1st <mrlolthe1st@yandex-team.com> | 2023-05-19 13:14:00 +0300 |
commit | 240660ff0b3a62970a9f037077b8fc947b963354 (patch) | |
tree | 1db5eb5a934cb9fb5dace4b1c924323454e30729 | |
parent | 1c61d3c39a24c9392a406352c70c70a8c2c8bd79 (diff) | |
download | ydb-240660ff0b3a62970a9f037077b8fc947b963354.tar.gz |
YQL-18505: Estimate read size after optimizers
YQL-18505: Estimate read size after optimizers
15 files changed, 318 insertions, 286 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index 2675c401c6f..8c0d6a0dc11 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -323,7 +323,9 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T } for (const auto& dataSource : types.DataSources) { - if (auto* dqIntegration = dataSource->GetDqIntegration(); dqIntegration && dqIntegration->CanRead(TDqSettings::TDefault::DataSizePerJob, TDqSettings::TDefault::MaxTasksPerStage, *node.Ptr(), ctx)) { + if (auto* dqIntegration = dataSource->GetDqIntegration(); dqIntegration + && dqIntegration->CanRead(*node.Ptr(), ctx) + && dqIntegration->EstimateReadSize(TDqSettings::TDefault::DataSizePerJob, TDqSettings::TDefault::MaxTasksPerStage, *node.Ptr(), ctx)) { txRes.Ops.insert(node.Raw()); for (size_t i = 0, childrenSize = node.Raw()->ChildrenSize(); i < childrenSize; ++i) { if (TExprNode::TPtr child = node.Raw()->ChildPtr(i)) { diff --git a/ydb/library/yql/core/yql_graph_transformer.cpp b/ydb/library/yql/core/yql_graph_transformer.cpp index 0ef4af98226..886fddbfd15 100644 --- a/ydb/library/yql/core/yql_graph_transformer.cpp +++ b/ydb/library/yql/core/yql_graph_transformer.cpp @@ -10,24 +10,24 @@ namespace { class TCompositeGraphTransformer : public TGraphTransformerBase { public: TCompositeGraphTransformer(const TVector<TTransformStage>& stages, bool useIssueScopes, bool doCheckArguments) - : Stages(stages) - , UseIssueScopes(useIssueScopes) - , DoCheckArguments(doCheckArguments) + : Stages_(stages) + , UseIssueScopes_(useIssueScopes) + , DoCheckArguments_(doCheckArguments) { - if (UseIssueScopes) { - for (const auto& stage : Stages) { + if (UseIssueScopes_) { + for (const auto& stage : Stages_) { YQL_ENSURE(!stage.Name.empty()); } } } void Rewind() override { - for (auto& stage : Stages) { + for (auto& stage : Stages_) { stage.GetTransformer().Rewind(); } - Index = 0; - CheckArgumentsCount = 0; + Index_ = 0; + CheckArgumentsCount_ = 0; } TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override { @@ -40,41 +40,41 @@ public: } #endif - if (Index >= Stages.size()) { + if (Index_ >= Stages_.size()) { return TStatus::Ok; } auto status = WithScope(ctx, [&]() { - return Stages[Index].GetTransformer().Transform(input, output, ctx); + return Stages_[Index_].GetTransformer().Transform(input, output, ctx); }); #ifndef NDEBUG - if (DoCheckArguments && output && output != input) { + if (DoCheckArguments_ && output && output != input) { try { CheckArguments(*output); - ++CheckArgumentsCount; + ++CheckArgumentsCount_; } catch (yexception& e) { - e << "at CheckArguments() pass #" << CheckArgumentsCount - << ", stage '" << Stages[Index].Name << "'"; + e << "at CheckArguments() pass #" << CheckArgumentsCount_ + << ", stage '" << Stages_[Index_].Name << "'"; throw; } } #else - Y_UNUSED(DoCheckArguments); - Y_UNUSED(CheckArgumentsCount); + Y_UNUSED(DoCheckArguments_); + Y_UNUSED(CheckArgumentsCount_); #endif status = HandleStatus(status); return status; } NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) override { - YQL_ENSURE(Index < Stages.size()); - return Stages[Index].GetTransformer().GetAsyncFuture(input); + YQL_ENSURE(Index_ < Stages_.size()); + return Stages_[Index_].GetTransformer().GetAsyncFuture(input); } TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override { - YQL_ENSURE(Index < Stages.size()); + YQL_ENSURE(Index_ < Stages_.size()); auto status = WithScope(ctx, [&]() { - return Stages[Index].GetTransformer().ApplyAsyncChanges(input, output, ctx); + return Stages_[Index_].GetTransformer().ApplyAsyncChanges(input, output, ctx); }); status = HandleStatus(status); @@ -82,18 +82,18 @@ public: } TStatistics GetStatistics() const final { - if (Statistics.Stages.empty()) { - Statistics.Stages.resize(Stages.size()); + if (Statistics_.Stages.empty()) { + Statistics_.Stages.resize(Stages_.size()); } - YQL_ENSURE(Stages.size() == Statistics.Stages.size()); - for (size_t i = 0; i < Stages.size(); ++i) { - auto& stagePair = Statistics.Stages[i]; - stagePair.first = Stages[i].Name; - stagePair.second = Stages[i].GetTransformer().GetStatistics(); + YQL_ENSURE(Stages_.size() == Statistics_.Stages.size()); + for (size_t i = 0; i < Stages_.size(); ++i) { + auto& stagePair = Statistics_.Stages[i]; + stagePair.first = Stages_[i].Name; + stagePair.second = Stages_[i].GetTransformer().GetStatistics(); } - return Statistics; + return Statistics_; } private: @@ -104,11 +104,11 @@ private: if (status.HasRestart) { // ignore Async status in this case - Index = 0; + Index_ = 0; status = IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true); } else if (status.Level == IGraphTransformer::TStatus::Ok) { status = IGraphTransformer::TStatus::Repeat; - ++Index; + ++Index_; } return status; @@ -116,10 +116,10 @@ private: template <typename TFunc> TStatus WithScope(TExprContext& ctx, TFunc func) { - if (UseIssueScopes) { + if (UseIssueScopes_) { TIssueScopeGuard guard(ctx.IssueManager, [&]() { - const auto scopeIssueCode = Stages[Index].IssueCode; - const auto scopeIssueMessage = Stages[Index].IssueMessage; + const auto scopeIssueCode = Stages_[Index_].IssueCode; + const auto scopeIssueMessage = Stages_[Index_].IssueMessage; auto issue = MakeIntrusive<TIssue>(TPosition(), scopeIssueMessage ? scopeIssueMessage : IssueCodeToString(scopeIssueCode)); issue->SetCode(scopeIssueCode, GetSeverity(scopeIssueCode)); @@ -133,11 +133,11 @@ private: } protected: - TVector<TTransformStage> Stages; - const bool UseIssueScopes; - const bool DoCheckArguments; - size_t Index = 0; - ui64 CheckArgumentsCount = 0; + TVector<TTransformStage> Stages_; + const bool UseIssueScopes_; + const bool DoCheckArguments_; + size_t Index_ = 0; + ui64 CheckArgumentsCount_ = 0; }; void AddTooManyTransformationsError(TPositionHandle pos, const TStringBuf& where, TExprContext& ctx) { @@ -173,7 +173,7 @@ public: private: void Rewind() override { - Condition.Clear(); + Condition_.Clear(); TCompositeGraphTransformer::Rewind(); } @@ -184,17 +184,17 @@ private: if (status.HasRestart) { // ignore Async status in this case - Index = 0; + Index_ = 0; status = IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true); } else if (status.Level == IGraphTransformer::TStatus::Ok) { status = IGraphTransformer::TStatus::Repeat; - YQL_ENSURE(!Condition.Empty(), "Condition must be set"); - if (Index == 0 && *Condition) { - Index = 1; // left - } else if (Index == 0) { - Index = 2; // right + YQL_ENSURE(!Condition_.Empty(), "Condition must be set"); + if (Index_ == 0 && *Condition_) { + Index_ = 1; // left + } else if (Index_ == 0) { + Index_ = 2; // right } else { - Index = 3; // end + Index_ = 3; // end } } @@ -205,8 +205,8 @@ private: { auto transformer = CreateFunctorTransformer([this, condition](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { output = input; - if (Condition.Empty()) { - Condition = condition(input, ctx); + if (Condition_.Empty()) { + Condition_ = condition(input, ctx); } return TStatus::Ok; }); @@ -214,7 +214,7 @@ private: return TTransformStage(transformer, "Condition", TIssuesIds::DEFAULT_ERROR); } - TMaybe<bool> Condition; + TMaybe<bool> Condition_; }; } // namespace diff --git a/ydb/library/yql/core/yql_graph_transformer.h b/ydb/library/yql/core/yql_graph_transformer.h index 4512ff4e3f9..26e508fc369 100644 --- a/ydb/library/yql/core/yql_graph_transformer.h +++ b/ydb/library/yql/core/yql_graph_transformer.h @@ -110,71 +110,71 @@ private: class TTransformScope { public: TTransformScope(TStatistics& statistics, const TExprContext* exprCtx) - : Statistics(statistics) - , ExprCtx(exprCtx) - , TransformStart(TInstant::Now()) - , ExprNodesSize(exprCtx ? exprCtx->ExprNodes.size() : 0) - , TypeNodesSize(exprCtx ? exprCtx->TypeNodes.size() : 0) - , ConstraintNodesSize(exprCtx ? exprCtx->ConstraintNodes.size() : 0) + : Statistics_(statistics) + , ExprCtx_(exprCtx) + , TransformStart_(TInstant::Now()) + , ExprNodesSize_(exprCtx ? exprCtx->ExprNodes.size() : 0) + , TypeNodesSize_(exprCtx ? exprCtx->TypeNodes.size() : 0) + , ConstraintNodesSize_(exprCtx ? exprCtx->ConstraintNodes.size() : 0) { } ~TTransformScope() { - Statistics.TransformDuration += TInstant::Now() - TransformStart; - if (ExprCtx) { - Statistics.NewExprNodes += ExprCtx->ExprNodes.size() - ExprNodesSize; - Statistics.NewTypeNodes += ExprCtx->TypeNodes.size() - TypeNodesSize; - Statistics.NewConstraintNodes += ExprCtx->ConstraintNodes.size() - ConstraintNodesSize; + Statistics_.TransformDuration += TInstant::Now() - TransformStart_; + if (ExprCtx_) { + Statistics_.NewExprNodes += ExprCtx_->ExprNodes.size() - ExprNodesSize_; + Statistics_.NewTypeNodes += ExprCtx_->TypeNodes.size() - TypeNodesSize_; + Statistics_.NewConstraintNodes += ExprCtx_->ConstraintNodes.size() - ConstraintNodesSize_; } } TStatus HandleStatus(const TStatus& status) { if (status == TStatus::Repeat) { - Statistics.Repeats++; + Statistics_.Repeats++; } if (status.HasRestart) { - Statistics.Restarts++; + Statistics_.Restarts++; } return status; } private: - TStatistics& Statistics; - const TExprContext* ExprCtx; - TInstant TransformStart; - i64 ExprNodesSize; - i64 TypeNodesSize; - i64 ConstraintNodesSize; + TStatistics& Statistics_; + const TExprContext* ExprCtx_; + TInstant TransformStart_; + i64 ExprNodesSize_; + i64 TypeNodesSize_; + i64 ConstraintNodesSize_; }; public: TGraphTransformerBase() - : Statistics(TStatistics::Zero()) - , AsyncStart() {} + : Statistics_(TStatistics::Zero()) + , AsyncStart_() {} TStatus Transform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - TTransformScope scope(Statistics, &ctx); + TTransformScope scope(Statistics_, &ctx); return scope.HandleStatus(DoTransform(input, output, ctx)); } NThreading::TFuture<void> GetAsyncFuture(const TExprNode& input) final { - TTransformScope scope(Statistics, nullptr); - AsyncStart = TInstant::Now(); + TTransformScope scope(Statistics_, nullptr); + AsyncStart_ = TInstant::Now(); return DoGetAsyncFuture(input); } TStatus ApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - TTransformScope scope(Statistics, &ctx); - Statistics.WaitDuration += TInstant::Now() - AsyncStart; + TTransformScope scope(Statistics_, &ctx); + Statistics_.WaitDuration += TInstant::Now() - AsyncStart_; return scope.HandleStatus(DoApplyAsyncChanges(input, output, ctx)); } - virtual TStatistics GetStatistics() const override { return Statistics; } + virtual TStatistics GetStatistics() const override { return Statistics_; } public: virtual TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0; @@ -182,10 +182,10 @@ public: virtual TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0; protected: - mutable TStatistics Statistics; + mutable TStatistics Statistics_; private: - TInstant AsyncStart; + TInstant AsyncStart_; }; struct TTransformStage { @@ -197,24 +197,24 @@ struct TTransformStage { : Name(name) , IssueCode(issueCode) , IssueMessage(issueMessage) - , RawTransformer(transformer.Get()) - , Transformer(transformer) + , RawTransformer_(transformer.Get()) + , Transformer_(transformer) {} TTransformStage(IGraphTransformer& transformer, const TString& name, EYqlIssueCode issueCode, const TString& issueMessage = {}) : Name(name) , IssueCode(issueCode) , IssueMessage(issueMessage) - , RawTransformer(&transformer) + , RawTransformer_(&transformer) {} IGraphTransformer& GetTransformer() const { - return *RawTransformer; + return *RawTransformer_; } private: - IGraphTransformer* const RawTransformer; - const TAutoPtr<IGraphTransformer> Transformer; + IGraphTransformer* const RawTransformer_; + const TAutoPtr<IGraphTransformer> Transformer_; }; TAutoPtr<IGraphTransformer> CreateCompositeGraphTransformer(const TVector<TTransformStage>& stages, bool useIssueScopes); @@ -264,10 +264,10 @@ template <typename TFunctor> class TFunctorTransformer final: public TSyncTransformerBase { public: TFunctorTransformer(TFunctor functor) - : Functor(std::move(functor)) {} + : Functor_(std::move(functor)) {} TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - TStatus status = Functor(input, output, ctx); + TStatus status = Functor_(input, output, ctx); YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Async); return status; @@ -277,7 +277,7 @@ public: } private: - TFunctor Functor; + TFunctor Functor_; }; template <typename TFunctor> @@ -295,34 +295,34 @@ public: TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { auto pair = static_cast<TDerived*>(this)->CallbackTransform(input, output, ctx); if (pair.first == TStatus::Async) { - YQL_ENSURE(Callbacks.emplace(input.Get(), pair.second).second); + YQL_ENSURE(Callbacks_.emplace(input.Get(), pair.second).second); } return pair.first; } NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final { - const auto it = Callbacks.find(&input); - YQL_ENSURE(it != Callbacks.cend()); + const auto it = Callbacks_.find(&input); + YQL_ENSURE(it != Callbacks_.cend()); return it->second.IgnoreResult(); } TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { - const auto it = Callbacks.find(input.Get()); - YQL_ENSURE(it != Callbacks.cend()); + const auto it = Callbacks_.find(input.Get()); + YQL_ENSURE(it != Callbacks_.cend()); auto& future = it->second; YQL_ENSURE(future.HasValue()); const auto status = future.GetValue()(input, output, ctx); - Callbacks.erase(it); + Callbacks_.erase(it); return status; } void Rewind() override { - Callbacks.clear(); + Callbacks_.clear(); } private: - TNodeMap<TAsyncTransformCallbackFuture> Callbacks; + TNodeMap<TAsyncTransformCallbackFuture> Callbacks_; }; template <bool AlwaysRaiseIssues = true, typename TFuture, typename TCallback> diff --git a/ydb/library/yql/dq/integration/yql_dq_integration.h b/ydb/library/yql/dq/integration/yql_dq_integration.h index 447af086e68..04f31368b7b 100644 --- a/ydb/library/yql/dq/integration/yql_dq_integration.h +++ b/ydb/library/yql/dq/integration/yql_dq_integration.h @@ -30,7 +30,8 @@ public: virtual ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node, TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) = 0; virtual bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues = false) { Y_UNUSED(skipIssues); Y_UNUSED(node); Y_UNUSED(ctx); return true; } - virtual TMaybe<ui64> CanRead(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0; + virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0; + virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) = 0; virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0; virtual TMaybe<bool> CanWrite(const TDqSettings& config, const TExprNode& write, TExprContext& ctx) = 0; virtual void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) = 0; diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp index 7def8d8c9cd..8a44248f694 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_dq_integration.cpp @@ -22,7 +22,11 @@ public: : State_(state) {} - TMaybe<ui64> CanRead(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&, bool) override { + bool CanRead(const TExprNode& read, TExprContext&, bool) override { + return TClReadTable::Match(&read); + } + + TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&) override { if (TClReadTable::Match(&read)) { return 0ul; // TODO: return real size } diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp index 09f8e1951df..4f9da234bda 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp @@ -2,80 +2,51 @@ namespace NYql { -ui64 TDqIntegrationBase::Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node, - TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) { - Y_UNUSED(config); - Y_UNUSED(maxPartitions); - Y_UNUSED(node); - Y_UNUSED(partitions); - Y_UNUSED(clusterName); - Y_UNUSED(ctx); - Y_UNUSED(canFallback); +ui64 TDqIntegrationBase::Partition(const TDqSettings&, size_t, const TExprNode&, + TVector<TString>&, TString*, TExprContext&, bool) { return 0; } -TMaybe<ui64> TDqIntegrationBase::CanRead(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext& ctx, bool skipIssues) { - Y_UNUSED(read); - Y_UNUSED(ctx); - Y_UNUSED(skipIssues); - return Nothing(); +bool TDqIntegrationBase::CanRead(const TExprNode&, TExprContext&, bool) { + return false; } -TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) { - Y_UNUSED(read); - Y_UNUSED(ctx); +TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TExprNode &, TExprContext&) { + return Nothing(); +} +TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext&) { return read; } -TMaybe<bool> TDqIntegrationBase::CanWrite(const TDqSettings&, const TExprNode& write, TExprContext& ctx) { - Y_UNUSED(write); - Y_UNUSED(ctx); +TMaybe<bool> TDqIntegrationBase::CanWrite(const TDqSettings&, const TExprNode&, TExprContext&) { return Nothing(); } -void TDqIntegrationBase::RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) { - Y_UNUSED(compiler); +void TDqIntegrationBase::RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase&) { } bool TDqIntegrationBase::CanFallback() { return false; } -void TDqIntegrationBase::FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) { - Y_UNUSED(node); - Y_UNUSED(settings); - Y_UNUSED(sourceType); +void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&) { } -void TDqIntegrationBase::FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) { - Y_UNUSED(node); - Y_UNUSED(settings); - Y_UNUSED(sinkType); +void TDqIntegrationBase::FillSinkSettings(const TExprNode&, ::google::protobuf::Any&, TString&) { } -void TDqIntegrationBase::FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) { - Y_UNUSED(node); - Y_UNUSED(settings); +void TDqIntegrationBase::FillTransformSettings(const TExprNode&, ::google::protobuf::Any&) { } -void TDqIntegrationBase::Annotate(const TExprNode& node, THashMap<TString, TString>& params) { - Y_UNUSED(node); - Y_UNUSED(params); +void TDqIntegrationBase::Annotate(const TExprNode&, THashMap<TString, TString>&) { } -bool TDqIntegrationBase::PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) { - Y_UNUSED(root); - Y_UNUSED(ctx); - Y_UNUSED(params); - Y_UNUSED(secureParams); +bool TDqIntegrationBase::PrepareFullResultTableParams(const TExprNode&, TExprContext&, THashMap<TString, TString>&, THashMap<TString, TString>&) { return false; } -void TDqIntegrationBase::WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) { - Y_UNUSED(writer); - Y_UNUSED(columns); - Y_UNUSED(graphParams); +void TDqIntegrationBase::WriteFullResultTableRef(NYson::TYsonWriter&, const TVector<TString>&, const THashMap<TString, TString>&) { } } // namespace NYql diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h index 8e56a9db395..de26d816714 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h @@ -8,7 +8,8 @@ class TDqIntegrationBase: public IDqIntegration { public: ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node, TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) override; - TMaybe<ui64> CanRead(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& read, TExprContext& ctx, bool skipIssues) override; + bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues) override; + TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TExprNode& node, TExprContext& ctx) override; TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override; void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override; TMaybe<bool> CanWrite(const TDqSettings& config, const TExprNode& write, TExprContext& ctx) override; diff --git a/ydb/library/yql/providers/common/transform/yql_optimize.cpp b/ydb/library/yql/providers/common/transform/yql_optimize.cpp index ef29baa8ffe..8e69e71bccb 100644 --- a/ydb/library/yql/providers/common/transform/yql_optimize.cpp +++ b/ydb/library/yql/providers/common/transform/yql_optimize.cpp @@ -14,18 +14,18 @@ using namespace NNodes; class TOptimizeTransformerBase::TIgnoreOptimizationContext: public IOptimizationContext { public: TIgnoreOptimizationContext(TOptimizeTransformerBase::TGetParents getParents) - : GetParents(std::move(getParents)) + : GetParents_(std::move(getParents)) { } virtual ~TIgnoreOptimizationContext() = default; void RemapNode(const TExprNode& src, const TExprNode::TPtr&) final { - const TParentsMap* parentsMap = GetParents(); + const TParentsMap* parentsMap = GetParents_(); auto parentsIt = parentsMap->find(&src); YQL_ENSURE(parentsIt != parentsMap->cend()); YQL_ENSURE(parentsIt->second.size() == 1, "Bad usage of local optimizer. Try to switch to global mode"); } private: - TOptimizeTransformerBase::TGetParents GetParents; + TOptimizeTransformerBase::TGetParents GetParents_; }; class TOptimizeTransformerBase::TRemapOptimizationContext: public IOptimizationContext { diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp index e353f088549..21c0eb85d8a 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource.cpp @@ -41,13 +41,13 @@ namespace { class TDqDataProviderSource: public TDataProviderBase { public: TDqDataProviderSource(const TDqState::TPtr& state, TExecTransformerFactory execTransformerFactory) - : State(state) - , ConfigurationTransformer([this]() { - return MakeHolder<NCommon::TProviderConfigurationTransformer>(State->Settings, *State->TypeCtx, TString{DqProviderName}); + : State_(state) + , ConfigurationTransformer_([this]() { + return MakeHolder<NCommon::TProviderConfigurationTransformer>(State_->Settings, *State_->TypeCtx, TString{DqProviderName}); }) - , ExecTransformer([this, execTransformerFactory] () { return THolder<IGraphTransformer>(execTransformerFactory(State)); }) - , TypeAnnotationTransformer([] () { return CreateDqsDataSourceTypeAnnotationTransformer(); }) - , ConstraintsTransformer([] () { return CreateDqDataSourceConstraintTransformer(); }) + , ExecTransformer_([this, execTransformerFactory] () { return THolder<IGraphTransformer>(execTransformerFactory(State_)); }) + , TypeAnnotationTransformer_([] () { return CreateDqsDataSourceTypeAnnotationTransformer(); }) + , ConstraintsTransformer_([] () { return CreateDqDataSourceConstraintTransformer(); }) { } TStringBuf GetName() const override { @@ -56,16 +56,16 @@ public: IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) override { Y_UNUSED(instantOnly); - return *TypeAnnotationTransformer; + return *TypeAnnotationTransformer_; } IGraphTransformer& GetConstraintTransformer(bool instantOnly, bool subGraph) override { Y_UNUSED(instantOnly && subGraph); - return *ConstraintsTransformer; + return *ConstraintsTransformer_; } IGraphTransformer& GetConfigurationTransformer() override { - return *ConfigurationTransformer; + return *ConfigurationTransformer_; } bool CanBuildResult(const TExprNode& node, TSyncMap& syncList) override { @@ -174,9 +174,9 @@ public: return false; } - canRef = State->Settings->EnableFullResultWrite.Get().GetOrElse(false); + canRef = State_->Settings->EnableFullResultWrite.Get().GetOrElse(false); if (canRef) { - if (auto fullResultTableProvider = State->TypeCtx->DataSinkMap.Value(State->TypeCtx->FullResultDataSink, nullptr)) { + if (auto fullResultTableProvider = State_->TypeCtx->DataSinkMap.Value(State_->TypeCtx->FullResultDataSink, nullptr)) { canRef = !!fullResultTableProvider->GetDqIntegration(); } else { canRef = false; @@ -225,31 +225,31 @@ public: } bool ValidateExecution(const TExprNode& node, TExprContext& ctx) override { - return ValidateDqExecution(node, *State->TypeCtx, ctx); + return ValidateDqExecution(node, *State_->TypeCtx, ctx, State_); } bool CanParse(const TExprNode& node) override { - return TypeAnnotationTransformer->CanParse(node); + return TypeAnnotationTransformer_->CanParse(node); } IGraphTransformer& GetCallableExecutionTransformer() override { - return *ExecTransformer; + return *ExecTransformer_; } void Reset() final { - if (ExecTransformer) { - ExecTransformer->Rewind(); - TypeAnnotationTransformer->Rewind(); - ConstraintsTransformer->Rewind(); + if (ExecTransformer_) { + ExecTransformer_->Rewind(); + TypeAnnotationTransformer_->Rewind(); + ConstraintsTransformer_->Rewind(); } } private: - const TDqState::TPtr State; - TLazyInitHolder<IGraphTransformer> ConfigurationTransformer; - TLazyInitHolder<IGraphTransformer> ExecTransformer; - TLazyInitHolder<TVisitorTransformerBase> TypeAnnotationTransformer; - TLazyInitHolder<IGraphTransformer> ConstraintsTransformer; + const TDqState::TPtr State_; + TLazyInitHolder<IGraphTransformer> ConfigurationTransformer_; + TLazyInitHolder<IGraphTransformer> ExecTransformer_; + TLazyInitHolder<TVisitorTransformerBase> TypeAnnotationTransformer_; + TLazyInitHolder<IGraphTransformer> ConstraintsTransformer_; }; } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp index e60af247c41..e568efb4167 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp @@ -78,17 +78,9 @@ public: Statistics_["DqAnalyzerOn"]++; - ui64 dataSize = 0; bool good = true; - bool hasJoin = false; TNodeSet visited; - Scan(*input, ctx, good, dataSize, visited, hasJoin); - - if (good && hasJoin && dataSize > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) { - Statistics_["DqAnalyzerBigJoin"]++; - AddInfo(ctx, TStringBuilder() << "too big join input: " << dataSize); - good = false; - } + Scan(*input, ctx, good, visited); if (good) { Statistics_["DqAnalyzerOk"]++; @@ -99,7 +91,6 @@ public: if (!good) { YQL_CLOG(DEBUG, ProviderDq) << "abort hidden"; State_->AbortHidden(); - YQL_CLOG(DEBUG, ProviderDq) << "good: " << good << " hasJoin: " << hasJoin << " dataSize: " << dataSize; return TStatus::Ok; } } @@ -128,21 +119,17 @@ private: ctx.IssueManager.RaiseIssue(info); } - void Scan(const TExprNode& node, TExprContext& ctx, bool& good, ui64& dataSize, TNodeSet& visited, bool& hasJoin) const { + void Scan(const TExprNode& node, TExprContext& ctx, bool& good, TNodeSet& visited) const { if (!visited.insert(&node).second) { return; } TExprBase expr(&node); - if (TMaybeNode<TCoEquiJoin>(&node)) { - hasJoin = true; - } - if (TCoCommit::Match(&node)) { for (size_t i = 0; i != node.ChildrenSize() && good; ++i) { if (i != TCoCommit::idx_DataSink) { - Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin); + Scan(*node.Child(i), ctx, good, visited); } } } else if (auto datasource = TMaybeNode<TCoDataSource>(&node).Category()) { @@ -165,11 +152,10 @@ private: YQL_ENSURE(datasource); auto dqIntegration = (*datasource)->GetDqIntegration(); if (dqIntegration) { - TMaybe<ui64> size; - bool pragmas = true; - if ((pragmas = dqIntegration->CheckPragmas(node, ctx, false)) && (size = dqIntegration->CanRead(State_->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob), State_->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), node, ctx, /*skipIssues = */ false))) { - dataSize += *size; - } else { + bool pragmas = dqIntegration->CheckPragmas(node, ctx, false); + bool canRead = pragmas && dqIntegration->CanRead(node, ctx, /*skipIssues = */ false); + + if (!pragmas || !canRead) { good = false; if (!pragmas) { State_->TypeCtx->PureResultDataSource.clear(); @@ -184,7 +170,7 @@ private: } if (good) { - Scan(node.Head(), ctx,good, dataSize, visited, hasJoin); + Scan(node.Head(), ctx,good, visited); } } else if (node.GetTypeAnn()->GetKind() == ETypeAnnotationKind::World && !TCoCommit::Match(&node) @@ -205,7 +191,7 @@ private: } if (good) { for (size_t i = 0; i != node.ChildrenSize() && good; ++i) { - Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin); + Scan(*node.Child(i), ctx, good, visited); } } } @@ -216,13 +202,13 @@ private: } if (good) { for (size_t i = 0; i != node.ChildrenSize() && good; ++i) { - Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin); + Scan(*node.Child(i), ctx, good, visited); } } } else { for (size_t i = 0; i != node.ChildrenSize() && good; ++i) { - Scan(*node.Child(i), ctx, good, dataSize, visited, hasJoin); + Scan(*node.Child(i), ctx, good, visited); } } } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp index 376a527e4bf..49f8a5fe79e 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp @@ -1,6 +1,8 @@ #include "yql_dq_validate.h" +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> +#include <ydb/library/yql/dq/integration/yql_dq_integration.h> #include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_expr_type_annotation.h> @@ -15,116 +17,166 @@ namespace NYql { using namespace NYql::NNodes; - namespace { - -bool ValidateDqNode(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited); - -bool ValidateDqStage(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited) { - if (!visited.insert(&node).second) { - return true; +class TDqExecutionValidator { +private: + static void ReportError(TExprContext& ctx, const TExprNode& where, const TString& err) { + YQL_CLOG(WARN, ProviderDq) << "Falling back from DQ: " << err; + ctx.AddError(YqlIssue(ctx.GetPosition(where.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, err)); } - bool hasErrors = false; - if (auto bad = FindNonYieldTransparentNode(TDqStageBase(&node).Program().Ptr(), typeCtx)) { - hasErrors = true; - YQL_CLOG(WARN, ProviderDq) << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage"; - ctx.AddError(YqlIssue(ctx.GetPosition(bad->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage")); - } - VisitExpr(TDqStageBase(&node).Program().Body().Ptr(), - [](const TExprNode::TPtr& n) { - return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()) && !TDqReadWrapBase::Match(n.Get()); - }, - [&hasErrors, &ctx](const TExprNode::TPtr& n) { - if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) { - YQL_CLOG(WARN, ProviderDq) << "Cannot execute system python udf " << n->Content() << " in DQ"; - ctx.AddError(YqlIssue(ctx.GetPosition(n->Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ")); - hasErrors = true; - } - return !hasErrors; + bool ValidateDqStage(const TExprNode& node) { + if (!Visited_.insert(&node).second) { + return true; } - ); - for (auto n: TDqStageBase(&node).Inputs()) { - hasErrors = !ValidateDqNode(n.Ref(), typeCtx, ctx, visited) || hasErrors; - } - if (auto outs = TDqStageBase(&node).Outputs()) { - for (auto n: outs.Cast()) { - hasErrors = !ValidateDqNode(n.Ref(), typeCtx, ctx, visited) || hasErrors; + bool hasErrors = false; + if (auto bad = FindNonYieldTransparentNode(TDqStageBase(&node).Program().Ptr(), TypeCtx_)) { + hasErrors = true; + ReportError(Ctx_, *bad, TStringBuilder() << "Cannot execute " << bad->Content() << " over stream/flow inside DQ stage"); } - } - return !hasErrors; + VisitExpr(TDqStageBase(&node).Program().Body().Ptr(), + [](const TExprNode::TPtr& n) { + return !TDqConnection::Match(n.Get()) && !TDqPhyPrecompute::Match(n.Get()) && !TDqReadWrapBase::Match(n.Get()); + }, + [&hasErrors, &ctx = Ctx_, &dataSize = DataSize_, &typeCtx = TypeCtx_, &state = State_](const TExprNode::TPtr& n) { + if (TCoScriptUdf::Match(n.Get()) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n->Head().Content()))) { + ReportError(ctx, *n, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ"); + hasErrors = true; + } + + if (!typeCtx.ForceDq && TDqReadWrapBase::Match(n.Get())) { + auto readNode = n->Child(0); + auto dataSourceName = readNode->Child(1)->Child(0)->Content(); + if (dataSourceName != DqProviderName) { + auto datasource = typeCtx.DataSourceMap.FindPtr(dataSourceName); + YQL_ENSURE(datasource); + auto dqIntegration = (*datasource)->GetDqIntegration(); + YQL_ENSURE(dqIntegration); + if (dqIntegration) { + TMaybe<ui64> size; + hasErrors |= !(size = dqIntegration->EstimateReadSize(state->Settings->DataSizePerJob.Get().GetOrElse(TDqSettings::TDefault::DataSizePerJob), state->Settings->MaxTasksPerStage.Get().GetOrElse(TDqSettings::TDefault::MaxTasksPerStage), *readNode, ctx)); + if (size) { + dataSize += *size; + } + } + } + } + return !hasErrors; + } + ); -} + for (auto n: TDqStageBase(&node).Inputs()) { + hasErrors |= !ValidateDqNode(n.Ref()); + } -bool ValidateDqNode(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, TNodeSet& visited) { - if (node.GetState() == TExprNode::EState::ExecutionComplete) { - return true; - } + if (auto outs = TDqStageBase(&node).Outputs()) { + for (auto n: outs.Cast()) { + hasErrors |= !ValidateDqNode(n.Ref()); + } + } - if (TDqStageBase::Match(&node)) { - // visited will be updated inside ValidateDqStage - return ValidateDqStage(node, typeCtx, ctx, visited); - } + return !hasErrors; - if (!visited.insert(&node).second) { - return true; } - if (TDqCnResult::Match(&node)) { - YQL_CLOG(WARN, ProviderDq) << TDqCnResult::CallableName() << " connection cannot be used inside graph"; - ctx.AddError(YqlIssue(ctx.GetPosition(node.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << TDqCnResult::CallableName() << " connection cannot be used inside graph")); - return false; - } - if (TDqConnection::Match(&node)) { - return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref(), typeCtx, ctx, visited); - } - if (TDqPhyPrecompute::Match(&node)) { - return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref(), typeCtx, ctx, visited); - } + bool ValidateDqNode(const TExprNode& node) { + if (node.GetState() == TExprNode::EState::ExecutionComplete) { + return true; + } - if (TDqSource::Match(&node) || TDqTransform::Match(&node) || TDqSink::Match(&node)) { - return true; - } + if (TDqStageBase::Match(&node)) { + // visited will be updated inside ValidateDqStage + return ValidateDqStage(node); + } - YQL_CLOG(WARN, ProviderDq) << "Failed to execute callable with name: " << node.Content() << " in DQ"; - ctx.AddError(YqlIssue(ctx.GetPosition(node.Pos()), TIssuesIds::DQ_OPTIMIZE_ERROR, TStringBuilder() << "Failed to execute callable with name: " << node.Content() << " in DQ")); - return false; -} + if (!Visited_.insert(&node).second) { + return true; + } -} + if (TDqCnResult::Match(&node)) { + ReportError(Ctx_, node, TStringBuilder() << TDqCnResult::CallableName() << " connection cannot be used inside graph"); + return false; + } -bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx) { - YQL_LOG_CTX_SCOPE(__FUNCTION__); + if (TDqConnection::Match(&node)) { + return ValidateDqStage(TDqConnection(&node).Output().Stage().Ref()); + } + if (TDqPhyPrecompute::Match(&node)) { + return ValidateDqNode(TDqPhyPrecompute(&node).Connection().Ref()); + } + + if (TDqSource::Match(&node) || TDqTransform::Match(&node) || TDqSink::Match(&node)) { + return true; + } + + ReportError(Ctx_, node, TStringBuilder() << "Failed to execute callable with name: " << node.Content() << " in DQ"); + return false; + } - TNodeSet dqNodes; - if (TDqCnResult::Match(&node)) { - dqNodes.insert(TDqCnResult(&node).Output().Stage().Raw()); - } else if (TDqQuery::Match(&node)) { - for (auto st: TDqQuery(&node).SinkStages()) { - dqNodes.insert(st.Raw()); +public: + TDqExecutionValidator(const TTypeAnnotationContext& typeCtx, TExprContext& ctx, const TDqState::TPtr state) : TypeCtx_(typeCtx), Ctx_(ctx), State_(state) {} + + bool ValidateDqExecution(const TExprNode& node) { + YQL_LOG_CTX_SCOPE(__FUNCTION__); + + TNodeSet dqNodes; + + bool hasJoin = false; + if (TDqCnResult::Match(&node)) { + dqNodes.insert(TDqCnResult(&node).Output().Stage().Raw()); + } else if (TDqQuery::Match(&node)) { + for (auto st: TDqQuery(&node).SinkStages()) { + dqNodes.insert(st.Raw()); + } + } else { + VisitExpr(node, [&dqNodes](const TExprNode& n) { + if (TDqStageBase::Match(&n)) { + dqNodes.insert(&n); + return false; + } else if (TDqConnection::Match(&n)) { + dqNodes.insert(&n); + return false; + } else if (TDqReadWrapBase::Match(&n)) { + return false; + } + return true; + }); } - } else { - VisitExpr(node, [&dqNodes](const TExprNode& n) { - if (TDqStageBase::Match(&n)) { - dqNodes.insert(&n); - return false; - } else if (TDqConnection::Match(&n)) { - dqNodes.insert(&n); - return false; - } else if (TDqReadWrapBase::Match(&n)) { - return false; + + VisitExpr(node, [&hasJoin](const TExprNode& n) { + if (TMaybeNode<TDqPhyMapJoin>(&n)) { + hasJoin = true; } return true; }); + + bool hasError = false; + + for (const auto n: dqNodes) { + hasError |= !ValidateDqNode(*n); + } + + if (!hasError && hasJoin && DataSize_ > State_->Settings->MaxDataSizePerQuery.Get().GetOrElse(10_GB)) { + ReportError(Ctx_, node, TStringBuilder() << "too big join input: " << DataSize_); + return false; + } + return !hasError; } - TNodeSet visited; - bool hasError = false; - for (const auto n: dqNodes) { - hasError = !ValidateDqNode(*n, typeCtx, ctx, visited) || hasError; - } - return !hasError; +private: + + const TTypeAnnotationContext& TypeCtx_; + TExprContext& Ctx_; + TNodeSet Visited_; + size_t DataSize_ = 0; + const TDqState::TPtr State_; + +}; +} + +bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, const TDqState::TPtr state) { + return TDqExecutionValidator(typeCtx, ctx, state).ValidateDqExecution(node); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.h b/ydb/library/yql/providers/dq/provider/yql_dq_validate.h index 087cafd13e0..8afc615b350 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_validate.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.h @@ -1,10 +1,12 @@ #pragma once +#include "yql_dq_state.h" + #include <ydb/library/yql/ast/yql_expr.h> #include <ydb/library/yql/core/yql_type_annotation.h> namespace NYql { -bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx); +bool ValidateDqExecution(const TExprNode& node, const TTypeAnnotationContext& typeCtx, TExprContext& ctx, const TDqState::TPtr state); } // namespace NYql diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index e438503cdf2..5cf8f9d5b37 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -119,7 +119,11 @@ public: return 0; } - TMaybe<ui64> CanRead(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&, bool) override { + bool CanRead(const TExprNode& read, TExprContext&, bool) override { + return TS3ReadObject::Match(&read); + } + + TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&) override { if (TS3ReadObject::Match(&read)) { return 0ul; // TODO: return real size } diff --git a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp index cdb4ffd2eec..33be2f8fdde 100644 --- a/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp +++ b/ydb/library/yql/providers/solomon/provider/yql_solomon_dq_integration.cpp @@ -74,10 +74,15 @@ public: { } - TMaybe<ui64> CanRead(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode&, TExprContext&, bool) override { + bool CanRead(const TExprNode&, TExprContext&, bool) override { YQL_ENSURE(false, "Unimplemented"); } + TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode&, TExprContext&) override { + YQL_ENSURE(false, "Unimplemented"); + } + + TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr&, TExprContext&) override { YQL_ENSURE(false, "Unimplemented"); } diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp index 804d75b0645..6e887fb99c2 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_dq_integration.cpp @@ -69,7 +69,11 @@ public: return 0; } - TMaybe<ui64> CanRead(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext&, bool ) override { + bool CanRead(const TExprNode& read, TExprContext&, bool ) override { + return TYdbReadTable::Match(&read); + } + + TMaybe<ui64> EstimateReadSize(ui64 /*dataSizePerJob*/, ui32 /*maxTasksPerStage*/, const TExprNode& read, TExprContext& ) override { if (TYdbReadTable::Match(&read)) { return 0ul; // TODO: return real size } |