diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2023-08-08 19:08:49 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2023-08-08 19:57:14 +0300 |
commit | 3b1425ae1bdde07577f85e623296e01229b65dc3 (patch) | |
tree | 2d08c316a8150491e2457697634b35271c045131 | |
parent | 3875d4ea30a3307f4ce47d4a01c131f00c100468 (diff) | |
download | ydb-3b1425ae1bdde07577f85e623296e01229b65dc3.tar.gz |
YQL-16247 Limit for time spent in hybrid.
4 files changed, 24 insertions, 4 deletions
diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.h b/ydb/library/yql/providers/yt/common/yql_yt_settings.h index 2b0fd6f6f38..a2beea2464b 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.h +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.h @@ -238,6 +238,7 @@ struct TYtSettings { NCommon::TConfSetting<ui16, false> WideFlowLimit; NCommon::TConfSetting<bool, false> UseSystemColumns; NCommon::TConfSetting<bool, false> HybridDqExecution; + NCommon::TConfSetting<TDuration, false> HybridDqTimeSpentLimit; NCommon::TConfSetting<NSize::TSize, false> HybridDqDataSizeLimitForOrdered; NCommon::TConfSetting<NSize::TSize, false> HybridDqDataSizeLimitForUnordered; NCommon::TConfSetting<bool, false> HybridDqExecutionFallback; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp index 327b066d47e..a794c783fa6 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp @@ -549,14 +549,27 @@ private: return MakeTableForDqWrite(input, ctx); } else if (const auto& result = input->GetResult(); result.IsAtom()) { + const auto publicId = State_->Types->TranslateOperationId(input->UniqueId()); if (result.IsAtom("")) { // Second iteration: do the actual write. + if (publicId) { + if (State_->HybridInFlightOprations.empty()) + State_->HybridStartTime = NMonotonic::TMonotonic::Now(); + YQL_ENSURE(State_->HybridInFlightOprations.emplace(*publicId).second, "Operation " << *publicId << " already started."); + } return RunDqWrite(input, ctx, tmpTable); - } else if(result.IsAtom("FallbackOnError")) { - return SyncOk(); } else { + if (publicId) { + YQL_ENSURE(State_->HybridInFlightOprations.erase(*publicId), "Operation " << *publicId << " not found."); + if (State_->HybridInFlightOprations.empty()) + State_->TimeSpentInHybrid += NMonotonic::TMonotonic::Now() - State_->HybridStartTime; + } + + if (result.IsAtom("FallbackOnError")) + return SyncOk(); + + YQL_ENSURE(result.IsAtom("DQ_completed"), "Unexpected result atom: " << result.Content()); // Third iteration: collect temporary table statistics. - Y_ENSURE(result.IsAtom("DQ_completed"), "Unexpected result atom: " << result.Content()); return CollectDqWrittenTableStats(input, ctx); } } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp index 81116065b5b..52ce13a53a8 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp @@ -509,7 +509,9 @@ bool TYtState::IsHybridEnabled() const { } bool TYtState::IsHybridEnabledForCluster(const std::string_view& cluster) const { - return Configuration->_EnableDq.Get(TString(cluster)).GetOrElse(true); + return Configuration->_EnableDq.Get(TString(cluster)).GetOrElse(true) + && TimeSpentInHybrid + (HybridInFlightOprations.empty() ? TDuration::Zero() : NMonotonic::TMonotonic::Now() - HybridStartTime) + < Configuration->HybridDqTimeSpentLimit.Get().GetOrElse(TDuration::Minutes(20)); } } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h index 24d76a4fe7c..94e55d44c7c 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h @@ -11,6 +11,7 @@ #include <ydb/library/yql/core/yql_execution.h> #include <ydb/library/yql/ast/yql_constraint.h> +#include <library/cpp/time_provider/monotonic.h> #include <library/cpp/yson/writer.h> #include <util/generic/string.h> @@ -98,6 +99,9 @@ struct TYtState : public TThrRefBase { THolder<IDqIntegration> DqIntegration_; ui32 NextEpochId = 1; bool OnlyNativeExecution = false; + TDuration TimeSpentInHybrid; + NMonotonic::TMonotonic HybridStartTime; + std::unordered_set<ui32> HybridInFlightOprations; private: std::unordered_map<ui64, TYtVersionedConfiguration::TState> ConfigurationEvalStates_; std::unordered_map<ui64, ui32> EpochEvalStates_; |