diff options
author | Ivan <5627721+abyss7@users.noreply.github.com> | 2024-08-29 18:09:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-29 18:09:32 +0300 |
commit | 93089eecc92947f748b20c17c2fa0b3fa7d2b18a (patch) | |
tree | 70cef038a518826bace90f23f2d02095f4c4b7fa | |
parent | 0e39c53a25f06037e9c94b5ee5cbfa00d65b5ee6 (diff) | |
download | ydb-93089eecc92947f748b20c17c2fa0b3fa7d2b18a.tar.gz |
Apply the last stats received from terminated CAs (#8356)
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 18 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 57 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 2 |
5 files changed, 51 insertions, 34 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 6f365c44e2..32353e34cd 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -533,7 +533,7 @@ private: if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) { CancelProposal(0); } - HandleComputeStats(ev); + HandleComputeState(ev); } void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { @@ -1015,7 +1015,7 @@ private: hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected); hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse); hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute); - hFunc(TEvDqCompute::TEvState, HandleComputeStats); + hFunc(TEvDqCompute::TEvState, HandleComputeState); hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData); hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck); hFunc(TEvKqp::TEvAbortExecution, HandleExecute); @@ -2646,6 +2646,7 @@ private: this->Become(&TThis::WaitShutdownState); LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and " << Planner->GetPendingComputeActors().size() << " compute actors"); + // TODO(ilezhankin): the CA awaiting timeout should be configurable. TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison)); } } else { @@ -2681,17 +2682,10 @@ private: } void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) { - if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) { - YQL_ENSURE(Planner); - - TActorId actor = ev->Sender; - ui64 taskId = ev->Get()->Record.GetTaskId(); - - Planner->CompletedCA(taskId, actor); + HandleComputeStats(ev); - if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { - PassAway(); - } + if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) { + PassAway(); } } diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 2915775ea3..297e247e14 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -380,7 +380,7 @@ protected: this->Send(channelComputeActorId, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId); } - void HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { + bool HandleComputeStats(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { TActorId computeActor = ev->Sender; auto& state = ev->Get()->Record; ui64 taskId = state.GetTaskId(); @@ -409,7 +409,39 @@ protected: } YQL_ENSURE(Planner); - bool populateChannels = Planner->AcknowledgeCA(taskId, computeActor, &state); + bool ack = Planner->AcknowledgeCA(taskId, computeActor, &state); + + switch (state.GetState()) { + case NYql::NDqProto::COMPUTE_STATE_FAILURE: + case NYql::NDqProto::COMPUTE_STATE_FINISHED: + // Don't finalize stats twice. + if (Planner->CompletedCA(taskId, computeActor)) { + ExtraData[computeActor].Swap(state.MutableExtraData()); + + if (Stats) { + Stats->AddComputeActorStats( + computeActor.NodeId(), + std::move(*state.MutableStats()), + TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) + ); + } + + LastTaskId = taskId; + LastComputeActorId = computeActor.ToString(); + } + default: + ; // ignore all other states. + } + + return ack; + } + + void HandleComputeState(NYql::NDq::TEvDqCompute::TEvState::TPtr& ev) { + TActorId computeActor = ev->Sender; + auto& state = ev->Get()->Record; + ui64 taskId = state.GetTaskId(); + + bool populateChannels = HandleComputeStats(ev); switch (state.GetState()) { case NYql::NDqProto::COMPUTE_STATE_UNKNOWN: { @@ -427,22 +459,8 @@ protected: break; } - case NYql::NDqProto::COMPUTE_STATE_FAILURE: - case NYql::NDqProto::COMPUTE_STATE_FINISHED: { - ExtraData[computeActor].Swap(state.MutableExtraData()); - if (Stats) { - Stats->AddComputeActorStats( - computeActor.NodeId(), - std::move(*state.MutableStats()), - TDuration::MilliSeconds(AggregationSettings.GetCollectLongTasksStatsTimeoutMs()) - ); - } - - LastTaskId = taskId; - LastComputeActorId = computeActor.ToString(); - YQL_ENSURE(Planner); - Planner->CompletedCA(taskId, computeActor); - } + default: + ; // ignore all other states. } if (state.GetState() == NYql::NDqProto::COMPUTE_STATE_FAILURE) { @@ -1854,6 +1872,9 @@ protected: void PassAway() override { YQL_ENSURE(AlreadyReplied && ResponseEv); + + // Actualize stats with the last stats from terminated CAs, but keep the status. + FillResponseStats(ResponseEv->Record.GetResponse().GetStatus()); this->Send(Target, ResponseEv.release()); for (auto channelPair: ResultChannelProxies) { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 44243f0882..997964a60f 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -592,11 +592,11 @@ bool TKqpPlanner::AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql:: return false; } -void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { +bool TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { auto& task = TasksGraph.GetTask(taskId); if (task.Meta.Completed) { YQL_ENSURE(!PendingComputeActors.contains(computeActor)); - return; + return false; } task.Meta.Completed = true; @@ -606,6 +606,8 @@ void TKqpPlanner::CompletedCA(ui64 taskId, TActorId computeActor) { PendingComputeActors.erase(it); LOG_I("Compute actor has finished execution: " << computeActor.ToString()); + + return true; } void TKqpPlanner::TaskNotStarted(ui64 taskId) { diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index df3c3fdd39..29facb855f 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -75,7 +75,7 @@ public: std::unique_ptr<IEventHandle> PlanExecution(); std::unique_ptr<IEventHandle> AssignTasksToNodes(); bool AcknowledgeCA(ui64 taskId, TActorId computeActor, const NYql::NDqProto::TEvComputeActorState* state); - void CompletedCA(ui64 taskId, TActorId computeActor); + bool CompletedCA(ui64 taskId, TActorId computeActor); void TaskNotStarted(ui64 taskId); TProgressStat::TEntry CalculateConsumptionUpdate(); void ShiftConsumption(); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index c04d5e7573..f7357744e1 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -110,7 +110,7 @@ private: STATEFN(ExecuteState) { try { switch (ev->GetTypeRewrite()) { - hFunc(TEvDqCompute::TEvState, HandleComputeStats); + hFunc(TEvDqCompute::TEvState, HandleComputeState); hFunc(TEvDqCompute::TEvChannelData, HandleChannelData); // from CA hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); |