aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-10-06 18:35:31 +0300
committerhcpp <hcpp@ydb.tech>2023-10-06 21:45:27 +0300
commit3340d1540a7b74501a690c0d11ab865e5a0aac92 (patch)
tree614b613e2bb8506bf774dede796e0193cc334542
parent0cfe96c728046e95090fd53fcf4814ebfcf40f49 (diff)
downloadydb-3340d1540a7b74501a690c0d11ab865e5a0aac92.tar.gz
fq proxy restarts have been added
-rw-r--r--ydb/core/fq/libs/compute/common/run_actor_params.cpp2
-rw-r--r--ydb/core/fq/libs/compute/ydb/actors_factory.cpp5
-rw-r--r--ydb/core/fq/libs/compute/ydb/actors_factory.h3
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h15
-rw-r--r--ydb/core/fq/libs/compute/ydb/executer_actor.cpp6
-rw-r--r--ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp61
-rw-r--r--ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp31
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp56
-rw-r--r--ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp20
-rw-r--r--ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto1
-rw-r--r--ydb/core/fq/libs/protos/fq_private.proto1
11 files changed, 138 insertions, 63 deletions
diff --git a/ydb/core/fq/libs/compute/common/run_actor_params.cpp b/ydb/core/fq/libs/compute/common/run_actor_params.cpp
index 3bd287188fe..1fa989e1aca 100644
--- a/ydb/core/fq/libs/compute/common/run_actor_params.cpp
+++ b/ydb/core/fq/libs/compute/common/run_actor_params.cpp
@@ -134,7 +134,7 @@ IOutputStream& operator<<(IOutputStream& out, const TRunActorParams& params) {
<< " OperationId: " << (params.OperationId.GetKind() != Ydb::TOperationId::UNUSED ? ProtoToString(params.OperationId) : "<empty>")
<< " ComputeConnection: " << params.ComputeConnection.ShortDebugString()
<< " ResultTtl: " << params.ResultTtl
- << " }";
+ << "}";
}
} /* NFq */
diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
index 240ae157714..9db333a97da 100644
--- a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
+++ b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp
@@ -69,11 +69,12 @@ struct TActorFactory : public IActorFactory {
return CreateResourcesCleanerActor(Params, parent, connector, operationId, Counters);
}
- std::unique_ptr<NActors::IActor> CreateFinalizer(const NActors::TActorId& parent,
+ std::unique_ptr<NActors::IActor> CreateFinalizer(const NFq::TRunActorParams& params,
+ const NActors::TActorId& parent,
const NActors::TActorId& pinger,
NYdb::NQuery::EExecStatus execStatus,
FederatedQuery::QueryMeta::ComputeStatus status) const override {
- return CreateFinalizerActor(Params, parent, pinger, execStatus, status, Counters);
+ return CreateFinalizerActor(params, parent, pinger, execStatus, status, Counters);
}
std::unique_ptr<NActors::IActor> CreateStopper(const NActors::TActorId& parent,
diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.h b/ydb/core/fq/libs/compute/ydb/actors_factory.h
index f6b8d90e2c0..ae85da060f7 100644
--- a/ydb/core/fq/libs/compute/ydb/actors_factory.h
+++ b/ydb/core/fq/libs/compute/ydb/actors_factory.h
@@ -32,7 +32,8 @@ struct IActorFactory : public TThrRefBase {
virtual std::unique_ptr<NActors::IActor> CreateResourcesCleaner(const NActors::TActorId& parent,
const NActors::TActorId& connector,
const NYdb::TOperation::TOperationId& operationId) const = 0;
- virtual std::unique_ptr<NActors::IActor> CreateFinalizer(const NActors::TActorId& parent,
+ virtual std::unique_ptr<NActors::IActor> CreateFinalizer(const NFq::TRunActorParams& params,
+ const NActors::TActorId& parent,
const NActors::TActorId& pinger,
NYdb::NQuery::EExecStatus execStatus,
FederatedQuery::QueryMeta::ComputeStatus status) const = 0;
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index caa27ee3c88..a2cf3cd8610 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -236,33 +236,36 @@ struct TEvYdbCompute {
};
struct TEvExecuterResponse : public NActors::TEventLocal<TEvExecuterResponse, EvExecuterResponse> {
- TEvExecuterResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId)
+ TEvExecuterResponse(NYdb::TOperation::TOperationId operationId, const TString& executionId, NYdb::EStatus status)
: OperationId(operationId)
, ExecutionId(executionId)
- , Success(true)
+ , Status(status)
{}
- explicit TEvExecuterResponse(NYql::TIssues issues)
- : Success(false)
+ explicit TEvExecuterResponse(NYql::TIssues issues, NYdb::EStatus status)
+ : Status(status)
, Issues(std::move(issues))
+
{}
NYdb::TOperation::TOperationId OperationId;
TString ExecutionId;
- bool Success = true;
+ NYdb::EStatus Status;
NYql::TIssues Issues;
};
struct TEvStatusTrackerResponse : public NActors::TEventLocal<TEvStatusTrackerResponse, EvStatusTrackerResponse> {
- TEvStatusTrackerResponse(NYql::TIssues issues, NYdb::EStatus status, NYdb::NQuery::EExecStatus execStatus)
+ TEvStatusTrackerResponse(NYql::TIssues issues, NYdb::EStatus status, NYdb::NQuery::EExecStatus execStatus, FederatedQuery::QueryMeta::ComputeStatus computeStatus)
: Issues(std::move(issues))
, Status(status)
, ExecStatus(execStatus)
+ , ComputeStatus(computeStatus)
{}
NYql::TIssues Issues;
NYdb::EStatus Status;
NYdb::NQuery::EExecStatus ExecStatus;
+ FederatedQuery::QueryMeta::ComputeStatus ComputeStatus;
};
struct TEvResultWriterResponse : public NActors::TEventLocal<TEvResultWriterResponse, EvResultWriterResponse> {
diff --git a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
index 27325504bfb..76c6ac79300 100644
--- a/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/executer_actor.cpp
@@ -88,13 +88,13 @@ public:
if (ev.Get()->Get()->Success) {
pingCounters->Ok->Inc();
LOG_I("Information about the operation id and execution id is stored. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId));
- Send(Parent, new TEvYdbCompute::TEvExecuterResponse(OperationId, ExecutionId));
+ Send(Parent, new TEvYdbCompute::TEvExecuterResponse(OperationId, ExecutionId, NYdb::EStatus::SUCCESS));
CompleteAndPassAway();
} else {
pingCounters->Error->Inc();
// Without the idempotency key, we lose the running operation here
LOG_E("Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId));
- Send(Parent, new TEvYdbCompute::TEvExecuterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)}}));
+ Send(Parent, new TEvYdbCompute::TEvExecuterResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the operation id and execution id. ExecutionId: " << ExecutionId << " OperationId: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR));
FailedAndPassAway();
}
}
@@ -103,7 +103,7 @@ public:
const auto& response = *ev.Get()->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_E("Can't execute script: " << ev->Get()->Issues.ToOneLineString());
- Send(Parent, new TEvYdbCompute::TEvExecuterResponse(ev->Get()->Issues));
+ Send(Parent, new TEvYdbCompute::TEvExecuterResponse(ev->Get()->Issues, response.Status));
FailedAndPassAway();
return;
}
diff --git a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
index 3d1cb3ec655..c2f5cf489b2 100644
--- a/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/finalizer_actor.cpp
@@ -77,13 +77,12 @@ public:
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Inc();
Become(&TFinalizerActor::StateFunc);
- Fq::Private::PingTaskRequest pingTaskRequest;
- if (ExecStatus == NYdb::NQuery::EExecStatus::Completed || Status == FederatedQuery::QueryMeta::COMPLETING) {
- pingTaskRequest.mutable_result_id()->set_value(Params.ResultId);
+ if (IsResignQuery()) {
+ SendResignQuery();
+ } else {
+ SendFinalPing();
}
- pingTaskRequest.set_status(GetFinalStatus());
- *pingTaskRequest.mutable_finished_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds());
- Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true));
+
}
FederatedQuery::QueryMeta::ComputeStatus GetFinalStatus() const {
@@ -123,6 +122,40 @@ public:
}
}
+ bool IsResignQuery() const {
+ switch (ExecStatus) {
+ case NYdb::NQuery::EExecStatus::Completed:
+ return false;
+ case NYdb::NQuery::EExecStatus::Unspecified:
+ case NYdb::NQuery::EExecStatus::Starting:
+ case NYdb::NQuery::EExecStatus::Aborted:
+ case NYdb::NQuery::EExecStatus::Canceled:
+ case NYdb::NQuery::EExecStatus::Failed:
+ break;
+ }
+
+ switch (Status) {
+ case FederatedQuery::QueryMeta::COMPLETING:
+ case FederatedQuery::QueryMeta::COMPLETED:
+ case FederatedQuery::QueryMeta::ABORTING_BY_USER:
+ case FederatedQuery::QueryMeta::ABORTED_BY_USER:
+ case FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM:
+ case FederatedQuery::QueryMeta::ABORTED_BY_SYSTEM:
+ case FederatedQuery::QueryMeta::STARTING:
+ case FederatedQuery::QueryMeta::FAILED:
+ case FederatedQuery::QueryMeta::RESUMING:
+ case FederatedQuery::QueryMeta::FAILING:
+ case FederatedQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED:
+ case FederatedQuery::QueryMeta_ComputeStatus_QueryMeta_ComputeStatus_INT_MIN_SENTINEL_DO_NOT_USE_:
+ case FederatedQuery::QueryMeta_ComputeStatus_QueryMeta_ComputeStatus_INT_MAX_SENTINEL_DO_NOT_USE_:
+ case FederatedQuery::QueryMeta::PAUSING:
+ case FederatedQuery::QueryMeta::PAUSED:
+ return false;
+ case FederatedQuery::QueryMeta::RUNNING:
+ return true;
+ }
+ }
+
STRICT_STFUNC(StateFunc,
hFunc(TEvents::TEvForwardPingResponse, Handle);
)
@@ -144,6 +177,22 @@ public:
}
}
+ void SendResignQuery() {
+ Fq::Private::PingTaskRequest pingTaskRequest;
+ pingTaskRequest.set_resign_query(true);
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true));
+ }
+
+ void SendFinalPing() {
+ Fq::Private::PingTaskRequest pingTaskRequest;
+ if (ExecStatus == NYdb::NQuery::EExecStatus::Completed || Status == FederatedQuery::QueryMeta::COMPLETING) {
+ pingTaskRequest.mutable_result_id()->set_value(Params.ResultId);
+ }
+ pingTaskRequest.set_status(GetFinalStatus());
+ *pingTaskRequest.mutable_finished_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds());
+ Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true));
+ }
+
private:
TRunActorParams Params;
TActorId Parent;
diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
index 83c4208a3c3..c2f3b47ae83 100644
--- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
@@ -100,12 +100,12 @@ public:
if (ev.Get()->Get()->Success) {
pingCounters->Ok->Inc();
LOG_I("Information about the status of operation is stored");
- Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(Issues, Status, ExecStatus));
+ Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(Issues, Status, ExecStatus, ComputeStatus));
CompleteAndPassAway();
} else {
pingCounters->Error->Inc();
LOG_E("Error saving information about the status of operation");
- Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus));
+ Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus, ComputeStatus));
FailedAndPassAway();
}
}
@@ -114,7 +114,7 @@ public:
const auto& response = *ev.Get()->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_E("Can't get operation: " << ev->Get()->Issues.ToOneLineString());
- Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus));
+ Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(ev->Get()->Issues, ev->Get()->Status, ExecStatus, ComputeStatus));
FailedAndPassAway();
return;
}
@@ -129,21 +129,11 @@ public:
case NYdb::NQuery::EExecStatus::Aborted:
case NYdb::NQuery::EExecStatus::Canceled:
case NYdb::NQuery::EExecStatus::Failed:
- if (response.ExecStatus == NYdb::NQuery::EExecStatus::Failed) {
- TStringBuilder builder;
- builder << "Query failed with code " << NYql::NDqProto::StatusIds_StatusCode_Name(
- NYql::NDq::YdbStatusToDqStatus(response.StatusCode));
- auto issue = NYql::TIssue(builder);
- for (auto& subIssue : response.Issues) {
- issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
- }
- Issues.AddIssue(issue);
- } else {
- Issues = response.Issues;
- }
+ Issues = response.Issues;
Status = response.Status;
ExecStatus = response.ExecStatus;
QueryStats = response.QueryStats;
+ StatusCode = NYql::NDq::YdbStatusToDqStatus(response.StatusCode);
Failed();
break;
case NYdb::NQuery::EExecStatus::Completed:
@@ -161,23 +151,24 @@ public:
}
void Failed() {
- LOG_I("Execution status: Failed, " << Status);
+ LOG_I("Execution status: Failed, Status: " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString());
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Inc();
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
- pingTaskRequest.set_status(::FederatedQuery::QueryMeta::FAILING);
+ pingTaskRequest.set_pending_status_code(StatusCode);
PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
}
void Complete() {
- LOG_I("Execution status: Complete" << Status);
+ LOG_I("Execution status: Complete " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString());
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Inc();
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
- pingTaskRequest.set_status(::FederatedQuery::QueryMeta::COMPLETING);
+ ComputeStatus = ::FederatedQuery::QueryMeta::COMPLETING;
+ pingTaskRequest.set_status(ComputeStatus);
PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
try {
pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan()));
@@ -213,9 +204,11 @@ private:
NYql::TIssues Issues;
NYdb::EStatus Status = NYdb::EStatus::SUCCESS;
NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
+ NYql::NDqProto::StatusIds::StatusCode StatusCode = NYql::NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_UNSPECIFIED;
Ydb::TableStats::QueryStats QueryStats;
NKikimr::TBackoffTimer BackoffTimer;
const TCompressor Compressor;
+ FederatedQuery::QueryMeta::ComputeStatus ComputeStatus = FederatedQuery::QueryMeta::RUNNING;
};
std::unique_ptr<NActors::IActor> CreateStatusTrackerActor(const TRunActorParams& params,
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
index 84c000551ee..174bd6284db 100644
--- a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp
@@ -19,10 +19,11 @@
#include <ydb/core/fq/libs/private_client/events.h>
#include <ydb/core/fq/libs/ydb/ydb.h>
#include <ydb/library/services/services.pb.h>
+#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
-#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::FQ_RUN_ACTOR, "[ydb] QueryId: " << Params.QueryId << " " << stream)
@@ -75,7 +76,7 @@ public:
auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("InitializerResponse (failed). Issues: " << response.Issues.ToOneLineString());
- ResignAndPassAway(response.Issues);
+ ResignAndPassAway(response.Status, response.Issues);
return;
}
@@ -85,9 +86,9 @@ public:
void Handle(const TEvYdbCompute::TEvExecuterResponse::TPtr& ev) {
auto& response = *ev->Get();
- if (!response.Success) {
+ if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("ExecuterResponse (failed). Issues: " << response.Issues.ToOneLineString());
- ResignAndPassAway(response.Issues);
+ ResignAndPassAway(response.Status, response.Issues);
return;
}
Params.ExecutionId = response.ExecutionId;
@@ -100,10 +101,11 @@ public:
auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("StatusTrackerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
- ResignAndPassAway(response.Issues);
+ ResignAndPassAway(response.Status, response.Issues);
return;
}
ExecStatus = response.ExecStatus;
+ Params.Status = response.ComputeStatus;
LOG_I("StatusTrackerResponse (success) " << response.Status << " ExecStatus: " << static_cast<int>(response.ExecStatus) << " Issues: " << response.Issues.ToOneLineString());
if (response.ExecStatus == NYdb::NQuery::EExecStatus::Completed) {
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release());
@@ -116,7 +118,7 @@ public:
auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("ResultWriterResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
- ResignAndPassAway(response.Issues);
+ ResignAndPassAway(response.Status, response.Issues);
return;
}
LOG_I("ResultWriterResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
@@ -127,18 +129,18 @@ public:
auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS && response.Status != NYdb::EStatus::UNSUPPORTED) {
LOG_I("ResourcesCleanerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
- ResignAndPassAway(response.Issues);
+ ResignAndPassAway(response.Status, response.Issues);
return;
}
LOG_I("ResourcesCleanerResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
- Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus, IsAborted ? FederatedQuery::QueryMeta::ABORTING_BY_USER : Params.Status).release());
+ Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, IsAborted ? FederatedQuery::QueryMeta::ABORTING_BY_USER : Params.Status).release());
}
void Handle(const TEvYdbCompute::TEvFinalizerResponse::TPtr ev) {
auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("FinalizerResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
- ResignAndPassAway(response.Issues);
+ ResignAndPassAway(response.Status, response.Issues);
return;
}
LOG_I("FinalizerResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
@@ -157,46 +159,52 @@ public:
auto& response = *ev->Get();
if (response.Status != NYdb::EStatus::SUCCESS) {
LOG_I("StopperResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString());
- ResignAndPassAway(response.Issues);
+ ResignAndPassAway(response.Status, response.Issues);
return;
}
LOG_I("StopperResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString());
Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
}
- void Run() {
+ void Run() { // recover points
switch (Params.Status) {
- case FederatedQuery::QueryMeta::ABORTING_BY_USER:
- case FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM:
- case FederatedQuery::QueryMeta::FAILING:
- if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
- Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
+ case FederatedQuery::QueryMeta::STARTING:
+ Register(ActorFactory->CreateInitializer(SelfId(), Pinger).release());
+ break;
+ case FederatedQuery::QueryMeta::RUNNING:
+ if (Params.OperationId.GetKind() == Ydb::TOperationId::UNUSED) {
+ Register(ActorFactory->CreateExecuter(SelfId(), Connector, Pinger).release()); // restart query
} else {
- Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus, Params.Status).release());
+ Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release());
}
break;
case FederatedQuery::QueryMeta::COMPLETING:
if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release());
} else {
- Register(ActorFactory->CreateFinalizer(SelfId(), Pinger, ExecStatus, Params.Status).release());
+ Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
}
break;
- case FederatedQuery::QueryMeta::STARTING:
- Register(ActorFactory->CreateInitializer(SelfId(), Pinger).release());
- break;
- case FederatedQuery::QueryMeta::RUNNING:
- Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release());
+ case FederatedQuery::QueryMeta::FAILING:
+ case FederatedQuery::QueryMeta::ABORTING_BY_USER:
+ case FederatedQuery::QueryMeta::ABORTING_BY_SYSTEM:
+ if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) {
+ Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release());
+ } else {
+ Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release());
+ }
break;
default:
break;
}
}
- void ResignAndPassAway(const NYql::TIssues& issues) {
+ void ResignAndPassAway(NYdb::EStatus status, const NYql::TIssues& issues) {
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(issues, pingTaskRequest.mutable_transient_issues());
pingTaskRequest.set_resign_query(true);
+
+ pingTaskRequest.set_status_code(NYql::NDq::YdbStatusToDqStatus(static_cast<Ydb::StatusIds::StatusCode>(status)));
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest, true));
FinishAndPassAway();
}
diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
index dce989fb15b..6af5f7b05d1 100644
--- a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp
@@ -73,7 +73,7 @@ TPingTaskParams ConstructHardPingTask(
auto meteringRecords = std::make_shared<std::vector<TString>>();
- auto prepareParams = [=, counters=counters, actorSystem = NActors::TActivationContext::ActorSystem()](const TVector<TResultSet>& resultSets) {
+ auto prepareParams = [=, counters=counters, actorSystem = NActors::TActivationContext::ActorSystem(), request=request](const TVector<TResultSet>& resultSets) mutable {
TString jobId;
FederatedQuery::Query query;
FederatedQuery::Internal::QueryInternal internal;
@@ -145,6 +145,13 @@ TPingTaskParams ConstructHardPingTask(
TDuration backoff = taskLeaseTtl;
if (request.resign_query()) {
+ if (request.status_code() == NYql::NDqProto::StatusIds::UNSPECIFIED && internal.pending_status_code() != NYql::NDqProto::StatusIds::UNSPECIFIED) {
+ request.set_status_code(internal.pending_status_code());
+ internal.clear_pending_status_code();
+ internal.clear_execution_id();
+ internal.clear_operation_id();
+ }
+
TRetryPolicyItem policy(0, TDuration::Seconds(1), TDuration::Zero());
auto it = retryPolicies.find(request.status_code());
auto policyFound = it != retryPolicies.end();
@@ -179,6 +186,13 @@ TPingTaskParams ConstructHardPingTask(
}
builder << " at " << Now();
auto issue = NYql::TIssue(builder);
+ if (query.issue().size() > 0 && request.issues().empty()) {
+ NYql::TIssues queryIssues;
+ NYql::IssuesFromMessage(query.issue(), queryIssues);
+ for (auto& subIssue : queryIssues) {
+ issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
+ }
+ }
if (transientIssues) {
for (auto& subIssue : *transientIssues) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
@@ -199,6 +213,10 @@ TPingTaskParams ConstructHardPingTask(
internal.set_status_code(request.status_code());
}
+ if (request.pending_status_code() != NYql::NDqProto::StatusIds::UNSPECIFIED) {
+ internal.set_pending_status_code(request.pending_status_code());
+ }
+
if (issues) {
NYql::IssuesToMessage(*issues, query.mutable_issue());
NYql::IssuesToMessage(*issues, job.mutable_issue());
diff --git a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
index 1d7f9f3b2f6..9821789d05f 100644
--- a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
+++ b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto
@@ -49,6 +49,7 @@ message QueryInternal {
string execution_id = 25;
NFq.NConfig.TYdbStorageConfig compute_connection = 26;
google.protobuf.Duration result_ttl = 27;
+ NYql.NDqProto.StatusIds.StatusCode pending_status_code = 28;
}
message JobInternal {
diff --git a/ydb/core/fq/libs/protos/fq_private.proto b/ydb/core/fq/libs/protos/fq_private.proto
index 5513f3a3eb5..bba6f0ffdb4 100644
--- a/ydb/core/fq/libs/protos/fq_private.proto
+++ b/ydb/core/fq/libs/protos/fq_private.proto
@@ -161,6 +161,7 @@ message PingTaskRequest {
repeated Ydb.Issue.IssueMessage internal_issues = 34;
string operation_id = 35;
string execution_id = 36;
+ NYql.NDqProto.StatusIds.StatusCode pending_status_code = 37;
}
message PingTaskResult {