diff options
author | Ivan <5627721+abyss7@users.noreply.github.com> | 2024-08-23 13:15:46 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-23 13:15:46 +0300 |
commit | fe50fe3015efab1c711b830b2027fbc0018729ab (patch) | |
tree | d282254120a9fa4961b13cd75f2c573b7c81db77 | |
parent | 1869eedd3e49984e0b363ceda467bc1fc7b16214 (diff) | |
download | ydb-fe50fe3015efab1c711b830b2027fbc0018729ab.tar.gz |
Wait for all CAs inside Executer before shutdown (#7829)
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 78 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 57 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 117 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 30 |
8 files changed, 257 insertions, 48 deletions
diff --git a/.gitignore b/.gitignore index ce49d73b45..87b7b54d29 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,9 @@ __pycache__/ *.pb.h *.pb.cc +# Other generated +*.fbs.h + # MacOS specific .DS_Store diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index fd16b7b441..1d18d034ac 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -205,6 +205,8 @@ public: } void Finalize() { + YQL_ENSURE(!AlreadyReplied); + if (LocksBroken) { TString message = "Transaction locks invalidated."; @@ -278,8 +280,7 @@ public: ExecuterSpan.EndOk(); Request.Transactions.crop(0); - LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize()); - Send(Target, ResponseEv.release()); + AlreadyReplied = true; PassAway(); } @@ -319,6 +320,8 @@ private: return "WaitSnapshotState"; } else if (func == &TThis::WaitResolveState) { return "WaitResolveState"; + } else if (func == &TThis::WaitShutdownState) { + return "WaitShutdownState"; } else { return TBase::CurrentStateFuncName(); } @@ -2595,6 +2598,22 @@ private: } } + void Shutdown() override { + if (Planner) { + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + LOG_I("Shutdown immediately - nothing to wait"); + PassAway(); + } else { + this->Become(&TThis::WaitShutdownState); + LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and " + << Planner->GetPendingComputeActors().size() << " compute actors"); + TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison)); + } + } else { + PassAway(); + } + } + void PassAway() override { auto totalTime = TInstant::Now() - StartTime; Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds()); @@ -2612,6 +2631,61 @@ private: TBase::PassAway(); } + STATEFN(WaitShutdownState) { + switch(ev->GetTypeRewrite()) { + hFunc(TEvDqCompute::TEvState, HandleShutdown); + hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown); + hFunc(TEvents::TEvPoison, HandleShutdown); + default: + LOG_E("Unexpected event: " << ev->GetTypeName()); // ignore all other events + } + } + + void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) { + if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) { + YQL_ENSURE(Planner); + + TActorId actor = ev->Sender; + ui64 taskId = ev->Get()->Record.GetTaskId(); + + Planner->CompletedCA(taskId, actor); + + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + PassAway(); + } + } + } + + void HandleShutdown(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { + const auto nodeId = ev->Get()->NodeId; + LOG_N("Node has disconnected while shutdown: " << nodeId); + + YQL_ENSURE(Planner); + + for (const auto& task : TasksGraph.GetTasks()) { + if (task.Meta.NodeId == nodeId && !task.Meta.Completed) { + if (task.ComputeActorId) { + Planner->CompletedCA(task.Id, task.ComputeActorId); + } else { + Planner->TaskNotStarted(task.Id); + } + } + } + + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + PassAway(); + } + } + + void HandleShutdown(TEvents::TEvPoison::TPtr& ev) { + // Self-poison means timeout - don't wait anymore. + LOG_I("Timed out on waiting for Compute Actors to finish - forcing shutdown"); + + if (ev->Sender == SelfId()) { + PassAway(); + } + } + private: void ReplyTxStateUnknown(ui64 shardId) { auto message = TStringBuilder() << "Tx state unknown for shard " << shardId << ", txid " << TxId; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 37f8bec828..2915775ea3 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -667,7 +667,7 @@ protected: if (statusCode == Ydb::StatusIds::INTERNAL_ERROR) { InternalError(issues); } else if (statusCode == Ydb::StatusIds::TIMEOUT) { - AbortExecutionAndDie(ev->Sender, NYql::NDqProto::StatusIds::TIMEOUT, "Request timeout exceeded"); + TimeoutError(ev->Sender); } else { RuntimeError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), issues); } @@ -1624,14 +1624,14 @@ protected: protected: void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) { for (const auto& task : this->TasksGraph.GetTasks()) { - if (task.ComputeActorId) { + if (task.ComputeActorId && !task.Meta.Completed) { LOG_I("aborting compute actor execution, message: " << issues.ToOneLineString() << ", compute actor: " << task.ComputeActorId << ", task: " << task.Id); auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(code), issues); this->Send(task.ComputeActorId, ev.Release()); } else { - LOG_I("task: " << task.Id << ", does not have Compute ActorId yet"); + LOG_I("task: " << task.Id << ", does not have the CA id yet or is already complete"); } } } @@ -1649,7 +1649,6 @@ protected: void InternalError(const NYql::TIssues& issues) { LOG_E(issues.ToOneLineString()); - TerminateComputeActors(Ydb::StatusIds::INTERNAL_ERROR, issues); auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::UNEXPECTED, "Internal error while executing transaction."); for (const NYql::TIssue& i : issues) { issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(i)); @@ -1663,7 +1662,6 @@ protected: void ReplyUnavailable(const TString& message) { LOG_E("UNAVAILABLE: " << message); - TerminateComputeActors(Ydb::StatusIds::UNAVAILABLE, message); auto issue = NYql::YqlIssue({}, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE); issue.AddSubIssue(new NYql::TIssue(message)); ReplyErrorAndDie(Ydb::StatusIds::UNAVAILABLE, issue); @@ -1671,7 +1669,6 @@ protected: void RuntimeError(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) { LOG_E(Ydb::StatusIds_StatusCode_Name(code) << ": " << issues.ToOneLineString()); - TerminateComputeActors(code, issues); ReplyErrorAndDie(code, issues); } @@ -1687,11 +1684,19 @@ protected: ReplyErrorAndDie(status, &issues); } - void AbortExecutionAndDie(TActorId abortSender, NYql::NDqProto::StatusIds::StatusCode status, const TString& message) { + void TimeoutError(TActorId abortSender) { if (AlreadyReplied) { + LOG_E("Timeout when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl); return; } + const auto status = NYql::NDqProto::StatusIds::TIMEOUT; + const TString message = "Request timeout exceeded"; + + TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message); + + AlreadyReplied = true; + LOG_E("Abort execution: " << NYql::NDqProto::StatusIds_StatusCode_Name(status) << "," << message); if (ExecuterSpan) { ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status)); @@ -1701,17 +1706,14 @@ protected: // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target). if (abortSender != Target) { - auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded"); + auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, message); this->Send(Target, abortEv.Release()); } - AlreadyReplied = true; LOG_E("Sending timeout response to: " << Target); - this->Send(Target, ResponseEv.release()); Request.Transactions.crop(0); - TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message); - this->PassAway(); + this->Shutdown(); } void FillResponseStats(Ydb::StatusIds::StatusCode status) { @@ -1746,17 +1748,11 @@ protected: google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>* issues) { if (AlreadyReplied) { + LOG_E("Error when we already replied - not good" << Endl << TBackTrace().PrintToString() << Endl); return; } - if (Planner) { - for (auto computeActor : Planner->GetPendingComputeActors()) { - LOG_D("terminate compute actor " << computeActor.first); - - auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDq::YdbStatusToDqStatus(status), "Terminate execution"); - this->Send(computeActor.first, ev.Release()); - } - } + TerminateComputeActors(status, "Terminate execution"); AlreadyReplied = true; auto& response = *ResponseEv->Record.MutableResponse(); @@ -1782,8 +1778,7 @@ protected: ExecuterStateSpan.EndError(response.DebugString()); Request.Transactions.crop(0); - this->Send(Target, ResponseEv.release()); - this->PassAway(); + this->Shutdown(); } protected: @@ -1851,7 +1846,16 @@ protected: } protected: + // Introduced separate method from `PassAway()` - to not get confused with expectations from other actors, + // that `PassAway()` should kill actor immediately. + virtual void Shutdown() { + PassAway(); + } + void PassAway() override { + YQL_ENSURE(AlreadyReplied && ResponseEv); + this->Send(Target, ResponseEv.release()); + for (auto channelPair: ResultChannelProxies) { LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId()); @@ -1872,12 +1876,11 @@ protected: if (KqpTableResolverId) { this->Send(KqpTableResolverId, new TEvents::TEvPoison); - this->Send(this->SelfId(), new TEvents::TEvPoison); - LOG_T("Terminate, become ZombieState"); - this->Become(&TKqpExecuterBase::ZombieState); - } else { - IActor::PassAway(); } + + this->Send(this->SelfId(), new TEvents::TEvPoison); + LOG_T("Terminate, become ZombieState"); + this->Become(&TKqpExecuterBase::ZombieState); } STATEFN(ZombieState) { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 3a13c9fb7f..926dda2700 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -554,7 +554,19 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { YQL_ENSURE(it != PendingComputeActors.end()); LastStats.emplace_back(std::move(it->second)); PendingComputeActors.erase(it); - return; + + LOG_I("Compute actor has finished execution: " << computeActor.ToString()); +} + +void TKqpPlanner::TaskNotStarted(ui64 taskId) { + // NOTE: should be invoked only while shutting down - when node is disconnected. + + auto& task = TasksGraph.GetTask(taskId); + + YQL_ENSURE(!task.ComputeActorId); + YQL_ENSURE(!task.Meta.Completed); + + PendingComputeTasks.erase(taskId); } TProgressStat::TEntry TKqpPlanner::CalculateConsumptionUpdate() { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index 639c537370..eed887d6e9 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -74,6 +74,7 @@ public: std::unique_ptr<IEventHandle> AssignTasksToNodes(); bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state); void CompletedCA(ui64 taskId, TActorId computeActor); + void TaskNotStarted(ui64 taskId); TProgressStat::TEntry CalculateConsumptionUpdate(); void ShiftConsumption(); void Submit(); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 277e77da71..c04d5e7573 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -273,6 +273,9 @@ private: public: void Finalize() { + YQL_ENSURE(!AlreadyReplied); + AlreadyReplied = true; + FillResponseStats(Ydb::StatusIds::SUCCESS); LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize()); @@ -281,8 +284,6 @@ public: ExecuterSpan.EndOk(); } - LOG_D("Sending response to: " << Target); - Send(Target, ResponseEv.release()); PassAway(); } diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index 6883fab44a..b0ac3f9dab 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -1,7 +1,10 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/core/kqp/common/events/events.h> #include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/kqp/executer_actor/kqp_executer.h> #include <ydb/library/ydb_issue/proto/issue_id.pb.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/core/tablet/resource_broker.h> #include <util/random/random.h> @@ -832,6 +835,120 @@ Y_UNIT_TEST_SUITE(KqpLimits) { UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::TIMEOUT); } + /* Scenario: + - prepare and run query + - observe first EvState event from CA to Executer and replace it with EvAbortExecution + - count all EvState events from all CAs + - wait for final event EvTxResponse from Executer + - expect it to happen strictly after all EvState events + */ + Y_UNIT_TEST(WaitCAsStateOnAbort) { + TKikimrRunner kikimr(TKikimrSettings().SetUseRealThreads(false)); + auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } ); + auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } ); + + auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"( + SELECT COUNT(*) FROM `/Root/TwoShard`; + )")).GetValueSync(); + }); + UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString()); + auto dataQuery = prepareResult.GetQuery(); + + bool firstEvState = false; + ui32 totalEvState = 0; + TActorId executerId; + ui32 actorCount = 3; // TODO: get number of actors properly. + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) { + ++totalEvState; + if (!firstEvState) { + executerId = ev->Recipient; + ev = new IEventHandle(ev->Recipient, ev->Sender, + new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues())); + firstEvState = true; + } + } else if (ev->GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType && ev->Sender == executerId) { + UNIT_ASSERT_C(totalEvState == actorCount*2, "Executer sent response before waiting for CAs"); + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto settings = TExecDataQuerySettings().OperationTimeout(TDuration::MilliSeconds(500)); + kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); }); + + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&](IEventHandle& ev) { + return ev.GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType + && ev.Sender == executerId && totalEvState == actorCount*2; + }); + + UNIT_ASSERT(runtime.DispatchEvents(opts)); + } + + /* Scenario: + - prepare and run query + - observe first EvState event from CA to Executer and replace it with EvAbortExecution + - count all EvState events from all CAs + - drop final EvState event from last CA + - wait for final event EvTxResponse from Executer after timeout poison + - expect it to happen strictly after all EvState events + */ + Y_UNIT_TEST(WaitCAsTimeout) { + TKikimrRunner kikimr(TKikimrSettings().SetUseRealThreads(false)); + auto db = kikimr.RunCall([&] { return kikimr.GetTableClient(); } ); + auto session = kikimr.RunCall([&] { return db.CreateSession().GetValueSync().GetSession(); } ); + + auto prepareResult = kikimr.RunCall([&] { return session.PrepareDataQuery(Q_(R"( + SELECT COUNT(*) FROM `/Root/TwoShard`; + )")).GetValueSync(); + }); + UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), EStatus::SUCCESS, prepareResult.GetIssues().ToString()); + auto dataQuery = prepareResult.GetQuery(); + + bool firstEvState = false; + bool timeoutPoison = false; + ui32 totalEvState = 0; + TActorId executerId; + ui32 actorCount = 3; // TODO: get number of actors properly. + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == NYql::NDq::TEvDqCompute::TEvState::EventType) { + ++totalEvState; + if (!firstEvState) { + executerId = ev->Recipient; + ev = new IEventHandle(ev->Recipient, ev->Sender, + new NKikimr::NKqp::TEvKqp::TEvAbortExecution(NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssues())); + firstEvState = true; + } else { + return TTestActorRuntime::EEventAction::DROP; + } + } else if (ev->GetTypeRewrite() == TEvents::TEvPoison::EventType && totalEvState == actorCount*2 && + ev->Sender == executerId && ev->Recipient == executerId) + { + timeoutPoison = true; + } else if (ev->GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType && ev->Sender == executerId) { + UNIT_ASSERT_C(timeoutPoison, "Executer sent response before waiting for CAs"); + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + auto settings = TExecDataQuerySettings().OperationTimeout(TDuration::MilliSeconds(500)); + kikimr.RunInThreadPool([&] { return dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); }); + + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&](IEventHandle& ev) { + return ev.GetTypeRewrite() == NKikimr::NKqp::TEvKqpExecuter::TEvTxResponse::EventType + && ev.Sender == executerId && totalEvState == actorCount*2 && timeoutPoison; + }); + + UNIT_ASSERT(runtime.DispatchEvents(opts)); + } + Y_UNIT_TEST(ReplySizeExceeded) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index be7db0bca5..666d33b343 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -554,7 +554,7 @@ protected: } } - void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues) + void ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::StatusCode statusCode, const TIssues& issues, bool forceTerminate = false) { auto execEv = MakeHolder<TEvDqCompute::TEvState>(); auto& record = execEv->Record; @@ -575,7 +575,7 @@ protected: this->Send(ExecuterId, execEv.Release()); - if (Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) { + if (!forceTerminate && Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) { // checkpointed CAs must not self-destroy return; } @@ -1032,10 +1032,6 @@ protected: auto tag = (EEvWakeupTag) ev->Get()->Tag; switch (tag) { case EEvWakeupTag::TimeoutTag: { - auto abortEv = MakeHolder<TEvDq::TEvAbortExecution>(NYql::NDqProto::StatusIds::TIMEOUT, TStringBuilder() - << "Timeout event from compute actor " << this->SelfId() - << ", TxId: " << TxId << ", task: " << Task.GetId()); - if (ComputeActorSpan) { ComputeActorSpan.EndError( TStringBuilder() @@ -1044,10 +1040,8 @@ protected: ); } - this->Send(ExecuterId, abortEv.Release()); - - TerminateSources("timeout exceeded", false); - Terminate(false, "timeout exceeded"); + State = NDqProto::COMPUTE_STATE_FAILURE; + ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::TIMEOUT, {TIssue("timeout exceeded")}, true); break; } case EEvWakeupTag::PeriodicStatsTag: { @@ -1071,8 +1065,9 @@ protected: switch (lostEventType) { case TEvDqCompute::TEvState::EventType: { CA_LOG_E("Handle undelivered TEvState event, abort execution"); - this->TerminateSources("executer lost", false); - Terminate(false, "executer lost"); + + TerminateSources("executer lost", false); + Terminate(false, "executer lost"); // Executer lost - no need to report state break; } default: { @@ -1118,14 +1113,17 @@ protected: InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, *ev->Get()->GetIssues().begin()); return; } + TIssues issues = ev->Get()->GetIssues(); CA_LOG_E("Handle abort execution event from: " << ev->Sender << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(ev->Get()->Record.GetStatusCode()) << ", reason: " << issues.ToOneLineString()); - bool success = ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS; - - this->TerminateSources(issues, success); + if (ev->Get()->Record.GetStatusCode() == NYql::NDqProto::StatusIds::SUCCESS) { + State = NDqProto::COMPUTE_STATE_FINISHED; + } else { + State = NDqProto::COMPUTE_STATE_FAILURE; + } if (ev->Sender != ExecuterId) { if (ComputeActorSpan) { @@ -1135,7 +1133,7 @@ protected: NActors::TActivationContext::Send(ev->Forward(ExecuterId)); } - Terminate(success, issues); + ReportStateAndMaybeDie(ev->Get()->Record.GetStatusCode(), issues, true); } void HandleExecuteBase(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { |