diff options
author | hcpp <hcpp@ydb.tech> | 2023-10-06 18:35:31 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-10-06 21:45:27 +0300 |
commit | 3340d1540a7b74501a690c0d11ab865e5a0aac92 (patch) | |
tree | 614b613e2bb8506bf774dede796e0193cc334542 | |
parent | 0cfe96c728046e95090fd53fcf4814ebfcf40f49 (diff) | |
download | ydb-3340d1540a7b74501a690c0d11ab865e5a0aac92.tar.gz |
fq proxy restarts have been added
11 files changed, 138 insertions, 63 deletions
diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp index 3bd287188fe..1fa989e1aca 100644 --- a/ydb/core/fq/libs/compute/common/run_actor_params.cpp +++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp @@ -134,7 +134,7 @@ IOutputStream& operator<<(IOutputStream& out, const TRunActorParams& params) { << " OperationId: " << (params.OperationId.GetKind() != Ydb::TOperationId::UNUSED ? ProtoToString(params.OperationId) : "<empty>") << " ComputeConnection: " << params.ComputeConnection.ShortDebugString() << " ResultTtl: " << params.ResultTtl - << " }"; + << "}"; } } /* NFq */ diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp index 240ae157714..9db333a97da 100644 --- a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp +++ b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp @@ -69,11 +69,12 @@ struct TActorFactory : public IActorFactory { return CreateResourcesCleanerActor(Params, parent, connector, operationId, Counters); } - std::unique_ptr<NActors::IActor> CreateFinalizer(const NActors::TActorId& parent, + std::unique_ptr<NActors::IActor> CreateFinalizer(const NFq::TRunActorParams& params, + const NActors::TActorId& parent, const NActors::TActorId& pinger, NYdb::NQuery::EExecStatus execStatus, FederatedQuery::QueryMeta::ComputeStatus status) const override { - return CreateFinalizerActor(Params, parent, pinger, execStatus, status, Counters); + return CreateFinalizerActor(params, parent, pinger, execStatus, status, Counters); } std::unique_ptr<NActors::IActor> CreateStopper(const NActors::TActorId& parent, diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.h b/ydb/core/fq/libs/compute/ydb/actors_factory.h index f6b8d90e2c0..ae85da060f7 100644 --- a/ydb/core/fq/libs/compute/ydb/actors_factory.h +++ b/ydb/core/fq/libs/compute/ydb/actors_factory.h @@ -32,7 +32,8 @@ struct IActorFactory : public TThrRefBase { virtual std::unique_ptr<NActors::IActor> CreateResourcesCleaner(const NActors::TActorId& parent, const NActors::TActorId& connector, const NYdb::TOperation::TOperationId& operationId) const = 0; - virtual std::unique_ptr<NActors::IActor> CreateFinalizer(const NActors::TActorId& parent, + virtual std::unique_ptr<NActors::IActor> CreateFinalizer(const NFq::TRunActorParams& params, + const NActors::TActorId& parent, const NActors::TActorId& pinger, NYdb::NQuery::EExecStatus execStatus, FederatedQuery::QueryMeta::ComputeStatus status) const = 0; diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index caa27ee3c88..a2cf3cd8610 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -236,33 +236,36 @@ struct TEvYdbCompute { }; struct TEvExecuterResponse : public NActors::TEventLocal<TEvExecuterResponse, EvExecuterResponse> { - TEvExecuterResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId) + TEvExecuterResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId, NYdb::EStatus status) : OperationId(operationId) , ExecutionId(executionId) - , Success(true) + , Status(status) {} - explicit TEvExecuterResponse(NYql::TIssues issues) - : Success(false) + explicit TEvExecuterResponse(NYql::TIssues issues, NYdb::EStatus status) + : Status(status) , Issues(std::move(issues)) + {} NYdb::TOperation::TOperationId OperationId; TString ExecutionId; - bool Success = true; + NYdb::EStatus Status; NYql::TIssues Issues; }; struct TEvStatusTrackerResponse : public NActors::TEventLocal<TEvStatusTrackerResponse, EvStatusTrackerResponse> { - TEvStatusTrackerResponse(NYql::TIssues issues, NYdb::EStatus status, NYdb::NQuery::EExecStatus execStatus) + TEvStatusTrackerResponse(NYql::TIssues issues, NYdb::EStatus status, NYdb::NQuery::EExecStatus execStatus, FederatedQuery::QueryMeta::ComputeStatus computeStatus) : Issues(std::move(issues)) , Status(status) , ExecStatus(execStatus) + , ComputeStatus(computeStatus) {} NYql::TIssues Issues; NYdb::EStatus Status; NYdb::NQuery::EExecStatus ExecStatus; + FederatedQuery::QueryMeta::ComputeStatus ComputeStatus; }; struct TEvResultWriterResponse : public NActors::TEventLocal<TEvResultWriterResponse, EvResultWriterResponse> { diff --git a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp index 27325504bfb..76c6ac79300 100644 --- a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp @@ -88,13 +88,13 @@ public: if (ev.Get()->Get()->Success) { pingCounters->Ok->Inc(); LOG_I("Information about the operation id and execution id is stored. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)); - Send(Parent, new TEvYdbCompute::TEvExecuterResponse(OperationId, ExecutionId)); + Send(Parent, new TEvYdbCompute::TEvExecuterResponse(OperationId, ExecutionId, NYdb::EStatus::SUCCESS)); CompleteAndPassAway(); } else { pingCounters->Error->Inc(); // Without the idempotency key, we lose the running operation here LOG_E("Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)); - Send(Parent, new TEvYdbCompute::TEvExecuterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)}})); + Send(Parent, new TEvYdbCompute::TEvExecuterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR)); FailedAndPassAway(); } } @@ -103,7 +103,7 @@ public: const auto& response = *ev.Get()->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_E("Can't execute script: " << ev->Get()->Issues.ToOneLineString()); - Send(Parent, new TEvYdbCompute::TEvExecuterResponse(ev->Get()->Issues)); + Send(Parent, new TEvYdbCompute::TEvExecuterResponse(ev->Get()->Issues, response.Status)); FailedAndPassAway(); return; } diff --git a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp index 3d1cb3ec655..c2f5cf489b2 100644 --- a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp @@ -77,13 +77,12 @@ public: auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); pingCounters->InFly->Inc(); Become(&TFinalizerActor::StateFunc); - Fq::Private::PingTaskRequest pingTaskRequest; - if (ExecStatus == NYdb::NQuery::EExecStatus::Completed || Status == FederatedQuery::QueryMeta::COMPLETING) { - pingTaskRequest.mutable_result_id()->set_value(Params.ResultId); + if (IsResignQuery()) { + SendResignQuery(); + } else { + SendFinalPing(); } - pingTaskRequest.set_status(GetFinalStatus()); - *pingTaskRequest.mutable_finished_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds()); - Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true)); + } FederatedQuery::QueryMeta::ComputeStatus GetFinalStatus() const { @@ -123,6 +122,40 @@ public: } } + bool IsResignQuery() const { + switch (ExecStatus) { + case NYdb::NQuery::EExecStatus::Completed: + return false; + case NYdb::NQuery::EExecStatus::Unspecified: + case NYdb::NQuery::EExecStatus::Starting: + case NYdb::NQuery::EExecStatus::Aborted: + case NYdb::NQuery::EExecStatus::Canceled: + case NYdb::NQuery::EExecStatus::Failed: + break; + } + + switch (Status) { + case FederatedQuery::QueryMeta::COMPLETING: + case FederatedQuery::QueryMeta::COMPLETED: + case FederatedQuery::QueryMeta::ABORTING_BY_USER: + case FederatedQuery::QueryMeta::ABORTED_BY_USER: + case FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM: + case FederatedQuery::QueryMeta::ABORTED_BY_SYSTEM: + case FederatedQuery::QueryMeta::STARTING: + case FederatedQuery::QueryMeta::FAILED: + case FederatedQuery::QueryMeta::RESUMING: + case FederatedQuery::QueryMeta::FAILING: + case FederatedQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED: + case FederatedQuery::QueryMeta_ComputeStatus_QueryMeta_ComputeStatus_INT_MIN_SENTINEL_DO_NOT_USE_: + case FederatedQuery::QueryMeta_ComputeStatus_QueryMeta_ComputeStatus_INT_MAX_SENTINEL_DO_NOT_USE_: + case FederatedQuery::QueryMeta::PAUSING: + case FederatedQuery::QueryMeta::PAUSED: + return false; + case FederatedQuery::QueryMeta::RUNNING: + return true; + } + } + STRICT_STFUNC(StateFunc, hFunc(TEvents::TEvForwardPingResponse, Handle); ) @@ -144,6 +177,22 @@ public: } } + void SendResignQuery() { + Fq::Private::PingTaskRequest pingTaskRequest; + pingTaskRequest.set_resign_query(true); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true)); + } + + void SendFinalPing() { + Fq::Private::PingTaskRequest pingTaskRequest; + if (ExecStatus == NYdb::NQuery::EExecStatus::Completed || Status == FederatedQuery::QueryMeta::COMPLETING) { + pingTaskRequest.mutable_result_id()->set_value(Params.ResultId); + } + pingTaskRequest.set_status(GetFinalStatus()); + *pingTaskRequest.mutable_finished_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds()); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true)); + } + private: TRunActorParams Params; TActorId Parent; diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp index 83c4208a3c3..c2f3b47ae83 100644 --- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp @@ -100,12 +100,12 @@ public: if (ev.Get()->Get()->Success) { pingCounters->Ok->Inc(); LOG_I("Information about the status of operation is stored"); - Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(Issues, Status, ExecStatus)); + Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(Issues, Status, ExecStatus, ComputeStatus)); CompleteAndPassAway(); } else { pingCounters->Error->Inc(); LOG_E("Error saving information about the status of operation"); - Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus)); + Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus, ComputeStatus)); FailedAndPassAway(); } } @@ -114,7 +114,7 @@ public: const auto& response = *ev.Get()->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString()); - Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus)); + Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus, ComputeStatus)); FailedAndPassAway(); return; } @@ -129,21 +129,11 @@ public: case NYdb::NQuery::EExecStatus::Aborted: case NYdb::NQuery::EExecStatus::Canceled: case NYdb::NQuery::EExecStatus::Failed: - if (response.ExecStatus == NYdb::NQuery::EExecStatus::Failed) { - TStringBuilder builder; - builder << "Query failed with code " << NYql::NDqProto::StatusIds_StatusCode_Name( - NYql::NDq::YdbStatusToDqStatus(response.StatusCode)); - auto issue = NYql::TIssue(builder); - for (auto& subIssue : response.Issues) { - issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); - } - Issues.AddIssue(issue); - } else { - Issues = response.Issues; - } + Issues = response.Issues; Status = response.Status; ExecStatus = response.ExecStatus; QueryStats = response.QueryStats; + StatusCode = NYql::NDq::YdbStatusToDqStatus(response.StatusCode); Failed(); break; case NYdb::NQuery::EExecStatus::Completed: @@ -161,23 +151,24 @@ public: } void Failed() { - LOG_I("Execution status: Failed, " << Status); + LOG_I("Execution status: Failed, Status: " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString()); auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); pingCounters->InFly->Inc(); Fq::Private::PingTaskRequest pingTaskRequest; NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues()); - pingTaskRequest.set_status(::FederatedQuery::QueryMeta::FAILING); + pingTaskRequest.set_pending_status_code(StatusCode); PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast()); Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); } void Complete() { - LOG_I("Execution status: Complete" << Status); + LOG_I("Execution status: Complete " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString()); auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); pingCounters->InFly->Inc(); Fq::Private::PingTaskRequest pingTaskRequest; NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues()); - pingTaskRequest.set_status(::FederatedQuery::QueryMeta::COMPLETING); + ComputeStatus = ::FederatedQuery::QueryMeta::COMPLETING; + pingTaskRequest.set_status(ComputeStatus); PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast()); try { pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan())); @@ -213,9 +204,11 @@ private: NYql::TIssues Issues; NYdb::EStatus Status = NYdb::EStatus::SUCCESS; NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified; + NYql::NDqProto::StatusIds::StatusCode StatusCode = NYql::NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_UNSPECIFIED; Ydb::TableStats::QueryStats QueryStats; NKikimr::TBackoffTimer BackoffTimer; const TCompressor Compressor; + FederatedQuery::QueryMeta::ComputeStatus ComputeStatus = FederatedQuery::QueryMeta::RUNNING; }; std::unique_ptr<NActors::IActor> CreateStatusTrackerActor(const TRunActorParams& params, diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp index 84c000551ee..174bd6284db 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp @@ -19,10 +19,11 @@ #include <ydb/core/fq/libs/private_client/events.h> #include <ydb/core/fq/libs/ydb/ydb.h> #include <ydb/library/services/services.pb.h> +#include <ydb/library/yql/dq/actors/dq.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> -#include <ydb/public/sdk/cpp/client/ydb_query/client.h> #include <ydb/public/sdk/cpp/client/ydb_operation/operation.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> +#include <ydb/public/sdk/cpp/client/ydb_query/client.h> #define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream) @@ -75,7 +76,7 @@ public: auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("InitializerResponse (failed). Issues: " << response.Issues.ToOneLineString()); - ResignAndPassAway(response.Issues); + ResignAndPassAway(response.Status, response.Issues); return; } @@ -85,9 +86,9 @@ public: void Handle(const TEvYdbCompute::TEvExecuterResponse::TPtr& ev) { auto& response = *ev->Get(); - if (!response.Success) { + if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("ExecuterResponse (failed). Issues: " << response.Issues.ToOneLineString()); - ResignAndPassAway(response.Issues); + ResignAndPassAway(response.Status, response.Issues); return; } Params.ExecutionId = response.ExecutionId; @@ -100,10 +101,11 @@ public: auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("StatusTrackerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); - ResignAndPassAway(response.Issues); + ResignAndPassAway(response.Status, response.Issues); return; } ExecStatus = response.ExecStatus; + Params.Status = response.ComputeStatus; LOG_I("StatusTrackerResponse (success) " << response.Status << " ExecStatus: " << static_cast<int>(response.ExecStatus) << " Issues: " << response.Issues.ToOneLineString()); if (response.ExecStatus == NYdb::NQuery::EExecStatus::Completed) { Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release()); @@ -116,7 +118,7 @@ public: auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("ResultWriterResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); - ResignAndPassAway(response.Issues); + ResignAndPassAway(response.Status, response.Issues); return; } LOG_I("ResultWriterResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); @@ -127,18 +129,18 @@ public: auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS && response.Status != NYdb::EStatus::UNSUPPORTED) { LOG_I("ResourcesCleanerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); - ResignAndPassAway(response.Issues); + ResignAndPassAway(response.Status, response.Issues); return; } LOG_I("ResourcesCleanerResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); - Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus, IsAborted ? FederatedQuery::QueryMeta::ABORTING_BY_USER : Params.Status).release()); + Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, IsAborted ? FederatedQuery::QueryMeta::ABORTING_BY_USER : Params.Status).release()); } void Handle(const TEvYdbCompute::TEvFinalizerResponse::TPtr ev) { auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("FinalizerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); - ResignAndPassAway(response.Issues); + ResignAndPassAway(response.Status, response.Issues); return; } LOG_I("FinalizerResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); @@ -157,46 +159,52 @@ public: auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("StopperResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); - ResignAndPassAway(response.Issues); + ResignAndPassAway(response.Status, response.Issues); return; } LOG_I("StopperResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); } - void Run() { + void Run() { // recover points switch (Params.Status) { - case FederatedQuery::QueryMeta::ABORTING_BY_USER: - case FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM: - case FederatedQuery::QueryMeta::FAILING: - if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) { - Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); + case FederatedQuery::QueryMeta::STARTING: + Register(ActorFactory->CreateInitializer(SelfId(), Pinger).release()); + break; + case FederatedQuery::QueryMeta::RUNNING: + if (Params.OperationId.GetKind() == Ydb::TOperationId::UNUSED) { + Register(ActorFactory->CreateExecuter(SelfId(), Connector, Pinger).release()); // restart query } else { - Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus, Params.Status).release()); + Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release()); } break; case FederatedQuery::QueryMeta::COMPLETING: if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) { Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release()); } else { - Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus, Params.Status).release()); + Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release()); } break; - case FederatedQuery::QueryMeta::STARTING: - Register(ActorFactory->CreateInitializer(SelfId(), Pinger).release()); - break; - case FederatedQuery::QueryMeta::RUNNING: - Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release()); + case FederatedQuery::QueryMeta::FAILING: + case FederatedQuery::QueryMeta::ABORTING_BY_USER: + case FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM: + if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) { + Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); + } else { + Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release()); + } break; default: break; } } - void ResignAndPassAway(const NYql::TIssues& issues) { + void ResignAndPassAway(NYdb::EStatus status, const NYql::TIssues& issues) { Fq::Private::PingTaskRequest pingTaskRequest; NYql::IssuesToMessage(issues, pingTaskRequest.mutable_transient_issues()); pingTaskRequest.set_resign_query(true); + + pingTaskRequest.set_status_code(NYql::NDq::YdbStatusToDqStatus(static_cast<Ydb::StatusIds::StatusCode>(status))); Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true)); FinishAndPassAway(); } diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp index dce989fb15b..6af5f7b05d1 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp @@ -73,7 +73,7 @@ TPingTaskParams ConstructHardPingTask( auto meteringRecords = std::make_shared<std::vector<TString>>(); - auto prepareParams = [=, counters=counters, actorSystem = NActors::TActivationContext::ActorSystem()](const TVector<TResultSet>& resultSets) { + auto prepareParams = [=, counters=counters, actorSystem = NActors::TActivationContext::ActorSystem(), request=request](const TVector<TResultSet>& resultSets) mutable { TString jobId; FederatedQuery::Query query; FederatedQuery::Internal::QueryInternal internal; @@ -145,6 +145,13 @@ TPingTaskParams ConstructHardPingTask( TDuration backoff = taskLeaseTtl; if (request.resign_query()) { + if (request.status_code() == NYql::NDqProto::StatusIds::UNSPECIFIED && internal.pending_status_code() != NYql::NDqProto::StatusIds::UNSPECIFIED) { + request.set_status_code(internal.pending_status_code()); + internal.clear_pending_status_code(); + internal.clear_execution_id(); + internal.clear_operation_id(); + } + TRetryPolicyItem policy(0, TDuration::Seconds(1), TDuration::Zero()); auto it = retryPolicies.find(request.status_code()); auto policyFound = it != retryPolicies.end(); @@ -179,6 +186,13 @@ TPingTaskParams ConstructHardPingTask( } builder << " at " << Now(); auto issue = NYql::TIssue(builder); + if (query.issue().size() > 0 && request.issues().empty()) { + NYql::TIssues queryIssues; + NYql::IssuesFromMessage(query.issue(), queryIssues); + for (auto& subIssue : queryIssues) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + } if (transientIssues) { for (auto& subIssue : *transientIssues) { issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); @@ -199,6 +213,10 @@ TPingTaskParams ConstructHardPingTask( internal.set_status_code(request.status_code()); } + if (request.pending_status_code() != NYql::NDqProto::StatusIds::UNSPECIFIED) { + internal.set_pending_status_code(request.pending_status_code()); + } + if (issues) { NYql::IssuesToMessage(*issues, query.mutable_issue()); NYql::IssuesToMessage(*issues, job.mutable_issue()); diff --git a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto index 1d7f9f3b2f6..9821789d05f 100644 --- a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto +++ b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto @@ -49,6 +49,7 @@ message QueryInternal { string execution_id = 25; NFq.NConfig.TYdbStorageConfig compute_connection = 26; google.protobuf.Duration result_ttl = 27; + NYql.NDqProto.StatusIds.StatusCode pending_status_code = 28; } message JobInternal { diff --git a/ydb/core/fq/libs/protos/fq_private.proto b/ydb/core/fq/libs/protos/fq_private.proto index 5513f3a3eb5..bba6f0ffdb4 100644 --- a/ydb/core/fq/libs/protos/fq_private.proto +++ b/ydb/core/fq/libs/protos/fq_private.proto @@ -161,6 +161,7 @@ message PingTaskRequest { repeated Ydb.Issue.IssueMessage internal_issues = 34; string operation_id = 35; string execution_id = 36; + NYql.NDqProto.StatusIds.StatusCode pending_status_code = 37; } message PingTaskResult { |