diff options
author | hor911 <hor911@ydb.tech> | 2023-09-08 17:41:51 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-09-08 18:17:10 +0300 |
commit | e15df84b09a86f909520fe070387fb6fe660c28e (patch) | |
tree | 54b1dbfe41a974ea213fc4a1cafbbbb635f9dd66 | |
parent | a64d975db7116478ef4e319124e6e3bfc67efd3c (diff) | |
download | ydb-e15df84b09a86f909520fe070387fb6fe660c28e.tar.gz |
Push DQ StatusCode as YDB StatusCode to YQv2 UI
--Чтобы доставить до FQ Proxy статус выполнения, понадобилось новое поле в метадате. Иначе никак, ибо текущий статус у вызова Get<Operation> - это статус самого вызова, а не (асинхронно выполнявшимся скрипта)--
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/events/events.h | 4 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp | 14 | ||||
-rw-r--r-- | ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_get_operation.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.cpp | 13 | ||||
-rw-r--r-- | ydb/core/kqp/ut/federated_query/common/common.cpp | 15 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp | 17 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/dq.cpp | 1 |
8 files changed, 40 insertions, 32 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index f2e5b6592f1..5dd538cddee 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -101,8 +101,9 @@ struct TEvYdbCompute { , Status(status) {} - TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, const TVector<Ydb::Query::ResultSetMeta>& resultSetsMeta, const Ydb::TableStats::QueryStats& queryStats, NYql::TIssues issues) + TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, Ydb::StatusIds::StatusCode statusCode, const TVector<Ydb::Query::ResultSetMeta>& resultSetsMeta, const Ydb::TableStats::QueryStats& queryStats, NYql::TIssues issues) : ExecStatus(execStatus) + , StatusCode(statusCode) , ResultSetsMeta(resultSetsMeta) , QueryStats(queryStats) , Issues(std::move(issues)) @@ -110,6 +111,7 @@ struct TEvYdbCompute { {} NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified; + Ydb::StatusIds::StatusCode StatusCode = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED; TVector<Ydb::Query::ResultSetMeta> ResultSetsMeta; Ydb::TableStats::QueryStats QueryStats; NYql::TIssues Issues; diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp index ab5b8d0c638..e9e34f53862 100644 --- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp @@ -10,6 +10,7 @@ #include <ydb/core/util/backoff.h> #include <ydb/library/services/services.pb.h> +#include <ydb/library/yql/dq/actors/dq.h> #include <ydb/library/yql/providers/common/metrics/service_counters.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> @@ -126,7 +127,18 @@ public: case NYdb::NQuery::EExecStatus::Aborted: case NYdb::NQuery::EExecStatus::Canceled: case NYdb::NQuery::EExecStatus::Failed: - Issues = response.Issues; + if (response.ExecStatus == NYdb::NQuery::EExecStatus::Failed) { + TStringBuilder builder; + builder << "Query failed with code " << NYql::NDqProto::StatusIds_StatusCode_Name( + NYql::NDq::YdbStatusToDqStatus(response.StatusCode)); + auto issue = NYql::TIssue(builder); + for (auto& subIssue : response.Issues) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + Issues.AddIssue(issue); + } else { + Issues = response.Issues; + } Status = response.Status; ExecStatus = response.ExecStatus; QueryStats = response.QueryStats; diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp index f5d746b98ce..737a02f4a48 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp @@ -72,8 +72,8 @@ public: .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { try { auto response = future.ExtractValueSync(); - if (response.Status().IsSuccess()) { - actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Metadata().ResultSetsMeta, response.Metadata().ExecStats, response.Status().GetIssues()), 0, cookie); + if (response.Id().GetKind() != Ydb::TOperationId::UNUSED) { + actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, static_cast<Ydb::StatusIds::StatusCode>(response.Status().GetStatus()), response.Metadata().ResultSetsMeta, response.Metadata().ExecStats, response.Status().GetIssues()), 0, cookie); } else { actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie); } diff --git a/ydb/core/grpc_services/rpc_get_operation.cpp b/ydb/core/grpc_services/rpc_get_operation.cpp index 84f62e39a78..d4bf26dd398 100644 --- a/ydb/core/grpc_services/rpc_get_operation.cpp +++ b/ydb/core/grpc_services/rpc_get_operation.cpp @@ -243,9 +243,7 @@ private: void Handle(NKqp::TEvGetScriptExecutionOperationResponse::TPtr& ev, const TActorContext& ctx) { TEvGetOperationRequest::TResponse resp; auto deferred = resp.mutable_operation(); - if (ev->Get()->Ready) { - deferred->set_id(GetProtoRequest()->id()); - } + deferred->set_id(GetProtoRequest()->id()); deferred->set_ready(ev->Get()->Ready); deferred->set_status(ev->Get()->Status); if (ev->Get()->Issues) { diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index c62b299e66e..7cfc0450eca 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -1118,7 +1118,7 @@ public: const TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32(); if (operationStatus) { - Ready = true; + OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus); } Metadata.set_execution_id(*ScriptExecutionIdFromOperation(OperationId)); @@ -1185,7 +1185,7 @@ public: ScriptExecutionRunnerActorIdFromString(*runScriptActorIdString, RunScriptActorId); } - if (!operationStatus) { + if (!OperationStatus) { // Check lease deadline NYdb::TResultSetParser deadlineResult(ResultSets[1]); if (deadlineResult.RowsCount() == 0) { @@ -1227,7 +1227,7 @@ public: } void OnFinishOperation() { - Ready = true; + OperationStatus = Ydb::StatusIds::ABORTED; Issues = LeaseExpiredIssues(); Metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED); @@ -1235,11 +1235,10 @@ public: } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::ABORTED && LeaseExpired) { + if (OperationStatus) { TMaybe<google::protobuf::Any> metadata; metadata.ConstructInPlace().PackFrom(Metadata); - - Send(Owner, new TEvGetScriptExecutionOperationResponse(Ready, LeaseExpired, RunScriptActorId, Ydb::StatusIds::SUCCESS, std::move(Issues), std::move(metadata))); + Send(Owner, new TEvGetScriptExecutionOperationResponse(true, LeaseExpired, RunScriptActorId, *OperationStatus, std::move(Issues), std::move(metadata))); } else { Send(Owner, new TEvGetScriptExecutionOperationResponse(false, LeaseExpired, RunScriptActorId, status, std::move(issues), Nothing())); } @@ -1251,7 +1250,7 @@ private: bool FinishIfLeaseExpired; TInstant StartActorTime; TString ExecutionId; - bool Ready = false; + TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; bool LeaseExpired = false; TActorId RunScriptActorId; NYql::TIssues Issues; diff --git a/ydb/core/kqp/ut/federated_query/common/common.cpp b/ydb/core/kqp/ut/federated_query/common/common.cpp index c221217f728..98584a83a18 100644 --- a/ydb/core/kqp/ut/federated_query/common/common.cpp +++ b/ydb/core/kqp/ut/federated_query/common/common.cpp @@ -6,15 +6,14 @@ namespace NKikimr::NKqp::NFederatedQueryTest { NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) { NYdb::NOperation::TOperationClient client(ydbDriver); - NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op; - do { - if (op.Initialized()) { - Sleep(TDuration::MilliSeconds(10)); + while (1) { + auto op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId).GetValueSync(); + if (op.Ready()) { + return op; } - op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId); - UNIT_ASSERT_C(op.GetValueSync().Status().IsSuccess(), TStringBuilder() << op.GetValueSync().Status().GetStatus() << ":" << op.GetValueSync().Status().GetIssues().ToString()); - } while (!op.GetValueSync().Ready()); - return op.GetValueSync(); + UNIT_ASSERT_C(op.Status().IsSuccess(), TStringBuilder() << op.Status().GetStatus() << ":" << op.Status().GetIssues().ToString()); + Sleep(TDuration::MilliSeconds(10)); + } } std::shared_ptr<TKikimrRunner> MakeKikimrRunner( diff --git a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp index 4575a63eb2e..8aaafae20fa 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp @@ -16,18 +16,17 @@ using namespace NYdb::NQuery; Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) { NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver, i32 tries = -1) { NYdb::NOperation::TOperationClient client(ydbDriver); - NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op; - do { - if (!op.Initialized()) { - Sleep(TDuration::MilliSeconds(10)); + while(1) { + auto op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId).GetValueSync(); + if (op.Ready() || tries == 0) { + return op; } + UNIT_ASSERT_C(op.Status().IsSuccess(), op.Status().GetStatus() << ":" << op.Status().GetIssues().ToString()); if (tries > 0) { --tries; } - op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId); - UNIT_ASSERT_C(op.GetValueSync().Status().IsSuccess(), op.GetValueSync().Status().GetStatus() << ":" << op.GetValueSync().Status().GetIssues().ToString()); - } while (!op.GetValueSync().Ready() && tries != 0); - return op.GetValueSync(); + Sleep(TDuration::MilliSeconds(10)); + } } Y_UNIT_TEST(ExecuteScript) { @@ -434,7 +433,6 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) { } auto op = opClient.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync(); - UNIT_ASSERT_C(op.Status().IsSuccess(), op.Status().GetIssues().ToString()); UNIT_ASSERT_C(op.Ready(), op.Status().GetIssues().ToString()); UNIT_ASSERT_C(op.Metadata().ExecStatus == EExecStatus::Completed || op.Metadata().ExecStatus == EExecStatus::Canceled, op.Status().GetIssues().ToString()); UNIT_ASSERT_EQUAL(op.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId); @@ -459,7 +457,6 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) { void ExpectExecStatus(EExecStatus status, const TScriptExecutionOperation op, const NYdb::TDriver& ydbDriver) { auto readyOp = WaitScriptExecutionOperation(op.Id(), ydbDriver); - UNIT_ASSERT_C(readyOp.Status().IsSuccess(), readyOp.Status().GetIssues().ToString()); UNIT_ASSERT_C(readyOp.Ready(), readyOp.Status().GetIssues().ToString()); UNIT_ASSERT(readyOp.Metadata().ExecStatus == status); UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecutionId, op.Metadata().ExecutionId); diff --git a/ydb/library/yql/dq/actors/dq.cpp b/ydb/library/yql/dq/actors/dq.cpp index 66493ca6c35..2bc52e4b727 100644 --- a/ydb/library/yql/dq/actors/dq.cpp +++ b/ydb/library/yql/dq/actors/dq.cpp @@ -38,6 +38,7 @@ NYql::NDqProto::StatusIds::StatusCode YdbStatusToDqStatus(Ydb::StatusIds::Status case Ydb::StatusIds::SUCCESS: return NYql::NDqProto::StatusIds::SUCCESS; case Ydb::StatusIds::BAD_REQUEST: + return NYql::NDqProto::StatusIds::BAD_REQUEST; case Ydb::StatusIds::UNAUTHORIZED: case Ydb::StatusIds::INTERNAL_ERROR: return NYql::NDqProto::StatusIds::INTERNAL_ERROR; |