diff options
author | hor911 <hor911@ydb.tech> | 2022-09-06 22:37:14 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-09-06 22:37:14 +0300 |
commit | 636ec676a65d8a309a4cf03a90ed3d7c9115f990 (patch) | |
tree | 688562bb28b286699753850624be415666985f32 | |
parent | 56457622e8898a32a4c9265ef4ab1a4a4a6f6585 (diff) | |
download | ydb-636ec676a65d8a309a4cf03a90ed3d7c9115f990.tar.gz |
Pass JobId and RetryCount to RunActor
6 files changed, 20 insertions, 5 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index a42d3a19888..bc6a65ba994 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -393,7 +393,9 @@ private: TenantName, task.result_limit(), NProtoInterop::CastFromProto(task.execution_limit()), - NProtoInterop::CastFromProto(task.request_started_at()) + NProtoInterop::CastFromProto(task.request_started_at()), + task.restart_count(), + task.job_id().value() ); auto runActorId = Register(CreateRunActor(SelfId(), queryCounters, std::move(params))); diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp index 9dcbd8fb04c..154133637f0 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.cpp +++ b/ydb/core/yq/libs/actors/run_actor_params.cpp @@ -51,7 +51,9 @@ TRunActorParams::TRunActorParams( const TString& tenantName, uint64_t resultBytesLimit, TDuration executionTtl, - TInstant requestStartedAt + TInstant requestStartedAt, + ui32 restartCount, + const TString& jobId ) : YqSharedResources(yqSharedResources) , CredentialsProviderFactory(credentialsProviderFactory) @@ -100,6 +102,8 @@ TRunActorParams::TRunActorParams( , ResultBytesLimit(resultBytesLimit) , ExecutionTtl(executionTtl) , RequestStartedAt(requestStartedAt) + , RestartCount(restartCount) + , JobId(jobId) { } diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h index 3e095104b4f..f5faf21c883 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.h +++ b/ydb/core/yq/libs/actors/run_actor_params.h @@ -66,7 +66,9 @@ struct TRunActorParams { // TODO2 : Change name const TString& tenantName, uint64_t resultBytesLimit, TDuration executionTtl, - TInstant requestStartedAt + TInstant requestStartedAt, + ui32 restartCount, + const TString& jobId ); TRunActorParams(const TRunActorParams& params) = default; @@ -122,6 +124,8 @@ struct TRunActorParams { // TODO2 : Change name uint64_t ResultBytesLimit; TDuration ExecutionTtl; TInstant RequestStartedAt; + ui32 RestartCount; + const TString JobId; }; } /* NYq */ diff --git a/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h b/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h index 29580cb2d0c..2f7aa74f3a4 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h +++ b/ydb/core/yq/libs/control_plane_storage/internal/response_tasks.h @@ -8,6 +8,7 @@ namespace NYq { struct TTask { TString Scope; TString QueryId; + ui32 RetryCount; YandexQuery::Query Query; YandexQuery::Internal::QueryInternal Internal; ui64 Generation = 0; diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp index 208a2646480..8b79ba31720 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp @@ -277,6 +277,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ << " LastSeenAt: " << lastSeenAt); taskInternal.ShouldAbortTask = !taskInternal.RetryLimiter.UpdateOnRetry(lastSeenAt, Config.TaskLeaseRetryPolicy, now); } + task.RetryCount = taskInternal.RetryLimiter.RetryCount; *rootCounters->GetSubgroup("scope", task.Scope)->GetSubgroup("query_id", task.QueryId)->GetCounter("RetryCount") = taskInternal.RetryLimiter.RetryCount; @@ -369,8 +370,6 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ << " unsupported"; } - - auto* newTask = result.add_tasks(); newTask->set_query_type(queryType); newTask->set_execute_mode(task.Query.meta().execute_mode()); @@ -396,6 +395,10 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ *newTask->mutable_execution_limit() = NProtoInterop::CastToProto(ExtractLimit(task)); *newTask->mutable_request_started_at() = task.Query.meta().started_at(); + newTask->set_restart_count(task.RetryCount); + auto* jobId = newTask->mutable_job_id(); + jobId->set_value(task.Query.meta().last_job_id()); + for (const auto& connection: task.Internal.connection()) { const auto serviceAccountId = ExtractServiceAccountId(connection); if (!serviceAccountId) { diff --git a/ydb/core/yq/libs/protos/fq_private.proto b/ydb/core/yq/libs/protos/fq_private.proto index 2f6208ff158..629e757ec69 100644 --- a/ydb/core/yq/libs/protos/fq_private.proto +++ b/ydb/core/yq/libs/protos/fq_private.proto @@ -89,6 +89,7 @@ message GetTaskResult { google.protobuf.Duration execution_limit = 29; google.protobuf.Timestamp request_started_at = 30; repeated CompressedData dq_graph_compressed = 31; + uint32 restart_count = 32; } repeated Task tasks = 1; } |