diff options
author | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-26 12:18:24 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <hor911@gmail.com> | 2022-04-26 12:18:24 +0300 |
commit | e42e29335e2001970f95535e7557d5c27519f29f (patch) | |
tree | 026a086ae687a5b6d695f84ce1659758b65b9b8b | |
parent | 47168758392c01c13b54f9d307a5af423cf8a4a0 (diff) | |
download | ydb-e42e29335e2001970f95535e7557d5c27519f29f.tar.gz |
Use StatusCode in yellow zone
ref:d9ad15810d9cb77dbb1c7fb2960db69ea50fab36
18 files changed, 285 insertions, 161 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index bcb44994755..6847794e711 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -366,13 +366,13 @@ add_subdirectory(library/cpp/digest/old_crc/gencrc) add_subdirectory(ydb/core/protos) add_subdirectory(ydb/core/yq/libs/config/protos) add_subdirectory(ydb/library/folder_service/proto) +add_subdirectory(ydb/library/yql/dq/actors/protos) +add_subdirectory(ydb/library/yql/dq/proto) add_subdirectory(ydb/library/yql/providers/s3/proto) add_subdirectory(ydb/library/login/protos) add_subdirectory(ydb/library/mkql_proto/protos) add_subdirectory(contrib/tools/protoc/plugins/grpc_cpp) add_subdirectory(contrib/libs/grpc/src/compiler/grpc_plugin_support) -add_subdirectory(ydb/library/yql/dq/actors/protos) -add_subdirectory(ydb/library/yql/dq/proto) add_subdirectory(ydb/core/protos/out) add_subdirectory(ydb/library/aclib) add_subdirectory(ydb/library/aclib/protos) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 3d1e7471772..a3ffccb62a6 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -446,13 +446,13 @@ add_subdirectory(library/cpp/digest/old_crc/gencrc) add_subdirectory(ydb/core/protos) add_subdirectory(ydb/core/yq/libs/config/protos) add_subdirectory(ydb/library/folder_service/proto) +add_subdirectory(ydb/library/yql/dq/actors/protos) +add_subdirectory(ydb/library/yql/dq/proto) add_subdirectory(ydb/library/yql/providers/s3/proto) add_subdirectory(ydb/library/login/protos) add_subdirectory(ydb/library/mkql_proto/protos) add_subdirectory(contrib/tools/protoc/plugins/grpc_cpp) add_subdirectory(contrib/libs/grpc/src/compiler/grpc_plugin_support) -add_subdirectory(ydb/library/yql/dq/actors/protos) -add_subdirectory(ydb/library/yql/dq/proto) add_subdirectory(ydb/core/protos/out) add_subdirectory(ydb/library/aclib) add_subdirectory(ydb/library/aclib/protos) diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 568c581656a..418bfdd6ab1 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -685,7 +685,7 @@ private: const bool finalize = failure || DqGraphIndex + 1 >= static_cast<i32>(DqGraphParams.size()); if (finalize) { - if (RetryNeeded) { + if (failure) { ResignQuery(ev->Get()->Record.GetStatusCode()); return; } @@ -987,7 +987,7 @@ private: } void Finish(YandexQuery::QueryMeta::ComputeStatus status) { - LOG_D("Is about to finish query with status " << YandexQuery::QueryMeta::ComputeStatus_Name(status));; + LOG_D("Is about to finish query with status " << YandexQuery::QueryMeta::ComputeStatus_Name(status)); Finishing = true; RetryNeeded = false; FinalQueryStatus = status; diff --git a/ydb/core/yq/libs/actors/task_ping.cpp b/ydb/core/yq/libs/actors/task_ping.cpp index 138317a9b69..43193914ae8 100644 --- a/ydb/core/yq/libs/actors/task_ping.cpp +++ b/ydb/core/yq/libs/actors/task_ping.cpp @@ -154,6 +154,7 @@ private: event->FinishedAt = TInstant::FromValue(google::protobuf::util::TimeUtil::TimestampToMicroseconds(req.finished_at())); } event->ResignQuery = req.resign_query(); + event->StatusCode = req.status_code(); event->CreatedTopicConsumers.reserve(req.created_topic_consumers_size()); for (const auto& topicConsumerProto : req.created_topic_consumers()) { diff --git a/ydb/core/yq/libs/config/protos/CMakeLists.txt b/ydb/core/yq/libs/config/protos/CMakeLists.txt index bcd88b57e38..2ba9a7fb892 100644 --- a/ydb/core/yq/libs/config/protos/CMakeLists.txt +++ b/ydb/core/yq/libs/config/protos/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(libs-config-protos PUBLIC contrib-libs-cxxsupp yutil library-folder_service-proto + dq-actors-protos providers-common-proto providers-s3-proto contrib-libs-protobuf diff --git a/ydb/core/yq/libs/config/protos/control_plane_storage.proto b/ydb/core/yq/libs/config/protos/control_plane_storage.proto index 6ed1169fae2..4e83ef3b727 100644 --- a/ydb/core/yq/libs/config/protos/control_plane_storage.proto +++ b/ydb/core/yq/libs/config/protos/control_plane_storage.proto @@ -6,6 +6,8 @@ option java_package = "ru.yandex.kikimr.proto"; import "ydb/core/yq/libs/config/protos/storage.proto"; +import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto"; + //////////////////////////////////////////////////////////// message TMapItem { @@ -19,6 +21,22 @@ message TQueryMapping { repeated string CommonTenantName = 3; } +// 1. StatusCode(s) are handled with defined policis, non-unique StatusCode(s) accross all polices is UB +// 2. RetryCount and RetryPeriodMs are used to calculate actual RetryRate, if it exceeds RetryCount, query is aborted +// 3. BackoffPeriodMs is factor of RetryRate to delay query execution before next retry +// 4. There are no default retry policy, all unhandled statuses are fatal + +message TRetryPolicy { + uint64 RetryCount = 1; + string RetryPeriod = 2; + string BackoffPeriod = 3; +} + +message TRetryPolicyMapping { + repeated NYql.NDqProto.StatusIds.StatusCode StatusCode = 1; + TRetryPolicy Policy = 2; +} + message TControlPlaneStorageConfig { bool Enabled = 1; NYq.NConfig.TYdbStorageConfig Storage = 2; // TODO: remove @@ -35,14 +53,16 @@ message TControlPlaneStorageConfig { bool UseInMemory = 13; uint64 TasksBatchSize = 14; uint64 NumTasksProportion = 15; - uint64 AnalyticsRetryCounterLimit = 16; - uint64 StreamingRetryCounterLimit = 17; - string AnalyticsRetryCounterUpdateTime = 18; - string StreamingRetryCounterUpdateTime = 19; + uint64 AnalyticsRetryCounterLimit = 16; // DEPRECATED by RetryPolicy::RetryCount + uint64 StreamingRetryCounterLimit = 17; // DEPRECATED by RetryPolicy::RetryCount + string AnalyticsRetryCounterUpdateTime = 18; // DEPRECATED by RetryPolicy::RetryPeriodMs + string StreamingRetryCounterUpdateTime = 19; // DEPRECATED by RetryPolicy::RetryPeriodMs string AutomaticQueriesTtl = 20; string TaskLeaseTtl = 21; + TRetryPolicy TaskLeaseRetryPolicy = 27; repeated string AvailableConnection = 22; repeated string AvailableBinding = 23; string ResultSetsTtl = 24; TQueryMapping Mapping = 25; + repeated TRetryPolicyMapping RetryPolicyMapping = 26; } diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h index 578d859405f..0e867a47610 100644 --- a/ydb/core/yq/libs/control_plane_storage/events/events.h +++ b/ydb/core/yq/libs/control_plane_storage/events/events.h @@ -1591,6 +1591,7 @@ struct TEvControlPlaneStorage { TMaybe<TInstant> StartedAt; TMaybe<TInstant> FinishedAt; bool ResignQuery = false; + ui64 StatusCode = 0; TVector<TTopicConsumer> CreatedTopicConsumers; TVector<TString> DqGraphs; i32 DqGraphIndex = 0; diff --git a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp index 783b9242385..8742d91df52 100644 --- a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp @@ -36,8 +36,6 @@ class TInMemoryControlPlaneStorageActor : public NActors::TActor<TInMemoryContro , IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10))) , AutomaticQueriesTtl(GetDuration(Proto.GetAutomaticQueriesTtl(), TDuration::Days(1))) , ResultSetsTtl(GetDuration(Proto.GetResultSetsTtl(), TDuration::Days(1))) - , AnalyticsRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1))) - , StreamingRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1))) , TaskLeaseTtl(GetDuration(Proto.GetTaskLeaseTtl(), TDuration::Seconds(30))) { } diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp index dd73e9f078e..14c2074064b 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp @@ -52,7 +52,7 @@ std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, con queryBuilder.AddText( "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl,\n" "`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" RETRY_RATE_COLUMN_NAME "` = $retry_rate,\n" - "`" IS_RESIGN_QUERY_COLUMN_NAME "` = false, `" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n" + "`" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n" "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" ); @@ -168,8 +168,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ queryBuilder.AddTimestamp("from", now); queryBuilder.AddUint64("tasks_limit", tasksBatchSize); queryBuilder.AddText( - "SELECT `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`,\n" - "`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" RETRY_RATE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" IS_RESIGN_QUERY_COLUMN_NAME "`\n" + "SELECT `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "`, `" LAST_SEEN_AT_COLUMN_NAME "`,\n" + "`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" RETRY_RATE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`\n" "FROM `" PENDING_SMALL_TABLE_NAME "`\n" "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" ASSIGNED_UNTIL_COLUMN_NAME "` < $from ORDER BY `" QUERY_ID_COLUMN_NAME "` DESC LIMIT $tasks_limit;\n" ); @@ -195,34 +195,36 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ task.Scope = *parser.ColumnParser(SCOPE_COLUMN_NAME).GetOptionalString(); task.QueryId = *parser.ColumnParser(QUERY_ID_COLUMN_NAME).GetOptionalString(); + auto previousOwner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString(); - bool isResignQuery = parser.ColumnParser(IS_RESIGN_QUERY_COLUMN_NAME).GetOptionalBool().GetOrElse(false); taskInternal.RetryCounter = parser.ColumnParser(RETRY_COUNTER_COLUMN_NAME).GetOptionalUint64().GetOrElse(0); taskInternal.RetryCounterUpdatedAt = parser.ColumnParser(RETRY_COUNTER_UPDATE_COLUMN_NAME).GetOptionalTimestamp().GetOrElse(TInstant::Zero()); taskInternal.RetryRate = parser.ColumnParser(RETRY_RATE_COLUMN_NAME).GetOptionalDouble().GetOrElse(0.0); + auto lastSeenAt = parser.ColumnParser(LAST_SEEN_AT_COLUMN_NAME).GetOptionalTimestamp().GetOrElse(TInstant::Zero()); - YandexQuery::QueryContent::QueryType queryType = static_cast<YandexQuery::QueryContent::QueryType>(parser.ColumnParser(QUERY_TYPE_COLUMN_NAME).GetOptionalInt64().GetOrElse(0)); + if (previousOwner) { // task lease timeout case only, other cases are updated at ping time - const auto retryCounterLimit = queryType == YandexQuery::QueryContent::ANALYTICS ? Config.Proto.GetAnalyticsRetryCounterLimit() : Config.Proto.GetStreamingRetryCounterLimit(); - const auto retryCounterUpdateTime = queryType == YandexQuery::QueryContent::ANALYTICS ? Config.AnalyticsRetryCounterUpdateTime : Config.StreamingRetryCounterUpdateTime; + CPS_LOG_AS_D(*actorSystem, "Task (Query): " << task.QueryId << " Lease TIMEOUT, RetryCounterUpdatedAt " << taskInternal.RetryCounterUpdatedAt + << " LastSeenAt: " << lastSeenAt); - auto lastPeriod = now - taskInternal.RetryCounterUpdatedAt; - if (lastPeriod >= retryCounterUpdateTime) { - taskInternal.RetryRate = 0.0; - } else { - taskInternal.RetryRate += 1.0; - auto rate = lastPeriod / retryCounterUpdateTime * retryCounterLimit; - if (taskInternal.RetryRate > rate) { - taskInternal.RetryRate -= rate; - } else { + auto lastPeriod = lastSeenAt - taskInternal.RetryCounterUpdatedAt; + if (lastPeriod >= Config.TaskLeaseRetryPolicy.RetryPeriod) { taskInternal.RetryRate = 0.0; + } else { + taskInternal.RetryRate += 1.0; + auto rate = lastPeriod / Config.TaskLeaseRetryPolicy.RetryPeriod * Config.TaskLeaseRetryPolicy.RetryCount; + if (taskInternal.RetryRate > rate) { + taskInternal.RetryRate -= rate; + } else { + taskInternal.RetryRate = 0.0; + } } - } - taskInternal.RetryCounter += 1; - taskInternal.RetryCounterUpdatedAt = now; + taskInternal.RetryCounter += 1; + taskInternal.RetryCounterUpdatedAt = now; - if (!isResignQuery && taskInternal.RetryRate >= retryCounterLimit) { - taskInternal.ShouldAbortTask = true; + if (taskInternal.RetryRate >= Config.TaskLeaseRetryPolicy.RetryCount) { + taskInternal.ShouldAbortTask = true; + } } CPS_LOG_AS_D(*actorSystem, "Task (Query): " << task.QueryId << " RetryRate: " << taskInternal.RetryRate diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp index 7a7b84f1b71..b2ff0bfcd49 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp @@ -19,7 +19,7 @@ bool IsFinishedStatus(YandexQuery::QueryMeta::ComputeStatus status) { std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructHardPingTask( const TEvControlPlaneStorage::TEvPingTaskRequest* request, std::shared_ptr<YandexQuery::QueryAction> response, - const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl) { + const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl, const THashMap<ui64, TYdbControlPlaneStorageActor::TRetryPolicyItem>& retryPolicies) { TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "HardPingTask(read)"); readQueryBuilder.AddString("tenant", request->TenantName); @@ -32,8 +32,8 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam " WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" "SELECT `" JOB_ID_COLUMN_NAME "`, `" JOB_COLUMN_NAME "` FROM `" JOBS_TABLE_NAME "`\n" " WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" JOB_ID_COLUMN_NAME "` = $last_job_id;\n" - "SELECT `" OWNER_COLUMN_NAME "` FROM `" PENDING_SMALL_TABLE_NAME "`\n" - " WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" + "SELECT `" OWNER_COLUMN_NAME "`, `" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" RETRY_RATE_COLUMN_NAME "`\n" + "FROM `" PENDING_SMALL_TABLE_NAME "` WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" ); auto prepareParams = [=](const TVector<TResultSet>& resultSets) { @@ -41,57 +41,119 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam YandexQuery::Query query; YandexQuery::Internal::QueryInternal internal; YandexQuery::Job job; - TString selectedOwner; + TString owner; + ui64 retryCounter = 0; + TInstant retryCounterUpdatedAt = TInstant::Zero(); + double retryRate = 0.0; if (resultSets.size() != 3) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 3 but equal " << resultSets.size() << ". Please contact internal support"; + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "RESULT SET SIZE of " << resultSets.size() << " != 3"; } { TResultSetParser parser(resultSets[0]); - if (parser.TryNextRow()) { - if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support"; - } - if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support"; - } + if (!parser.TryNextRow()) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\""; + } + if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." QUERY_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\""; + } + if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\""; } } { TResultSetParser parser(resultSets[1]); - if (parser.TryNextRow()) { - if (!job.ParseFromString(*parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString())) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for job. Please contact internal support"; - } - jobId = *parser.ColumnParser(JOB_ID_COLUMN_NAME).GetOptionalString(); + if (!parser.TryNextRow()) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " JOBS_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\""; + } + if (!job.ParseFromString(*parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString())) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " JOBS_TABLE_NAME "." JOB_COLUMN_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\""; } + jobId = *parser.ColumnParser(JOB_ID_COLUMN_NAME).GetOptionalString(); } { TResultSetParser parser(resultSets[2]); - if (parser.TryNextRow()) { - selectedOwner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString(); + if (!parser.TryNextRow()) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request->TenantName << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ; } + owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString(); + if (owner != request->Owner) { + ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request->QueryId << "\" MISMATCHED: \"" << request->Owner << "\" (received) != \"" << owner << "\" (selected)"; + } + retryCounter = parser.ColumnParser(RETRY_COUNTER_COLUMN_NAME).GetOptionalUint64().GetOrElse(0); + retryCounterUpdatedAt = parser.ColumnParser(RETRY_COUNTER_UPDATE_COLUMN_NAME).GetOptionalTimestamp().GetOrElse(TInstant::Zero()); + retryRate = parser.ColumnParser(RETRY_RATE_COLUMN_NAME).GetOptionalDouble().GetOrElse(0.0); } - if (selectedOwner != request->Owner) { - ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "Query with the specified Owner: \"" << request->Owner << "\" does not exist. Selected owner: \"" << selectedOwner << "\""; + TMaybe<YandexQuery::QueryMeta::ComputeStatus> queryStatus = request->Status; + TMaybe<NYql::TIssues> issues = request->Issues; + TMaybe<NYql::TIssues> transientIssues = request->TransientIssues; + + // running query us locked for lease period + TDuration backoff = taskLeaseTtl; + + if (request->ResignQuery) { + + TYdbControlPlaneStorageActor::TRetryPolicyItem policy(0, TDuration::Seconds(1), TDuration::Zero()); + auto it = retryPolicies.find(request->StatusCode); + if (it != retryPolicies.end()) { + policy = it->second; + } + + auto lastPeriod = Now() - retryCounterUpdatedAt; + if (lastPeriod >= policy.RetryPeriod) { + retryRate = 0.0; + } else { + retryRate += 1.0; + auto rate = lastPeriod / policy.RetryPeriod * policy.RetryCount; + if (retryRate > rate) { + retryRate -= rate; + } else { + retryRate = 0.0; + } + } + + if (retryRate < policy.RetryCount) { + queryStatus.Clear(); + retryCounter += 1; + retryCounterUpdatedAt = Now(); + // failing query is throttled for backoff period + backoff = policy.BackoffPeriod * retryRate; + owner = ""; + } else { + Cerr << "PING FAILURE for code " << request->StatusCode << Endl; + // failure query should be processed instantly + queryStatus = YandexQuery::QueryMeta::FAILING; + backoff = TDuration::Zero(); + // all transient issues became final + if (transientIssues) { + if (issues) { + issues->AddIssues(*transientIssues); + transientIssues.Clear(); + } else { + issues.Swap(transientIssues); + } + } + } + + Cerr << "PING " << retryCounter << " " << retryCounterUpdatedAt << " " << backoff << Endl; } - if (request->Status) { - query.mutable_meta()->set_status(*request->Status); - job.mutable_query_meta()->set_status(*request->Status); + if (queryStatus) { + query.mutable_meta()->set_status(*queryStatus); + job.mutable_query_meta()->set_status(*queryStatus); } - if (request->Issues) { - NYql::IssuesToMessage(*request->Issues, query.mutable_issue()); - NYql::IssuesToMessage(*request->Issues, job.mutable_issue()); + if (issues) { + NYql::IssuesToMessage(*issues, query.mutable_issue()); + NYql::IssuesToMessage(*issues, job.mutable_issue()); } - if (request->TransientIssues) { - NYql::TIssues issues = *request->TransientIssues; + if (transientIssues) { + NYql::TIssues issues = *transientIssues; for (const auto& issue: *query.mutable_transient_issue()) { issues.AddIssue(NYql::IssueFromMessage(issue)); } @@ -223,11 +285,16 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam ); } else { // update pending small - writeQueryBuilder.AddTimestamp("now", request->ResignQuery ? TInstant::Zero() : TInstant::Now()); - writeQueryBuilder.AddTimestamp("ttl", request->ResignQuery ? TInstant::Zero() : TInstant::Now() + taskLeaseTtl); - const TString updateResignQueryFlag = request->ResignQuery ? ", `" IS_RESIGN_QUERY_COLUMN_NAME "` = true" : ""; + writeQueryBuilder.AddTimestamp("now", TInstant::Now()); + writeQueryBuilder.AddTimestamp("ttl", TInstant::Now() + backoff); + writeQueryBuilder.AddTimestamp("retry_counter_update_time", retryCounterUpdatedAt); + writeQueryBuilder.AddDouble("retry_rate", retryRate); + writeQueryBuilder.AddUint64("retry_counter", retryCounter); + writeQueryBuilder.AddString("owner", owner); writeQueryBuilder.AddText( - "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl" + updateResignQueryFlag + "\n" + "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl,\n" + "`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" RETRY_RATE_COLUMN_NAME "` = $retry_rate,\n" + "`" OWNER_COLUMN_NAME "` = $owner\n" "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" ); } @@ -291,45 +358,47 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam ); auto prepareParams = [=](const TVector<TResultSet>& resultSets) { - TString selectedOwner; + TString owner; YandexQuery::Internal::QueryInternal internal; if (resultSets.size() != 2) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 2 but equal " << resultSets.size() << ". Please contact internal support"; + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "RESULT SET SIZE of " << resultSets.size() << " != 2"; } { TResultSetParser parser(resultSets[0]); - if (parser.TryNextRow()) { - if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { - ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for pending internal query. Please contact internal support"; - } + if (!parser.TryNextRow()) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ; + } + if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\""; } } { TResultSetParser parser(resultSets[1]); - if (parser.TryNextRow()) { - selectedOwner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString(); + if (!parser.TryNextRow()) { + ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request->TenantName << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ; + } + owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString(); + if (owner != request->Owner) { + ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request->QueryId << "\" MISMATCHED: \"" << request->Owner << "\" (received) != \"" << owner << "\" (selected)"; } } - *response = internal.action(); - if (selectedOwner != request->Owner) { - ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "query with the specified Owner: \"" << request->Owner << "\" does not exist. Selected owner: \"" << selectedOwner << "\""; - } + *response = internal.action(); TSqlQueryBuilder writeQueryBuilder(tablePathPrefix, "SoftPingTask(write)"); - writeQueryBuilder.AddTimestamp("now", request->ResignQuery ? TInstant::Zero() : TInstant::Now()); - writeQueryBuilder.AddTimestamp("ttl", request->ResignQuery ? TInstant::Zero() : TInstant::Now() + taskLeaseTtl); + writeQueryBuilder.AddTimestamp("now", TInstant::Now()); + writeQueryBuilder.AddTimestamp("ttl", TInstant::Now() + taskLeaseTtl); writeQueryBuilder.AddString("tenant", request->TenantName); writeQueryBuilder.AddString("scope", request->Scope); writeQueryBuilder.AddString("query_id", request->QueryId); - writeQueryBuilder.AddString("owner", request->Owner); - const TString updateResignQueryFlag = request->ResignQuery ? ", `" IS_RESIGN_QUERY_COLUMN_NAME "` = true" : ""; + Cerr << "PingTask " << TInstant::Now() << " " << request->TenantName << " " << request->Scope << " " << request->QueryId << Endl; + writeQueryBuilder.AddText( - "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl " + updateResignQueryFlag + "\n" + "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl\n" "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n" ); @@ -371,7 +440,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq if (request->Status) Counters.GetFinalStatusCounters(cloudId, scope)->IncByStatus(*request->Status); auto pingTaskParams = DoesPingTaskUpdateQueriesTable(request) ? - ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config.AutomaticQueriesTtl, Config.TaskLeaseTtl) : + ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config.AutomaticQueriesTtl, Config.TaskLeaseTtl, Config.RetryPolicies) : ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config.TaskLeaseTtl); auto readQuery = std::get<0>(pingTaskParams); // Use std::get for win compiler auto readParams = std::get<1>(pingTaskParams); diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h index c8724306426..51985c9795e 100644 --- a/ydb/core/yq/libs/control_plane_storage/message_builders.h +++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h @@ -2,6 +2,7 @@ #include <util/datetime/base.h> +#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h> #include <ydb/public/api/protos/yq.pb.h> #include <ydb/core/yq/libs/control_plane_storage/events/events.h> @@ -1014,6 +1015,7 @@ class TPingTaskBuilder { TMaybe<TInstant> StartedAt; TMaybe<TInstant> FinishedAt; bool ResignQuery = false; + NYql::NDqProto::StatusIds::StatusCode StatusCode = NYql::NDqProto::StatusIds::UNSPECIFIED; TVector<NYq::TEvControlPlaneStorage::TTopicConsumer> CreatedTopicConsumers; TVector<TString> DqGraphs; i32 DqGraphIndex = 0; @@ -1121,9 +1123,15 @@ public: return *this; } - TPingTaskBuilder& SetResignQuery(bool resignQuery = true) + TPingTaskBuilder& SetResignQuery(bool resignQuery = true) + { + ResignQuery = resignQuery; + return *this; + } + + TPingTaskBuilder& SetStatusCode(NYql::NDqProto::StatusIds::StatusCode statusCode = NYql::NDqProto::StatusIds::UNSPECIFIED) { - ResignQuery = resignQuery; + StatusCode = statusCode; return *this; } @@ -1158,6 +1166,7 @@ public: request->StartedAt = StartedAt; request->FinishedAt = FinishedAt; request->ResignQuery = ResignQuery; + request->StatusCode = StatusCode; request->CreatedTopicConsumers = CreatedTopicConsumers; request->DqGraphs = DqGraphs; request->DqGraphIndex = DqGraphIndex; diff --git a/ydb/core/yq/libs/control_plane_storage/util.cpp b/ydb/core/yq/libs/control_plane_storage/util.cpp index 288f25a1cb4..ca53572412b 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.cpp +++ b/ydb/core/yq/libs/control_plane_storage/util.cpp @@ -56,22 +56,6 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane config.SetNumTasksProportion(4); } - if (!config.GetAnalyticsRetryCounterLimit()) { - config.SetAnalyticsRetryCounterLimit(20); - } - - if (!config.GetStreamingRetryCounterLimit()) { - config.SetStreamingRetryCounterLimit(20); - } - - if (!config.GetAnalyticsRetryCounterUpdateTime()) { - config.SetAnalyticsRetryCounterUpdateTime("1d"); - } - - if (!config.GetStreamingRetryCounterUpdateTime()) { - config.SetStreamingRetryCounterUpdateTime("1d"); - } - if (!config.GetAutomaticQueriesTtl()) { config.SetAutomaticQueriesTtl("1d"); } @@ -80,6 +64,32 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane config.SetTaskLeaseTtl("30s"); } + if (!config.HasTaskLeaseRetryPolicy()) { + auto& taskLeaseRetryPolicy = *config.MutableTaskLeaseRetryPolicy(); + taskLeaseRetryPolicy.SetRetryCount(20); + taskLeaseRetryPolicy.SetRetryPeriod("1d"); + } + + if (!config.RetryPolicyMappingSize()) { + { + auto& policyMapping = *config.AddRetryPolicyMapping(); + policyMapping.AddStatusCode(NYql::NDqProto::StatusIds::UNAVAILABLE); + auto& policy = *policyMapping.MutablePolicy(); + policy.SetRetryCount(20); + policy.SetRetryPeriod("1d"); + // backoff is Zero() + } + { + auto& policyMapping = *config.AddRetryPolicyMapping(); + policyMapping.AddStatusCode(NYql::NDqProto::StatusIds::OVERLOADED); + policyMapping.AddStatusCode(NYql::NDqProto::StatusIds::EXTERNAL_ERROR); + auto& policy = *policyMapping.MutablePolicy(); + policy.SetRetryCount(10); + policy.SetRetryPeriod("1m"); + policy.SetBackoffPeriod("1s"); + } + } + if (!config.GetStorage().GetToken() && config.GetStorage().GetOAuthFile()) { config.MutableStorage()->SetToken(StripString(TFileInput(config.GetStorage().GetOAuthFile()).ReadAll())); } @@ -104,6 +114,7 @@ bool DoesPingTaskUpdateQueriesTable(const TEvControlPlaneStorage::TEvPingTaskReq request->Plan || request->StartedAt || request->FinishedAt || + request->ResignQuery || !request->CreatedTopicConsumers.empty() || !request->DqGraphs.empty() || request->DqGraphIndex || diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index 42ac23f6e5c..acc2fc3543d 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -67,8 +67,6 @@ TYdbControlPlaneStorageActor::TConfig::TConfig(const NConfig::TControlPlaneStora , IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10))) , AutomaticQueriesTtl(GetDuration(Proto.GetAutomaticQueriesTtl(), TDuration::Days(1))) , ResultSetsTtl(GetDuration(Proto.GetResultSetsTtl(), TDuration::Days(1))) - , AnalyticsRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1))) - , StreamingRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1))) , TaskLeaseTtl(GetDuration(Proto.GetTaskLeaseTtl(), TDuration::Seconds(30))) { for (const auto& availableConnection: Proto.GetAvailableConnection()) { @@ -78,6 +76,24 @@ TYdbControlPlaneStorageActor::TConfig::TConfig(const NConfig::TControlPlaneStora for (const auto& availableBinding: Proto.GetAvailableBinding()) { AvailableBindings.insert(GetBindingType(availableBinding)); } + + for (const auto& mapping: Proto.GetRetryPolicyMapping()) { + auto& retryPolicy = mapping.GetPolicy(); + auto retryCount = retryPolicy.GetRetryCount(); + auto retryPeriod = GetDuration(retryPolicy.GetRetryPeriod(), TDuration::Hours(1)); + auto backoffPeriod = GetDuration(retryPolicy.GetBackoffPeriod(), TDuration::Zero()); + for (const auto statusCode: mapping.GetStatusCode()) { + RetryPolicies.emplace(statusCode, TRetryPolicyItem(retryCount, retryPeriod, backoffPeriod)); + } + } + + if (Proto.HasTaskLeaseRetryPolicy()) { + TaskLeaseRetryPolicy.RetryCount = Proto.GetTaskLeaseRetryPolicy().GetRetryCount(); + TaskLeaseRetryPolicy.RetryPeriod = GetDuration(Proto.GetTaskLeaseRetryPolicy().GetRetryPeriod(), TDuration::Days(1)); + } else { + TaskLeaseRetryPolicy.RetryCount = 20; + TaskLeaseRetryPolicy.RetryPeriod = TDuration::Days(1); + } } /* diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h index d528d0db20b..a8519ac050a 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h @@ -217,17 +217,30 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont } }; +public: + struct TRetryPolicyItem { + ui64 RetryCount = 0; + TDuration RetryPeriod = TDuration::Zero(); + TDuration BackoffPeriod = TDuration::Zero(); + + TRetryPolicyItem() = default; + TRetryPolicyItem(ui64 retryCount, const TDuration& retryPeriod, const TDuration& backoffPeriod) + : RetryCount(retryCount), RetryPeriod(retryPeriod), BackoffPeriod(backoffPeriod) + { } + }; + +private: struct TConfig { NConfig::TControlPlaneStorageConfig Proto; TString IdsPrefix; TDuration IdempotencyKeyTtl; TDuration AutomaticQueriesTtl; TDuration ResultSetsTtl; - TDuration AnalyticsRetryCounterUpdateTime; - TDuration StreamingRetryCounterUpdateTime; TDuration TaskLeaseTtl; TSet<YandexQuery::ConnectionSetting::ConnectionCase> AvailableConnections; TSet<YandexQuery::BindingSetting::BindingCase> AvailableBindings; + THashMap<ui64, TRetryPolicyItem> RetryPolicies; + TRetryPolicyItem TaskLeaseRetryPolicy; TConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common); }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 3cea1fe0241..e7cee5283c9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -101,12 +101,12 @@ public: static_cast<TDerived*>(this)->DoBootstrap(); } catch (const NKikimr::TMemoryLimitExceededException& e) { - InternalError(TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() + InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit() << ", host: " << HostName() << ", canAllocateExtraMemory: " << CanAllocateExtraMemory); } catch (const std::exception& e) { - InternalError(TIssuesIds::UNEXPECTED, e.what()); + InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::UNEXPECTED, e.what()); } ReportEventElapsedTime(); @@ -198,11 +198,11 @@ protected: hFunc(IDqSourceActor::TEvSourceError, OnSourceError); default: { CA_LOG_C("TDqComputeActorBase, unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")"); - InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")"); + InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")"); } } } catch (const NKikimr::TMemoryLimitExceededException& e) { - InternalError(TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() + InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit() << ", host: " << HostName() << ", canAllocateExtraMemory: " << CanAllocateExtraMemory); @@ -210,7 +210,7 @@ protected: if (PassExceptions) { throw; } - InternalError(TIssuesIds::UNEXPECTED, e.what()); + InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::UNEXPECTED, e.what()); } if (reportTime) { @@ -381,7 +381,7 @@ protected: if (Channels->CheckInFlight("Tasks execution finished")) { State = NDqProto::COMPUTE_STATE_FINISHED; CA_LOG_D("Compute state finished. All channels finished"); - ReportStateAndMaybeDie(TIssue("success")); + ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::SUCCESS, {TIssue("success")}); } } } @@ -434,38 +434,6 @@ protected: Terminate(success, TIssues({TIssue(message)})); } - void ReportStateAndMaybeDie(TIssue&& issue) { - ReportStateAndMaybeDie( - State == NDqProto::COMPUTE_STATE_FINISHED ? - NYql::NDqProto::StatusIds::UNSPECIFIED - : NYql::NDqProto::StatusIds::ABORTED, TIssues({std::move(issue)})); - } - - void ReportStateAndDie(NDqProto::EComputeState state, TIssue&& issue) { - auto execEv = MakeHolder<TEvDqCompute::TEvState>(); - auto& record = execEv->Record; - - record.SetState(state); - if (state != NDqProto::COMPUTE_STATE_FINISHED) { - record.SetStatusCode(NYql::NDqProto::StatusIds::ABORTED); - } else { - record.SetStatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED); - } - record.SetTaskId(Task.GetId()); - if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { - FillStats(record.MutableStats(), /* last */ true); - } - - if (issue.Message) { - IssueToMessage(issue, record.MutableIssues()->Add()); - } - - this->Send(ExecuterId, execEv.Release()); - - TerminateSources(TIssues({issue}), state == NDqProto::COMPUTE_STATE_FINISHED); - Terminate(state == NDqProto::COMPUTE_STATE_FINISHED, TIssues({std::move(issue)})); - } - void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) { auto execEv = MakeHolder<TEvDqCompute::TEvState>(); @@ -496,13 +464,28 @@ protected: SetIssueCode(issueCode, issue); std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator(); State = NDqProto::COMPUTE_STATE_FAILURE; - ReportStateAndMaybeDie(std::move(issue)); + ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, {std::move(issue)}); + } + + void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssuesIds::EIssueCode issueCode, const TString& message) { + CA_LOG_E("InternalError: " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << TIssuesIds::EIssueCode_Name(issueCode) << ": " << message << "."); + TIssue issue(message); + SetIssueCode(issueCode, issue); + std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator(); + State = NDqProto::COMPUTE_STATE_FAILURE; + ReportStateAndMaybeDie(statusCode, TIssues({std::move(issue)})); + } + + void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssue issue) { + std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator(); + State = NDqProto::COMPUTE_STATE_FAILURE; + ReportStateAndMaybeDie(statusCode, TIssues({std::move(issue)})); } - void InternalError(TIssue issue) { + void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssues issues) { std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator(); State = NDqProto::COMPUTE_STATE_FAILURE; - ReportStateAndMaybeDie(std::move(issue)); + ReportStateAndMaybeDie(statusCode, issues); } void ContinueExecute() { @@ -945,7 +928,7 @@ protected: void HandleExecuteBase(TEvDq::TEvAbortExecution::TPtr& ev) { if (ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::INTERNAL_ERROR) { Y_VERIFY(ev->Get()->GetIssues().Size() == 1); - InternalError(*ev->Get()->GetIssues().begin()); + InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ev->Get()->GetIssues().begin()); return; } TIssues issues = ev->Get()->GetIssues(); @@ -1298,9 +1281,8 @@ protected: return; } - TString desc = ev->Get()->Issues.ToString(); - CA_LOG_E("Source[" << ev->Get()->InputIndex << "] fatal error: " << desc); - InternalError(TIssuesIds::DEFAULT_ERROR, desc); + CA_LOG_E("Source[" << ev->Get()->InputIndex << "] fatal error: " << ev->Get()->Issues.ToString()); + InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, ev->Get()->Issues); } void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) override { @@ -1309,9 +1291,8 @@ protected: return; } - TString desc = issues.ToString(); - CA_LOG_E("Sink[" << outputIndex << "] fatal error: " << desc); - InternalError(TIssuesIds::DEFAULT_ERROR, desc); + CA_LOG_E("Sink[" << outputIndex << "] fatal error: " << issues.ToString()); + InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues); } virtual ui64 CalcMkqlMemoryLimit() { diff --git a/ydb/library/yql/dq/actors/protos/dq_status_codes.proto b/ydb/library/yql/dq/actors/protos/dq_status_codes.proto index f0cde1171e0..db04330fcf2 100644 --- a/ydb/library/yql/dq/actors/protos/dq_status_codes.proto +++ b/ydb/library/yql/dq/actors/protos/dq_status_codes.proto @@ -17,5 +17,6 @@ message StatusIds { OVERLOADED = 9; LIMIT_EXCEEDED = 10; UNSUPPORTED = 11; + EXTERNAL_ERROR = 12; } } diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 862576fe78d..663b6177a71 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -403,7 +403,7 @@ private: LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, Sprintf("TMemoryLimitExceededException: %s", err.c_str())); TIssue issue(err); SetIssueCode(TIssuesIds::KIKIMR_PRECONDITION_FAILED, issue); - return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TVector<TIssue>{issue}); + return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::OVERLOADED, TVector<TIssue>{issue}); } THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) { diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp index 57946cc174c..a7b30fa5286 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.cpp +++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp @@ -117,10 +117,11 @@ private: YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << SelfId() - << " TaskId: " << taskId - << " State: " << ev->Get()->Record.GetState() - << " PingCookie: " << ev->Cookie; - + << " EvState TaskId: " << taskId + << " State: " << state.GetState() + << " PingCookie: " << ev->Cookie + << " StatusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(state.GetStatusCode()); + if (state.HasStats() && state.GetStats().GetTasks().size()) { YQL_LOG(DEBUG) << " " << SelfId() << " AddStats " << taskId; AddStats(state.GetStats()); @@ -129,7 +130,7 @@ private: } } - switch (ev->Get()->Record.GetState()) { + switch (state.GetState()) { case NDqProto::COMPUTE_STATE_UNKNOWN: { // TODO: use issues TString message = "unexpected state from " + ToString(computeActor) + ", task: " + ToString(taskId); @@ -491,7 +492,7 @@ private: void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable, bool needFallback) { YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "OnError " << issues.ToOneLineString(); + YQL_LOG(DEBUG) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode); if (Finished) { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(WARN) << "OnError IGNORED when Finished, Retriable=" << retriable << ", NeedFallback=" << needFallback; |