summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yql/essentials/core/dq_integration/yql_dq_integration.h1
-rw-r--r--yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp3
-rw-r--r--yql/essentials/providers/common/dq/yql_dq_integration_impl.h1
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp10
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp4
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_provider.cpp2
-rw-r--r--yt/yql/providers/yt/provider/yql_yt_provider.h1
7 files changed, 21 insertions, 1 deletions
diff --git a/yql/essentials/core/dq_integration/yql_dq_integration.h b/yql/essentials/core/dq_integration/yql_dq_integration.h
index d1aaa655a64..1ac9427f656 100644
--- a/yql/essentials/core/dq_integration/yql_dq_integration.h
+++ b/yql/essentials/core/dq_integration/yql_dq_integration.h
@@ -90,6 +90,7 @@ public:
virtual bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0;
// Called to configure DQ peephole
virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) = 0;
+ virtual void NotifyDqTimeout() = 0;
};
std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx);
diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp
index 1209b53c77b..4782a79d579 100644
--- a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp
+++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp
@@ -92,4 +92,7 @@ bool TDqIntegrationBase::FillSinkPlanProperties(const NNodes::TExprBase&, TMap<T
void TDqIntegrationBase::ConfigurePeepholePipeline(bool, const THashMap<TString, TString>&, TTransformationPipeline*) {
}
+void TDqIntegrationBase::NotifyDqTimeout() {
+}
+
} // namespace NYql
diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h
index 4624caede41..1fbcb33d775 100644
--- a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h
+++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h
@@ -28,6 +28,7 @@ public:
bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override;
bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override;
void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) override;
+ void NotifyDqTimeout() override;
protected:
bool CanBlockReadTypes(const TStructExprType* node);
diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
index 26ffebf27e9..0c8c9fdf6c9 100644
--- a/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_dq_hybrid.cpp
@@ -83,6 +83,16 @@ private:
return false;
}
+ if (State_->IsDqTimeout) {
+ PushSkipStat("DqTimeout", nodeName);
+ return false;
+ }
+
+ if (State_->OnlyNativeExecution) {
+ PushSkipStat("OnlyNativeExecution", nodeName);
+ return false;
+ }
+
if (operation.Output().Size() != 1U) {
PushSkipStat("MultipleOutputs", nodeName);
return false;
diff --git a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
index d45ff380a76..b8cfc3c0872 100644
--- a/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_dq_integration.cpp
@@ -930,6 +930,10 @@ public:
return "DQ cannot execute the query. Cause: " + message;
}
+ virtual void NotifyDqTimeout() override {
+ State_->IsDqTimeout = true;
+ }
+
private:
TYtState* State_;
};
diff --git a/yt/yql/providers/yt/provider/yql_yt_provider.cpp b/yt/yql/providers/yt/provider/yql_yt_provider.cpp
index 44c70d483af..74a63aab6aa 100644
--- a/yt/yql/providers/yt/provider/yql_yt_provider.cpp
+++ b/yt/yql/providers/yt/provider/yql_yt_provider.cpp
@@ -541,7 +541,7 @@ bool TYtState::IsHybridEnabled() const {
}
bool TYtState::IsHybridEnabledForCluster(const std::string_view& cluster) const {
- return !OnlyNativeExecution && Configuration->_EnableDq.Get(TString(cluster)).GetOrElse(true);
+ return Configuration->_EnableDq.Get(TString(cluster)).GetOrElse(true);
}
bool TYtState::HybridTakesTooLong() const {
diff --git a/yt/yql/providers/yt/provider/yql_yt_provider.h b/yt/yql/providers/yt/provider/yql_yt_provider.h
index e79c41cb135..0c804087229 100644
--- a/yt/yql/providers/yt/provider/yql_yt_provider.h
+++ b/yt/yql/providers/yt/provider/yql_yt_provider.h
@@ -123,6 +123,7 @@ struct TYtState : public TThrRefBase {
i32 FlowDependsOnId = 0;
IOptimizerFactory::TPtr OptimizerFactory_;
IDqHelper::TPtr DqHelper;
+ bool IsDqTimeout = false;
private:
std::unordered_map<ui64, TYtVersionedConfiguration::TState> ConfigurationEvalStates_;
std::unordered_map<ui64, ui32> EpochEvalStates_;