diff options
7 files changed, 21 insertions, 1 deletions
diff --git a/yql/essentials/core/dq_integration/yql_dq_integration.h b/yql/essentials/core/dq_integration/yql_dq_integration.h index d1aaa655a64..1ac9427f656 100644 --- a/yql/essentials/core/dq_integration/yql_dq_integration.h +++ b/yql/essentials/core/dq_integration/yql_dq_integration.h @@ -90,6 +90,7 @@ public: virtual bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0; // Called to configure DQ peephole virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) = 0; + virtual void NotifyDqTimeout() = 0; }; std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx); diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp index 1209b53c77b..4782a79d579 100644 --- a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp +++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp @@ -92,4 +92,7 @@ bool TDqIntegrationBase::FillSinkPlanProperties(const NNodes::TExprBase&, TMap<T void TDqIntegrationBase::ConfigurePeepholePipeline(bool, const THashMap<TString, TString>&, TTransformationPipeline*) { } +void TDqIntegrationBase::NotifyDqTimeout() { +} + } // namespace NYql diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h index 4624caede41..1fbcb33d775 100644 --- a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h +++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h @@ -28,6 +28,7 @@ public: bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override; bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override; void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) override; + void NotifyDqTimeout() override; protected: bool CanBlockReadTypes(const TStructExprType* node); diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index 26ffebf27e9..0c8c9fdf6c9 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -83,6 +83,16 @@ private: return false; } + if (State_->IsDqTimeout) { + PushSkipStat("DqTimeout", nodeName); + return false; + } + + if (State_->OnlyNativeExecution) { + PushSkipStat("OnlyNativeExecution", nodeName); + return false; + } + if (operation.Output().Size() != 1U) { PushSkipStat("MultipleOutputs", nodeName); return false; diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp index d45ff380a76..b8cfc3c0872 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -930,6 +930,10 @@ public: return "DQ cannot execute the query. Cause: " + message; } + virtual void NotifyDqTimeout() override { + State_->IsDqTimeout = true; + } + private: TYtState* State_; }; diff --git a/yt/yql/providers/yt/provider/yql_yt_provider.cpp b/yt/yql/providers/yt/provider/yql_yt_provider.cpp index 44c70d483af..74a63aab6aa 100644 --- a/yt/yql/providers/yt/provider/yql_yt_provider.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_provider.cpp @@ -541,7 +541,7 @@ bool TYtState::IsHybridEnabled() const { } bool TYtState::IsHybridEnabledForCluster(const std::string_view& cluster) const { - return !OnlyNativeExecution && Configuration->_EnableDq.Get(TString(cluster)).GetOrElse(true); + return Configuration->_EnableDq.Get(TString(cluster)).GetOrElse(true); } bool TYtState::HybridTakesTooLong() const { diff --git a/yt/yql/providers/yt/provider/yql_yt_provider.h b/yt/yql/providers/yt/provider/yql_yt_provider.h index e79c41cb135..0c804087229 100644 --- a/yt/yql/providers/yt/provider/yql_yt_provider.h +++ b/yt/yql/providers/yt/provider/yql_yt_provider.h @@ -123,6 +123,7 @@ struct TYtState : public TThrRefBase { i32 FlowDependsOnId = 0; IOptimizerFactory::TPtr OptimizerFactory_; IDqHelper::TPtr DqHelper; + bool IsDqTimeout = false; private: std::unordered_map<ui64, TYtVersionedConfiguration::TState> ConfigurationEvalStates_; std::unordered_map<ui64, ui32> EpochEvalStates_; |
