aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-09-08 17:41:51 +0300
committerhor911 <hor911@ydb.tech>2023-09-08 18:17:10 +0300
commite15df84b09a86f909520fe070387fb6fe660c28e (patch)
tree54b1dbfe41a974ea213fc4a1cafbbbb635f9dd66
parenta64d975db7116478ef4e319124e6e3bfc67efd3c (diff)
downloadydb-e15df84b09a86f909520fe070387fb6fe660c28e.tar.gz
Push DQ StatusCode as YDB StatusCode to YQv2 UI
--Чтобы доставить до FQ Proxy статус выполнения, понадобилось новое поле в метадате. Иначе никак, ибо текущий статус у вызова Get<Operation> - это статус самого вызова, а не (асинхронно выполнявшимся скрипта)--
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h4
-rw-r--r--ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp14
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp4
-rw-r--r--ydb/core/grpc_services/rpc_get_operation.cpp4
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp13
-rw-r--r--ydb/core/kqp/ut/federated_query/common/common.cpp15
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp17
-rw-r--r--ydb/library/yql/dq/actors/dq.cpp1
8 files changed, 40 insertions, 32 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index f2e5b6592f1..5dd538cddee 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -101,8 +101,9 @@ struct TEvYdbCompute {
, Status(status)
{}
- TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, const TVector<Ydb::Query::ResultSetMeta>& resultSetsMeta, const Ydb::TableStats::QueryStats& queryStats, NYql::TIssues issues)
+ TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, Ydb::StatusIds::StatusCode statusCode, const TVector<Ydb::Query::ResultSetMeta>& resultSetsMeta, const Ydb::TableStats::QueryStats& queryStats, NYql::TIssues issues)
: ExecStatus(execStatus)
+ , StatusCode(statusCode)
, ResultSetsMeta(resultSetsMeta)
, QueryStats(queryStats)
, Issues(std::move(issues))
@@ -110,6 +111,7 @@ struct TEvYdbCompute {
{}
NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
+ Ydb::StatusIds::StatusCode StatusCode = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
TVector<Ydb::Query::ResultSetMeta> ResultSetsMeta;
Ydb::TableStats::QueryStats QueryStats;
NYql::TIssues Issues;
diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
index ab5b8d0c638..e9e34f53862 100644
--- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/util/backoff.h>
#include <ydb/library/services/services.pb.h>
+#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/providers/common/metrics/service_counters.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
@@ -126,7 +127,18 @@ public:
case NYdb::NQuery::EExecStatus::Aborted:
case NYdb::NQuery::EExecStatus::Canceled:
case NYdb::NQuery::EExecStatus::Failed:
- Issues = response.Issues;
+ if (response.ExecStatus == NYdb::NQuery::EExecStatus::Failed) {
+ TStringBuilder builder;
+ builder << "Query failed with code " << NYql::NDqProto::StatusIds_StatusCode_Name(
+ NYql::NDq::YdbStatusToDqStatus(response.StatusCode));
+ auto issue = NYql::TIssue(builder);
+ for (auto& subIssue : response.Issues) {
+ issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
+ }
+ Issues.AddIssue(issue);
+ } else {
+ Issues = response.Issues;
+ }
Status = response.Status;
ExecStatus = response.ExecStatus;
QueryStats = response.QueryStats;
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
index f5d746b98ce..737a02f4a48 100644
--- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
@@ -72,8 +72,8 @@ public:
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
try {
auto response = future.ExtractValueSync();
- if (response.Status().IsSuccess()) {
- actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, response.Metadata().ResultSetsMeta, response.Metadata().ExecStats, response.Status().GetIssues()), 0, cookie);
+ if (response.Id().GetKind() != Ydb::TOperationId::UNUSED) {
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Metadata().ExecStatus, static_cast<Ydb::StatusIds::StatusCode>(response.Status().GetStatus()), response.Metadata().ResultSetsMeta, response.Metadata().ExecStats, response.Status().GetIssues()), 0, cookie);
} else {
actorSystem->Send(recipient, new TEvYdbCompute::TEvGetOperationResponse(response.Status().GetIssues(), response.Status().GetStatus()), 0, cookie);
}
diff --git a/ydb/core/grpc_services/rpc_get_operation.cpp b/ydb/core/grpc_services/rpc_get_operation.cpp
index 84f62e39a78..d4bf26dd398 100644
--- a/ydb/core/grpc_services/rpc_get_operation.cpp
+++ b/ydb/core/grpc_services/rpc_get_operation.cpp
@@ -243,9 +243,7 @@ private:
void Handle(NKqp::TEvGetScriptExecutionOperationResponse::TPtr& ev, const TActorContext& ctx) {
TEvGetOperationRequest::TResponse resp;
auto deferred = resp.mutable_operation();
- if (ev->Get()->Ready) {
- deferred->set_id(GetProtoRequest()->id());
- }
+ deferred->set_id(GetProtoRequest()->id());
deferred->set_ready(ev->Get()->Ready);
deferred->set_status(ev->Get()->Status);
if (ev->Get()->Issues) {
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index c62b299e66e..7cfc0450eca 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -1118,7 +1118,7 @@ public:
const TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32();
if (operationStatus) {
- Ready = true;
+ OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus);
}
Metadata.set_execution_id(*ScriptExecutionIdFromOperation(OperationId));
@@ -1185,7 +1185,7 @@ public:
ScriptExecutionRunnerActorIdFromString(*runScriptActorIdString, RunScriptActorId);
}
- if (!operationStatus) {
+ if (!OperationStatus) {
// Check lease deadline
NYdb::TResultSetParser deadlineResult(ResultSets[1]);
if (deadlineResult.RowsCount() == 0) {
@@ -1227,7 +1227,7 @@ public:
}
void OnFinishOperation() {
- Ready = true;
+ OperationStatus = Ydb::StatusIds::ABORTED;
Issues = LeaseExpiredIssues();
Metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED);
@@ -1235,11 +1235,10 @@ public:
}
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
- if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::ABORTED && LeaseExpired) {
+ if (OperationStatus) {
TMaybe<google::protobuf::Any> metadata;
metadata.ConstructInPlace().PackFrom(Metadata);
-
- Send(Owner, new TEvGetScriptExecutionOperationResponse(Ready, LeaseExpired, RunScriptActorId, Ydb::StatusIds::SUCCESS, std::move(Issues), std::move(metadata)));
+ Send(Owner, new TEvGetScriptExecutionOperationResponse(true, LeaseExpired, RunScriptActorId, *OperationStatus, std::move(Issues), std::move(metadata)));
} else {
Send(Owner, new TEvGetScriptExecutionOperationResponse(false, LeaseExpired, RunScriptActorId, status, std::move(issues), Nothing()));
}
@@ -1251,7 +1250,7 @@ private:
bool FinishIfLeaseExpired;
TInstant StartActorTime;
TString ExecutionId;
- bool Ready = false;
+ TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
bool LeaseExpired = false;
TActorId RunScriptActorId;
NYql::TIssues Issues;
diff --git a/ydb/core/kqp/ut/federated_query/common/common.cpp b/ydb/core/kqp/ut/federated_query/common/common.cpp
index c221217f728..98584a83a18 100644
--- a/ydb/core/kqp/ut/federated_query/common/common.cpp
+++ b/ydb/core/kqp/ut/federated_query/common/common.cpp
@@ -6,15 +6,14 @@ namespace NKikimr::NKqp::NFederatedQueryTest {
NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) {
NYdb::NOperation::TOperationClient client(ydbDriver);
- NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op;
- do {
- if (op.Initialized()) {
- Sleep(TDuration::MilliSeconds(10));
+ while (1) {
+ auto op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId).GetValueSync();
+ if (op.Ready()) {
+ return op;
}
- op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId);
- UNIT_ASSERT_C(op.GetValueSync().Status().IsSuccess(), TStringBuilder() << op.GetValueSync().Status().GetStatus() << ":" << op.GetValueSync().Status().GetIssues().ToString());
- } while (!op.GetValueSync().Ready());
- return op.GetValueSync();
+ UNIT_ASSERT_C(op.Status().IsSuccess(), TStringBuilder() << op.Status().GetStatus() << ":" << op.Status().GetIssues().ToString());
+ Sleep(TDuration::MilliSeconds(10));
+ }
}
std::shared_ptr<TKikimrRunner> MakeKikimrRunner(
diff --git a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp
index 4575a63eb2e..8aaafae20fa 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp
@@ -16,18 +16,17 @@ using namespace NYdb::NQuery;
Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) {
NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionOperation(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver, i32 tries = -1) {
NYdb::NOperation::TOperationClient client(ydbDriver);
- NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op;
- do {
- if (!op.Initialized()) {
- Sleep(TDuration::MilliSeconds(10));
+ while(1) {
+ auto op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId).GetValueSync();
+ if (op.Ready() || tries == 0) {
+ return op;
}
+ UNIT_ASSERT_C(op.Status().IsSuccess(), op.Status().GetStatus() << ":" << op.Status().GetIssues().ToString());
if (tries > 0) {
--tries;
}
- op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId);
- UNIT_ASSERT_C(op.GetValueSync().Status().IsSuccess(), op.GetValueSync().Status().GetStatus() << ":" << op.GetValueSync().Status().GetIssues().ToString());
- } while (!op.GetValueSync().Ready() && tries != 0);
- return op.GetValueSync();
+ Sleep(TDuration::MilliSeconds(10));
+ }
}
Y_UNIT_TEST(ExecuteScript) {
@@ -434,7 +433,6 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) {
}
auto op = opClient.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync();
- UNIT_ASSERT_C(op.Status().IsSuccess(), op.Status().GetIssues().ToString());
UNIT_ASSERT_C(op.Ready(), op.Status().GetIssues().ToString());
UNIT_ASSERT_C(op.Metadata().ExecStatus == EExecStatus::Completed || op.Metadata().ExecStatus == EExecStatus::Canceled, op.Status().GetIssues().ToString());
UNIT_ASSERT_EQUAL(op.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId);
@@ -459,7 +457,6 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) {
void ExpectExecStatus(EExecStatus status, const TScriptExecutionOperation op, const NYdb::TDriver& ydbDriver) {
auto readyOp = WaitScriptExecutionOperation(op.Id(), ydbDriver);
- UNIT_ASSERT_C(readyOp.Status().IsSuccess(), readyOp.Status().GetIssues().ToString());
UNIT_ASSERT_C(readyOp.Ready(), readyOp.Status().GetIssues().ToString());
UNIT_ASSERT(readyOp.Metadata().ExecStatus == status);
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecutionId, op.Metadata().ExecutionId);
diff --git a/ydb/library/yql/dq/actors/dq.cpp b/ydb/library/yql/dq/actors/dq.cpp
index 66493ca6c35..2bc52e4b727 100644
--- a/ydb/library/yql/dq/actors/dq.cpp
+++ b/ydb/library/yql/dq/actors/dq.cpp
@@ -38,6 +38,7 @@ NYql::NDqProto::StatusIds::StatusCode YdbStatusToDqStatus(Ydb::StatusIds::Status
case Ydb::StatusIds::SUCCESS:
return NYql::NDqProto::StatusIds::SUCCESS;
case Ydb::StatusIds::BAD_REQUEST:
+ return NYql::NDqProto::StatusIds::BAD_REQUEST;
case Ydb::StatusIds::UNAUTHORIZED:
case Ydb::StatusIds::INTERNAL_ERROR:
return NYql::NDqProto::StatusIds::INTERNAL_ERROR;