aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <hor911@gmail.com>2022-04-26 12:18:24 +0300
committerAleksandr Khoroshilov <hor911@gmail.com>2022-04-26 12:18:24 +0300
commite42e29335e2001970f95535e7557d5c27519f29f (patch)
tree026a086ae687a5b6d695f84ce1659758b65b9b8b
parent47168758392c01c13b54f9d307a5af423cf8a4a0 (diff)
downloadydb-e42e29335e2001970f95535e7557d5c27519f29f.tar.gz
Use StatusCode in yellow zone
ref:d9ad15810d9cb77dbb1c7fb2960db69ea50fab36
-rw-r--r--CMakeLists.darwin.txt4
-rw-r--r--CMakeLists.linux.txt4
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp4
-rw-r--r--ydb/core/yq/libs/actors/task_ping.cpp1
-rw-r--r--ydb/core/yq/libs/config/protos/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/config/protos/control_plane_storage.proto28
-rw-r--r--ydb/core/yq/libs/control_plane_storage/events/events.h1
-rw-r--r--ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp2
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp44
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp171
-rw-r--r--ydb/core/yq/libs/control_plane_storage/message_builders.h13
-rw-r--r--ydb/core/yq/libs/control_plane_storage/util.cpp43
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp20
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h17
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h77
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_status_codes.proto1
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp2
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller.cpp13
18 files changed, 285 insertions, 161 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index bcb44994755..6847794e711 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -366,13 +366,13 @@ add_subdirectory(library/cpp/digest/old_crc/gencrc)
add_subdirectory(ydb/core/protos)
add_subdirectory(ydb/core/yq/libs/config/protos)
add_subdirectory(ydb/library/folder_service/proto)
+add_subdirectory(ydb/library/yql/dq/actors/protos)
+add_subdirectory(ydb/library/yql/dq/proto)
add_subdirectory(ydb/library/yql/providers/s3/proto)
add_subdirectory(ydb/library/login/protos)
add_subdirectory(ydb/library/mkql_proto/protos)
add_subdirectory(contrib/tools/protoc/plugins/grpc_cpp)
add_subdirectory(contrib/libs/grpc/src/compiler/grpc_plugin_support)
-add_subdirectory(ydb/library/yql/dq/actors/protos)
-add_subdirectory(ydb/library/yql/dq/proto)
add_subdirectory(ydb/core/protos/out)
add_subdirectory(ydb/library/aclib)
add_subdirectory(ydb/library/aclib/protos)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 3d1e7471772..a3ffccb62a6 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -446,13 +446,13 @@ add_subdirectory(library/cpp/digest/old_crc/gencrc)
add_subdirectory(ydb/core/protos)
add_subdirectory(ydb/core/yq/libs/config/protos)
add_subdirectory(ydb/library/folder_service/proto)
+add_subdirectory(ydb/library/yql/dq/actors/protos)
+add_subdirectory(ydb/library/yql/dq/proto)
add_subdirectory(ydb/library/yql/providers/s3/proto)
add_subdirectory(ydb/library/login/protos)
add_subdirectory(ydb/library/mkql_proto/protos)
add_subdirectory(contrib/tools/protoc/plugins/grpc_cpp)
add_subdirectory(contrib/libs/grpc/src/compiler/grpc_plugin_support)
-add_subdirectory(ydb/library/yql/dq/actors/protos)
-add_subdirectory(ydb/library/yql/dq/proto)
add_subdirectory(ydb/core/protos/out)
add_subdirectory(ydb/library/aclib)
add_subdirectory(ydb/library/aclib/protos)
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 568c581656a..418bfdd6ab1 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -685,7 +685,7 @@ private:
const bool finalize = failure || DqGraphIndex + 1 >= static_cast<i32>(DqGraphParams.size());
if (finalize) {
- if (RetryNeeded) {
+ if (failure) {
ResignQuery(ev->Get()->Record.GetStatusCode());
return;
}
@@ -987,7 +987,7 @@ private:
}
void Finish(YandexQuery::QueryMeta::ComputeStatus status) {
- LOG_D("Is about to finish query with status " << YandexQuery::QueryMeta::ComputeStatus_Name(status));;
+ LOG_D("Is about to finish query with status " << YandexQuery::QueryMeta::ComputeStatus_Name(status));
Finishing = true;
RetryNeeded = false;
FinalQueryStatus = status;
diff --git a/ydb/core/yq/libs/actors/task_ping.cpp b/ydb/core/yq/libs/actors/task_ping.cpp
index 138317a9b69..43193914ae8 100644
--- a/ydb/core/yq/libs/actors/task_ping.cpp
+++ b/ydb/core/yq/libs/actors/task_ping.cpp
@@ -154,6 +154,7 @@ private:
event->FinishedAt = TInstant::FromValue(google::protobuf::util::TimeUtil::TimestampToMicroseconds(req.finished_at()));
}
event->ResignQuery = req.resign_query();
+ event->StatusCode = req.status_code();
event->CreatedTopicConsumers.reserve(req.created_topic_consumers_size());
for (const auto& topicConsumerProto : req.created_topic_consumers()) {
diff --git a/ydb/core/yq/libs/config/protos/CMakeLists.txt b/ydb/core/yq/libs/config/protos/CMakeLists.txt
index bcd88b57e38..2ba9a7fb892 100644
--- a/ydb/core/yq/libs/config/protos/CMakeLists.txt
+++ b/ydb/core/yq/libs/config/protos/CMakeLists.txt
@@ -12,6 +12,7 @@ target_link_libraries(libs-config-protos PUBLIC
contrib-libs-cxxsupp
yutil
library-folder_service-proto
+ dq-actors-protos
providers-common-proto
providers-s3-proto
contrib-libs-protobuf
diff --git a/ydb/core/yq/libs/config/protos/control_plane_storage.proto b/ydb/core/yq/libs/config/protos/control_plane_storage.proto
index 6ed1169fae2..4e83ef3b727 100644
--- a/ydb/core/yq/libs/config/protos/control_plane_storage.proto
+++ b/ydb/core/yq/libs/config/protos/control_plane_storage.proto
@@ -6,6 +6,8 @@ option java_package = "ru.yandex.kikimr.proto";
import "ydb/core/yq/libs/config/protos/storage.proto";
+import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto";
+
////////////////////////////////////////////////////////////
message TMapItem {
@@ -19,6 +21,22 @@ message TQueryMapping {
repeated string CommonTenantName = 3;
}
+// 1. StatusCode(s) are handled with defined policis, non-unique StatusCode(s) accross all polices is UB
+// 2. RetryCount and RetryPeriodMs are used to calculate actual RetryRate, if it exceeds RetryCount, query is aborted
+// 3. BackoffPeriodMs is factor of RetryRate to delay query execution before next retry
+// 4. There are no default retry policy, all unhandled statuses are fatal
+
+message TRetryPolicy {
+ uint64 RetryCount = 1;
+ string RetryPeriod = 2;
+ string BackoffPeriod = 3;
+}
+
+message TRetryPolicyMapping {
+ repeated NYql.NDqProto.StatusIds.StatusCode StatusCode = 1;
+ TRetryPolicy Policy = 2;
+}
+
message TControlPlaneStorageConfig {
bool Enabled = 1;
NYq.NConfig.TYdbStorageConfig Storage = 2; // TODO: remove
@@ -35,14 +53,16 @@ message TControlPlaneStorageConfig {
bool UseInMemory = 13;
uint64 TasksBatchSize = 14;
uint64 NumTasksProportion = 15;
- uint64 AnalyticsRetryCounterLimit = 16;
- uint64 StreamingRetryCounterLimit = 17;
- string AnalyticsRetryCounterUpdateTime = 18;
- string StreamingRetryCounterUpdateTime = 19;
+ uint64 AnalyticsRetryCounterLimit = 16; // DEPRECATED by RetryPolicy::RetryCount
+ uint64 StreamingRetryCounterLimit = 17; // DEPRECATED by RetryPolicy::RetryCount
+ string AnalyticsRetryCounterUpdateTime = 18; // DEPRECATED by RetryPolicy::RetryPeriodMs
+ string StreamingRetryCounterUpdateTime = 19; // DEPRECATED by RetryPolicy::RetryPeriodMs
string AutomaticQueriesTtl = 20;
string TaskLeaseTtl = 21;
+ TRetryPolicy TaskLeaseRetryPolicy = 27;
repeated string AvailableConnection = 22;
repeated string AvailableBinding = 23;
string ResultSetsTtl = 24;
TQueryMapping Mapping = 25;
+ repeated TRetryPolicyMapping RetryPolicyMapping = 26;
}
diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h
index 578d859405f..0e867a47610 100644
--- a/ydb/core/yq/libs/control_plane_storage/events/events.h
+++ b/ydb/core/yq/libs/control_plane_storage/events/events.h
@@ -1591,6 +1591,7 @@ struct TEvControlPlaneStorage {
TMaybe<TInstant> StartedAt;
TMaybe<TInstant> FinishedAt;
bool ResignQuery = false;
+ ui64 StatusCode = 0;
TVector<TTopicConsumer> CreatedTopicConsumers;
TVector<TString> DqGraphs;
i32 DqGraphIndex = 0;
diff --git a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp
index 783b9242385..8742d91df52 100644
--- a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp
@@ -36,8 +36,6 @@ class TInMemoryControlPlaneStorageActor : public NActors::TActor<TInMemoryContro
, IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10)))
, AutomaticQueriesTtl(GetDuration(Proto.GetAutomaticQueriesTtl(), TDuration::Days(1)))
, ResultSetsTtl(GetDuration(Proto.GetResultSetsTtl(), TDuration::Days(1)))
- , AnalyticsRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1)))
- , StreamingRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1)))
, TaskLeaseTtl(GetDuration(Proto.GetTaskLeaseTtl(), TDuration::Seconds(30)))
{
}
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
index dd73e9f078e..14c2074064b 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
@@ -52,7 +52,7 @@ std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal, con
queryBuilder.AddText(
"UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl,\n"
"`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" RETRY_RATE_COLUMN_NAME "` = $retry_rate,\n"
- "`" IS_RESIGN_QUERY_COLUMN_NAME "` = false, `" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n"
+ "`" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n"
"WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
@@ -168,8 +168,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
queryBuilder.AddTimestamp("from", now);
queryBuilder.AddUint64("tasks_limit", tasksBatchSize);
queryBuilder.AddText(
- "SELECT `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`,\n"
- "`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" RETRY_RATE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" IS_RESIGN_QUERY_COLUMN_NAME "`\n"
+ "SELECT `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "`, `" LAST_SEEN_AT_COLUMN_NAME "`,\n"
+ "`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" RETRY_RATE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`\n"
"FROM `" PENDING_SMALL_TABLE_NAME "`\n"
"WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" ASSIGNED_UNTIL_COLUMN_NAME "` < $from ORDER BY `" QUERY_ID_COLUMN_NAME "` DESC LIMIT $tasks_limit;\n"
);
@@ -195,34 +195,36 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
task.Scope = *parser.ColumnParser(SCOPE_COLUMN_NAME).GetOptionalString();
task.QueryId = *parser.ColumnParser(QUERY_ID_COLUMN_NAME).GetOptionalString();
+ auto previousOwner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString();
- bool isResignQuery = parser.ColumnParser(IS_RESIGN_QUERY_COLUMN_NAME).GetOptionalBool().GetOrElse(false);
taskInternal.RetryCounter = parser.ColumnParser(RETRY_COUNTER_COLUMN_NAME).GetOptionalUint64().GetOrElse(0);
taskInternal.RetryCounterUpdatedAt = parser.ColumnParser(RETRY_COUNTER_UPDATE_COLUMN_NAME).GetOptionalTimestamp().GetOrElse(TInstant::Zero());
taskInternal.RetryRate = parser.ColumnParser(RETRY_RATE_COLUMN_NAME).GetOptionalDouble().GetOrElse(0.0);
+ auto lastSeenAt = parser.ColumnParser(LAST_SEEN_AT_COLUMN_NAME).GetOptionalTimestamp().GetOrElse(TInstant::Zero());
- YandexQuery::QueryContent::QueryType queryType = static_cast<YandexQuery::QueryContent::QueryType>(parser.ColumnParser(QUERY_TYPE_COLUMN_NAME).GetOptionalInt64().GetOrElse(0));
+ if (previousOwner) { // task lease timeout case only, other cases are updated at ping time
- const auto retryCounterLimit = queryType == YandexQuery::QueryContent::ANALYTICS ? Config.Proto.GetAnalyticsRetryCounterLimit() : Config.Proto.GetStreamingRetryCounterLimit();
- const auto retryCounterUpdateTime = queryType == YandexQuery::QueryContent::ANALYTICS ? Config.AnalyticsRetryCounterUpdateTime : Config.StreamingRetryCounterUpdateTime;
+ CPS_LOG_AS_D(*actorSystem, "Task (Query): " << task.QueryId << " Lease TIMEOUT, RetryCounterUpdatedAt " << taskInternal.RetryCounterUpdatedAt
+ << " LastSeenAt: " << lastSeenAt);
- auto lastPeriod = now - taskInternal.RetryCounterUpdatedAt;
- if (lastPeriod >= retryCounterUpdateTime) {
- taskInternal.RetryRate = 0.0;
- } else {
- taskInternal.RetryRate += 1.0;
- auto rate = lastPeriod / retryCounterUpdateTime * retryCounterLimit;
- if (taskInternal.RetryRate > rate) {
- taskInternal.RetryRate -= rate;
- } else {
+ auto lastPeriod = lastSeenAt - taskInternal.RetryCounterUpdatedAt;
+ if (lastPeriod >= Config.TaskLeaseRetryPolicy.RetryPeriod) {
taskInternal.RetryRate = 0.0;
+ } else {
+ taskInternal.RetryRate += 1.0;
+ auto rate = lastPeriod / Config.TaskLeaseRetryPolicy.RetryPeriod * Config.TaskLeaseRetryPolicy.RetryCount;
+ if (taskInternal.RetryRate > rate) {
+ taskInternal.RetryRate -= rate;
+ } else {
+ taskInternal.RetryRate = 0.0;
+ }
}
- }
- taskInternal.RetryCounter += 1;
- taskInternal.RetryCounterUpdatedAt = now;
+ taskInternal.RetryCounter += 1;
+ taskInternal.RetryCounterUpdatedAt = now;
- if (!isResignQuery && taskInternal.RetryRate >= retryCounterLimit) {
- taskInternal.ShouldAbortTask = true;
+ if (taskInternal.RetryRate >= Config.TaskLeaseRetryPolicy.RetryCount) {
+ taskInternal.ShouldAbortTask = true;
+ }
}
CPS_LOG_AS_D(*actorSystem, "Task (Query): " << task.QueryId << " RetryRate: " << taskInternal.RetryRate
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
index 7a7b84f1b71..b2ff0bfcd49 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
@@ -19,7 +19,7 @@ bool IsFinishedStatus(YandexQuery::QueryMeta::ComputeStatus status) {
std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructHardPingTask(
const TEvControlPlaneStorage::TEvPingTaskRequest* request, std::shared_ptr<YandexQuery::QueryAction> response,
- const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl) {
+ const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl, const THashMap<ui64, TYdbControlPlaneStorageActor::TRetryPolicyItem>& retryPolicies) {
TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "HardPingTask(read)");
readQueryBuilder.AddString("tenant", request->TenantName);
@@ -32,8 +32,8 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
" WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
"SELECT `" JOB_ID_COLUMN_NAME "`, `" JOB_COLUMN_NAME "` FROM `" JOBS_TABLE_NAME "`\n"
" WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" JOB_ID_COLUMN_NAME "` = $last_job_id;\n"
- "SELECT `" OWNER_COLUMN_NAME "` FROM `" PENDING_SMALL_TABLE_NAME "`\n"
- " WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ "SELECT `" OWNER_COLUMN_NAME "`, `" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" RETRY_RATE_COLUMN_NAME "`\n"
+ "FROM `" PENDING_SMALL_TABLE_NAME "` WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
auto prepareParams = [=](const TVector<TResultSet>& resultSets) {
@@ -41,57 +41,119 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
YandexQuery::Query query;
YandexQuery::Internal::QueryInternal internal;
YandexQuery::Job job;
- TString selectedOwner;
+ TString owner;
+ ui64 retryCounter = 0;
+ TInstant retryCounterUpdatedAt = TInstant::Zero();
+ double retryRate = 0.0;
if (resultSets.size() != 3) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 3 but equal " << resultSets.size() << ". Please contact internal support";
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "RESULT SET SIZE of " << resultSets.size() << " != 3";
}
{
TResultSetParser parser(resultSets[0]);
- if (parser.TryNextRow()) {
- if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query. Please contact internal support";
- }
- if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support";
- }
+ if (!parser.TryNextRow()) {
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"";
+ }
+ if (!query.ParseFromString(*parser.ColumnParser(QUERY_COLUMN_NAME).GetOptionalString())) {
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." QUERY_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\"";
+ }
+ if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\"";
}
}
{
TResultSetParser parser(resultSets[1]);
- if (parser.TryNextRow()) {
- if (!job.ParseFromString(*parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString())) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for job. Please contact internal support";
- }
- jobId = *parser.ColumnParser(JOB_ID_COLUMN_NAME).GetOptionalString();
+ if (!parser.TryNextRow()) {
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " JOBS_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"";
+ }
+ if (!job.ParseFromString(*parser.ColumnParser(JOB_COLUMN_NAME).GetOptionalString())) {
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " JOBS_TABLE_NAME "." JOB_COLUMN_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"";
}
+ jobId = *parser.ColumnParser(JOB_ID_COLUMN_NAME).GetOptionalString();
}
{
TResultSetParser parser(resultSets[2]);
- if (parser.TryNextRow()) {
- selectedOwner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString();
+ if (!parser.TryNextRow()) {
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request->TenantName << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ;
}
+ owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString();
+ if (owner != request->Owner) {
+ ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request->QueryId << "\" MISMATCHED: \"" << request->Owner << "\" (received) != \"" << owner << "\" (selected)";
+ }
+ retryCounter = parser.ColumnParser(RETRY_COUNTER_COLUMN_NAME).GetOptionalUint64().GetOrElse(0);
+ retryCounterUpdatedAt = parser.ColumnParser(RETRY_COUNTER_UPDATE_COLUMN_NAME).GetOptionalTimestamp().GetOrElse(TInstant::Zero());
+ retryRate = parser.ColumnParser(RETRY_RATE_COLUMN_NAME).GetOptionalDouble().GetOrElse(0.0);
}
- if (selectedOwner != request->Owner) {
- ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "Query with the specified Owner: \"" << request->Owner << "\" does not exist. Selected owner: \"" << selectedOwner << "\"";
+ TMaybe<YandexQuery::QueryMeta::ComputeStatus> queryStatus = request->Status;
+ TMaybe<NYql::TIssues> issues = request->Issues;
+ TMaybe<NYql::TIssues> transientIssues = request->TransientIssues;
+
+ // running query us locked for lease period
+ TDuration backoff = taskLeaseTtl;
+
+ if (request->ResignQuery) {
+
+ TYdbControlPlaneStorageActor::TRetryPolicyItem policy(0, TDuration::Seconds(1), TDuration::Zero());
+ auto it = retryPolicies.find(request->StatusCode);
+ if (it != retryPolicies.end()) {
+ policy = it->second;
+ }
+
+ auto lastPeriod = Now() - retryCounterUpdatedAt;
+ if (lastPeriod >= policy.RetryPeriod) {
+ retryRate = 0.0;
+ } else {
+ retryRate += 1.0;
+ auto rate = lastPeriod / policy.RetryPeriod * policy.RetryCount;
+ if (retryRate > rate) {
+ retryRate -= rate;
+ } else {
+ retryRate = 0.0;
+ }
+ }
+
+ if (retryRate < policy.RetryCount) {
+ queryStatus.Clear();
+ retryCounter += 1;
+ retryCounterUpdatedAt = Now();
+ // failing query is throttled for backoff period
+ backoff = policy.BackoffPeriod * retryRate;
+ owner = "";
+ } else {
+ Cerr << "PING FAILURE for code " << request->StatusCode << Endl;
+ // failure query should be processed instantly
+ queryStatus = YandexQuery::QueryMeta::FAILING;
+ backoff = TDuration::Zero();
+ // all transient issues became final
+ if (transientIssues) {
+ if (issues) {
+ issues->AddIssues(*transientIssues);
+ transientIssues.Clear();
+ } else {
+ issues.Swap(transientIssues);
+ }
+ }
+ }
+
+ Cerr << "PING " << retryCounter << " " << retryCounterUpdatedAt << " " << backoff << Endl;
}
- if (request->Status) {
- query.mutable_meta()->set_status(*request->Status);
- job.mutable_query_meta()->set_status(*request->Status);
+ if (queryStatus) {
+ query.mutable_meta()->set_status(*queryStatus);
+ job.mutable_query_meta()->set_status(*queryStatus);
}
- if (request->Issues) {
- NYql::IssuesToMessage(*request->Issues, query.mutable_issue());
- NYql::IssuesToMessage(*request->Issues, job.mutable_issue());
+ if (issues) {
+ NYql::IssuesToMessage(*issues, query.mutable_issue());
+ NYql::IssuesToMessage(*issues, job.mutable_issue());
}
- if (request->TransientIssues) {
- NYql::TIssues issues = *request->TransientIssues;
+ if (transientIssues) {
+ NYql::TIssues issues = *transientIssues;
for (const auto& issue: *query.mutable_transient_issue()) {
issues.AddIssue(NYql::IssueFromMessage(issue));
}
@@ -223,11 +285,16 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
);
} else {
// update pending small
- writeQueryBuilder.AddTimestamp("now", request->ResignQuery ? TInstant::Zero() : TInstant::Now());
- writeQueryBuilder.AddTimestamp("ttl", request->ResignQuery ? TInstant::Zero() : TInstant::Now() + taskLeaseTtl);
- const TString updateResignQueryFlag = request->ResignQuery ? ", `" IS_RESIGN_QUERY_COLUMN_NAME "` = true" : "";
+ writeQueryBuilder.AddTimestamp("now", TInstant::Now());
+ writeQueryBuilder.AddTimestamp("ttl", TInstant::Now() + backoff);
+ writeQueryBuilder.AddTimestamp("retry_counter_update_time", retryCounterUpdatedAt);
+ writeQueryBuilder.AddDouble("retry_rate", retryRate);
+ writeQueryBuilder.AddUint64("retry_counter", retryCounter);
+ writeQueryBuilder.AddString("owner", owner);
writeQueryBuilder.AddText(
- "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl" + updateResignQueryFlag + "\n"
+ "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl,\n"
+ "`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" RETRY_RATE_COLUMN_NAME "` = $retry_rate,\n"
+ "`" OWNER_COLUMN_NAME "` = $owner\n"
"WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
}
@@ -291,45 +358,47 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
);
auto prepareParams = [=](const TVector<TResultSet>& resultSets) {
- TString selectedOwner;
+ TString owner;
YandexQuery::Internal::QueryInternal internal;
if (resultSets.size() != 2) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Result set size is not equal to 2 but equal " << resultSets.size() << ". Please contact internal support";
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "RESULT SET SIZE of " << resultSets.size() << " != 2";
}
{
TResultSetParser parser(resultSets[0]);
- if (parser.TryNextRow()) {
- if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
- ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for pending internal query. Please contact internal support";
- }
+ if (!parser.TryNextRow()) {
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " QUERIES_TABLE_NAME " where " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ;
+ }
+ if (!internal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "ERROR PARSING " QUERIES_TABLE_NAME "." INTERNAL_COLUMN_NAME " where " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\"";
}
}
{
TResultSetParser parser(resultSets[1]);
- if (parser.TryNextRow()) {
- selectedOwner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString();
+ if (!parser.TryNextRow()) {
+ ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "NOT FOUND " PENDING_SMALL_TABLE_NAME " where " TENANT_COLUMN_NAME " = \"" << request->TenantName << "\" and " SCOPE_COLUMN_NAME " = \"" << request->Scope << "\" and " QUERY_ID_COLUMN_NAME " = \"" << request->QueryId << "\"" ;
+ }
+ owner = *parser.ColumnParser(OWNER_COLUMN_NAME).GetOptionalString();
+ if (owner != request->Owner) {
+ ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "OWNER of QUERY ID = \"" << request->QueryId << "\" MISMATCHED: \"" << request->Owner << "\" (received) != \"" << owner << "\" (selected)";
}
}
- *response = internal.action();
- if (selectedOwner != request->Owner) {
- ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "query with the specified Owner: \"" << request->Owner << "\" does not exist. Selected owner: \"" << selectedOwner << "\"";
- }
+ *response = internal.action();
TSqlQueryBuilder writeQueryBuilder(tablePathPrefix, "SoftPingTask(write)");
- writeQueryBuilder.AddTimestamp("now", request->ResignQuery ? TInstant::Zero() : TInstant::Now());
- writeQueryBuilder.AddTimestamp("ttl", request->ResignQuery ? TInstant::Zero() : TInstant::Now() + taskLeaseTtl);
+ writeQueryBuilder.AddTimestamp("now", TInstant::Now());
+ writeQueryBuilder.AddTimestamp("ttl", TInstant::Now() + taskLeaseTtl);
writeQueryBuilder.AddString("tenant", request->TenantName);
writeQueryBuilder.AddString("scope", request->Scope);
writeQueryBuilder.AddString("query_id", request->QueryId);
- writeQueryBuilder.AddString("owner", request->Owner);
- const TString updateResignQueryFlag = request->ResignQuery ? ", `" IS_RESIGN_QUERY_COLUMN_NAME "` = true" : "";
+ Cerr << "PingTask " << TInstant::Now() << " " << request->TenantName << " " << request->Scope << " " << request->QueryId << Endl;
+
writeQueryBuilder.AddText(
- "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl " + updateResignQueryFlag + "\n"
+ "UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now, `" ASSIGNED_UNTIL_COLUMN_NAME "` = $ttl\n"
"WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
@@ -371,7 +440,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq
if (request->Status)
Counters.GetFinalStatusCounters(cloudId, scope)->IncByStatus(*request->Status);
auto pingTaskParams = DoesPingTaskUpdateQueriesTable(request) ?
- ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config.AutomaticQueriesTtl, Config.TaskLeaseTtl) :
+ ConstructHardPingTask(request, response, YdbConnection->TablePathPrefix, Config.AutomaticQueriesTtl, Config.TaskLeaseTtl, Config.RetryPolicies) :
ConstructSoftPingTask(request, response, YdbConnection->TablePathPrefix, Config.TaskLeaseTtl);
auto readQuery = std::get<0>(pingTaskParams); // Use std::get for win compiler
auto readParams = std::get<1>(pingTaskParams);
diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h
index c8724306426..51985c9795e 100644
--- a/ydb/core/yq/libs/control_plane_storage/message_builders.h
+++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h
@@ -2,6 +2,7 @@
#include <util/datetime/base.h>
+#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
#include <ydb/public/api/protos/yq.pb.h>
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
@@ -1014,6 +1015,7 @@ class TPingTaskBuilder {
TMaybe<TInstant> StartedAt;
TMaybe<TInstant> FinishedAt;
bool ResignQuery = false;
+ NYql::NDqProto::StatusIds::StatusCode StatusCode = NYql::NDqProto::StatusIds::UNSPECIFIED;
TVector<NYq::TEvControlPlaneStorage::TTopicConsumer> CreatedTopicConsumers;
TVector<TString> DqGraphs;
i32 DqGraphIndex = 0;
@@ -1121,9 +1123,15 @@ public:
return *this;
}
- TPingTaskBuilder& SetResignQuery(bool resignQuery = true)
+ TPingTaskBuilder& SetResignQuery(bool resignQuery = true)
+ {
+ ResignQuery = resignQuery;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetStatusCode(NYql::NDqProto::StatusIds::StatusCode statusCode = NYql::NDqProto::StatusIds::UNSPECIFIED)
{
- ResignQuery = resignQuery;
+ StatusCode = statusCode;
return *this;
}
@@ -1158,6 +1166,7 @@ public:
request->StartedAt = StartedAt;
request->FinishedAt = FinishedAt;
request->ResignQuery = ResignQuery;
+ request->StatusCode = StatusCode;
request->CreatedTopicConsumers = CreatedTopicConsumers;
request->DqGraphs = DqGraphs;
request->DqGraphIndex = DqGraphIndex;
diff --git a/ydb/core/yq/libs/control_plane_storage/util.cpp b/ydb/core/yq/libs/control_plane_storage/util.cpp
index 288f25a1cb4..ca53572412b 100644
--- a/ydb/core/yq/libs/control_plane_storage/util.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/util.cpp
@@ -56,22 +56,6 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane
config.SetNumTasksProportion(4);
}
- if (!config.GetAnalyticsRetryCounterLimit()) {
- config.SetAnalyticsRetryCounterLimit(20);
- }
-
- if (!config.GetStreamingRetryCounterLimit()) {
- config.SetStreamingRetryCounterLimit(20);
- }
-
- if (!config.GetAnalyticsRetryCounterUpdateTime()) {
- config.SetAnalyticsRetryCounterUpdateTime("1d");
- }
-
- if (!config.GetStreamingRetryCounterUpdateTime()) {
- config.SetStreamingRetryCounterUpdateTime("1d");
- }
-
if (!config.GetAutomaticQueriesTtl()) {
config.SetAutomaticQueriesTtl("1d");
}
@@ -80,6 +64,32 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane
config.SetTaskLeaseTtl("30s");
}
+ if (!config.HasTaskLeaseRetryPolicy()) {
+ auto& taskLeaseRetryPolicy = *config.MutableTaskLeaseRetryPolicy();
+ taskLeaseRetryPolicy.SetRetryCount(20);
+ taskLeaseRetryPolicy.SetRetryPeriod("1d");
+ }
+
+ if (!config.RetryPolicyMappingSize()) {
+ {
+ auto& policyMapping = *config.AddRetryPolicyMapping();
+ policyMapping.AddStatusCode(NYql::NDqProto::StatusIds::UNAVAILABLE);
+ auto& policy = *policyMapping.MutablePolicy();
+ policy.SetRetryCount(20);
+ policy.SetRetryPeriod("1d");
+ // backoff is Zero()
+ }
+ {
+ auto& policyMapping = *config.AddRetryPolicyMapping();
+ policyMapping.AddStatusCode(NYql::NDqProto::StatusIds::OVERLOADED);
+ policyMapping.AddStatusCode(NYql::NDqProto::StatusIds::EXTERNAL_ERROR);
+ auto& policy = *policyMapping.MutablePolicy();
+ policy.SetRetryCount(10);
+ policy.SetRetryPeriod("1m");
+ policy.SetBackoffPeriod("1s");
+ }
+ }
+
if (!config.GetStorage().GetToken() && config.GetStorage().GetOAuthFile()) {
config.MutableStorage()->SetToken(StripString(TFileInput(config.GetStorage().GetOAuthFile()).ReadAll()));
}
@@ -104,6 +114,7 @@ bool DoesPingTaskUpdateQueriesTable(const TEvControlPlaneStorage::TEvPingTaskReq
request->Plan ||
request->StartedAt ||
request->FinishedAt ||
+ request->ResignQuery ||
!request->CreatedTopicConsumers.empty() ||
!request->DqGraphs.empty() ||
request->DqGraphIndex ||
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index 42ac23f6e5c..acc2fc3543d 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -67,8 +67,6 @@ TYdbControlPlaneStorageActor::TConfig::TConfig(const NConfig::TControlPlaneStora
, IdempotencyKeyTtl(GetDuration(Proto.GetIdempotencyKeysTtl(), TDuration::Minutes(10)))
, AutomaticQueriesTtl(GetDuration(Proto.GetAutomaticQueriesTtl(), TDuration::Days(1)))
, ResultSetsTtl(GetDuration(Proto.GetResultSetsTtl(), TDuration::Days(1)))
- , AnalyticsRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1)))
- , StreamingRetryCounterUpdateTime(GetDuration(Proto.GetAnalyticsRetryCounterUpdateTime(), TDuration::Days(1)))
, TaskLeaseTtl(GetDuration(Proto.GetTaskLeaseTtl(), TDuration::Seconds(30)))
{
for (const auto& availableConnection: Proto.GetAvailableConnection()) {
@@ -78,6 +76,24 @@ TYdbControlPlaneStorageActor::TConfig::TConfig(const NConfig::TControlPlaneStora
for (const auto& availableBinding: Proto.GetAvailableBinding()) {
AvailableBindings.insert(GetBindingType(availableBinding));
}
+
+ for (const auto& mapping: Proto.GetRetryPolicyMapping()) {
+ auto& retryPolicy = mapping.GetPolicy();
+ auto retryCount = retryPolicy.GetRetryCount();
+ auto retryPeriod = GetDuration(retryPolicy.GetRetryPeriod(), TDuration::Hours(1));
+ auto backoffPeriod = GetDuration(retryPolicy.GetBackoffPeriod(), TDuration::Zero());
+ for (const auto statusCode: mapping.GetStatusCode()) {
+ RetryPolicies.emplace(statusCode, TRetryPolicyItem(retryCount, retryPeriod, backoffPeriod));
+ }
+ }
+
+ if (Proto.HasTaskLeaseRetryPolicy()) {
+ TaskLeaseRetryPolicy.RetryCount = Proto.GetTaskLeaseRetryPolicy().GetRetryCount();
+ TaskLeaseRetryPolicy.RetryPeriod = GetDuration(Proto.GetTaskLeaseRetryPolicy().GetRetryPeriod(), TDuration::Days(1));
+ } else {
+ TaskLeaseRetryPolicy.RetryCount = 20;
+ TaskLeaseRetryPolicy.RetryPeriod = TDuration::Days(1);
+ }
}
/*
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index d528d0db20b..a8519ac050a 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -217,17 +217,30 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
}
};
+public:
+ struct TRetryPolicyItem {
+ ui64 RetryCount = 0;
+ TDuration RetryPeriod = TDuration::Zero();
+ TDuration BackoffPeriod = TDuration::Zero();
+
+ TRetryPolicyItem() = default;
+ TRetryPolicyItem(ui64 retryCount, const TDuration& retryPeriod, const TDuration& backoffPeriod)
+ : RetryCount(retryCount), RetryPeriod(retryPeriod), BackoffPeriod(backoffPeriod)
+ { }
+ };
+
+private:
struct TConfig {
NConfig::TControlPlaneStorageConfig Proto;
TString IdsPrefix;
TDuration IdempotencyKeyTtl;
TDuration AutomaticQueriesTtl;
TDuration ResultSetsTtl;
- TDuration AnalyticsRetryCounterUpdateTime;
- TDuration StreamingRetryCounterUpdateTime;
TDuration TaskLeaseTtl;
TSet<YandexQuery::ConnectionSetting::ConnectionCase> AvailableConnections;
TSet<YandexQuery::BindingSetting::BindingCase> AvailableBindings;
+ THashMap<ui64, TRetryPolicyItem> RetryPolicies;
+ TRetryPolicyItem TaskLeaseRetryPolicy;
TConfig(const NConfig::TControlPlaneStorageConfig& config, const NConfig::TCommonConfig& common);
};
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 3cea1fe0241..e7cee5283c9 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -101,12 +101,12 @@ public:
static_cast<TDerived*>(this)->DoBootstrap();
} catch (const NKikimr::TMemoryLimitExceededException& e) {
- InternalError(TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
+ InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
<< "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
<< ", host: " << HostName()
<< ", canAllocateExtraMemory: " << CanAllocateExtraMemory);
} catch (const std::exception& e) {
- InternalError(TIssuesIds::UNEXPECTED, e.what());
+ InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::UNEXPECTED, e.what());
}
ReportEventElapsedTime();
@@ -198,11 +198,11 @@ protected:
hFunc(IDqSourceActor::TEvSourceError, OnSourceError);
default: {
CA_LOG_C("TDqComputeActorBase, unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")");
- InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")");
+ InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Unexpected event: " << ev->GetTypeRewrite() << " (" << GetEventTypeString(ev) << ")");
}
}
} catch (const NKikimr::TMemoryLimitExceededException& e) {
- InternalError(TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
+ InternalError(NYql::NDqProto::StatusIds::OVERLOADED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
<< "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
<< ", host: " << HostName()
<< ", canAllocateExtraMemory: " << CanAllocateExtraMemory);
@@ -210,7 +210,7 @@ protected:
if (PassExceptions) {
throw;
}
- InternalError(TIssuesIds::UNEXPECTED, e.what());
+ InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::UNEXPECTED, e.what());
}
if (reportTime) {
@@ -381,7 +381,7 @@ protected:
if (Channels->CheckInFlight("Tasks execution finished")) {
State = NDqProto::COMPUTE_STATE_FINISHED;
CA_LOG_D("Compute state finished. All channels finished");
- ReportStateAndMaybeDie(TIssue("success"));
+ ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::SUCCESS, {TIssue("success")});
}
}
}
@@ -434,38 +434,6 @@ protected:
Terminate(success, TIssues({TIssue(message)}));
}
- void ReportStateAndMaybeDie(TIssue&& issue) {
- ReportStateAndMaybeDie(
- State == NDqProto::COMPUTE_STATE_FINISHED ?
- NYql::NDqProto::StatusIds::UNSPECIFIED
- : NYql::NDqProto::StatusIds::ABORTED, TIssues({std::move(issue)}));
- }
-
- void ReportStateAndDie(NDqProto::EComputeState state, TIssue&& issue) {
- auto execEv = MakeHolder<TEvDqCompute::TEvState>();
- auto& record = execEv->Record;
-
- record.SetState(state);
- if (state != NDqProto::COMPUTE_STATE_FINISHED) {
- record.SetStatusCode(NYql::NDqProto::StatusIds::ABORTED);
- } else {
- record.SetStatusCode(NYql::NDqProto::StatusIds::UNSPECIFIED);
- }
- record.SetTaskId(Task.GetId());
- if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) {
- FillStats(record.MutableStats(), /* last */ true);
- }
-
- if (issue.Message) {
- IssueToMessage(issue, record.MutableIssues()->Add());
- }
-
- this->Send(ExecuterId, execEv.Release());
-
- TerminateSources(TIssues({issue}), state == NDqProto::COMPUTE_STATE_FINISHED);
- Terminate(state == NDqProto::COMPUTE_STATE_FINISHED, TIssues({std::move(issue)}));
- }
-
void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues)
{
auto execEv = MakeHolder<TEvDqCompute::TEvState>();
@@ -496,13 +464,28 @@ protected:
SetIssueCode(issueCode, issue);
std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator();
State = NDqProto::COMPUTE_STATE_FAILURE;
- ReportStateAndMaybeDie(std::move(issue));
+ ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, {std::move(issue)});
+ }
+
+ void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssuesIds::EIssueCode issueCode, const TString& message) {
+ CA_LOG_E("InternalError: " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode) << " " << TIssuesIds::EIssueCode_Name(issueCode) << ": " << message << ".");
+ TIssue issue(message);
+ SetIssueCode(issueCode, issue);
+ std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator();
+ State = NDqProto::COMPUTE_STATE_FAILURE;
+ ReportStateAndMaybeDie(statusCode, TIssues({std::move(issue)}));
+ }
+
+ void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssue issue) {
+ std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator();
+ State = NDqProto::COMPUTE_STATE_FAILURE;
+ ReportStateAndMaybeDie(statusCode, TIssues({std::move(issue)}));
}
- void InternalError(TIssue issue) {
+ void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssues issues) {
std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator();
State = NDqProto::COMPUTE_STATE_FAILURE;
- ReportStateAndMaybeDie(std::move(issue));
+ ReportStateAndMaybeDie(statusCode, issues);
}
void ContinueExecute() {
@@ -945,7 +928,7 @@ protected:
void HandleExecuteBase(TEvDq::TEvAbortExecution::TPtr& ev) {
if (ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::INTERNAL_ERROR) {
Y_VERIFY(ev->Get()->GetIssues().Size() == 1);
- InternalError(*ev->Get()->GetIssues().begin());
+ InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ev->Get()->GetIssues().begin());
return;
}
TIssues issues = ev->Get()->GetIssues();
@@ -1298,9 +1281,8 @@ protected:
return;
}
- TString desc = ev->Get()->Issues.ToString();
- CA_LOG_E("Source[" << ev->Get()->InputIndex << "] fatal error: " << desc);
- InternalError(TIssuesIds::DEFAULT_ERROR, desc);
+ CA_LOG_E("Source[" << ev->Get()->InputIndex << "] fatal error: " << ev->Get()->Issues.ToString());
+ InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, ev->Get()->Issues);
}
void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) override {
@@ -1309,9 +1291,8 @@ protected:
return;
}
- TString desc = issues.ToString();
- CA_LOG_E("Sink[" << outputIndex << "] fatal error: " << desc);
- InternalError(TIssuesIds::DEFAULT_ERROR, desc);
+ CA_LOG_E("Sink[" << outputIndex << "] fatal error: " << issues.ToString());
+ InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues);
}
virtual ui64 CalcMkqlMemoryLimit() {
diff --git a/ydb/library/yql/dq/actors/protos/dq_status_codes.proto b/ydb/library/yql/dq/actors/protos/dq_status_codes.proto
index f0cde1171e0..db04330fcf2 100644
--- a/ydb/library/yql/dq/actors/protos/dq_status_codes.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_status_codes.proto
@@ -17,5 +17,6 @@ message StatusIds {
OVERLOADED = 9;
LIMIT_EXCEEDED = 10;
UNSUPPORTED = 11;
+ EXTERNAL_ERROR = 12;
}
}
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 862576fe78d..663b6177a71 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -403,7 +403,7 @@ private:
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, Sprintf("TMemoryLimitExceededException: %s", err.c_str()));
TIssue issue(err);
SetIssueCode(TIssuesIds::KIKIMR_PRECONDITION_FAILED, issue);
- return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TVector<TIssue>{issue});
+ return MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::OVERLOADED, TVector<TIssue>{issue});
}
THolder<TEvDq::TEvAbortExecution> GetError(const TString& message) {
diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp
index 57946cc174c..a7b30fa5286 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller.cpp
+++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp
@@ -117,10 +117,11 @@ private:
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(DEBUG)
<< SelfId()
- << " TaskId: " << taskId
- << " State: " << ev->Get()->Record.GetState()
- << " PingCookie: " << ev->Cookie;
-
+ << " EvState TaskId: " << taskId
+ << " State: " << state.GetState()
+ << " PingCookie: " << ev->Cookie
+ << " StatusCode: " << NYql::NDqProto::StatusIds_StatusCode_Name(state.GetStatusCode());
+
if (state.HasStats() && state.GetStats().GetTasks().size()) {
YQL_LOG(DEBUG) << " " << SelfId() << " AddStats " << taskId;
AddStats(state.GetStats());
@@ -129,7 +130,7 @@ private:
}
}
- switch (ev->Get()->Record.GetState()) {
+ switch (state.GetState()) {
case NDqProto::COMPUTE_STATE_UNKNOWN: {
// TODO: use issues
TString message = "unexpected state from " + ToString(computeActor) + ", task: " + ToString(taskId);
@@ -491,7 +492,7 @@ private:
void OnError(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool retriable, bool needFallback) {
YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "OnError " << issues.ToOneLineString();
+ YQL_LOG(DEBUG) << "OnError " << issues.ToOneLineString() << " " << NYql::NDqProto::StatusIds_StatusCode_Name(statusCode);
if (Finished) {
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(WARN) << "OnError IGNORED when Finished, Retriable=" << retriable << ", NeedFallback=" << needFallback;