diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-11 00:09:11 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-11 00:09:11 +0300 |
commit | 6e11f648a116d97e271e6dad640070a37e3960e4 (patch) | |
tree | 74e9b8dd205f3e9de207ddce67a8e2d6e96f207c | |
parent | d0d27ff451c16dbec7fbc99206cba32803c52cc6 (diff) | |
download | ydb-6e11f648a116d97e271e6dad640070a37e3960e4.tar.gz |
Better RetryRate instead of RetryCounter
Better RetryRate instead of RetryCounter
Limit faulty queries with RetryRate instead of RetryCounter
ref:0ef73932e06018143b2452ecab70232e5e96bf24
4 files changed, 35 insertions, 23 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp index a5aea82148d..dd73e9f078e 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp @@ -15,6 +15,7 @@ struct TTaskInternal { TEvControlPlaneStorage::TTask Task; ui64 RetryCounter = 0; TInstant RetryCounterUpdatedAt = TInstant::Zero(); + double RetryRate = 0.0; bool ShouldAbortTask = false; TString TablePathPrefix; TString Owner; @@ -26,24 +27,20 @@ struct TTaskInternal { std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, const TInstant& nowTimestamp, const TInstant& taskLeaseUntil) { const auto& task = taskInternal.Task; - const auto& retryCounter = taskInternal.RetryCounter; - const auto& retryCounterUpdatedAt = taskInternal.RetryCounterUpdatedAt; - const auto& hostName = taskInternal.HostName; - const auto& owner = taskInternal.Owner; - TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(write)"); queryBuilder.AddString("tenant", taskInternal.TenantName); queryBuilder.AddString("scope", task.Scope); queryBuilder.AddString("query_id", task.QueryId); queryBuilder.AddString("query", task.Query.SerializeAsString()); queryBuilder.AddString("internal", task.Internal.SerializeAsString()); - queryBuilder.AddString("host", hostName); - queryBuilder.AddString("owner", owner); + queryBuilder.AddString("host", taskInternal.HostName); + queryBuilder.AddString("owner", taskInternal.Owner); queryBuilder.AddTimestamp("now", nowTimestamp); queryBuilder.AddTimestamp("ttl", taskLeaseUntil); - queryBuilder.AddUint64("retry_counter", retryCounter); + queryBuilder.AddUint64("retry_counter", taskInternal.RetryCounter); queryBuilder.AddUint64("generation", task.Generation); - queryBuilder.AddTimestamp("retry_counter_update_time", retryCounterUpdatedAt); + queryBuilder.AddTimestamp("retry_counter_update_time", taskInternal.RetryCounterUpdatedAt); + queryBuilder.AddDouble("retry_rate", taskInternal.RetryRate); // update queries queryBuilder.AddText( @@ -54,8 +51,8 @@ std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, con // update pending small queryBuilder.AddText( "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl,\n" - "`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" IS_RESIGN_QUERY_COLUMN_NAME "` = false,\n" - "`" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n" + "`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" RETRY_RATE_COLUMN_NAME "` = $retry_rate,\n" + "`" IS_RESIGN_QUERY_COLUMN_NAME "` = false, `" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n" "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" ); @@ -172,13 +169,13 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ queryBuilder.AddUint64("tasks_limit", tasksBatchSize); queryBuilder.AddText( "SELECT `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`,\n" - "`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" IS_RESIGN_QUERY_COLUMN_NAME "`\n" + "`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" RETRY_RATE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" IS_RESIGN_QUERY_COLUMN_NAME "`\n" "FROM `" PENDING_SMALL_TABLE_NAME "`\n" "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" ASSIGNED_UNTIL_COLUMN_NAME "` < $from ORDER BY `" QUERY_ID_COLUMN_NAME "` DESC LIMIT $tasks_limit;\n" ); auto responseTasks = std::make_shared<TResponseTasks>(); - auto prepareParams = [=, responseTasks=responseTasks](const TVector<TResultSet>& resultSets) mutable { + auto prepareParams = [=, actorSystem=NActors::TActivationContext::ActorSystem(), responseTasks=responseTasks](const TVector<TResultSet>& resultSets) mutable { TVector<TTaskInternal> tasks; TVector<TPickTaskParams> pickTaskParams; const auto now = TInstant::Now(); @@ -202,26 +199,35 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ bool isResignQuery = parser.ColumnParser(IS_RESIGN_QUERY_COLUMN_NAME).GetOptionalBool().GetOrElse(false); taskInternal.RetryCounter = parser.ColumnParser(RETRY_COUNTER_COLUMN_NAME).GetOptionalUint64().GetOrElse(0); taskInternal.RetryCounterUpdatedAt = parser.ColumnParser(RETRY_COUNTER_UPDATE_COLUMN_NAME).GetOptionalTimestamp().GetOrElse(TInstant::Zero()); + taskInternal.RetryRate = parser.ColumnParser(RETRY_RATE_COLUMN_NAME).GetOptionalDouble().GetOrElse(0.0); YandexQuery::QueryContent::QueryType queryType = static_cast<YandexQuery::QueryContent::QueryType>(parser.ColumnParser(QUERY_TYPE_COLUMN_NAME).GetOptionalInt64().GetOrElse(0)); const auto retryCounterLimit = queryType == YandexQuery::QueryContent::ANALYTICS ? Config.Proto.GetAnalyticsRetryCounterLimit() : Config.Proto.GetStreamingRetryCounterLimit(); const auto retryCounterUpdateTime = queryType == YandexQuery::QueryContent::ANALYTICS ? Config.AnalyticsRetryCounterUpdateTime : Config.StreamingRetryCounterUpdateTime; - if (isResignQuery) { - continue; - } - - ++taskInternal.RetryCounter; - - if (now > taskInternal.RetryCounterUpdatedAt + retryCounterUpdateTime) { - taskInternal.RetryCounter = 0; - taskInternal.RetryCounterUpdatedAt = now; + auto lastPeriod = now - taskInternal.RetryCounterUpdatedAt; + if (lastPeriod >= retryCounterUpdateTime) { + taskInternal.RetryRate = 0.0; + } else { + taskInternal.RetryRate += 1.0; + auto rate = lastPeriod / retryCounterUpdateTime * retryCounterLimit; + if (taskInternal.RetryRate > rate) { + taskInternal.RetryRate -= rate; + } else { + taskInternal.RetryRate = 0.0; + } } + taskInternal.RetryCounter += 1; + taskInternal.RetryCounterUpdatedAt = now; - if (taskInternal.RetryCounter >= retryCounterLimit) { + if (!isResignQuery && taskInternal.RetryRate >= retryCounterLimit) { taskInternal.ShouldAbortTask = true; } + + CPS_LOG_AS_D(*actorSystem, "Task (Query): " << task.QueryId << " RetryRate: " << taskInternal.RetryRate + << " RetryCounter: " << taskInternal.RetryCounter << " At: " << taskInternal.RetryCounterUpdatedAt + << (taskInternal.ShouldAbortTask ? " ABORTED" : "")); } std::shuffle(tasks.begin(), tasks.end(), std::default_random_engine()); diff --git a/ydb/core/yq/libs/control_plane_storage/schema.h b/ydb/core/yq/libs/control_plane_storage/schema.h index 9d4b6719a9a..6677be5a306 100644 --- a/ydb/core/yq/libs/control_plane_storage/schema.h +++ b/ydb/core/yq/libs/control_plane_storage/schema.h @@ -67,5 +67,6 @@ namespace NYq { #define OWNER_COLUMN_NAME "owner" #define LAST_SEEN_AT_COLUMN_NAME "last_seen_at" #define ASSIGNED_UNTIL_COLUMN_NAME "assigned_until" +#define RETRY_RATE_COLUMN_NAME "retry_rate" } // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index 8168e98fe93..42ac23f6e5c 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -134,6 +134,7 @@ void TYdbControlPlaneStorageActor::CreatePendingSmallTable() .AddNullableColumn(OWNER_COLUMN_NAME, EPrimitiveType::String) .AddNullableColumn(TENANT_COLUMN_NAME, EPrimitiveType::String) .AddNullableColumn(ASSIGNED_UNTIL_COLUMN_NAME, EPrimitiveType::Timestamp) + .AddNullableColumn(RETRY_RATE_COLUMN_NAME, EPrimitiveType::Double) .SetPrimaryKeyColumns({TENANT_COLUMN_NAME, SCOPE_COLUMN_NAME, QUERY_ID_COLUMN_NAME}) .Build(); diff --git a/ydb/core/yq/libs/db_schema/db_schema.h b/ydb/core/yq/libs/db_schema/db_schema.h index c14a7d21c01..8faf8278f7e 100644 --- a/ydb/core/yq/libs/db_schema/db_schema.h +++ b/ydb/core/yq/libs/db_schema/db_schema.h @@ -51,6 +51,10 @@ public: return AddValue(name, NYdb::TValueBuilder().Bool(value).Build()); } + TSqlQueryBuilder& AddDouble(const TString& name, double value) { + return AddValue(name, NYdb::TValueBuilder().Double(value).Build()); + } + TSqlQueryBuilder& AddValue(const TString& name, const NYdb::TValue& value, const TString& pred); TSqlQueryBuilder& AddValue(const TString& name, const TString& value, const TString& pred) { |