aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2023-08-08 19:08:49 +0300
committera-romanov <Anton.Romanov@ydb.tech>2023-08-08 19:57:14 +0300
commit3b1425ae1bdde07577f85e623296e01229b65dc3 (patch)
tree2d08c316a8150491e2457697634b35271c045131
parent3875d4ea30a3307f4ce47d4a01c131f00c100468 (diff)
downloadydb-3b1425ae1bdde07577f85e623296e01229b65dc3.tar.gz
YQL-16247 Limit for time spent in hybrid.
-rw-r--r--ydb/library/yql/providers/yt/common/yql_yt_settings.h1
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp19
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp4
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_provider.h4
4 files changed, 24 insertions, 4 deletions
diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.h b/ydb/library/yql/providers/yt/common/yql_yt_settings.h
index 2b0fd6f6f38..a2beea2464b 100644
--- a/ydb/library/yql/providers/yt/common/yql_yt_settings.h
+++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.h
@@ -238,6 +238,7 @@ struct TYtSettings {
NCommon::TConfSetting<ui16, false> WideFlowLimit;
NCommon::TConfSetting<bool, false> UseSystemColumns;
NCommon::TConfSetting<bool, false> HybridDqExecution;
+ NCommon::TConfSetting<TDuration, false> HybridDqTimeSpentLimit;
NCommon::TConfSetting<NSize::TSize, false> HybridDqDataSizeLimitForOrdered;
NCommon::TConfSetting<NSize::TSize, false> HybridDqDataSizeLimitForUnordered;
NCommon::TConfSetting<bool, false> HybridDqExecutionFallback;
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
index 327b066d47e..a794c783fa6 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_datasink_exec.cpp
@@ -549,14 +549,27 @@ private:
return MakeTableForDqWrite(input, ctx);
}
else if (const auto& result = input->GetResult(); result.IsAtom()) {
+ const auto publicId = State_->Types->TranslateOperationId(input->UniqueId());
if (result.IsAtom("")) {
// Second iteration: do the actual write.
+ if (publicId) {
+ if (State_->HybridInFlightOprations.empty())
+ State_->HybridStartTime = NMonotonic::TMonotonic::Now();
+ YQL_ENSURE(State_->HybridInFlightOprations.emplace(*publicId).second, "Operation " << *publicId << " already started.");
+ }
return RunDqWrite(input, ctx, tmpTable);
- } else if(result.IsAtom("FallbackOnError")) {
- return SyncOk();
} else {
+ if (publicId) {
+ YQL_ENSURE(State_->HybridInFlightOprations.erase(*publicId), "Operation " << *publicId << " not found.");
+ if (State_->HybridInFlightOprations.empty())
+ State_->TimeSpentInHybrid += NMonotonic::TMonotonic::Now() - State_->HybridStartTime;
+ }
+
+ if (result.IsAtom("FallbackOnError"))
+ return SyncOk();
+
+ YQL_ENSURE(result.IsAtom("DQ_completed"), "Unexpected result atom: " << result.Content());
// Third iteration: collect temporary table statistics.
- Y_ENSURE(result.IsAtom("DQ_completed"), "Unexpected result atom: " << result.Content());
return CollectDqWrittenTableStats(input, ctx);
}
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp
index 81116065b5b..52ce13a53a8 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp
@@ -509,7 +509,9 @@ bool TYtState::IsHybridEnabled() const {
}
bool TYtState::IsHybridEnabledForCluster(const std::string_view& cluster) const {
- return Configuration->_EnableDq.Get(TString(cluster)).GetOrElse(true);
+ return Configuration->_EnableDq.Get(TString(cluster)).GetOrElse(true)
+ && TimeSpentInHybrid + (HybridInFlightOprations.empty() ? TDuration::Zero() : NMonotonic::TMonotonic::Now() - HybridStartTime)
+ < Configuration->HybridDqTimeSpentLimit.Get().GetOrElse(TDuration::Minutes(20));
}
}
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h
index 24d76a4fe7c..94e55d44c7c 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.h
@@ -11,6 +11,7 @@
#include <ydb/library/yql/core/yql_execution.h>
#include <ydb/library/yql/ast/yql_constraint.h>
+#include <library/cpp/time_provider/monotonic.h>
#include <library/cpp/yson/writer.h>
#include <util/generic/string.h>
@@ -98,6 +99,9 @@ struct TYtState : public TThrRefBase {
THolder<IDqIntegration> DqIntegration_;
ui32 NextEpochId = 1;
bool OnlyNativeExecution = false;
+ TDuration TimeSpentInHybrid;
+ NMonotonic::TMonotonic HybridStartTime;
+ std::unordered_set<ui32> HybridInFlightOprations;
private:
std::unordered_map<ui64, TYtVersionedConfiguration::TState> ConfigurationEvalStates_;
std::unordered_map<ui64, ui32> EpochEvalStates_;