aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-04-11 00:09:11 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-04-11 00:09:11 +0300
commit6e11f648a116d97e271e6dad640070a37e3960e4 (patch)
tree74e9b8dd205f3e9de207ddce67a8e2d6e96f207c
parentd0d27ff451c16dbec7fbc99206cba32803c52cc6 (diff)
downloadydb-6e11f648a116d97e271e6dad640070a37e3960e4.tar.gz
Better RetryRate instead of RetryCounter
Better RetryRate instead of RetryCounter Limit faulty queries with RetryRate instead of RetryCounter ref:0ef73932e06018143b2452ecab70232e5e96bf24
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp52
-rw-r--r--ydb/core/yq/libs/control_plane_storage/schema.h1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp1
-rw-r--r--ydb/core/yq/libs/db_schema/db_schema.h4
4 files changed, 35 insertions, 23 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
index a5aea82148d..dd73e9f078e 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
@@ -15,6 +15,7 @@ struct TTaskInternal {
TEvControlPlaneStorage::TTask Task;
ui64 RetryCounter = 0;
TInstant RetryCounterUpdatedAt = TInstant::Zero();
+ double RetryRate = 0.0;
bool ShouldAbortTask = false;
TString TablePathPrefix;
TString Owner;
@@ -26,24 +27,20 @@ struct TTaskInternal {
std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, const TInstant& nowTimestamp, const TInstant& taskLeaseUntil) {
const auto& task = taskInternal.Task;
- const auto& retryCounter = taskInternal.RetryCounter;
- const auto& retryCounterUpdatedAt = taskInternal.RetryCounterUpdatedAt;
- const auto& hostName = taskInternal.HostName;
- const auto& owner = taskInternal.Owner;
-
TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(write)");
queryBuilder.AddString("tenant", taskInternal.TenantName);
queryBuilder.AddString("scope", task.Scope);
queryBuilder.AddString("query_id", task.QueryId);
queryBuilder.AddString("query", task.Query.SerializeAsString());
queryBuilder.AddString("internal", task.Internal.SerializeAsString());
- queryBuilder.AddString("host", hostName);
- queryBuilder.AddString("owner", owner);
+ queryBuilder.AddString("host", taskInternal.HostName);
+ queryBuilder.AddString("owner", taskInternal.Owner);
queryBuilder.AddTimestamp("now", nowTimestamp);
queryBuilder.AddTimestamp("ttl", taskLeaseUntil);
- queryBuilder.AddUint64("retry_counter", retryCounter);
+ queryBuilder.AddUint64("retry_counter", taskInternal.RetryCounter);
queryBuilder.AddUint64("generation", task.Generation);
- queryBuilder.AddTimestamp("retry_counter_update_time", retryCounterUpdatedAt);
+ queryBuilder.AddTimestamp("retry_counter_update_time", taskInternal.RetryCounterUpdatedAt);
+ queryBuilder.AddDouble("retry_rate", taskInternal.RetryRate);
// update queries
queryBuilder.AddText(
@@ -54,8 +51,8 @@ std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, con
// update pending small
queryBuilder.AddText(
"UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl,\n"
- "`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" IS_RESIGN_QUERY_COLUMN_NAME "` = false,\n"
- "`" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n"
+ "`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" RETRY_RATE_COLUMN_NAME "` = $retry_rate,\n"
+ "`" IS_RESIGN_QUERY_COLUMN_NAME "` = false, `" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n"
"WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
@@ -172,13 +169,13 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
queryBuilder.AddUint64("tasks_limit", tasksBatchSize);
queryBuilder.AddText(
"SELECT `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`,\n"
- "`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" IS_RESIGN_QUERY_COLUMN_NAME "`\n"
+ "`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" RETRY_RATE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" IS_RESIGN_QUERY_COLUMN_NAME "`\n"
"FROM `" PENDING_SMALL_TABLE_NAME "`\n"
"WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" ASSIGNED_UNTIL_COLUMN_NAME "` < $from ORDER BY `" QUERY_ID_COLUMN_NAME "` DESC LIMIT $tasks_limit;\n"
);
auto responseTasks = std::make_shared<TResponseTasks>();
- auto prepareParams = [=, responseTasks=responseTasks](const TVector<TResultSet>& resultSets) mutable {
+ auto prepareParams = [=, actorSystem=NActors::TActivationContext::ActorSystem(), responseTasks=responseTasks](const TVector<TResultSet>& resultSets) mutable {
TVector<TTaskInternal> tasks;
TVector<TPickTaskParams> pickTaskParams;
const auto now = TInstant::Now();
@@ -202,26 +199,35 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
bool isResignQuery = parser.ColumnParser(IS_RESIGN_QUERY_COLUMN_NAME).GetOptionalBool().GetOrElse(false);
taskInternal.RetryCounter = parser.ColumnParser(RETRY_COUNTER_COLUMN_NAME).GetOptionalUint64().GetOrElse(0);
taskInternal.RetryCounterUpdatedAt = parser.ColumnParser(RETRY_COUNTER_UPDATE_COLUMN_NAME).GetOptionalTimestamp().GetOrElse(TInstant::Zero());
+ taskInternal.RetryRate = parser.ColumnParser(RETRY_RATE_COLUMN_NAME).GetOptionalDouble().GetOrElse(0.0);
YandexQuery::QueryContent::QueryType queryType = static_cast<YandexQuery::QueryContent::QueryType>(parser.ColumnParser(QUERY_TYPE_COLUMN_NAME).GetOptionalInt64().GetOrElse(0));
const auto retryCounterLimit = queryType == YandexQuery::QueryContent::ANALYTICS ? Config.Proto.GetAnalyticsRetryCounterLimit() : Config.Proto.GetStreamingRetryCounterLimit();
const auto retryCounterUpdateTime = queryType == YandexQuery::QueryContent::ANALYTICS ? Config.AnalyticsRetryCounterUpdateTime : Config.StreamingRetryCounterUpdateTime;
- if (isResignQuery) {
- continue;
- }
-
- ++taskInternal.RetryCounter;
-
- if (now > taskInternal.RetryCounterUpdatedAt + retryCounterUpdateTime) {
- taskInternal.RetryCounter = 0;
- taskInternal.RetryCounterUpdatedAt = now;
+ auto lastPeriod = now - taskInternal.RetryCounterUpdatedAt;
+ if (lastPeriod >= retryCounterUpdateTime) {
+ taskInternal.RetryRate = 0.0;
+ } else {
+ taskInternal.RetryRate += 1.0;
+ auto rate = lastPeriod / retryCounterUpdateTime * retryCounterLimit;
+ if (taskInternal.RetryRate > rate) {
+ taskInternal.RetryRate -= rate;
+ } else {
+ taskInternal.RetryRate = 0.0;
+ }
}
+ taskInternal.RetryCounter += 1;
+ taskInternal.RetryCounterUpdatedAt = now;
- if (taskInternal.RetryCounter >= retryCounterLimit) {
+ if (!isResignQuery && taskInternal.RetryRate >= retryCounterLimit) {
taskInternal.ShouldAbortTask = true;
}
+
+ CPS_LOG_AS_D(*actorSystem, "Task (Query): " << task.QueryId << " RetryRate: " << taskInternal.RetryRate
+ << " RetryCounter: " << taskInternal.RetryCounter << " At: " << taskInternal.RetryCounterUpdatedAt
+ << (taskInternal.ShouldAbortTask ? " ABORTED" : ""));
}
std::shuffle(tasks.begin(), tasks.end(), std::default_random_engine());
diff --git a/ydb/core/yq/libs/control_plane_storage/schema.h b/ydb/core/yq/libs/control_plane_storage/schema.h
index 9d4b6719a9a..6677be5a306 100644
--- a/ydb/core/yq/libs/control_plane_storage/schema.h
+++ b/ydb/core/yq/libs/control_plane_storage/schema.h
@@ -67,5 +67,6 @@ namespace NYq {
#define OWNER_COLUMN_NAME "owner"
#define LAST_SEEN_AT_COLUMN_NAME "last_seen_at"
#define ASSIGNED_UNTIL_COLUMN_NAME "assigned_until"
+#define RETRY_RATE_COLUMN_NAME "retry_rate"
} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index 8168e98fe93..42ac23f6e5c 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -134,6 +134,7 @@ void TYdbControlPlaneStorageActor::CreatePendingSmallTable()
.AddNullableColumn(OWNER_COLUMN_NAME, EPrimitiveType::String)
.AddNullableColumn(TENANT_COLUMN_NAME, EPrimitiveType::String)
.AddNullableColumn(ASSIGNED_UNTIL_COLUMN_NAME, EPrimitiveType::Timestamp)
+ .AddNullableColumn(RETRY_RATE_COLUMN_NAME, EPrimitiveType::Double)
.SetPrimaryKeyColumns({TENANT_COLUMN_NAME, SCOPE_COLUMN_NAME, QUERY_ID_COLUMN_NAME})
.Build();
diff --git a/ydb/core/yq/libs/db_schema/db_schema.h b/ydb/core/yq/libs/db_schema/db_schema.h
index c14a7d21c01..8faf8278f7e 100644
--- a/ydb/core/yq/libs/db_schema/db_schema.h
+++ b/ydb/core/yq/libs/db_schema/db_schema.h
@@ -51,6 +51,10 @@ public:
return AddValue(name, NYdb::TValueBuilder().Bool(value).Build());
}
+ TSqlQueryBuilder& AddDouble(const TString& name, double value) {
+ return AddValue(name, NYdb::TValueBuilder().Double(value).Build());
+ }
+
TSqlQueryBuilder& AddValue(const TString& name, const NYdb::TValue& value, const TString& pred);
TSqlQueryBuilder& AddValue(const TString& name, const TString& value, const TString& pred) {