aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-11 13:38:05 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-02-11 13:38:05 +0300
commitcddbd0caebf2457b54caac7522bfc5979358698a (patch)
treed0c50cdfd55aef154f9d27d6fd4e36639044fb4c
parentdadcaf26d43e922c5e58646758872dde2c328f95 (diff)
downloadydb-cddbd0caebf2457b54caac7522bfc5979358698a.tar.gz
YQ-356 Pass issues in TEvAbortExecution
Pass issues everywhere Pass subissues to other functions Improve TEvAbortExecution ref:bda713c8e075db5cde7e1a27f7c7b39f6d357fc0
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp6
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp11
-rw-r--r--ydb/core/kqp/executer/kqp_executer_impl.h39
-rw-r--r--ydb/core/kqp/executer/kqp_planner.cpp3
-rw-r--r--ydb/core/kqp/kqp_ic_gateway.cpp29
-rw-r--r--ydb/core/kqp/node/kqp_node.cpp6
-rw-r--r--ydb/core/kqp/node/kqp_node.h7
-rw-r--r--ydb/core/kqp/node/kqp_node_ut.cpp6
-rw-r--r--ydb/core/sys_view/common/scan_actor_base_impl.h2
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp5
-rw-r--r--ydb/core/tx/datashard/datashard__kqp_scan.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp.cpp5
-rw-r--r--ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp12
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor.h3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h29
-rw-r--r--ydb/library/yql/dq/actors/compute/ya.make1
-rw-r--r--ydb/library/yql/dq/actors/dq.h46
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_events.proto3
-rw-r--r--ydb/library/yql/providers/common/gateway/yql_provider_gateway.h8
-rw-r--r--ydb/library/yql/providers/dq/actors/events.cpp9
-rw-r--r--ydb/library/yql/providers/dq/actors/events.h1
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller.cpp24
22 files changed, 164 insertions, 98 deletions
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
index 5f95f263c6..69d14f9ad7 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
@@ -442,13 +442,13 @@ private:
void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
+ NYql::TIssues issues = ev->Get()->GetIssues();
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Got abort execution event, from: "
<< ev->Sender << ", code: " << Ydb::StatusIds::StatusCode_Name(record.GetStatusCode())
- << ", message: " << record.GetMessage());
+ << ", message: " << issues.ToOneLineString());
- NYql::TIssue issue(record.GetMessage());
- ReplyFinishStream(record.GetStatusCode(), issue, ctx);
+ ReplyFinishStream(record.GetStatusCode(), issues, ctx);
}
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index f8e3cd0c77..cb12128559 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -905,10 +905,7 @@ private:
void TerminateExpiredScan(const TActorId& actorId, TStringBuf msg) {
CA_LOG_W(msg);
- auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>();
- abortEv->Record.SetStatusCode(Ydb::StatusIds::CANCELLED);
- abortEv->Record.SetMessage("Cancel unexpected/expired scan");
-
+ auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(Ydb::StatusIds::CANCELLED, "Cancel unexpected/expired scan");
Send(actorId, abortEv.Release());
}
@@ -995,7 +992,7 @@ private:
}
}
- void TerminateSources(const TString& message, bool success) override {
+ void TerminateSources(const TIssues& issues, bool success) override {
if (!ScanData || Shards.empty()) {
return;
}
@@ -1004,10 +1001,10 @@ private:
auto& state = Shards.front();
if (state.ActorId) {
CA_LOG(prio, "Send abort execution event to scan over tablet: " << state.TabletId << ", table: "
- << ScanData->TablePath << ", scan actor: " << state.ActorId << ", message: " << message);
+ << ScanData->TablePath << ", scan actor: " << state.ActorId << ", message: " << issues.ToOneLineString());
Send(state.ActorId, new TEvKqp::TEvAbortExecution(
- success ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::ABORTED, message));
+ success ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::ABORTED, issues));
} else {
CA_LOG(prio, "Table: " << ScanData->TablePath << ", scan has not been started yet");
}
diff --git a/ydb/core/kqp/executer/kqp_executer_impl.h b/ydb/core/kqp/executer/kqp_executer_impl.h
index 6ecf78ab05..c2e872c4e8 100644
--- a/ydb/core/kqp/executer/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer/kqp_executer_impl.h
@@ -172,10 +172,11 @@ protected:
protected:
void HandleAbortExecution(TEvKqp::TEvAbortExecution::TPtr& ev) {
auto& msg = ev->Get()->Record;
+ NYql::TIssues issues = ev->Get()->GetIssues();
LOG_D("Got EvAbortExecution, status: " << Ydb::StatusIds_StatusCode_Name(msg.GetStatusCode())
- << ", message: " << msg.GetMessage());
+ << ", message: " << issues.ToOneLineString());
if (msg.GetStatusCode() == Ydb::StatusIds::INTERNAL_ERROR) {
- InternalError(msg.GetMessage());
+ InternalError(issues);
} else if (msg.GetStatusCode() == Ydb::StatusIds::TIMEOUT) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(Ydb::StatusIds::TIMEOUT, "Request timeout exceeded");
this->Send(Target, abortEv.Release());
@@ -183,7 +184,7 @@ protected:
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, "timeout");
this->PassAway();
} else {
- RuntimeError(msg.GetStatusCode(), msg.GetMessage());
+ RuntimeError(msg.GetStatusCode(), issues);
}
}
@@ -473,13 +474,13 @@ protected:
}
protected:
- void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const TString& message) {
+ void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
for (const auto& task : this->TasksGraph.GetTasks()) {
if (task.ComputeActorId) {
- LOG_I("aborting compute actor execution, message: " << message
+ LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString()
<< ", compute actor: " << task.ComputeActorId << ", task: " << task.Id);
- auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(code, message);
+ auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(code, issues);
this->Send(task.ComputeActorId, ev.Release());
} else {
LOG_I("task: " << task.Id << ", does not have Compute ActorId yet");
@@ -487,6 +488,10 @@ protected:
}
}
+ void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const TString& message) {
+ TerminateComputeActors(code, NYql::TIssues({NYql::TIssue(message)}));
+ }
+
protected:
void UnexpectedEvent(const TString& state, ui32 eventType) {
LOG_C("TKqpExecuter, unexpected event: " << eventType << ", at state:" << state << ", selfID: " << this->SelfId());
@@ -494,14 +499,20 @@ protected:
<< ", event: " << eventType);
}
- void InternalError(const TString& message) {
- LOG_E(message);
- TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, message);
+ void InternalError(const NYql::TIssues& issues) {
+ LOG_E(issues.ToOneLineString());
+ TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction.");
- issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(message));
+ for (const NYql::TIssue& i : issues) {
+ issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
+ }
ReplyErrorAndDie(Ydb::StatusIds::INTERNAL_ERROR, issue);
}
+ void InternalError(const TString& message) {
+ InternalError(NYql::TIssues({NYql::TIssue(message)}));
+ }
+
void ReplyUnavailable(const TString& message) {
LOG_E("UNAVAILABLE: " << message);
TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message);
@@ -510,10 +521,10 @@ protected:
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
}
- void RuntimeError(Ydb::StatusIds::StatusCode code, const TString& message) {
- LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << message);
- TerminateComputeActors(code, message);
- ReplyErrorAndDie(code, NYql::TIssue(message));
+ void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
+ LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString());
+ TerminateComputeActors(code, issues);
+ ReplyErrorAndDie(code, issues);
}
void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) {
diff --git a/ydb/core/kqp/executer/kqp_planner.cpp b/ydb/core/kqp/executer/kqp_planner.cpp
index 6bcfa85c66..920785ce05 100644
--- a/ydb/core/kqp/executer/kqp_planner.cpp
+++ b/ydb/core/kqp/executer/kqp_planner.cpp
@@ -82,7 +82,7 @@ void TKqpPlanner::HandleWait(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) {
}
void TKqpPlanner::HandleWait(TEvKqp::TEvAbortExecution::TPtr& ev) {
- LOG_E("Terminate KqpPlanner, reason: " << ev->Get()->Record.GetMessage());
+ LOG_E("Terminate KqpPlanner, reason: " << ev->Get()->GetIssues().ToOneLineString());
PassAway();
}
@@ -326,4 +326,3 @@ IActor* CreateKqpPlanner(ui64 txId, const TActorId& executer, TVector<NDqProto::
}
} // namespace NKikimr::NKqp
-
diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp
index 47489a9bd5..a82fd4c5a2 100644
--- a/ydb/core/kqp/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/kqp_ic_gateway.cpp
@@ -234,11 +234,11 @@ public:
}
void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record;
+ const TString msg = ev->Get()->GetIssues().ToOneLineString();
LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId()
- << "Received abort execution event for scan query: " << record.GetMessage());
+ << "Received abort execution event for scan query: " << msg);
- TBase::HandleError(record.GetMessage(), ctx);
+ TBase::HandleError(msg, ctx);
}
using TBase::HandleResponse;
@@ -350,11 +350,11 @@ public:
}
void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record;
+ const TString msg = ev->Get()->GetIssues().ToOneLineString();
LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, this->SelfId()
- << "Received abort execution event for data query: " << record.GetMessage());
+ << "Received abort execution event for data query: " << msg);
- TBase::HandleError(record.GetMessage(), ctx);
+ TBase::HandleError(msg, ctx);
}
using TBase::Handle;
@@ -437,11 +437,11 @@ public:
}
void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record;
+ const TString msg = ev->Get()->GetIssues().ToOneLineString();
LOG_DEBUG_S(ctx, NKikimrServices::KQP_GATEWAY, SelfId()
- << "Received abort execution event for scan query: " << record.GetMessage());
+ << "Received abort execution event for scan query: " << msg);
- TBase::HandleError(record.GetMessage(), ctx);
+ TBase::HandleError(msg, ctx);
}
using TBase::HandleResponse;
@@ -892,14 +892,21 @@ private:
void Handle(TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {
auto& msg = ev->Get()->Record;
+ NYql::TIssues issues = ev->Get()->GetIssues();
LOG_ERROR_S(ctx, NKikimrServices::KQP_GATEWAY,
"TKqpExecPhysicalRequestHandler, got EvAbortExecution event."
<< " Code: " << Ydb::StatusIds_StatusCode_Name(msg.GetStatusCode())
- << ", reason: " << msg.GetMessage());
+ << ", reason: " << issues.ToOneLineString());
auto issueCode = NYql::YqlStatusFromYdbStatus(msg.GetStatusCode());
- Promise.SetValue(ResultFromError<TResult>(YqlIssue({}, issueCode, msg.GetMessage())));
+ NYql::TIssues resultIssues;
+ for (const auto& i : issues) {
+ NYql::TIssue issue(i);
+ NYql::SetIssueCode(issueCode, issue);
+ resultIssues.AddIssue(std::move(issue));
+ }
+ Promise.SetValue(ResultFromError<TResult>(std::move(resultIssues)));
this->PassAway();
}
diff --git a/ydb/core/kqp/node/kqp_node.cpp b/ydb/core/kqp/node/kqp_node.cpp
index 24f8f2ac79..a7e96aea1d 100644
--- a/ydb/core/kqp/node/kqp_node.cpp
+++ b/ydb/core/kqp/node/kqp_node.cpp
@@ -292,8 +292,8 @@ private:
auto runtimeSettings = runtimeSettingsBase;
runtimeSettings.TerminateHandler = [actorSystem, rm = SelfId(), txId, taskId = dqTask.GetId()]
- (bool success, const TString& message) {
- actorSystem->Send(rm, new TEvKqpNode::TEvFinishKqpTask(txId, taskId, success, message));
+ (bool success, const NYql::TIssues& issues) {
+ actorSystem->Send(rm, new TEvKqpNode::TEvFinishKqpTask(txId, taskId, success, issues));
};
ETableKind tableKind = ETableKind::Unknown;
@@ -347,7 +347,7 @@ private:
auto& msg = *ev->Get();
LOG_D("TxId: " << msg.TxId << ", finish compute task: " << msg.TaskId << ", success: " << msg.Success
- << ", message: " << msg.Message);
+ << ", message: " << msg.Issues.ToOneLineString());
auto task = State.RemoveTask(msg.TxId, msg.TaskId, msg.Success, [this, &msg]
(const TActorId& requester, const NKqpNode::TTasksRequest& request, const NKqpNode::TTaskContext&, bool finishTx) {
diff --git a/ydb/core/kqp/node/kqp_node.h b/ydb/core/kqp/node/kqp_node.h
index 8f1096ed31..0c75086fdf 100644
--- a/ydb/core/kqp/node/kqp_node.h
+++ b/ydb/core/kqp/node/kqp_node.h
@@ -5,6 +5,7 @@
#include <ydb/core/protos/config.pb.h>
#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
+#include <ydb/library/yql/public/issue/yql_issue.h>
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/event_pb.h>
@@ -40,13 +41,13 @@ struct TEvKqpNode {
const ui64 TxId;
const ui64 TaskId;
const bool Success;
- const TString Message;
+ const NYql::TIssues Issues;
- TEvFinishKqpTask(ui64 txId, ui64 taskId, bool success, const TString& message = "")
+ TEvFinishKqpTask(ui64 txId, ui64 taskId, bool success, const NYql::TIssues& issues = {})
: TxId(txId)
, TaskId(taskId)
, Success(success)
- , Message(message) {}
+ , Issues(issues) {}
};
struct TEvCancelKqpTasksRequest : public TEventPB<TEvCancelKqpTasksRequest,
diff --git a/ydb/core/kqp/node/kqp_node_ut.cpp b/ydb/core/kqp/node/kqp_node_ut.cpp
index 4decee6bd3..6023a0b817 100644
--- a/ydb/core/kqp/node/kqp_node_ut.cpp
+++ b/ydb/core/kqp/node/kqp_node_ut.cpp
@@ -204,7 +204,7 @@ public:
void SendFinishTask(const TActorId& computeActorId, ui64 txId, ui64 taskId, bool success = true,
const TString& message = "")
{
- auto ev = new TEvKqpNode::TEvFinishKqpTask(txId, taskId, success, message);
+ auto ev = new TEvKqpNode::TEvFinishKqpTask(txId, taskId, success, NYql::TIssues({NYql::TIssue(message)}));
Runtime->Send(new IEventHandle(KqpNodeActorId, computeActorId, ev));
}
@@ -618,7 +618,7 @@ void KqpNode::ExecuterLost() {
for (auto& [taskId, computeActor] : CompFactory->Task2Actor) {
auto abortEvent = Runtime->GrabEdgeEvent<TEvKqp::TEvAbortExecution>(computeActor.ActorId);
- UNIT_ASSERT_VALUES_EQUAL("executer lost", abortEvent->Get()->Record.GetMessage());
+ UNIT_ASSERT_VALUES_EQUAL("executer lost", abortEvent->Get()->Record.GetLegacyMessage());
}
UNIT_ASSERT_VALUES_EQUAL(KqpCounters->RmComputeActors->Val(), 0);
@@ -668,7 +668,7 @@ void KqpNode::TerminateTx() {
for (auto&[taskId, computeActor] : CompFactory->Task2Actor) {
auto abortEvent = Runtime->GrabEdgeEvent<TEvKqp::TEvAbortExecution>(computeActor.ActorId);
- UNIT_ASSERT_VALUES_EQUAL("terminate", abortEvent->Get()->Record.GetMessage());
+ UNIT_ASSERT_VALUES_EQUAL("terminate", abortEvent->Get()->Record.GetLegacyMessage());
}
}
diff --git a/ydb/core/sys_view/common/scan_actor_base_impl.h b/ydb/core/sys_view/common/scan_actor_base_impl.h
index 298a8efbc0..88f7c1ba83 100644
--- a/ydb/core/sys_view/common/scan_actor_base_impl.h
+++ b/ydb/core/sys_view/common/scan_actor_base_impl.h
@@ -74,7 +74,7 @@ protected:
<< ", scan id: " << ScanId
<< ", table id: " << TableId
<< ", code: " << Ydb::StatusIds::StatusCode_Name(ev->Get()->Record.GetStatusCode())
- << ", error: " << ev->Get()->Record.GetMessage());
+ << ", error: " << ev->Get()->GetIssues().ToOneLineString());
this->PassAway();
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index e2f7ba5a51..697114c0b8 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -254,15 +254,16 @@ private:
void HandleScan(TEvKqp::TEvAbortExecution::TPtr& ev) {
auto& msg = ev->Get()->Record;
+ TString reason = ev->Get()->GetIssues().ToOneLineString();
auto prio = msg.GetStatusCode() == Ydb::StatusIds::SUCCESS ? NActors::NLog::PRI_DEBUG : NActors::NLog::PRI_WARN;
LOG_LOG_S(*TlsActivationContext, prio, NKikimrServices::TX_COLUMNSHARD_SCAN, "Got AbortExecution"
<< ", at: " << ScanActorId
<< ", txId: " << TxId << ", scanId: " << ScanId << ", gen: " << ScanGen << ", table: " << TablePath
<< ", code: " << Ydb::StatusIds_StatusCode_Name(msg.GetStatusCode())
- << ", reason: " << msg.GetMessage());
+ << ", reason: " << reason);
- AbortReason = std::move(msg.GetMessage());
+ AbortReason = std::move(reason);
SendError(NKikimrProto::EReplyStatus::ERROR); // TODO: better status?
Finish();
}
diff --git a/ydb/core/tx/datashard/datashard__kqp_scan.cpp b/ydb/core/tx/datashard/datashard__kqp_scan.cpp
index b6b6b7e337..182e9882c0 100644
--- a/ydb/core/tx/datashard/datashard__kqp_scan.cpp
+++ b/ydb/core/tx/datashard/datashard__kqp_scan.cpp
@@ -143,7 +143,7 @@ private:
<< ", at: " << ScanActorId << ", tablet: " << DatashardActorId
<< ", scanId: " << ScanId << ", table: " << TablePath
<< ", code: " << Ydb::StatusIds_StatusCode_Name(msg.GetStatusCode())
- << ", reason: " << msg.GetMessage());
+ << ", reason: " << ev->Get()->GetIssues().ToOneLineString());
AbortEvent = ev->Release();
Driver->Touch(EScan::Final);
@@ -342,7 +342,10 @@ private:
if (AbortEvent) {
ev->Record.SetStatus(AbortEvent->Record.GetStatusCode());
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_OPERATION_ABORTED, TStringBuilder()
- << "Table " << TablePath << " scan failed, reason: " << AbortEvent->Record.GetMessage());
+ << "Table " << TablePath << " scan failed");
+ for (const NYql::TIssue& i : AbortEvent->GetIssues()) {
+ issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
+ }
IssueToMessage(issue, ev->Record.MutableIssues()->Add());
} else {
ev->Record.SetStatus(Ydb::StatusIds::ABORTED);
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp.cpp b/ydb/core/tx/datashard/datashard_ut_kqp.cpp
index d3145951a4..d2584ec2e8 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp.cpp
@@ -199,7 +199,7 @@ private:
}
if (auto* ev = std::get<TEvKqp::TEvAbortExecution*>(replies)) {
- UNIT_FAIL(ev->Record.GetMessage());
+ UNIT_FAIL(ev->GetIssues().ToOneLineString());
}
if (auto* ev = std::get<TEvKqpExecuter::TEvStreamData*>(replies)) {
@@ -469,7 +469,7 @@ void KqpStabilityTests::AbortOnDisconnect() {
}
if (auto* ev = std::get<TEvKqp::TEvAbortExecution*>(replies)) {
- UNIT_FAIL(ev->Record.GetMessage());
+ UNIT_FAIL(ev->GetIssues().ToOneLineString());
}
if (std::get<TEvKqpExecuter::TEvStreamData*>(replies)) {
@@ -481,4 +481,3 @@ void KqpStabilityTests::AbortOnDisconnect() {
}
}
}
-
diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
index d9d354c2e5..a9138ef40f 100644
--- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
+++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp
@@ -110,10 +110,9 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvRegisterCoord
CC_LOG_D("Got TEvRegisterCoordinatorResponse; issues: " << ev->Get()->Issues.ToOneLineString());
const auto& issues = ev->Get()->Issues;
if (issues) {
- auto message = "Can't register in storage: " + issues.ToOneLineString();
- CC_LOG_E(message);
+ CC_LOG_E("Can't register in storage: " + issues.ToOneLineString());
++*Metrics.StorageError;
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message));
+ Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Can't register in storage", issues));
return;
}
@@ -167,9 +166,8 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvGetCheckpoint
if (event->Issues) {
++*Metrics.StorageError;
- auto message = "Can't get checkpoints to restore: " + event->Issues.ToOneLineString();
- CC_LOG_E(message);
- Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(message));
+ CC_LOG_E("Can't get checkpoints to restore: " + event->Issues.ToOneLineString());
+ Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError("Can't get checkpoints to restore", event->Issues));
return;
}
@@ -230,7 +228,7 @@ void TCheckpointCoordinator::TryToRestoreOffsetsFromForeignCheckpoint(const TChe
}
if (!result) {
- Send(TaskControllerId, new NYql::NDq::TEvDq::TEvAbortExecution(Ydb::StatusIds::BAD_REQUEST, issues.ToString()));
+ Send(TaskControllerId, new NYql::NDq::TEvDq::TEvAbortExecution(Ydb::StatusIds::BAD_REQUEST, issues));
return;
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
index 84eeda2edb..a3afe7a55b 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h
@@ -8,6 +8,7 @@
#include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h>
#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
#include <ydb/library/yql/dq/runtime/dq_transport.h>
+#include <ydb/library/yql/public/issue/yql_issue.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -229,7 +230,7 @@ struct TComputeRuntimeSettings {
bool UseLLVM = false;
bool UseSpilling = false;
- std::function<void(bool success, const TString& reason)> TerminateHandler;
+ std::function<void(bool success, const TIssues& issues)> TerminateHandler;
TMaybe<NDqProto::TRlPath> RlPath;
};
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index deaadefb42..14d64c5a86 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -387,8 +387,7 @@ protected:
}
protected:
- void Terminate(bool success, const TString& message) {
-
+ void Terminate(bool success, const TIssues& issues) {
if (MkqlMemoryLimit && MemoryLimits.FreeMemoryFn) {
MemoryLimits.FreeMemoryFn(TxId, Task.GetId(), MkqlMemoryLimit);
MkqlMemoryLimit = 0;
@@ -425,17 +424,21 @@ protected:
}
if (RuntimeSettings.TerminateHandler) {
- RuntimeSettings.TerminateHandler(success, message);
+ RuntimeSettings.TerminateHandler(success, issues);
}
this->PassAway();
}
+ void Terminate(bool success, const TString& message) {
+ Terminate(success, TIssues({TIssue(message)}));
+ }
+
void ReportStateAndMaybeDie(TIssue&& issue) {
ReportStateAndMaybeDie(
State == NDqProto::COMPUTE_STATE_FINISHED ?
Ydb::StatusIds::STATUS_CODE_UNSPECIFIED
- : Ydb::StatusIds::ABORTED, TIssues({issue}));
+ : Ydb::StatusIds::ABORTED, TIssues({std::move(issue)}));
}
void ReportStateAndDie(NDqProto::EComputeState state, TIssue&& issue) {
@@ -459,8 +462,8 @@ protected:
this->Send(ExecuterId, execEv.Release());
- TerminateSources(issue.Message, state == NDqProto::COMPUTE_STATE_FINISHED);
- Terminate(state == NDqProto::COMPUTE_STATE_FINISHED, issue.Message);
+ TerminateSources(TIssues({issue}), state == NDqProto::COMPUTE_STATE_FINISHED);
+ Terminate(state == NDqProto::COMPUTE_STATE_FINISHED, TIssues({std::move(issue)}));
}
void ReportStateAndMaybeDie(Ydb::StatusIds::StatusCode status, const TIssues& issues)
@@ -751,7 +754,11 @@ protected:
virtual void PollSources(THolder<NKikimr::IDestructable> /* state */) {
}
- virtual void TerminateSources(const TString& /* message */, bool /* success */) {
+ virtual void TerminateSources(const TIssues& /* issues */, bool /* success */) {
+ }
+
+ void TerminateSources(const TString& message, bool success) {
+ TerminateSources(TIssues({TIssue(message)}), success);
}
virtual TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() {
@@ -906,20 +913,20 @@ protected:
}
void HandleExecuteBase(TEvDq::TEvAbortExecution::TPtr& ev) {
- TString message = ev->Get()->Record.GetMessage();
+ TIssues issues = ev->Get()->GetIssues();
CA_LOG_E("Handle abort execution event from: " << ev->Sender
<< ", status: " << Ydb::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode())
- << ", reason: " << message);
+ << ", reason: " << issues.ToOneLineString());
bool success = ev->Get()->Record.GetStatusCode() == Ydb::StatusIds::SUCCESS;
- this->TerminateSources(message, success);
+ this->TerminateSources(issues, success);
if (ev->Sender != ExecuterId) {
NActors::TActivationContext::Send(ev->Forward(ExecuterId));
}
- Terminate(success, message);
+ Terminate(success, issues);
}
void HandleExecuteBase(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
diff --git a/ydb/library/yql/dq/actors/compute/ya.make b/ydb/library/yql/dq/actors/compute/ya.make
index c4b20bd502..fc2d5cfaba 100644
--- a/ydb/library/yql/dq/actors/compute/ya.make
+++ b/ydb/library/yql/dq/actors/compute/ya.make
@@ -30,6 +30,7 @@ PEERDIR(
ydb/library/yql/dq/tasks
ydb/library/yql/minikql/comp_nodes
ydb/library/yql/minikql/computation
+ ydb/library/yql/public/issue
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/library/yql/dq/actors/dq.h b/ydb/library/yql/dq/actors/dq.h
index 025626d05f..61355cf78d 100644
--- a/ydb/library/yql/dq/actors/dq.h
+++ b/ydb/library/yql/dq/actors/dq.h
@@ -2,23 +2,24 @@
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/actors/dq_events_ids.h>
+#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
-namespace NYql {
-namespace NDq {
+namespace NYql::NDq {
struct TEvDq {
struct TEvAbortExecution : public NActors::TEventPB<TEvAbortExecution, NDqProto::TEvAbortExecution, TDqEvents::EvAbortExecution> {
- static THolder <TEvAbortExecution> Unavailable(const TString& s) {
- return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::UNAVAILABLE, s);
+ static THolder<TEvAbortExecution> Unavailable(const TString& s, const TIssues& subIssues = {}) {
+ return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::UNAVAILABLE, s, subIssues);
}
- static THolder <TEvAbortExecution> InternalError(const TString& s) {
- return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::INTERNAL_ERROR, s);
+ static THolder<TEvAbortExecution> InternalError(const TString& s, const TIssues& subIssues = {}) {
+ return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::INTERNAL_ERROR, s, subIssues);
}
- static THolder <TEvAbortExecution> Aborted(const TString& s) {
- return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::ABORTED, s);
+ static THolder<TEvAbortExecution> Aborted(const TString& s, const TIssues& subIssues = {}) {
+ return MakeHolder<TEvAbortExecution>(Ydb::StatusIds::ABORTED, s, subIssues);
}
TEvAbortExecution() = default;
@@ -27,13 +28,34 @@ struct TEvDq {
TEvAbortExecution(const TEvAbortExecution&) = default;
- TEvAbortExecution(Ydb::StatusIds::StatusCode code, const TString& message) {
+ TEvAbortExecution(Ydb::StatusIds::StatusCode code, const TIssues& issues) {
Record.SetStatusCode(code);
- Record.SetMessage(message);
+ IssuesToMessage(issues, Record.MutableIssues());
+ }
+
+ TEvAbortExecution(Ydb::StatusIds::StatusCode code, const TString& message, const TIssues& subIssues = {}) {
+ Record.SetStatusCode(code);
+ Record.SetLegacyMessage(message);
+ TIssue issue(message);
+ for (const TIssue& i : subIssues) {
+ issue.AddSubIssue(MakeIntrusive<TIssue>(i));
+ }
+ TIssues issues;
+ issues.AddIssue(std::move(issue));
+ IssuesToMessage(issues, Record.MutableIssues());
+ }
+
+ TIssues GetIssues() const {
+ TIssues issues;
+ if (Record.IssuesSize()) {
+ IssuesFromMessage(Record.GetIssues(), issues);
+ } else if (const TString& msg = Record.GetLegacyMessage()) {
+ issues.AddIssue(msg);
+ }
+ return issues;
}
};
};
-} // namespace NDq
-} // namespace NYql
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto
index 0e38ab3bbc..d6c22b4509 100644
--- a/ydb/library/yql/dq/actors/protos/dq_events.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_events.proto
@@ -75,7 +75,8 @@ message TEvComputeChannelDataAck {
message TEvAbortExecution {
optional Ydb.StatusIds.StatusCode StatusCode = 1;
- optional string Message = 2;
+ optional string LegacyMessage = 2; // TODO: remove
+ repeated Ydb.Issue.IssueMessage Issues = 4;
}
message TRlPath {
diff --git a/ydb/library/yql/providers/common/gateway/yql_provider_gateway.h b/ydb/library/yql/providers/common/gateway/yql_provider_gateway.h
index c4b498e9a1..0d47e3c910 100644
--- a/ydb/library/yql/providers/common/gateway/yql_provider_gateway.h
+++ b/ydb/library/yql/providers/common/gateway/yql_provider_gateway.h
@@ -66,6 +66,14 @@ TResult ResultFromError(const TIssue& error) {
}
template<typename TResult>
+TResult ResultFromError(const TIssues& error) {
+ TResult result;
+ result.AddIssues(error);
+
+ return result;
+}
+
+template<typename TResult>
TResult ResultFromError(const TString& error, TPosition pos = TPosition()) {
return ResultFromError<TResult>(TIssue(pos, error));
}
diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp
index 892c5cccd7..5ebe75db67 100644
--- a/ydb/library/yql/providers/dq/actors/events.cpp
+++ b/ydb/library/yql/providers/dq/actors/events.cpp
@@ -7,12 +7,17 @@ namespace NYql::NDqs {
*Record.MutableTask() = std::move(task);
}
- TEvDqFailure::TEvDqFailure(const TIssue& issue, bool retriable, bool needFallback) {
- IssuesToMessage({issue}, Record.MutableIssues());
+ TEvDqFailure::TEvDqFailure(const TIssues& issues, bool retriable, bool needFallback) {
+ IssuesToMessage(issues, Record.MutableIssues());
Record.SetRetriable(retriable);
Record.SetNeedFallback(needFallback);
}
+ TEvDqFailure::TEvDqFailure(const TIssue& issue, bool retriable, bool needFallback)
+ : TEvDqFailure(TIssues({issue}), retriable, needFallback)
+ {
+ }
+
TEvQueryResponse::TEvQueryResponse(NDqProto::TQueryResponse&& queryResult) {
Record = std::move(queryResult);
}
diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h
index 594921560a..bd03b26483 100644
--- a/ydb/library/yql/providers/dq/actors/events.h
+++ b/ydb/library/yql/providers/dq/actors/events.h
@@ -21,6 +21,7 @@ namespace NYql::NDqs {
struct TEvDqFailure : NActors::TEventPB<TEvDqFailure, NDqProto::TDqFailure, TDqExecuterEvents::ES_DQ_FAILURE> {
TEvDqFailure() = default;
+ explicit TEvDqFailure(const TIssues& issues, bool retriable = false, bool needFallback = false);
explicit TEvDqFailure(const TIssue& issue, bool retriable = false, bool needFallback = false);
};
diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp
index 238828e453..75c1923197 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller.cpp
+++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp
@@ -105,9 +105,9 @@ private:
void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) {
YQL_LOG_CTX_SCOPE(TraceId);
auto ydbStatusId = ev->Get()->Record.GetStatusCode();
- auto message = ev->Get()->Record.GetMessage();
- YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << ydbStatusId << " " << message;
- OnError(message, ydbStatusId == Ydb::StatusIds::UNAVAILABLE, false); // TODO: check fallback
+ TIssues issues = ev->Get()->GetIssues();
+ YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << ydbStatusId << " " << issues.ToOneLineString();
+ OnError(issues, ydbStatusId == Ydb::StatusIds::UNAVAILABLE, false); // TODO: check fallback
}
void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) {
@@ -139,7 +139,7 @@ private:
case NDqProto::COMPUTE_STATE_FAILURE: {
// TODO: don't convert issues to string
NYql::IssuesFromMessage(state.GetIssues(), Issues);
- OnError(Issues.ToString(), false, false);
+ OnError(Issues, false, false);
break;
}
case NDqProto::COMPUTE_STATE_EXECUTING: {
@@ -472,23 +472,27 @@ private:
}
}
- void OnError(const TString& message, bool retriable, bool needFallback) {
+ void OnError(const TIssues& issues, bool retriable, bool needFallback) {
YQL_LOG_CTX_SCOPE(TraceId);
- YQL_LOG(DEBUG) << "OnError " << message;
+ YQL_LOG(DEBUG) << "OnError " << issues.ToOneLineString();
if (Finished) {
YQL_LOG_CTX_SCOPE(TraceId);
YQL_LOG(WARN) << "OnError IGNORED when Finished, Retriable=" << retriable << ", NeedFallback=" << needFallback;
} else {
- auto issueCode = needFallback
- ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR
- : TIssuesIds::DQ_GATEWAY_ERROR;
- auto req = MakeHolder<TEvDqFailure>(TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR), retriable, needFallback);
+ auto req = MakeHolder<TEvDqFailure>(issues, retriable, needFallback);
FinalStat().FlushCounters(req->Record);
Send(ExecuterId, req.Release());
Finished = true;
}
}
+ void OnError(const TString& message, bool retriable, bool needFallback) {
+ auto issueCode = needFallback
+ ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR
+ : TIssuesIds::DQ_GATEWAY_ERROR;
+ OnError(TIssues({TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR)}), retriable, needFallback);
+ }
+
void Finish() {
if (ServiceCounters.Counters && AggrPeriod) {
ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0); // force metrics upload on Finish when Aggregated