diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-11 13:38:05 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-02-11 13:38:05 +0300 |
commit | cddbd0caebf2457b54caac7522bfc5979358698a (patch) | |
tree | d0c50cdfd55aef154f9d27d6fd4e36639044fb4c | |
parent | dadcaf26d43e922c5e58646758872dde2c328f95 (diff) | |
download | ydb-cddbd0caebf2457b54caac7522bfc5979358698a.tar.gz |
YQ-356 Pass issues in TEvAbortExecution
Pass issues everywhere
Pass subissues to other functions
Improve TEvAbortExecution
ref:bda713c8e075db5cde7e1a27f7c7b39f6d357fc0
22 files changed, 164 insertions, 98 deletions
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 5f95f263c6..69d14f9ad7 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -442,13 +442,13 @@ private: void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) { auto& record = ev->Get()->Record; + NYql::TIssues issues = ev->Get()->GetIssues(); LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Got abort execution event, from: " << ev->Sender << ", code: " << Ydb::StatusIds::StatusCode_Name(record.GetStatusCode()) - << ", message: " << record.GetMessage()); + << ", message: " << issues.ToOneLineString()); - NYql::TIssue issue(record.GetMessage()); - ReplyFinishStream(record.GetStatusCode(), issue, ctx); + ReplyFinishStream(record.GetStatusCode(), issues, ctx); } void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index f8e3cd0c77..cb12128559 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -905,10 +905,7 @@ private: void TerminateExpiredScan(const TActorId& actorId, TStringBuf msg) { CA_LOG_W(msg); - auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(); - abortEv->Record.SetStatusCode(Ydb::StatusIds::CANCELLED); - abortEv->Record.SetMessage("Cancel unexpected/expired scan"); - + auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(Ydb::StatusIds::CANCELLED, "Cancel unexpected/expired scan"); Send(actorId, abortEv.Release()); } @@ -995,7 +992,7 @@ private: } } - void TerminateSources(const TString& message, bool success) override { + void TerminateSources(const TIssues& issues, bool success) override { if (!ScanData || Shards.empty()) { return; } @@ -1004,10 +1001,10 @@ private: auto& state = Shards.front(); if (state.ActorId) { CA_LOG(prio, "Send abort execution event to scan over tablet: " << state.TabletId << ", table: " - << ScanData->TablePath << ", scan actor: " << state.ActorId << ", message: " << message); + << ScanData->TablePath << ", scan actor: " << state.ActorId << ", message: " << issues.ToOneLineString()); Send(state.ActorId, new TEvKqp::TEvAbortExecution( - success ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::ABORTED, message)); + success ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::ABORTED, issues)); } else { CA_LOG(prio, "Table: " << ScanData->TablePath << ", scan has not been started yet"); } diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h index 6ecf78ab05..c2e872c4e8 100644 --- a/ydb/core/kqp/executer/kqp_executer_impl.h +++ b/ydb/core/kqp/executer/kqp_executer_impl.h @@ -172,10 +172,11 @@ protected: protected: void HandleAbortExecution(TEvKqp::TEvAbortExecution::TPtr& ev) { auto& msg = ev->Get()->Record; + NYql::TIssues issues = ev->Get()->GetIssues(); LOG_D("Got EvAbortExecution, status: " << Ydb::StatusIds_StatusCode_Name(msg.GetStatusCode()) - << ", message: " << msg.GetMessage()); + << ", message: " << issues.ToOneLineString()); if (msg.GetStatusCode() == Ydb::StatusIds::INTERNAL_ERROR) { - InternalError(msg.GetMessage()); + InternalError(issues); } else if (msg.GetStatusCode() == Ydb::StatusIds::TIMEOUT) { auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(Ydb::StatusIds::TIMEOUT, "Request timeout exceeded"); this->Send(Target, abortEv.Release()); @@ -183,7 +184,7 @@ protected: TerminateComputeActors(Ydb::StatusIds::TIMEOUT, "timeout"); this->PassAway(); } else { - RuntimeError(msg.GetStatusCode(), msg.GetMessage()); + RuntimeError(msg.GetStatusCode(), issues); } } @@ -473,13 +474,13 @@ protected: } protected: - void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const TString& message) { + void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) { for (const auto& task : this->TasksGraph.GetTasks()) { if (task.ComputeActorId) { - LOG_I("aborting compute actor execution, message: " << message + LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString() << ", compute actor: " << task.ComputeActorId << ", task: " << task.Id); - auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(code, message); + auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(code, issues); this->Send(task.ComputeActorId, ev.Release()); } else { LOG_I("task: " << task.Id << ", does not have Compute ActorId yet"); @@ -487,6 +488,10 @@ protected: } } + void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const TString& message) { + TerminateComputeActors(code, NYql::TIssues({NYql::TIssue(message)})); + } + protected: void UnexpectedEvent(const TString& state, ui32 eventType) { LOG_C("TKqpExecuter, unexpected event: " << eventType << ", at state:" << state << ", selfID: " << this->SelfId()); @@ -494,14 +499,20 @@ protected: << ", event: " << eventType); } - void InternalError(const TString& message) { - LOG_E(message); - TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, message); + void InternalError(const NYql::TIssues& issues) { + LOG_E(issues.ToOneLineString()); + TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues); auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction."); - issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(message)); + for (const NYql::TIssue& i : issues) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i)); + } ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, issue); } + void InternalError(const TString& message) { + InternalError(NYql::TIssues({NYql::TIssue(message)})); + } + void ReplyUnavailable(const TString& message) { LOG_E("UNAVAILABLE: " << message); TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message); @@ -510,10 +521,10 @@ protected: ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue); } - void RuntimeError(Ydb::StatusIds::StatusCode code, const TString& message) { - LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << message); - TerminateComputeActors(code, message); - ReplyErrorAndDie(code, NYql::TIssue(message)); + void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) { + LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString()); + TerminateComputeActors(code, issues); + ReplyErrorAndDie(code, issues); } void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { diff --git a/ydb/core/kqp/executer/kqp_planner.cpp b/ydb/core/kqp/executer/kqp_planner.cpp index 6bcfa85c66..920785ce05 100644 --- a/ydb/core/kqp/executer/kqp_planner.cpp +++ b/ydb/core/kqp/executer/kqp_planner.cpp @@ -82,7 +82,7 @@ void TKqpPlanner::HandleWait(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) { } void TKqpPlanner::HandleWait(TEvKqp::TEvAbortExecution::TPtr& ev) { - LOG_E("Terminate KqpPlanner, reason: " << ev->Get()->Record.GetMessage()); + LOG_E("Terminate KqpPlanner, reason: " << ev->Get()->GetIssues().ToOneLineString()); PassAway(); } @@ -326,4 +326,3 @@ IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto:: } } // namespace NKikimr::NKqp - diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp index 47489a9bd5..a82fd4c5a2 100644 --- a/ydb/core/kqp/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/kqp_ic_gateway.cpp @@ -234,11 +234,11 @@ public: } void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record; + const TString msg = ev->Get()->GetIssues().ToOneLineString(); LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId() - << "Received abort execution event for scan query: " << record.GetMessage()); + << "Received abort execution event for scan query: " << msg); - TBase::HandleError(record.GetMessage(), ctx); + TBase::HandleError(msg, ctx); } using TBase::HandleResponse; @@ -350,11 +350,11 @@ public: } void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record; + const TString msg = ev->Get()->GetIssues().ToOneLineString(); LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, this->SelfId() - << "Received abort execution event for data query: " << record.GetMessage()); + << "Received abort execution event for data query: " << msg); - TBase::HandleError(record.GetMessage(), ctx); + TBase::HandleError(msg, ctx); } using TBase::Handle; @@ -437,11 +437,11 @@ public: } void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record; + const TString msg = ev->Get()->GetIssues().ToOneLineString(); LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId() - << "Received abort execution event for scan query: " << record.GetMessage()); + << "Received abort execution event for scan query: " << msg); - TBase::HandleError(record.GetMessage(), ctx); + TBase::HandleError(msg, ctx); } using TBase::HandleResponse; @@ -892,14 +892,21 @@ private: void Handle(TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) { auto& msg = ev->Get()->Record; + NYql::TIssues issues = ev->Get()->GetIssues(); LOG_ERROR_S(ctx, NKikimrServices::KQP_GATEWAY, "TKqpExecPhysicalRequestHandler, got EvAbortExecution event." << " Code: " << Ydb::StatusIds_StatusCode_Name(msg.GetStatusCode()) - << ", reason: " << msg.GetMessage()); + << ", reason: " << issues.ToOneLineString()); auto issueCode = NYql::YqlStatusFromYdbStatus(msg.GetStatusCode()); - Promise.SetValue(ResultFromError<TResult>(YqlIssue({}, issueCode, msg.GetMessage()))); + NYql::TIssues resultIssues; + for (const auto& i : issues) { + NYql::TIssue issue(i); + NYql::SetIssueCode(issueCode, issue); + resultIssues.AddIssue(std::move(issue)); + } + Promise.SetValue(ResultFromError<TResult>(std::move(resultIssues))); this->PassAway(); } diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp index 24f8f2ac79..a7e96aea1d 100644 --- a/ydb/core/kqp/node/kqp_node.cpp +++ b/ydb/core/kqp/node/kqp_node.cpp @@ -292,8 +292,8 @@ private: auto runtimeSettings = runtimeSettingsBase; runtimeSettings.TerminateHandler = [actorSystem, rm = SelfId(), txId, taskId = dqTask.GetId()] - (bool success, const TString& message) { - actorSystem->Send(rm, new TEvKqpNode::TEvFinishKqpTask(txId, taskId, success, message)); + (bool success, const NYql::TIssues& issues) { + actorSystem->Send(rm, new TEvKqpNode::TEvFinishKqpTask(txId, taskId, success, issues)); }; ETableKind tableKind = ETableKind::Unknown; @@ -347,7 +347,7 @@ private: auto& msg = *ev->Get(); LOG_D("TxId: " << msg.TxId << ", finish compute task: " << msg.TaskId << ", success: " << msg.Success - << ", message: " << msg.Message); + << ", message: " << msg.Issues.ToOneLineString()); auto task = State.RemoveTask(msg.TxId, msg.TaskId, msg.Success, [this, &msg] (const TActorId& requester, const NKqpNode::TTasksRequest& request, const NKqpNode::TTaskContext&, bool finishTx) { diff --git a/ydb/core/kqp/node/kqp_node.h b/ydb/core/kqp/node/kqp_node.h index 8f1096ed31..0c75086fdf 100644 --- a/ydb/core/kqp/node/kqp_node.h +++ b/ydb/core/kqp/node/kqp_node.h @@ -5,6 +5,7 @@ #include <ydb/core/protos/config.pb.h> #include <ydb/library/yql/dq/runtime/dq_tasks_runner.h> +#include <ydb/library/yql/public/issue/yql_issue.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/event_pb.h> @@ -40,13 +41,13 @@ struct TEvKqpNode { const ui64 TxId; const ui64 TaskId; const bool Success; - const TString Message; + const NYql::TIssues Issues; - TEvFinishKqpTask(ui64 txId, ui64 taskId, bool success, const TString& message = "") + TEvFinishKqpTask(ui64 txId, ui64 taskId, bool success, const NYql::TIssues& issues = {}) : TxId(txId) , TaskId(taskId) , Success(success) - , Message(message) {} + , Issues(issues) {} }; struct TEvCancelKqpTasksRequest : public TEventPB<TEvCancelKqpTasksRequest, diff --git a/ydb/core/kqp/node/kqp_node_ut.cpp b/ydb/core/kqp/node/kqp_node_ut.cpp index 4decee6bd3..6023a0b817 100644 --- a/ydb/core/kqp/node/kqp_node_ut.cpp +++ b/ydb/core/kqp/node/kqp_node_ut.cpp @@ -204,7 +204,7 @@ public: void SendFinishTask(const TActorId& computeActorId, ui64 txId, ui64 taskId, bool success = true, const TString& message = "") { - auto ev = new TEvKqpNode::TEvFinishKqpTask(txId, taskId, success, message); + auto ev = new TEvKqpNode::TEvFinishKqpTask(txId, taskId, success, NYql::TIssues({NYql::TIssue(message)})); Runtime->Send(new IEventHandle(KqpNodeActorId, computeActorId, ev)); } @@ -618,7 +618,7 @@ void KqpNode::ExecuterLost() { for (auto& [taskId, computeActor] : CompFactory->Task2Actor) { auto abortEvent = Runtime->GrabEdgeEvent<TEvKqp::TEvAbortExecution>(computeActor.ActorId); - UNIT_ASSERT_VALUES_EQUAL("executer lost", abortEvent->Get()->Record.GetMessage()); + UNIT_ASSERT_VALUES_EQUAL("executer lost", abortEvent->Get()->Record.GetLegacyMessage()); } UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 0); @@ -668,7 +668,7 @@ void KqpNode::TerminateTx() { for (auto&[taskId, computeActor] : CompFactory->Task2Actor) { auto abortEvent = Runtime->GrabEdgeEvent<TEvKqp::TEvAbortExecution>(computeActor.ActorId); - UNIT_ASSERT_VALUES_EQUAL("terminate", abortEvent->Get()->Record.GetMessage()); + UNIT_ASSERT_VALUES_EQUAL("terminate", abortEvent->Get()->Record.GetLegacyMessage()); } } diff --git a/ydb/core/sys_view/common/scan_actor_base_impl.h b/ydb/core/sys_view/common/scan_actor_base_impl.h index 298a8efbc0..88f7c1ba83 100644 --- a/ydb/core/sys_view/common/scan_actor_base_impl.h +++ b/ydb/core/sys_view/common/scan_actor_base_impl.h @@ -74,7 +74,7 @@ protected: << ", scan id: " << ScanId << ", table id: " << TableId << ", code: " << Ydb::StatusIds::StatusCode_Name(ev->Get()->Record.GetStatusCode()) - << ", error: " << ev->Get()->Record.GetMessage()); + << ", error: " << ev->Get()->GetIssues().ToOneLineString()); this->PassAway(); } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index e2f7ba5a51..697114c0b8 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -254,15 +254,16 @@ private: void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) { auto& msg = ev->Get()->Record; + TString reason = ev->Get()->GetIssues().ToOneLineString(); auto prio = msg.GetStatusCode() == Ydb::StatusIds::SUCCESS ? NActors::NLog::PRI_DEBUG : NActors::NLog::PRI_WARN; LOG_LOG_S(*TlsActivationContext, prio, NKikimrServices::TX_COLUMNSHARD_SCAN, "Got AbortExecution" << ", at: " << ScanActorId << ", txId: " << TxId << ", scanId: " << ScanId << ", gen: " << ScanGen << ", table: " << TablePath << ", code: " << Ydb::StatusIds_StatusCode_Name(msg.GetStatusCode()) - << ", reason: " << msg.GetMessage()); + << ", reason: " << reason); - AbortReason = std::move(msg.GetMessage()); + AbortReason = std::move(reason); SendError(NKikimrProto::EReplyStatus::ERROR); // TODO: better status? Finish(); } diff --git a/ydb/core/tx/datashard/datashard__kqp_scan.cpp b/ydb/core/tx/datashard/datashard__kqp_scan.cpp index b6b6b7e337..182e9882c0 100644 --- a/ydb/core/tx/datashard/datashard__kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard__kqp_scan.cpp @@ -143,7 +143,7 @@ private: << ", at: " << ScanActorId << ", tablet: " << DatashardActorId << ", scanId: " << ScanId << ", table: " << TablePath << ", code: " << Ydb::StatusIds_StatusCode_Name(msg.GetStatusCode()) - << ", reason: " << msg.GetMessage()); + << ", reason: " << ev->Get()->GetIssues().ToOneLineString()); AbortEvent = ev->Release(); Driver->Touch(EScan::Final); @@ -342,7 +342,10 @@ private: if (AbortEvent) { ev->Record.SetStatus(AbortEvent->Record.GetStatusCode()); auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED, TStringBuilder() - << "Table " << TablePath << " scan failed, reason: " << AbortEvent->Record.GetMessage()); + << "Table " << TablePath << " scan failed"); + for (const NYql::TIssue& i : AbortEvent->GetIssues()) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i)); + } IssueToMessage(issue, ev->Record.MutableIssues()->Add()); } else { ev->Record.SetStatus(Ydb::StatusIds::ABORTED); diff --git a/ydb/core/tx/datashard/datashard_ut_kqp.cpp b/ydb/core/tx/datashard/datashard_ut_kqp.cpp index d3145951a4..d2584ec2e8 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp.cpp @@ -199,7 +199,7 @@ private: } if (auto* ev = std::get<TEvKqp::TEvAbortExecution*>(replies)) { - UNIT_FAIL(ev->Record.GetMessage()); + UNIT_FAIL(ev->GetIssues().ToOneLineString()); } if (auto* ev = std::get<TEvKqpExecuter::TEvStreamData*>(replies)) { @@ -469,7 +469,7 @@ void KqpStabilityTests::AbortOnDisconnect() { } if (auto* ev = std::get<TEvKqp::TEvAbortExecution*>(replies)) { - UNIT_FAIL(ev->Record.GetMessage()); + UNIT_FAIL(ev->GetIssues().ToOneLineString()); } if (std::get<TEvKqpExecuter::TEvStreamData*>(replies)) { @@ -481,4 +481,3 @@ void KqpStabilityTests::AbortOnDisconnect() { } } } - diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp index d9d354c2e5..a9138ef40f 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp +++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp @@ -110,10 +110,9 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoord CC_LOG_D("Got TEvRegisterCoordinatorResponse; issues: " << ev->Get()->Issues.ToOneLineString()); const auto& issues = ev->Get()->Issues; if (issues) { - auto message = "Can't register in storage: " + issues.ToOneLineString(); - CC_LOG_E(message); + CC_LOG_E("Can't register in storage: " + issues.ToOneLineString()); ++*Metrics.StorageError; - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message)); + Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Can't register in storage", issues)); return; } @@ -167,9 +166,8 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvGetCheckpoint if (event->Issues) { ++*Metrics.StorageError; - auto message = "Can't get checkpoints to restore: " + event->Issues.ToOneLineString(); - CC_LOG_E(message); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message)); + CC_LOG_E("Can't get checkpoints to restore: " + event->Issues.ToOneLineString()); + Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Can't get checkpoints to restore", event->Issues)); return; } @@ -230,7 +228,7 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe } if (!result) { - Send(TaskControllerId, new NYql::NDq::TEvDq::TEvAbortExecution(Ydb::StatusIds::BAD_REQUEST, issues.ToString())); + Send(TaskControllerId, new NYql::NDq::TEvDq::TEvAbortExecution(Ydb::StatusIds::BAD_REQUEST, issues)); return; } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 84eeda2edb..a3afe7a55b 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -8,6 +8,7 @@ #include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h> #include <ydb/library/yql/dq/runtime/dq_tasks_runner.h> #include <ydb/library/yql/dq/runtime/dq_transport.h> +#include <ydb/library/yql/public/issue/yql_issue.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> @@ -229,7 +230,7 @@ struct TComputeRuntimeSettings { bool UseLLVM = false; bool UseSpilling = false; - std::function<void(bool success, const TString& reason)> TerminateHandler; + std::function<void(bool success, const TIssues& issues)> TerminateHandler; TMaybe<NDqProto::TRlPath> RlPath; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index deaadefb42..14d64c5a86 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -387,8 +387,7 @@ protected: } protected: - void Terminate(bool success, const TString& message) { - + void Terminate(bool success, const TIssues& issues) { if (MkqlMemoryLimit && MemoryLimits.FreeMemoryFn) { MemoryLimits.FreeMemoryFn(TxId, Task.GetId(), MkqlMemoryLimit); MkqlMemoryLimit = 0; @@ -425,17 +424,21 @@ protected: } if (RuntimeSettings.TerminateHandler) { - RuntimeSettings.TerminateHandler(success, message); + RuntimeSettings.TerminateHandler(success, issues); } this->PassAway(); } + void Terminate(bool success, const TString& message) { + Terminate(success, TIssues({TIssue(message)})); + } + void ReportStateAndMaybeDie(TIssue&& issue) { ReportStateAndMaybeDie( State == NDqProto::COMPUTE_STATE_FINISHED ? Ydb::StatusIds::STATUS_CODE_UNSPECIFIED - : Ydb::StatusIds::ABORTED, TIssues({issue})); + : Ydb::StatusIds::ABORTED, TIssues({std::move(issue)})); } void ReportStateAndDie(NDqProto::EComputeState state, TIssue&& issue) { @@ -459,8 +462,8 @@ protected: this->Send(ExecuterId, execEv.Release()); - TerminateSources(issue.Message, state == NDqProto::COMPUTE_STATE_FINISHED); - Terminate(state == NDqProto::COMPUTE_STATE_FINISHED, issue.Message); + TerminateSources(TIssues({issue}), state == NDqProto::COMPUTE_STATE_FINISHED); + Terminate(state == NDqProto::COMPUTE_STATE_FINISHED, TIssues({std::move(issue)})); } void ReportStateAndMaybeDie(Ydb::StatusIds::StatusCode status, const TIssues& issues) @@ -751,7 +754,11 @@ protected: virtual void PollSources(THolder<NKikimr::IDestructable> /* state */) { } - virtual void TerminateSources(const TString& /* message */, bool /* success */) { + virtual void TerminateSources(const TIssues& /* issues */, bool /* success */) { + } + + void TerminateSources(const TString& message, bool success) { + TerminateSources(TIssues({TIssue(message)}), success); } virtual TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() { @@ -906,20 +913,20 @@ protected: } void HandleExecuteBase(TEvDq::TEvAbortExecution::TPtr& ev) { - TString message = ev->Get()->Record.GetMessage(); + TIssues issues = ev->Get()->GetIssues(); CA_LOG_E("Handle abort execution event from: " << ev->Sender << ", status: " << Ydb::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()) - << ", reason: " << message); + << ", reason: " << issues.ToOneLineString()); bool success = ev->Get()->Record.GetStatusCode() == Ydb::StatusIds::SUCCESS; - this->TerminateSources(message, success); + this->TerminateSources(issues, success); if (ev->Sender != ExecuterId) { NActors::TActivationContext::Send(ev->Forward(ExecuterId)); } - Terminate(success, message); + Terminate(success, issues); } void HandleExecuteBase(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { diff --git a/ydb/library/yql/dq/actors/compute/ya.make b/ydb/library/yql/dq/actors/compute/ya.make index c4b20bd502..fc2d5cfaba 100644 --- a/ydb/library/yql/dq/actors/compute/ya.make +++ b/ydb/library/yql/dq/actors/compute/ya.make @@ -30,6 +30,7 @@ PEERDIR( ydb/library/yql/dq/tasks ydb/library/yql/minikql/comp_nodes ydb/library/yql/minikql/computation + ydb/library/yql/public/issue ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/yql/dq/actors/dq.h b/ydb/library/yql/dq/actors/dq.h index 025626d05f..61355cf78d 100644 --- a/ydb/library/yql/dq/actors/dq.h +++ b/ydb/library/yql/dq/actors/dq.h @@ -2,23 +2,24 @@ #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/dq/actors/dq_events_ids.h> +#include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> -namespace NYql { -namespace NDq { +namespace NYql::NDq { struct TEvDq { struct TEvAbortExecution : public NActors::TEventPB<TEvAbortExecution, NDqProto::TEvAbortExecution, TDqEvents::EvAbortExecution> { - static THolder <TEvAbortExecution> Unavailable(const TString& s) { - return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::UNAVAILABLE, s); + static THolder<TEvAbortExecution> Unavailable(const TString& s, const TIssues& subIssues = {}) { + return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::UNAVAILABLE, s, subIssues); } - static THolder <TEvAbortExecution> InternalError(const TString& s) { - return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::INTERNAL_ERROR, s); + static THolder<TEvAbortExecution> InternalError(const TString& s, const TIssues& subIssues = {}) { + return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::INTERNAL_ERROR, s, subIssues); } - static THolder <TEvAbortExecution> Aborted(const TString& s) { - return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::ABORTED, s); + static THolder<TEvAbortExecution> Aborted(const TString& s, const TIssues& subIssues = {}) { + return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::ABORTED, s, subIssues); } TEvAbortExecution() = default; @@ -27,13 +28,34 @@ struct TEvDq { TEvAbortExecution(const TEvAbortExecution&) = default; - TEvAbortExecution(Ydb::StatusIds::StatusCode code, const TString& message) { + TEvAbortExecution(Ydb::StatusIds::StatusCode code, const TIssues& issues) { Record.SetStatusCode(code); - Record.SetMessage(message); + IssuesToMessage(issues, Record.MutableIssues()); + } + + TEvAbortExecution(Ydb::StatusIds::StatusCode code, const TString& message, const TIssues& subIssues = {}) { + Record.SetStatusCode(code); + Record.SetLegacyMessage(message); + TIssue issue(message); + for (const TIssue& i : subIssues) { + issue.AddSubIssue(MakeIntrusive<TIssue>(i)); + } + TIssues issues; + issues.AddIssue(std::move(issue)); + IssuesToMessage(issues, Record.MutableIssues()); + } + + TIssues GetIssues() const { + TIssues issues; + if (Record.IssuesSize()) { + IssuesFromMessage(Record.GetIssues(), issues); + } else if (const TString& msg = Record.GetLegacyMessage()) { + issues.AddIssue(msg); + } + return issues; } }; }; -} // namespace NDq -} // namespace NYql +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto index 0e38ab3bbc..d6c22b4509 100644 --- a/ydb/library/yql/dq/actors/protos/dq_events.proto +++ b/ydb/library/yql/dq/actors/protos/dq_events.proto @@ -75,7 +75,8 @@ message TEvComputeChannelDataAck { message TEvAbortExecution { optional Ydb.StatusIds.StatusCode StatusCode = 1; - optional string Message = 2; + optional string LegacyMessage = 2; // TODO: remove + repeated Ydb.Issue.IssueMessage Issues = 4; } message TRlPath { diff --git a/ydb/library/yql/providers/common/gateway/yql_provider_gateway.h b/ydb/library/yql/providers/common/gateway/yql_provider_gateway.h index c4b498e9a1..0d47e3c910 100644 --- a/ydb/library/yql/providers/common/gateway/yql_provider_gateway.h +++ b/ydb/library/yql/providers/common/gateway/yql_provider_gateway.h @@ -66,6 +66,14 @@ TResult ResultFromError(const TIssue& error) { } template<typename TResult> +TResult ResultFromError(const TIssues& error) { + TResult result; + result.AddIssues(error); + + return result; +} + +template<typename TResult> TResult ResultFromError(const TString& error, TPosition pos = TPosition()) { return ResultFromError<TResult>(TIssue(pos, error)); } diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp index 892c5cccd7..5ebe75db67 100644 --- a/ydb/library/yql/providers/dq/actors/events.cpp +++ b/ydb/library/yql/providers/dq/actors/events.cpp @@ -7,12 +7,17 @@ namespace NYql::NDqs { *Record.MutableTask() = std::move(task); } - TEvDqFailure::TEvDqFailure(const TIssue& issue, bool retriable, bool needFallback) { - IssuesToMessage({issue}, Record.MutableIssues()); + TEvDqFailure::TEvDqFailure(const TIssues& issues, bool retriable, bool needFallback) { + IssuesToMessage(issues, Record.MutableIssues()); Record.SetRetriable(retriable); Record.SetNeedFallback(needFallback); } + TEvDqFailure::TEvDqFailure(const TIssue& issue, bool retriable, bool needFallback) + : TEvDqFailure(TIssues({issue}), retriable, needFallback) + { + } + TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) { Record = std::move(queryResult); } diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h index 594921560a..bd03b26483 100644 --- a/ydb/library/yql/providers/dq/actors/events.h +++ b/ydb/library/yql/providers/dq/actors/events.h @@ -21,6 +21,7 @@ namespace NYql::NDqs { struct TEvDqFailure : NActors::TEventPB<TEvDqFailure, NDqProto::TDqFailure, TDqExecuterEvents::ES_DQ_FAILURE> { TEvDqFailure() = default; + explicit TEvDqFailure(const TIssues& issues, bool retriable = false, bool needFallback = false); explicit TEvDqFailure(const TIssue& issue, bool retriable = false, bool needFallback = false); }; diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp index 238828e453..75c1923197 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.cpp +++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp @@ -105,9 +105,9 @@ private: void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) { YQL_LOG_CTX_SCOPE(TraceId); auto ydbStatusId = ev->Get()->Record.GetStatusCode(); - auto message = ev->Get()->Record.GetMessage(); - YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << ydbStatusId << " " << message; - OnError(message, ydbStatusId == Ydb::StatusIds::UNAVAILABLE, false); // TODO: check fallback + TIssues issues = ev->Get()->GetIssues(); + YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << ydbStatusId << " " << issues.ToOneLineString(); + OnError(issues, ydbStatusId == Ydb::StatusIds::UNAVAILABLE, false); // TODO: check fallback } void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) { @@ -139,7 +139,7 @@ private: case NDqProto::COMPUTE_STATE_FAILURE: { // TODO: don't convert issues to string NYql::IssuesFromMessage(state.GetIssues(), Issues); - OnError(Issues.ToString(), false, false); + OnError(Issues, false, false); break; } case NDqProto::COMPUTE_STATE_EXECUTING: { @@ -472,23 +472,27 @@ private: } } - void OnError(const TString& message, bool retriable, bool needFallback) { + void OnError(const TIssues& issues, bool retriable, bool needFallback) { YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "OnError " << message; + YQL_LOG(DEBUG) << "OnError " << issues.ToOneLineString(); if (Finished) { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(WARN) << "OnError IGNORED when Finished, Retriable=" << retriable << ", NeedFallback=" << needFallback; } else { - auto issueCode = needFallback - ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR - : TIssuesIds::DQ_GATEWAY_ERROR; - auto req = MakeHolder<TEvDqFailure>(TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR), retriable, needFallback); + auto req = MakeHolder<TEvDqFailure>(issues, retriable, needFallback); FinalStat().FlushCounters(req->Record); Send(ExecuterId, req.Release()); Finished = true; } } + void OnError(const TString& message, bool retriable, bool needFallback) { + auto issueCode = needFallback + ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR + : TIssuesIds::DQ_GATEWAY_ERROR; + OnError(TIssues({TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR)}), retriable, needFallback); + } + void Finish() { if (ServiceCounters.Counters && AggrPeriod) { ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0); // force metrics upload on Finish when Aggregated |