diff options
| author | maxkovalev <[email protected]> | 2024-12-10 11:59:38 +0300 |
|---|---|---|
| committer | maxkovalev <[email protected]> | 2024-12-10 12:33:22 +0300 |
| commit | e6f05fc9701dd0f6a909e33be132d4f1f280e72f (patch) | |
| tree | 6a594178f2bfdd6f228f34e231a3c1144f04d0ab | |
| parent | dc3e62419c27e433c71ecc7918926f79b0d492ac (diff) | |
YQL-19285: Skip Hibrid when Dq has timeout
commit_hash:3e4fdb7e4afeb5f524578778330d927d8f165b1a
7 files changed, 21 insertions, 1 deletions
diff --git a/yql/essentials/core/dq_integration/yql_dq_integration.h b/yql/essentials/core/dq_integration/yql_dq_integration.h index d1aaa655a64..1ac9427f656 100644 --- a/yql/essentials/core/dq_integration/yql_dq_integration.h +++ b/yql/essentials/core/dq_integration/yql_dq_integration.h @@ -90,6 +90,7 @@ public: virtual bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0; // Called to configure DQ peephole virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) = 0; + virtual void NotifyDqTimeout() = 0; }; std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx); diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp index 1209b53c77b..4782a79d579 100644 --- a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp +++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp @@ -92,4 +92,7 @@ bool TDqIntegrationBase::FillSinkPlanProperties(const NNodes::TExprBase&, TMap<T void TDqIntegrationBase::ConfigurePeepholePipeline(bool, const THashMap<TString, TString>&, TTransformationPipeline*) { } +void TDqIntegrationBase::NotifyDqTimeout() { +} + } // namespace NYql diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h index 4624caede41..1fbcb33d775 100644 --- a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h +++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h @@ -28,6 +28,7 @@ public: bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override; bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override; void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) override; + void NotifyDqTimeout() override; protected: bool CanBlockReadTypes(const TStructExprType* node); diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp index 26ffebf27e9..0c8c9fdf6c9 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp @@ -83,6 +83,16 @@ private: return false; } + if (State_->IsDqTimeout) { + PushSkipStat("DqTimeout", nodeName); + return false; + } + + if (State_->OnlyNativeExecution) { + PushSkipStat("OnlyNativeExecution", nodeName); + return false; + } + if (operation.Output().Size() != 1U) { PushSkipStat("MultipleOutputs", nodeName); return false; diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp index d45ff380a76..b8cfc3c0872 100644 --- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -930,6 +930,10 @@ public: return "DQ cannot execute the query. Cause: " + message; } + virtual void NotifyDqTimeout() override { + State_->IsDqTimeout = true; + } + private: TYtState* State_; }; diff --git a/yt/yql/providers/yt/provider/yql_yt_provider.cpp b/yt/yql/providers/yt/provider/yql_yt_provider.cpp index 44c70d483af..74a63aab6aa 100644 --- a/yt/yql/providers/yt/provider/yql_yt_provider.cpp +++ b/yt/yql/providers/yt/provider/yql_yt_provider.cpp @@ -541,7 +541,7 @@ bool TYtState::IsHybridEnabled() const { } bool TYtState::IsHybridEnabledForCluster(const std::string_view& cluster) const { - return !OnlyNativeExecution && Configuration->_EnableDq.Get(TString(cluster)).GetOrElse(true); + return Configuration->_EnableDq.Get(TString(cluster)).GetOrElse(true); } bool TYtState::HybridTakesTooLong() const { diff --git a/yt/yql/providers/yt/provider/yql_yt_provider.h b/yt/yql/providers/yt/provider/yql_yt_provider.h index e79c41cb135..0c804087229 100644 --- a/yt/yql/providers/yt/provider/yql_yt_provider.h +++ b/yt/yql/providers/yt/provider/yql_yt_provider.h @@ -123,6 +123,7 @@ struct TYtState : public TThrRefBase { i32 FlowDependsOnId = 0; IOptimizerFactory::TPtr OptimizerFactory_; IDqHelper::TPtr DqHelper; + bool IsDqTimeout = false; private: std::unordered_map<ui64, TYtVersionedConfiguration::TState> ConfigurationEvalStates_; std::unordered_map<ui64, ui32> EpochEvalStates_; |
