aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIvan <5627721+abyss7@users.noreply.github.com>2024-08-23 13:15:46 +0300
committerGitHub <noreply@github.com>2024-08-23 13:15:46 +0300
commitfe50fe3015efab1c711b830b2027fbc0018729ab (patch)
treed282254120a9fa4961b13cd75f2c573b7c81db77
parent1869eedd3e49984e0b363ceda467bc1fc7b16214 (diff)
downloadydb-fe50fe3015efab1c711b830b2027fbc0018729ab.tar.gz
Wait for all CAs inside Executer before shutdown (#7829)
-rw-r--r--.gitignore3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp78
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h57
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp14
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp5
-rw-r--r--ydb/core/kqp/ut/query/kqp_limits_ut.cpp117
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h30
8 files changed, 257 insertions, 48 deletions
diff --git a/.gitignore b/.gitignore
index ce49d73b45..87b7b54d29 100644
--- a/.gitignore
+++ b/.gitignore
@@ -26,6 +26,9 @@ __pycache__/
*.pb.h
*.pb.cc
+# Other generated
+*.fbs.h
+
# MacOS specific
.DS_Store
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index fd16b7b441..1d18d034ac 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -205,6 +205,8 @@ public:
}
void Finalize() {
+ YQL_ENSURE(!AlreadyReplied);
+
if (LocksBroken) {
TString message = "Transaction locks invalidated.";
@@ -278,8 +280,7 @@ public:
ExecuterSpan.EndOk();
Request.Transactions.crop(0);
- LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize());
- Send(Target, ResponseEv.release());
+ AlreadyReplied = true;
PassAway();
}
@@ -319,6 +320,8 @@ private:
return "WaitSnapshotState";
} else if (func == &TThis::WaitResolveState) {
return "WaitResolveState";
+ } else if (func == &TThis::WaitShutdownState) {
+ return "WaitShutdownState";
} else {
return TBase::CurrentStateFuncName();
}
@@ -2595,6 +2598,22 @@ private:
}
}
+ void Shutdown() override {
+ if (Planner) {
+ if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
+ LOG_I("Shutdown immediately - nothing to wait");
+ PassAway();
+ } else {
+ this->Become(&TThis::WaitShutdownState);
+ LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
+ << Planner->GetPendingComputeActors().size() << " compute actors");
+ TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
+ }
+ } else {
+ PassAway();
+ }
+ }
+
void PassAway() override {
auto totalTime = TInstant::Now() - StartTime;
Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds());
@@ -2612,6 +2631,61 @@ private:
TBase::PassAway();
}
+ STATEFN(WaitShutdownState) {
+ switch(ev->GetTypeRewrite()) {
+ hFunc(TEvDqCompute::TEvState, HandleShutdown);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown);
+ hFunc(TEvents::TEvPoison, HandleShutdown);
+ default:
+ LOG_E("Unexpected event: " << ev->GetTypeName()); // ignore all other events
+ }
+ }
+
+ void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
+ if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
+ YQL_ENSURE(Planner);
+
+ TActorId actor = ev->Sender;
+ ui64 taskId = ev->Get()->Record.GetTaskId();
+
+ Planner->CompletedCA(taskId, actor);
+
+ if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
+ PassAway();
+ }
+ }
+ }
+
+ void HandleShutdown(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
+ const auto nodeId = ev->Get()->NodeId;
+ LOG_N("Node has disconnected while shutdown: " << nodeId);
+
+ YQL_ENSURE(Planner);
+
+ for (const auto& task : TasksGraph.GetTasks()) {
+ if (task.Meta.NodeId == nodeId && !task.Meta.Completed) {
+ if (task.ComputeActorId) {
+ Planner->CompletedCA(task.Id, task.ComputeActorId);
+ } else {
+ Planner->TaskNotStarted(task.Id);
+ }
+ }
+ }
+
+ if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
+ PassAway();
+ }
+ }
+
+ void HandleShutdown(TEvents::TEvPoison::TPtr& ev) {
+ // Self-poison means timeout - don't wait anymore.
+ LOG_I("Timed out on waiting for Compute Actors to finish - forcing shutdown");
+
+ if (ev->Sender == SelfId()) {
+ PassAway();
+ }
+ }
+
private:
void ReplyTxStateUnknown(ui64 shardId) {
auto message = TStringBuilder() << "Tx state unknown for shard " << shardId << ", txid " << TxId;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 37f8bec828..2915775ea3 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -667,7 +667,7 @@ protected:
if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) {
InternalError(issues);
} else if (statusCode == Ydb::StatusIds::TIMEOUT) {
- AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded");
+ TimeoutError(ev->Sender);
} else {
RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues);
}
@@ -1624,14 +1624,14 @@ protected:
protected:
void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
for (const auto& task : this->TasksGraph.GetTasks()) {
- if (task.ComputeActorId) {
+ if (task.ComputeActorId && !task.Meta.Completed) {
LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString()
<< ", compute actor: " << task.ComputeActorId << ", task: " << task.Id);
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(code), issues);
this->Send(task.ComputeActorId, ev.Release());
} else {
- LOG_I("task: " << task.Id << ", does not have Compute ActorId yet");
+ LOG_I("task: " << task.Id << ", does not have the CA id yet or is already complete");
}
}
}
@@ -1649,7 +1649,6 @@ protected:
void InternalError(const NYql::TIssues& issues) {
LOG_E(issues.ToOneLineString());
- TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction.");
for (const NYql::TIssue& i : issues) {
issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i));
@@ -1663,7 +1662,6 @@ protected:
void ReplyUnavailable(const TString& message) {
LOG_E("UNAVAILABLE: " << message);
- TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message);
auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE);
issue.AddSubIssue(new NYql::TIssue(message));
ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue);
@@ -1671,7 +1669,6 @@ protected:
void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString());
- TerminateComputeActors(code, issues);
ReplyErrorAndDie(code, issues);
}
@@ -1687,11 +1684,19 @@ protected:
ReplyErrorAndDie(status, &issues);
}
- void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) {
+ void TimeoutError(TActorId abortSender) {
if (AlreadyReplied) {
+ LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
return;
}
+ const auto status = NYql::NDqProto::StatusIds::TIMEOUT;
+ const TString message = "Request timeout exceeded";
+
+ TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
+
+ AlreadyReplied = true;
+
LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message);
if (ExecuterSpan) {
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
@@ -1701,17 +1706,14 @@ protected:
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
if (abortSender != Target) {
- auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded");
+ auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, message);
this->Send(Target, abortEv.Release());
}
- AlreadyReplied = true;
LOG_E("Sending timeout response to: " << Target);
- this->Send(Target, ResponseEv.release());
Request.Transactions.crop(0);
- TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
- this->PassAway();
+ this->Shutdown();
}
void FillResponseStats(Ydb::StatusIds::StatusCode status) {
@@ -1746,17 +1748,11 @@ protected:
google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues)
{
if (AlreadyReplied) {
+ LOG_E("Error when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl);
return;
}
- if (Planner) {
- for (auto computeActor : Planner->GetPendingComputeActors()) {
- LOG_D("terminate compute actor " << computeActor.first);
-
- auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution");
- this->Send(computeActor.first, ev.Release());
- }
- }
+ TerminateComputeActors(status, "Terminate execution");
AlreadyReplied = true;
auto& response = *ResponseEv->Record.MutableResponse();
@@ -1782,8 +1778,7 @@ protected:
ExecuterStateSpan.EndError(response.DebugString());
Request.Transactions.crop(0);
- this->Send(Target, ResponseEv.release());
- this->PassAway();
+ this->Shutdown();
}
protected:
@@ -1851,7 +1846,16 @@ protected:
}
protected:
+ // Introduced separate method from `PassAway()` - to not get confused with expectations from other actors,
+ // that `PassAway()` should kill actor immediately.
+ virtual void Shutdown() {
+ PassAway();
+ }
+
void PassAway() override {
+ YQL_ENSURE(AlreadyReplied && ResponseEv);
+ this->Send(Target, ResponseEv.release());
+
for (auto channelPair: ResultChannelProxies) {
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());
@@ -1872,12 +1876,11 @@ protected:
if (KqpTableResolverId) {
this->Send(KqpTableResolverId, new TEvents::TEvPoison);
- this->Send(this->SelfId(), new TEvents::TEvPoison);
- LOG_T("Terminate, become ZombieState");
- this->Become(&TKqpExecuterBase::ZombieState);
- } else {
- IActor::PassAway();
}
+
+ this->Send(this->SelfId(), new TEvents::TEvPoison);
+ LOG_T("Terminate, become ZombieState");
+ this->Become(&TKqpExecuterBase::ZombieState);
}
STATEFN(ZombieState) {
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 3a13c9fb7f..926dda2700 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -554,7 +554,19 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) {
YQL_ENSURE(it != PendingComputeActors.end());
LastStats.emplace_back(std::move(it->second));
PendingComputeActors.erase(it);
- return;
+
+ LOG_I("Compute actor has finished execution: " << computeActor.ToString());
+}
+
+void TKqpPlanner::TaskNotStarted(ui64 taskId) {
+ // NOTE: should be invoked only while shutting down - when node is disconnected.
+
+ auto& task = TasksGraph.GetTask(taskId);
+
+ YQL_ENSURE(!task.ComputeActorId);
+ YQL_ENSURE(!task.Meta.Completed);
+
+ PendingComputeTasks.erase(taskId);
}
TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() {
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index 639c537370..eed887d6e9 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -74,6 +74,7 @@ public:
std::unique_ptr<IEventHandle> AssignTasksToNodes();
bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state);
void CompletedCA(ui64 taskId, TActorId computeActor);
+ void TaskNotStarted(ui64 taskId);
TProgressStat::TEntry CalculateConsumptionUpdate();
void ShiftConsumption();
void Submit();
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 277e77da71..c04d5e7573 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -273,6 +273,9 @@ private:
public:
void Finalize() {
+ YQL_ENSURE(!AlreadyReplied);
+ AlreadyReplied = true;
+
FillResponseStats(Ydb::StatusIds::SUCCESS);
LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());
@@ -281,8 +284,6 @@ public:
ExecuterSpan.EndOk();
}
- LOG_D("Sending response to: " << Target);
- Send(Target, ResponseEv.release());
PassAway();
}
diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
index 6883fab44a..b0ac3f9dab 100644
--- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
@@ -1,7 +1,10 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/kqp/common/events/events.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/library/ydb_issue/proto/issue_id.pb.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/core/tablet/resource_broker.h>
#include <util/random/random.h>
@@ -832,6 +835,120 @@ Y_UNIT_TEST_SUITE(KqpLimits) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::TIMEOUT);
}
+ /* Scenario:
+ - prepare and run query
+ - observe first EvState event from CA to Executer and replace it with EvAbortExecution
+ - count all EvState events from all CAs
+ - wait for final event EvTxResponse from Executer
+ - expect it to happen strictly after all EvState events
+ */
+ Y_UNIT_TEST(WaitCAsStateOnAbort) {
+ TKikimrRunner kikimr(TKikimrSettings().SetUseRealThreads(false));
+ auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } );
+ auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } );
+
+ auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"(
+ SELECT COUNT(*) FROM `/Root/TwoShard`;
+ )")).GetValueSync();
+ });
+ UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString());
+ auto dataQuery = prepareResult.GetQuery();
+
+ bool firstEvState = false;
+ ui32 totalEvState = 0;
+ TActorId executerId;
+ ui32 actorCount = 3; // TODO: get number of actors properly.
+
+ auto& runtime = *kikimr.GetTestServer().GetRuntime();
+ runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) {
+ ++totalEvState;
+ if (!firstEvState) {
+ executerId = ev->Recipient;
+ ev = new IEventHandle(ev->Recipient, ev->Sender,
+ new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues()));
+ firstEvState = true;
+ }
+ } else if (ev->GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType && ev->Sender == executerId) {
+ UNIT_ASSERT_C(totalEvState == actorCount*2, "Executer sent response before waiting for CAs");
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto settings = TExecDataQuerySettings().OperationTimeout(TDuration::MilliSeconds(500));
+ kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); });
+
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&](IEventHandle& ev) {
+ return ev.GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType
+ && ev.Sender == executerId && totalEvState == actorCount*2;
+ });
+
+ UNIT_ASSERT(runtime.DispatchEvents(opts));
+ }
+
+ /* Scenario:
+ - prepare and run query
+ - observe first EvState event from CA to Executer and replace it with EvAbortExecution
+ - count all EvState events from all CAs
+ - drop final EvState event from last CA
+ - wait for final event EvTxResponse from Executer after timeout poison
+ - expect it to happen strictly after all EvState events
+ */
+ Y_UNIT_TEST(WaitCAsTimeout) {
+ TKikimrRunner kikimr(TKikimrSettings().SetUseRealThreads(false));
+ auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } );
+ auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } );
+
+ auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"(
+ SELECT COUNT(*) FROM `/Root/TwoShard`;
+ )")).GetValueSync();
+ });
+ UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString());
+ auto dataQuery = prepareResult.GetQuery();
+
+ bool firstEvState = false;
+ bool timeoutPoison = false;
+ ui32 totalEvState = 0;
+ TActorId executerId;
+ ui32 actorCount = 3; // TODO: get number of actors properly.
+
+ auto& runtime = *kikimr.GetTestServer().GetRuntime();
+ runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) {
+ ++totalEvState;
+ if (!firstEvState) {
+ executerId = ev->Recipient;
+ ev = new IEventHandle(ev->Recipient, ev->Sender,
+ new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues()));
+ firstEvState = true;
+ } else {
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ } else if (ev->GetTypeRewrite() == TEvents::TEvPoison::EventType && totalEvState == actorCount*2 &&
+ ev->Sender == executerId && ev->Recipient == executerId)
+ {
+ timeoutPoison = true;
+ } else if (ev->GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType && ev->Sender == executerId) {
+ UNIT_ASSERT_C(timeoutPoison, "Executer sent response before waiting for CAs");
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ auto settings = TExecDataQuerySettings().OperationTimeout(TDuration::MilliSeconds(500));
+ kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); });
+
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&](IEventHandle& ev) {
+ return ev.GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType
+ && ev.Sender == executerId && totalEvState == actorCount*2 && timeoutPoison;
+ });
+
+ UNIT_ASSERT(runtime.DispatchEvents(opts));
+ }
+
Y_UNIT_TEST(ReplySizeExceeded) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetTableClient();
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index be7db0bca5..666d33b343 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -554,7 +554,7 @@ protected:
}
}
- void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues)
+ void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool forceTerminate = false)
{
auto execEv = MakeHolder<TEvDqCompute::TEvState>();
auto& record = execEv->Record;
@@ -575,7 +575,7 @@ protected:
this->Send(ExecuterId, execEv.Release());
- if (Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) {
+ if (!forceTerminate && Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) {
// checkpointed CAs must not self-destroy
return;
}
@@ -1032,10 +1032,6 @@ protected:
auto tag = (EEvWakeupTag) ev->Get()->Tag;
switch (tag) {
case EEvWakeupTag::TimeoutTag: {
- auto abortEv = MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, TStringBuilder()
- << "Timeout event from compute actor " << this->SelfId()
- << ", TxId: " << TxId << ", task: " << Task.GetId());
-
if (ComputeActorSpan) {
ComputeActorSpan.EndError(
TStringBuilder()
@@ -1044,10 +1040,8 @@ protected:
);
}
- this->Send(ExecuterId, abortEv.Release());
-
- TerminateSources("timeout exceeded", false);
- Terminate(false, "timeout exceeded");
+ State = NDqProto::COMPUTE_STATE_FAILURE;
+ ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::TIMEOUT, {TIssue("timeout exceeded")}, true);
break;
}
case EEvWakeupTag::PeriodicStatsTag: {
@@ -1071,8 +1065,9 @@ protected:
switch (lostEventType) {
case TEvDqCompute::TEvState::EventType: {
CA_LOG_E("Handle undelivered TEvState event, abort execution");
- this->TerminateSources("executer lost", false);
- Terminate(false, "executer lost");
+
+ TerminateSources("executer lost", false);
+ Terminate(false, "executer lost"); // Executer lost - no need to report state
break;
}
default: {
@@ -1118,14 +1113,17 @@ protected:
InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ev->Get()->GetIssues().begin());
return;
}
+
TIssues issues = ev->Get()->GetIssues();
CA_LOG_E("Handle abort execution event from: " << ev->Sender
<< ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode())
<< ", reason: " << issues.ToOneLineString());
- bool success = ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS;
-
- this->TerminateSources(issues, success);
+ if (ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS) {
+ State = NDqProto::COMPUTE_STATE_FINISHED;
+ } else {
+ State = NDqProto::COMPUTE_STATE_FAILURE;
+ }
if (ev->Sender != ExecuterId) {
if (ComputeActorSpan) {
@@ -1135,7 +1133,7 @@ protected:
NActors::TActivationContext::Send(ev->Forward(ExecuterId));
}
- Terminate(success, issues);
+ ReportStateAndMaybeDie(ev->Get()->Record.GetStatusCode(), issues, true);
}
void HandleExecuteBase(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {